iis服务器助手广告广告
返回顶部
首页 > 资讯 > 数据库 >redis stream 实现消息队列的实践
  • 318
分享到

redis stream 实现消息队列的实践

redisstream消息队列redis消息队列 2022-08-10 20:08:24 318人浏览 安东尼
摘要

目录Redis 实现消息对列4中方法发布订阅list 队列zset 队列Stream 队列基本命令xadd 生产消息读取消息xgroup 消费者组xreadgroup 消费消息Pending 等待列表消息确认消息转移信息

Redis5.0带来了Stream类型。从字面上看是流类型,但其实从功能上看,应该是Redis对消息队列MQ,Message Queue)的完善实现。

基于redis实现消息队列的方式有很多:

  • PUB/SUB,订阅/发布模式
  • 基于List的 LPUSH+BRPOP 的实现

redis 实现消息对列4中方法

发布订阅

发布订阅优点: 典型的一对的,所有消费者都能同时消费到消息。主动通知订阅者而不是订阅者轮询去读。

发布订阅缺点: 不支持多个消费者公平消费消息,消息没有持久化,不管订阅者是否收到消息,消息都会丢失。

使用场景:微服务间的消息同步,如 分布式WEBSocker,数据同步等。

list 队列

生产者通过lpush生成消息,消费者通过blpop阻塞读取消息。

**list队列优点:**支持多个消费者公平消费消息,对消息进行存储,可以通过lrange查询队列内的消息。

**list队列缺点:**blpop仍然会阻塞当前连接,导致连接不可用。一旦blpop成功消息就丢弃了,期间如果服务器宕机消息会丢失,不支持一对多消费者。

zset 队列

生产者通过zadd 创建消息时指定分数,可以确定消息的顺序,消费者通过zrange获取消息后进行消费,消费完后通zrem删除消息。

zset优点: 保证了消息的顺序,消费者消费失败后重新入队不会打乱消费顺序。

zset缺点: 不支持一对多消费,多个消费者消费时可能出现读取同一条消息的情况,得通过加或其他方式解决消费的幂等性。

zset使用场景:由于数据是有序的,常常被用于延迟队列,如 redisson的DelayQueue

Stream 队列

Redis5.0带来了Stream类型。从字面上看是流类型,但其实从功能上看,应该是Redis对消息队列(MQ,Message Queue)的完善实现。

参考kafka的思想,通过多个消费者组和消费者支持一对多消费,公平消费,消费者内维护了pending列表防止消息丢失。

提供消息ack机制。

redis stream 实现消息队列的实践

基本命令

xadd 生产消息

往 stream 内创建消息 语法为:

XADD key ID field string [field string …]

# * 表示自动生成id redis会根据时间戳+序列号自动生成id,不建议我们自己指定id
xadd stream1 * name zs age 23  

读取消息

读取stream内的消息,这个并不是消费,只是提供了查看数据的功能,语法为:

XREAD [COUNT count] [block milliseconds] STREAMS key [key …] ID [ID …]

#表示从 stream1 内取出一条消息,从第0条消息读取(0表示最小的id)
xread count 1 streams stream1 0
#表示从 stream1 内 id=1649143363972-0 开始读取一条消息,读取的是指定id的下一条消息
xread count 1 streams msg 1649143363972-0

#表示一直阻塞读取最新的消息($表示获取下一个生成的消息)
xread count 1 block 0 streams stream1 $ 

xrange stream - + 10

XRANGE key startID endID count

#表示从stream1内取10条消息 起始位置为 -(最小ID) 结束位置为+(最大ID)
xrange stream1 - + 10 

xgroup 消费者组

redis stream 借鉴了kafka的设计,采用了消费者和消费者组的概念。允许多个消费者组消费stream的消息,每个消费者组都能收到完整的消息,例如:stream内有10条消息,消费者组A和消费者组B同时消费时,都能获取到这10条消息。

每个消费者组内可以有多个消费者消费,消息会平均分摊给各个消费者,例如:stream有10条消息,消费者A,B,C同时在同一个组内消费,A接收到 1,4,7,10,B接收到 2,5,8,C接收到 3,6,9

创建消费者组:

#消费消息首先得创建消费者组
# 表示为队列 stream1 创建一个消费者组 group1 从消息id=0(第一条消息)开始读取消息
xgroup create stream1 group1 0

#查询stream1内的所有消费者组信息
xinfo groups stream1

xreadgroup 消费消息

通过xreadgroup可以在消费者组内创建消费者消费消息

XREADGROUP group groupName consumerName [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]

#创建消费者读取消息
#在group1消费者组内通过consumer1消费stream1内的消息,消费1条未分配的消息 (> 表示未分配过消费者的消息)
xreadgrup group group1 consumer1 count 1 streams stream1 > 

Pending 等待列表

通过 xreadgroup 读取消息时消息会分配给对应的消费者,每个消费者内都维护了一个Pending列表用于保存接收到的消息,当消息ack后会从pending列表内移除,也就是说pending列表内维护的是所有未ack的消息id

每个Pending的消息有4个属性:

  • 消息ID
  • 所属消费者
  • IDLE,已读取时长
  • delivery counter,消息被读取次数

XPENDING key group [start end count] [consumer]

#查看pending列表
# 查看group1组内的consumer1的pending列表 - 表示最小的消息id + 表示最大的消息ID
xpending stream1 group1 - + 10 consumer1
# 查看group1组内的所有消费者pending类表
xpending stream1 group1 - + 10 

消息确认

当消费者消费了消息,需要通过 xack 命令确认消息,xack后的消息会从pending列表移除

XACK key gruopName ID

xack stream1 group1 xxx

消息转移

当消费者接收到消息却不能正确消费时(报错或其他原因),可以使用 XCLaiM 将消息转移给其他消费者消费,需要设置组、转移的目标消费者和消息ID,同时需要提供IDLE(已被读取时长),只有超过这个时长,才能被转移。

通过xclaim转移的消息只是将消息移入另一个消费者的pending列表,消费者并不能通过xreadgroup读取到消息,只能通过xpending读取到。

# 表示将ID为 1553585533795-1 的消息转移到消费者B消费,前提是消费
XCLAIM stream1 group1 consumer1 3600000 1553585533795-1

信息监控

redis提供了xinfo来查看stream的信息

#查看sream信息
xinfo stream steam1
#查询消费者组信息
xinfo groups group1 

#查询消费者信息
xinfo consumers consumer1

springBoot 整合

1 引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

2 编写消费者

@Slf4j
@Component
public class EmailConsumer implements StreamListener<String, MapRecord<String,String,String>> {

    public final String streamName      = "emailStream";
    public final String groupName       = "emailGroup";
    public final String consumerName    = "emailConsumer";


    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Override
    public void onMessage(MapRecord<String, String, String> message) {

        //log.info("stream名称-->{}",message.getStream());
        //log.info("消息ID-->{}",message.getId());
        log.info("消息内容-->{}",message.getValue());

        Map<String, String> msgMap = message.getValue();

        if( msgMap.get("sID")!=null && Integer.valueOf(msgMap.get("sID")) % 3 ==0 ){
            //消费异常导致未能ack时,消息会进入pending列表,我们可以启动定时任务来读取pending列表处理失败的任务
            log.info("消费异常-->"+message);
           return;
        }

        StreamOperations<String, String, String> streamOperations = stringRedisTemplate.opsForStream();
        //消息应答
        streamOperations.acknowledge( streamName,groupName,message.getId() );

    }
	//我们可以启动定时任务不断监听pending列表,处理死信消息
}

3 配置redis

序列化配置

@EnableCaching
@Configuration
public class RedisConfig {


    
    @Bean
    public Jackson2JSONRedisSerializer<Object> jackson2jsonRedisSerializer(){
        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);

        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        jackson2JsonRedisSerializer.setObjectMapper(om);

        return jackson2JsonRedisSerializer;
    }

    
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory,
                                                       Jackson2JsonRedisSerializer jackson2JsonRedisSerializer) {

        // 配置redisTemplate
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        RedisSerializer<?> stringSerializer = new StringRedisSerializer();

        // key序列化
        redisTemplate.seTKEySerializer(stringSerializer);
        // value序列化
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);

        // Hash key序列化
        redisTemplate.setHashKeySerializer(stringSerializer);
        // Hash value序列化
        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);

        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }

}

消费者组和消费者配置

@Slf4j
@Configuration
public class RedisStreamConfig {

    @Autowired
    private EmailConsumer emailConsumer;

    @Autowired
    private RedisTemplate<String,Object> redisTemplate;

    @Bean
    public StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String,String,String>> emailListenerContainerOptions(){

        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();

        return StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                .builder()
                //block读取超时时间
                .pollTimeout(Duration.ofSeconds(3))
                //count 数量(一次只获取一条消息)
                .BATchSize(1)
                //序列化规则
                .serializer( stringRedisSerializer )
                .build();
    }

    
    @Bean
    public StreamMessageListenerContainer<String,MapRecord<String,String,String>> emailListenerContainer(RedisConnectionFactory factory,
                                                                 StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String,String,String>> streamMessageListenerContainerOptions){

        StreamMessageListenerContainer<String,MapRecord<String,String,String>> listenerContainer = StreamMessageListenerContainer.create(factory,
                streamMessageListenerContainerOptions);

        //如果 流不存在 创建 stream 流
        if( !redisTemplate.hasKey(emailConsumer.streamName)){
            redisTemplate.opsForStream().add(emailConsumer.streamName, Collections.singletonMap("", ""));
            log.info("初始化stream {} success",emailConsumer.streamName);
        }

        //创建消费者组
        try {
            redisTemplate.opsForStream().createGroup(emailConsumer.streamName,emailConsumer.groupName);
        } catch (Exception e) {
            log.info("消费者组 {} 已存在",emailConsumer.groupName);
        }

        //注册消费者 消费者名称,从哪条消息开始消费,消费者类
        // > 表示没消费过的消息
        // $ 表示最新的消息
        listenerContainer.receive(
            Consumer.from(emailConsumer.groupName, emailConsumer.consumerName),
            StreamOffset.create(emailConsumer.streamName, ReadOffset.lastConsumed()),
            emailConsumer
        );


        listenerContainer.start();
        return listenerContainer;
    }

}

4.生产者生产消息

@GetMapping("/redis/ps")
public String redisPublish(String content,Integer count){

    StreamOperations streamOperations = redisTemplate.opsForStream();

    for (int i = 0; i < count; i++) {
        AtomicInteger num = new AtomicInteger(i);

        Map msgMap = new HashMap();
        msgMap.put("count", i);
        msgMap.put("sID", num);
        //新增消息
        streamOperations.add("emailStream",msgMap);
    }
    return "success";
}

参考文档:

redis Stream 消息队列

SpringBoot整合redis stream 实现消息队列

到此这篇关于redis stream 实现消息队列的实践的文章就介绍到这了,更多相关redis stream 消息队列内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

您可能感兴趣的文档:

--结束END--

本文标题: redis stream 实现消息队列的实践

本文链接: https://www.lsjlt.com/news/33639.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • redis stream 实现消息队列的实践
    目录Redis 实现消息对列4中方法发布订阅list 队列zset 队列Stream 队列基本命令xadd 生产消息读取消息xgroup 消费者组xreadgroup 消费消息Pending 等待列表消息确认消息转移信息...
    99+
    2022-08-10
    redisstream消息队列 redis消息队列
  • redisstream实现消息队列的实践
    目录redis 实现消息对列4中方法发布订阅list 队列zset 队列Stream 队列基本命令xadd 生产消息读取消息xgroup 消费者组xreadgroup 消费消息Pen...
    99+
    2022-11-13
    redis stream 消息队列 redis 消息队列
  • 详解Redis Stream做消息队列
    目录ListPub/subStreamConsumer Grouplast_delivered_idpending_idscurdpending_ids如何避免消息丢失嵌入SpringBoot注册Redis s...
    99+
    2022-09-23
  • Redis中如何实现消息队列和延时消息队列
    这篇文章将为大家详细讲解有关Redis中如何实现消息队列和延时消息队列,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。list的几个命令lpush (left push)由...
    99+
    2022-10-19
  • redis怎么实现消息队列
    Redis可以通过以下几种方式实现消息队列:1. List数据结构:使用Redis的List数据结构实现简单的消息队列。生产者将消息...
    99+
    2023-09-14
    redis
  • redis实现消息队列的方法
    这期内容当中的小编将会给大家带来有关redis实现消息队列的方法,以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。消息队列,Message Queue,常用于解决并发系统中的资源一致性问题...
    99+
    2022-10-18
  • Java实现Redis延时消息队列
    目录什么是延时任务 延时任务的特点 实现思路: 代码实现 1.消息模型2.RedisMq 消息队列实现类3.消息生产者 4.消息消费者 5. 消息执接口 6. 任务类型的实现类:可以...
    99+
    2022-11-12
  • Redis如何实现消息队列功能
    Redis如何实现消息队列功能随着互联网的发展,消息队列在分布式系统中变得越来越重要。消息队列允许不同的应用程序之间通过异步通信来传递和处理消息,提高了系统的可伸缩性和可靠性。Redis作为一款快速、可靠、灵活的内存数据库,也可以用来实现消...
    99+
    2023-11-07
    redis 实现 消息队列
  • Redis消息队列怎么实现秒杀
    要实现秒杀功能,可以使用Redis的消息队列来进行异步处理。下面是一种基本的实现方法:1. 准备工作:创建一个商品库存键值对,如"s...
    99+
    2023-10-11
    Redis
  • 如何使用redis实现消息队列
    使用redis实现消息队列的示例:redis的pubsub功能实现发布订阅模式,代码:import redisclass Task(object):def __init__(self):self.rcon = redis.StrictRed...
    99+
    2022-10-24
  • redis中的消息队列
    这期内容当中的小编将会给大家带来有关redis中的消息队列介绍,以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。一、认识消息队列1.1 消息队列概念“消息”是在两台计算机间传送的数据单位。...
    99+
    2022-11-30
    redis 消息队列 edi
  • golang消息队列实现
    Golang是一种开源的编程语言,它适用于创建高性能的网络应用程序和消息队列等分布式系统。在这篇文章中,我们将探讨如何使用Golang来实现一个消息队列。什么是消息队列?在分布式系统中,应用程序通常需要在不同的节点之间共享数据。消息队列是用...
    99+
    2023-05-15
  • 详解Redis用链表实现消息队列
    前言 Redis链表经常会被用于消息队列的服务,以完成多程序之间的消息交换。个人认为redis消息队列有一个好处,就是可以实现分布式和共享,就和memcache作为mysql的缓存和mysql自带的缓存一样...
    99+
    2022-06-04
    队列 详解 链表
  • redis中队列消息实现应用解耦
    本篇文章给大家分享的是有关redis中队列消息实现应用解耦,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。1、如果redis服务器挂掉了怎么办啊...
    99+
    2022-10-18
  • Redis怎么使用ZSET实现消息队列
    这篇文章主要介绍了Redis怎么使用ZSET实现消息队列的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇Redis怎么使用ZSET实现消息队列文章都会有所收获,下面我们一起来看看吧。1.redis 用zset做消...
    99+
    2023-07-05
  • redis用list做消息队列的实现示例
    目录生产消息服务消费消息服务,定时任务日志测试leftPush消息入队,rightPop对应,消息出队。 rightPop(RedisConstant.MQ_LIST, 0L, Ti...
    99+
    2022-11-13
  • 怎么在springboot中用redis实现消息队列
    本篇内容主要讲解“怎么在springboot中用redis实现消息队列”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“怎么在springboot中用redis实现消息队列”吧!准备阶段安装redi...
    99+
    2023-06-19
  • 详解redis是如何实现队列消息的ack
    前言 由于公司提供的队列实在太过于蛋疼而且还限制不能使用其他队列,但为了保证数据安全性需要一个可以有ack功能的队列。 原生的redis中通过L/R PUSH/POP方式来实现队列的功能,这个当然是没办法满...
    99+
    2022-06-04
    队列 如何实现 详解
  • Redis 使用 List 实现消息队列的优缺点
    目录什么是消息队列消息队列满足哪些特性消息有序性重复消息处理可靠性List 实现消息队列LPUSHRPOP实时消费问题重复消费消息可靠性需要注意的是Redission 实战添加依赖J...
    99+
    2022-11-12
  • SpringBoot2实现MessageQueue消息队列
    目录什么是消息队列一、异步与同步1.1 同步通讯与异步通讯1.2 同步调用的问题1.3 异步调用方案二、MQ消息队列2.1 单机部署MQ2.2 结构和概念2.3 常见的消息模型三、S...
    99+
    2023-05-17
    SpringBoot2 MessageQueue消息队列 SpringBoot MessageQueue
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作