广告
返回顶部
首页 > 资讯 > 后端开发 > 其他教程 >详解RedisStream做消息队列
  • 358
分享到

详解RedisStream做消息队列

2024-04-02 19:04:59 358人浏览 八月长安
摘要

目录ListPub/subStreamConsumer Grouplast_delivered_idpending_idscurdpending_ids如何避免消息丢失嵌入

List

众所周知redis数据结构中的list的lpush与rpop可以用于常规消息队列,从集合的最左端写入,最右端弹出消费。并且支持多个生产者与多个消费者并发拿数据,数据只能由一个消费者拿到。

但这个方案并不能保证消费者消费消息后是否成功处理的问题(服务挂掉或处理异常等),机制属于点对点模式不能做广播模式(发布/订阅模式)

Pub/sub

于是redis提供了相应的发布订阅功能,为了解除点对点的强绑定模式引入了Channel管道

当生产者向管道中发布消息,订阅了该管道的消费者能够同时接收到该消息,而且为了简化订阅多个管道需要显式关注多个名称提供了pattern能力。

通过名称匹配如果接收消息的频道wmyskxz.chat,consumer3也会收到消息。

但这个方案也有很大的诟病就是不会持久化,如果服务挂掉重启数据就全丢弃了,也没有提供ack机制,不保证数据可靠性,不管有没有消费成功发后既忘。

Stream

stream的话结构很像kafka的设计思想,提供了consumer group和offset机制,结构上感觉跟kafka的topic差不多,只是没有对应partation副本机制,而是一个追加消息的链表结构。客户端调用XADD时候自动创建stream。每个消息都会持久化并存在唯一的id标识

Consumer Group

消费者组的概念跟kafka的消费者概念如出一辙,消费者既可以用XREAD命令进行独立消费,也可以多个消费者同时加入一个消费者组。一条消息只能由一个消费者组中的一个消费者消费。这样可以在分布式系统中保证消息的唯一性。

其实这个特性我后来仔细琢磨了一下当时自认为无懈可击的流式图表为了保证分布式系统消息唯一做了redis分布式。有点鸡肋,明明消费者组已经保证了数据的唯一性。只能说加锁可以压缩资源成本

last_delivered_id

用于标识消费者组消费在stream上消费位置的游标,每个消费者组都有一个stream内唯一的名称,消费者组不会自动创建,需要用XGROUP CREATE显式创建。

pending_ids

每个消费者内部都有一个状态变量。用来表示已经被客户端消费但没有ack的消费。目的是为了保证客户端至少消费了消息一次(atleastonce)。如果消费者收到了消息处理完了但是没有回复ack,就会导致列表不断增长,如果有很多消费组的话,那么这个列表占用的内存就会放大

curd

  • xadd 追加消息
  • xdel 删除消息,这里的删除仅仅是设置了标志位,不影响消息总长度
  • xrange 获取消息列表,会自动过滤已经删除的消息
  • xlen 消息长度
  • del 删除Stream

pending_ids如何避免消息丢失

在客户端消费者读取Stream消息时,Redis服务器将消息回复给客户端的过程中,客户端突然断开了连接,消息就丢失了。

但是pending_ids里已经保存了发出去的消息ID。待客户端重新连上之后,可以再次收到pending_ids中的消息ID列表。

不过此时xreadgroup的起始消息必须是任意有效的消息ID,一般将参数设为0-0,表示读取所有的pending_ids消息以及自last_delivered_id之后的新消息。

嵌入springBoot

redis stream虽然还是有一些弊端,但是相比较而言用kafka之类的消息组件太重,redis用作消息队列已经很合适了。

这里简单提一下思路,本质上是提供一个管理消息的一个小功能,定义一个注解用于创建stream管道

创建一个注解类,标注该注解的类必须继承StreamListener<String, ObjectRecord<String, Object>>类且重写onMessage方法。方法上也加这个注解

创建一个config类实现BeanPostProcessor接口,重写bean声明周期postProcessAfterInitializationpostProcessBeforeInitialization方法。该方法会在spring启动流程里的refresh方法加载bean的声明周期中扫描到所有加了注解的bean。

通过线程池挨个创建stream的group组与stream的consumer监听连接,config类记得继承DisposableBean类在destroy方法里把连接关掉免得oom。

注册redis stream api提供的consumer容器

这里一定注意pollTimeout参数,看名字就知道默认拉取数据时间间隔,这个参数如果写的值很小或者写0,你就看你cpu高不高就完了。

@Bean("listenerContainer")
@DependsOn(value = "redisConnectionFactory")
public StreamMessageListenerContainer<String, ObjectRecord<String, Object>> init() {
   StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Object>>
   options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
         .batchSize(10)
         .serializer(new StringRedisSerializer())
         .executor(new ForkJoinPool())
         .pollTimeout(Duration.ofSeconds(3))
         .targetType(Object.class)
         .build();
   return StreamMessageListenerContainer.create(redisConnectionFactory, options);
}

创建消费者

private Subscription createSubscription(RedisConnectionFactory factory, StreamListener streamListener, String streamKey, String group, String consumerName) {
   StreamOperations<String, String, Object> streamOperations = this.stringRedisTemplate.opsForStream();

   if (stringRedisTemplate.hasKey(streamKey)) {
      StreamInfo.XInfoGroups groups = streamOperations.groups(streamKey);

      AtomicReference<Boolean> groupHasKey = new AtomicReference<>(false);

      groups.forEach(groupInfo -> {
         if (Objects.equals(group, groupInfo.getRaw().get("name"))) {
            groupHasKey.set(true);
         }
      });

      if (groups.isEmpty() || !groupHasKey.get()) {
         creatGroup(streamKey, group);
      } else {
         groups.stream().forEach(g -> {
            log.info("XInfoGroups:{}", g);
            StreamInfo.XInfoConsumers consumers = streamOperations.consumers(streamKey, g.groupName());
            log.info("XInfoConsumers:{}", consumers);
         });
      }
   } else {
      creatGroup(streamKey, group);
   }
   StreamOffset<String> streamOffset = StreamOffset.create(streamKey, ReadOffset.lastConsumed());
   Consumer consumer = Consumer.from(group, consumerName);

   Subscription subscription = listenerContainer.receive(consumer, streamOffset, streamListener);
   listenerContainer.start();
   this.containerList.add(listenerContainer);
   return subscription;
}

到此这篇关于详解Redis Stream做消息队列的文章就介绍到这了,更多相关Redis Stream内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

--结束END--

本文标题: 详解RedisStream做消息队列

本文链接: https://www.lsjlt.com/news/168247.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • 详解RedisStream做消息队列
    目录ListPub/subStreamConsumer Grouplast_delivered_idpending_idscurdpending_ids如何避免消息丢失嵌入...
    99+
    2022-11-13
  • redisstream实现消息队列的实践
    目录redis 实现消息对列4中方法发布订阅list 队列zset 队列Stream 队列基本命令xadd 生产消息读取消息xgroup 消费者组xreadgroup 消费消息Pen...
    99+
    2022-11-13
    redis stream 消息队列 redis 消息队列
  • 详解Redis Stream做消息队列
    目录ListPub/subStreamConsumer Grouplast_delivered_idpending_idscurdpending_ids如何避免消息丢失嵌入SpringBoot注册Redis s...
    99+
    2022-09-23
  • FreeRTOS-消息队列详解
    ✅作者简介:嵌入式入坑者,与大家一起加油,希望文章能够帮助各位!!!! 📃个人主页:@rivencode的个人主页 🔥系列专栏:玩转FreeRTOS Ὂ...
    99+
    2023-09-29
    java 网络 开发语言
  • MSMQ微软消息队列详解
    一、引言 Windows Communication Foundation(WCF)是Microsoft为构建面向服务的应用程序而提供的统一编程模型,该服务模型提供了支持松散耦合和版...
    99+
    2022-11-13
  • SpringBoot整合rockerMQ消息队列详解
    目录Springboot整合RockerMQ使用总结消费模式生产者组和消费者组生产者投递消息的三种方式如何保证消息不丢失顺序消息分布式事务Springboot整合RockerMQ 1...
    99+
    2022-11-13
  • thinkphp5.0消息队列topthink/think-queue详解
    第一、安装topthink/think-queue composer require topthink/think-queue=1.1.6 第二、配置queue.php信息 找到应用目录下面的extra/queue.php进行配置,队列驱...
    99+
    2023-10-07
    php 开发语言
  • 如何利用redis做消息队列
    利用redis做消息队列的示例:生产者模拟程序,代码:package scheduleTest; import java.util.Random; import java.util.UUID; import redis.clients.je...
    99+
    2022-10-14
  • AndroidMessageQueue消息队列主要作用详解
    目录定义模型关系内部属性行为约束使用建议消息队列相关概念消息队列的消费场景消息至多被消费一次消息至少被消费一次消息仅被消费一次实践Hello World消息队列好处或功能定义 队列是...
    99+
    2023-02-16
    Android MessageQueue Android MessageQueue消息队列 Android消息队列
  • 消息队列 Kafka
    Kafka Kafka 是一个分布式的基于发布/订阅模式的消息队列(MQ,Message Queue),主要应用于大数据实时处理领域 为什么使用消息队列MQ 在高并发环境下,同步请求来不及处理会发生堵塞,从而触发too many conn...
    99+
    2023-10-23
    kafka 分布式
  • Python消息队列
    消息中间件 --->就是消息队列异步方式:不需要立马得到结果,需要排队同步方式:需要实时获得数据,坚决不能排队例子:#多进程模块multiprocessingfrom multiprocessing import Processfro...
    99+
    2023-01-31
    队列 消息 Python
  • RabbitMQ消息队列
      一、简介   RabbitMQ是一个在AMQP基础上完整的、可复用的企业消息系统,遵循Mozilla Public License开源协议。MQ全称Message Queue(消息队列),它是一种应用程序对应用程序的通信方式。应用程序...
    99+
    2023-01-31
    队列 消息 RabbitMQ
  • RabbitMQ 消息队列
    RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。 MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写...
    99+
    2023-01-31
    队列 消息 RabbitMQ
  • redis能不能用来做消息队列
    这篇文章主要介绍了redis能不能用来做消息队列,具有一定借鉴价值,需要的朋友可以参考下。希望大家阅读完这篇文章后大有收获。下面让小编带着大家一起了解一下。应用场景:例如秒杀。瞬时大量写入订单到数据库,导致...
    99+
    2022-10-18
  • redis做消息队列有什么缺点
    redis做消息队列的缺点:基于List的LPUSH+BRPOP的消息队列缺点:做消费确认ACK比较麻烦。不能做广播模式,例如典型的Pub/Discribe模式。不能重复消费,一旦消费就会被删除。不支持分组消费。...
    99+
    2022-10-09
  • redis中怎么用list做消息队列
    本文小编为大家详细介绍“redis中怎么用list做消息队列”,内容详细,步骤清晰,细节处理妥当,希望这篇“redis中怎么用list做消息队列”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。leftPush消息入...
    99+
    2023-06-29
  • 分布式消息队列RocketMQ概念详解
    目录1.MQ概述1.1 RocketMQ简介1.2 MQ用途1.3 常见MQ产品2.RocketMQ 基本概念2.1 消息2.2 主题2.3 标签2.4 队列2.5 Producer...
    99+
    2023-05-19
    分布式消息队列RocketMQ概念 消息队列RocketMQ RocketMQ概念
  • 详解Redis用链表实现消息队列
    前言 Redis链表经常会被用于消息队列的服务,以完成多程序之间的消息交换。个人认为redis消息队列有一个好处,就是可以实现分布式和共享,就和memcache作为mysql的缓存和mysql自带的缓存一样...
    99+
    2022-06-04
    队列 详解 链表
  • Java RabbitMQ消息队列详解常见问题
    目录消息堆积保证消息不丢失死信队列延迟队列RabbitMQ消息幂等问题RabbitMQ消息自动重试机制合理的选择重试机制消费者开启手动ack模式rabbitMQ如何解决消息幂等问题R...
    99+
    2022-11-13
  • Android 消息队列模型详解及实例
    Android 消息队列模型详解及实例Android系统的消息队列和消息循环都是针对具体线程的,一个线程可以存在(当然也可以不存在)一个消息队列(Message Queue)和一个消息循环(Looper)。Android中除了UI线程(主线...
    99+
    2023-05-31
    android 消息队列 roi
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作