广告
返回顶部
首页 > 资讯 > 后端开发 > Python >SpringCloud Stream使用解析
  • 365
分享到

SpringCloud Stream使用解析

2024-04-02 19:04:59 365人浏览 独家记忆

Python 官方文档:入门教程 => 点击学习

摘要

目录SpringCloudStream下面用RabbitMQ来说明使用!案例之消息驱动之生产者案例之消息驱动消费者测试补充说明springCloudStream 官方定义sprin

springCloudStream

官方定义spring cloud Stream 是一个构建消息驱动微服务框架
应用通过inputs和outputs来与Spring Cloud Stream中binder对象交互。通过我们配置来binding(绑定),而Spring Cloud Stream中的binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与Spring Cloud Stream 交互就可以方便使用消息驱动的方式。
通过使用Spring Integration来连接消息代理中间件以及实现消息事件驱动。
目前仅支持RabbitMQkafka

下面用RabbitMQ来说明使用!

案例之消息驱动之生产者

1.建一个项目,并添加如下的依赖:


 <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-WEB</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>

2.编写下面的yml文件


server:
 port: 8801

spring:
 application:
  name: cloud-stream-provider
 rabbitmq:
  host: 192.168.43.76
  port: 5672
  username: guest
  passWord: guest
 cloud:
  stream:
   binders: # 在此处配置要绑定的rabbitmq的服务信息;
    defaultRabbit: # 表示定义的名称,用于于binding整合
     type: rabbit # 消息组件类型
#     environment: # 设置rabbitmq的相关的环境配置,(本机方式)
#      spring:
#       rabbitmq:
#        host: localhost
#        port: 5672
#        username: guest
#        password: guest
   bindings: # 服务的整合处理
    output: # 这个名字是一个通道的名称
     destination: studyExchange # 表示要使用的Exchange名称定义
     content-type: application/JSON # 设置消息类型,本次为json,文本则设置“text/plain”
     binder: defaultRabbit # 设置要绑定的消息服务的具体设置(爆红不要管)

eureka:
 client: # 客户端进行Eureka注册的配置
  service-url:
   defaultZone: Http://localhost:7001/eureka
 instance:
  lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
  lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
  instance-id: send-8801.com # 在信息列表时显示主机名称
  prefer-ip-address: true   # 访问的路径变为IP地址

3.编写service,下面仅展示实现类:


import org.lzl.sprinGCloud.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import javax.annotation.Resource;
import java.util.UUID;


//注意这里不需要写@Service,因为该service是跟rabbitmq打交道的
@EnableBinding(Source.class)//定义消息的推送管道
public class MessageProviderImpl implements IMessageProvider {

  @Resource
  private MessageChannel output;

  @Override
  public String send() {
    String serial = UUID.randomUUID().toString();
    output.send(MessageBuilder.withPayload(serial).build());
    System.out.println("*****serial:"+serial);
    return null;
  }
}

4.编写controller


import org.lzl.springcloud.service.IMessageProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;

@RestController
public class SendMessageController {
  @Resource
  private IMessageProvider messageProvider;

  @GetMapping(value = "/sendMessage")
  public String sendMessage(){
    return messageProvider.send();
  }
}

案例之消息驱动消费者

1.写pom,加上下面的依赖


	<dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>

2.编写yml,和生产者唯一的区别在于,下面是input


server:
 port: 8802

spring:
 application:
  name: cloud-stream-consumer
 rabbitmq:
  host: 192.168.43.76
  port: 5672
  username: guest
  password: guest
 cloud:
  stream:
   binders: # 在此处配置要绑定的rabbitmq的服务信息;
    defaultRabbit: # 表示定义的名称,用于于binding整合
     type: rabbit # 消息组件类型
#     environment: # 设置rabbitmq的相关的环境配置
#      spring:
#       rabbitmq:
#        host: localhost
#        port: 5672
#        username: guest
#        password: guest
   bindings: # 服务的整合处理
    input: # 这个名字是一个通道的名称
     destination: studyExchange # 表示要使用的Exchange名称定义
     content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
     binder: defaultRabbit # 设置要绑定的消息服务的具体设置



eureka:
 client: # 客户端进行Eureka注册的配置
  service-url:
   defaultZone: http://localhost:7001/eureka
 instance:
  lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
  lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
  instance-id: receive-8802.com # 在信息列表时显示主机名称
  prefer-ip-address: true   # 访问的路径变为IP地址

3.编写controller,该controller不向外界暴露端口,起到实时监控消息管道的作用!


import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
  @Value("${server.port}")
  private String serverPort;

  @StreamListener(Sink.INPUT)//只要8801发送消息,8802就会接收到8801的消息
  public void input(Message<String> message){
    System.out.println("消费者1号--------》接收到的消息:"+message.getPayload()+"\t port: "+serverPort);
  }

}

测试

启动rabbitMQ和上面的两个项目,访问http://localhost:8801/sendMessage
在消费者的控制台中就会出现下面的订单流水号:

在这里插入图片描述

补充说明

我们打开rabbitmq的监控界面:发现默认是帮我们分组的

在这里插入图片描述

想要自定义分组只需要在消费者的yml文件中加上下面的一行:

在这里插入图片描述

如果有多个消费者,为了避免出现重复消费的问题,应将组名设置成一样的。一个组的成员轮循消费,不同组的成员进行相同的消费。

到此这篇关于SpringCloud Stream介绍的文章就介绍到这了,更多相关SpringCloud Stream介绍内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

--结束END--

本文标题: SpringCloud Stream使用解析

本文链接: https://www.lsjlt.com/news/122636.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • SpringCloud Stream使用解析
    目录SpringCloudStream下面用RabbitMQ来说明使用!案例之消息驱动之生产者案例之消息驱动消费者测试补充说明SpringCloudStream 官方定义Sprin...
    99+
    2022-11-12
  • java springcloud的Stream怎么使用
    本篇内容主要讲解“java springcloud的Stream怎么使用”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“java springcloud的Stream怎么使用”吧!Spring C...
    99+
    2023-06-05
  • Stream怎么在SpringCloud中使用
    本篇文章为大家展示了Stream怎么在SpringCloud中使用,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。1.建一个项目,并添加如下的依赖: <dependency>&n...
    99+
    2023-06-14
  • 最新SpringCloud Stream消息驱动讲解
    目录SpringCloud Stream消息驱动1、SpringCloud Stream概述1.1、设计思想1.2、标准的流程套路1.3、编码API和常用注解2、消息驱动之生产者(o...
    99+
    2022-11-13
    SpringCloud Stream消息驱动 SpringCloud 消息驱动
  • SpringCloud @FeignClient参数的用法解析
    目录SpringCloud @FeignClient 参数详解@FeignClient 注解常用参数SpringCloud @FeignClient 参数详解 今天因为工作中遇到Fe...
    99+
    2022-11-12
  • 解析Java8 Stream原理
    目录一、前言二、Stream流水线解决方案2.1、操作如何记录2.2、操作如何叠加2.3、叠加之后的操作如何执行一、前言 首先我们先看一个使用Stream API的示例,具体代码如下...
    99+
    2022-11-12
  • 浅析Node.js 中 Stream API 的使用
    本文由浅入深给大家介绍node.js stream api,具体详情请看下文吧。 基本介绍 在 Node.js 中,读取文件的方式有两种,一种是用 fs.readFile ,另外一种是利用 fs.creat...
    99+
    2022-06-04
    js Node API
  • java List的stream()方法解析
    一、简介 常用的4种stream()用法。 (1) list.stream().map().collect() 方法,可以获取list中JavaBean的某个字段,转成一个新的list。 (2) list.stream().filte...
    99+
    2023-09-04
    java list stream 方法 代码 Powered by 金山文档
  • SpringCloud Netflix Ribbon源码解析(推荐)
    目录SpringCloud Netflix Ribbon源码解析配置和实例初始化与OpenFeign 的集成负载均衡器LoadBalancerClientILoadBalancer负...
    99+
    2022-11-11
  • 【Java系列】深入解析Stream API
    序言 你只管努力,其他交给时间,时间会证明一切。 文章标记颜色说明: 黄色:重要标题红色:用来标记结论绿色:用来标记论点蓝色:用来标记论点 希望这篇文章能让你不仅有一定的收获,而且可以愉快的学习,如果有什么建议,都可以留...
    99+
    2023-09-01
    java 算法 python
  • SpringCloud LoadBalancerClient 负载均衡原理解析
    目录深入解析 LoadBalancerClient 接口源码:1、LoadBalancerClient 源码解析:2、ILoadBalancer 源码解析:  &...
    99+
    2022-11-13
  • SpringCloud-Spring Boot Starter使用测试实例分析
    这篇文章主要介绍了SpringCloud-Spring Boot Starter使用测试实例分析的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇SpringCloud-Spring ...
    99+
    2023-07-02
  • SpringCloud之@FeignClient()注解的使用方式
    目录@FeignClient()注解的使用@FeignClient标签的常用属性如下SpringCloud 服务间互相调用 @FeignClient注解我在FEIGN-CONSUME...
    99+
    2022-11-12
  • SpringCloud中Gateway的使用教程详解
    目录1.基础教程2.将配置放在配置文件里3.放在springcloud里面4.使用服务名而不是IP1.基础教程 pom.xml <parent> ...
    99+
    2022-11-13
    SpringCloud Gateway使用 SpringCloud Gateway
  • 深入理解JDK8中Stream使用
    概述 Stream 是 Java8 中处理集合的关键抽象概念,它可以指定你希望对集合进行的操作,可以执行非常复杂的查找、过滤和映射数据等操作。使用Stream API 对集合数据进行...
    99+
    2022-11-12
  • Redis Stream类型的使用详解
    目录一、背景二、redis中Stream类型的特点三、Stream的结构四、Stream的命令1、XADD 往Stream末尾添加消息1、命令格式2、举例2、XRANGE查看Stre...
    99+
    2022-11-12
  • Java流处理stream使用详解
    目录基本流中间操作与终端操作一些常见的终端操作进阶流筛选各异的元素截断跳过元素映射流mapflatMap匹配全匹配与非全匹配OptionalfindAnyfindFirst归约求和归...
    99+
    2022-11-13
  • SpringCloud@RefreshScope注解源码层面深入分析
    目录写在前面正文写在前面 最近在研究Spring Cloud和Spring Cloud Alibaba源码,在看到Nacos的配置中心的时候,有注意到自动刷新配置的玩法,底层实现依靠...
    99+
    2023-05-15
    SpringCloud @RefreshScope SpringCloud @RefreshScope注解
  • SpringCloud Tencent 全套解决方案源码分析
    目录Spring Cloud Tencent 是什么?项目源码地址一、安装北极星二、服务注册与发现三、配置管理四、服务限流五、服务路由六、限流熔断Spring Cloud Tence...
    99+
    2022-11-13
  • SpringCloud Hystrix的使用
    目录简介 服务熔断 实践 服务降级 实践 服务熔断与服务降级的区别 服务监控 Dashboard 建立项目 简介 在分布式系统中,服务与服务之间依赖错综复杂,一种不可避免的情况就是...
    99+
    2022-11-12
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作