iis服务器助手广告广告
返回顶部
首页 > 资讯 > 后端开发 > JAVA >Redis队列详解(springboot实战)
  • 778
分享到

Redis队列详解(springboot实战)

java数据结构redisspringbootPoweredby金山文档 2023-10-25 21:10:21 778人浏览 八月长安
摘要

前言 MQ应用有很多,比如ActiveMQ,RabbitMQ,kafka等,但是也可以基于Redis来实现,可以降低系统的维护成本和实现复杂度,本篇介绍redis中实现消息队列的几种方案,并通过SpringBoot实战使其更易懂。 1

前言

MQ应用有很多,比如ActiveMQ,RabbitMQ,kafka等,但是也可以基于Redis来实现,可以降低系统的维护成本和实现复杂度,本篇介绍redis中实现消息队列的几种方案,并通过SpringBoot实战使其更易懂。

1. 基于List的 LPUSH+BRPOP 的实现

2. 基于Sorted-Set的实现

3. PUB/SUB,订阅/发布模式

4. 基于Stream类型的实现

1. 基于List的 LPUSH+BRPOP 的实现

描述

使用rpushlpush操作入队列,lpoprpop操作出队列。

List支持多个生产者和消费者并发进出消息,每个消费者拿到都是不同的列表元素。

优点

一旦数据到来则立刻醒过来,消息延迟几乎为零。

缺点

  • 不能重复消费,一旦消费就会被删除

  • 不能做广播模式 , 不支持分组消费

  • lpop和rpop会一直空轮训,消耗资源 ,但可以 引入阻塞读blpop和brpop 同时也有新的问题 如果线程一直阻塞在那里,Redis客户端的连接就成了闲置连接,闲置过久,服务器一般会主动断开连接,减少闲置资源占用,这个时候blpop和brpop或抛出异常

实战

代码

@Slf4j@Servicepublic class ListRedisQueue {    //队列名    public static final String KEY = "listQueue";    @Resource    private RedisTemplate redisTemplate;    public void produce(String message) {        redisTemplate.opsForList().rightPush(KEY, message);    }    public void consume() {        while (true) {            String msg = (String) redisTemplate.opsForList().leftPop(KEY);            log.info("疯狂获取消息:" + msg);        }    }    public void blockinGConsume() {        while (true) {            List obj = redisTemplate.executePipelined(new RedisCallback() {                @Override                public Object doInRedis(RedisConnection connection) throws DataAccessException {                    //队列没有元素会阻塞操作,直到队列获取新的元素或超时,5表示如果没元素就每五秒去拿一次消息                    return connection.bRPop(5, KEY.getBytes());                }            }, new StringRedisSerializer());            for (Object str : obj) {                log.info("blockingConsume获取消息 : {}", str);            }        }    }}  

测试

lPop/rPop消费数据

    @Autowired    private ListRedisQueue listRedisQueue;    @Test    public void produce() {        for (int i = 0; i < 5; i++) {            listRedisQueue.produce("第"+i + "个数据");        }    }    @Test    public void consume() {        produce();        logger.info("生产消息完毕");        listRedisQueue.consume();    }

输出

blpop / brpop 消费数据
    @Test    public void blockingConsume() {        produce();        logger.info("生产消息完毕");        listRedisQueue.blockingConsume();    }

输出

2. 基于Sorted-Set的实现延时队列

描述

其实zset就是sorted set。为了避免sorted set简写sset导致命令冲突,所以改为zset。同理例如class-->clazz

sorted set从字面意思上,很容易就可以理解,是个有序且不可重复的数据集合。类似set和hash的混合体,但是相比于set,zset内部由score进行排序.

优点

可以自定义消息ID,在消息ID有意义时,比较重要。

缺点

缺点也明显,不允许重复消息(因为是集合),同时消息ID确定有错误会导致消息的顺序出错。

实战

代码

@Slf4j@Servicepublic class SortedSetRedisQueue {    //队列名    public static final String KEY = "sortedSet_queue";    @Autowired    private RedisTemplate redisTemplate;    public void produce(String msg, Double score) {        // 创建Sorted Set实例        ZSetOperations zSetOperations = redisTemplate.opsForZSet();        // 添加数据        zSetOperations.add(KEY, msg, score);    }    public void consumer() throws InterruptedException {        // 创建SortedSet实例        ZSetOperations zSetOperations = redisTemplate.opsForZSet();        while (true) {            // 拿取数据 (rangeByScore返回有序集合中指定分数区间的成员列表。有序集成员按分数值递增(从小到大)次序排列)            Set order = zSetOperations.rangeByScore(KEY, 0, System.currentTimeMillis(), 0, 1);            if (ObjectUtils.isEmpty(order)) {                log.info("当前没有数据 当前线程睡眠3秒");                TimeUnit.SECONDS.sleep(3);                // 跳过本次循环 重新循环拿取数据                continue;            }            // 利用迭代器拿取Set中的数据            String massage = order.iterator().next();            // 过河拆迁,拿到就删除消息            if (zSetOperations.remove(KEY, massage) > 0) {                //做些业务处理                log.info("我拿到的消息:" + massage);            }        }    }}

测试

    @Autowired    private SortedSetRedisQueue sortedSetRedisQueue;    @Test    public void sortedSetProduce() throws InterruptedException {        for (int i = 0; i < 5; i++) {            TimeUnit.SECONDS.sleep(1);            // 生成分数            double score = System.currentTimeMillis();            sortedSetRedisQueue.produce("第"+i + "个数据",score);        }    }    @Test    public void sortedSetConsumer() throws InterruptedException {        sortedSetProduce();        logger.info("生产消息完毕");        sortedSetRedisQueue.consumer();    }}

输出

3.PUB/SUB,订阅/发布模式

描述

SUBSCRIBE,用于订阅信道

PUBLISH,向信道发送消息

UNSUBSCRIBE,取消订阅

此模式允许生产者只生产一次消息,由中间件负责将消息复制到多个消息队列,每个消息队列由对应的消费组消费。

优点

  • 一个消息可以发布到多个消费者

  • 消费者可以同时订阅多个信道,因此可以接收多种消息(处理时先根据信道判断)

  • 消息即时发送,消费者会自动接收到信道发布的消息

缺点

  • 消息发布时,如果客户端不在线,则消息丢失

  • 消费者处理消息时出现了大量消息积压,则可能会断开通道,导致消息丢失

  • 消费者接收消息的时间不一定是一致的,可能会有差异(业务处理需要判重)

实战

监听器

@Slf4j@Componentpublic class RedisMessageListenerListener implements MessageListener {    @Autowired    private RedisTemplate redisTemplate;        @Override    public void onMessage(Message message, byte[] pattern) {        String channel = new String(pattern);        log.info("onMessage --> 消息通道是:{}", channel);        RedisSerializer valueSerializer = redisTemplate.getValueSerializer();        Object deserialize = valueSerializer.deserialize(message.getBody());        log.info("反序列化的结果:{}", deserialize);        if (deserialize == null) return;        String md5DigestAsHex = DigestUtils.md5DigestAsHex(deserialize.toString().getBytes(StandardCharsets.UTF_8));        log.info("计算得到的key: {}", md5DigestAsHex);        Boolean result = redisTemplate.opsForValue().setIfAbsent(md5DigestAsHex, "1", 20, TimeUnit.SECONDS);        if (Boolean.TRUE.equals(result)) {            // redis消息进行处理            log.info("接收的结果:{}", deserialize.toString());        } else {            log.info("其他服务处理中");        }    }}

实现MessageListener 接口,就可以通过onMessage()方法接收到消息了,该方法有两个参数:

  • 参数 message 的 getBody() 方法以二进制形式获取消息体, getChannel() 以二进制形式获取消息通道

  • 参数 pattern 二进制形式的消息通道(实际和 message.getChannel() 返回值相同)

绑定监听器

@Configurationpublic class RedisMessageListenerConfig {    @Bean    public RedisMessageListenerContainer getRedisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory,                  RedisMessageListenerListener redisMessageListenerListener) {        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();        redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);        redisMessageListenerContainer.addMessageListener(redisMessageListenerListener, new ChannelTopic(PubSubRedisQueue.KEY));        return redisMessageListenerContainer;    }}

RedisMessageListenerContainer 是为Redis消息侦听器 MessageListener 提供异步行为的容器。处理侦听、转换和消息分派的低级别详细信息。

本文使用的是主题订阅:ChannelTopic,你也可以使用模式匹配:PatternTopic,从而匹配多个信道。

生产者

@Servicepublic class PubSubRedisQueue {    //队列名    public static final String KEY = "pub_sub_queue";    @Autowired    private RedisTemplate redisTemplate;    public void produce(String message) {        redisTemplate.convertAndSend(KEY, message);    }}  

测试

@Slf4j@RestController@RequestMapping(value = "/queue")public class RedisMQController {    @Autowired    private PubSubRedisQueue pubSubRedisQueue;    @RequestMapping(value = "/pubsub/produce", method = RequestMethod.GET)    public void pubsubProduce(@RequestParam(name = "msg") String msg) {        pubSubRedisQueue.produce(msg);    }

随便找个浏览器请求生产者接口:

所以每插入一条消息,监听者则立即进去消费

4. 基于Stream类型的实现(Redis Version5.0)

描述

Stream为redis 5.0后新增的数据结构。支持多播的可持久化消息队列,实现借鉴了Kafka设计。

Redis Stream的结构如上图所示,它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的ID和对应的内容消息是持久化的,Redis重启后,内容还在。

每个Stream都有唯一的名称,它就是Redis的key,在我们首次使用xadd指令追加消息时自动创建

每个Stream都可以挂多个消费组,每个消费组会有个游标last_delivered_id在Stream数组之上往前移动,表示当前消费组已经消费到哪条消息了。每个消费组都有一个Stream内唯一的名称,消费组不会自动创建,它需要单独的指令xgroup create进行创建,需要指定从Stream的某个消息ID开始消费,这个ID用来初始化last_delivered_id变量。

每个消费组(Consumer Group)的状态都是独立的,相互不受影响。也就是说同一份Stream内部的消息会被每个消费组都消费到

同一个消费组(Consumer Group)可以挂接多个消费者(Consumer),这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标last_delivered_id往前移动。每个消费者者有一个组内唯一名称。

消费者(Consumer)内部会有个状态变量pending_ids,它记录了当前已经被客户端读取的消息,但是还没有ack。如果客户端没有ack,这个变量里面的消息ID会越来越多,一旦某个消息被ack,它就开始减少。这个pending_ids变量在Redis官方被称之为PEL,也就是Pending Entries List,这是一个很核心的数据结构,它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理。

优点

  1. 高性能:可以在非常短的时间内处理大量的消息。

  1. 持久化:支持数据持久化,即使Redis服务器宕机,也可以恢复之前的消息。

  1. 顺序性:保证消息的顺序性,即使是并发的消息也会按照发送顺序排列。

  1. 灵活性:可以方便地扩展和分布式部署,可以满足不同场景下的需求。

缺点

  1. 功能相对简单:Redis Stream相对于其他的消息队列,功能相对简单,无法满足一些复杂的需求。

  1. 不支持消息回溯:即消费者无法获取之前已经消费过的消息。

  1. 不支持多消费者分组:无法实现多个消费者并发消费消息的功能。

实战

自动ack消费者

@Slf4j@Componentpublic class AutoAckStreamConsumeListener implements StreamListener> {    //分组名    public static final String GROUP = "autoack_stream";    @Autowired    private RedisTemplate redisTemplate;    @Override    public void onMessage(MapRecord message) {        String stream = message.getStream();        RecordId id = message.getId();        Map map = message.getValue();        log.info("[自动ACK]接收到一个消息 stream:[{}],id:[{}],value:[{}]", stream, id, map);        redisTemplate.opsForStream().delete(GROUP, id.getValue());    }}

手动ack消费者

@Slf4j@Componentpublic class BasicAckStreamConsumeListener implements StreamListener> {    //分组名    public static final String GROUP = "basicack_stream";    @Autowired    private RedisTemplate redisTemplate;    @Override    public void onMessage(MapRecord message) {        String stream = message.getStream();        RecordId id = message.getId();        Map map = message.getValue();        log.info("[手动ACK]接收到一个消息 stream:[{}],id:[{}],value:[{}]", stream, id, map);        redisTemplate.opsForStream().acknowledge(stream, GROUP, id.getValue());        //消费完毕删除该条消息        redisTemplate.opsForStream().delete(GROUP, id.getValue());    }}

绑定关系

@Slf4j@Configurationpublic class RedisStreamConfiguration {    @Autowired    private RedisConnectionFactory redisConnectionFactory;    @Autowired    private AutoAckStreamConsumeListener autoAckStreamConsumeListener;    @Autowired    private BasicAckStreamConsumeListener basicAckStreamConsumeListener;    @Autowired    private RedisTemplate redisTemplate;    @Bean(initMethod = "start", destroyMethod = "stop")    public StreamMessageListenerContainer> streamMessageListenerContainer() {        AtomicInteger index = new AtomicInteger(1);        int processors = Runtime.getRuntime().availableProcessors();        ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS,                new LinkedBlockingDeque<>(), r -> {            Thread thread = new Thread(r);            thread.setName("async-stream-consumer-" + index.getAndIncrement());            thread.setDaemon(true);            return thread;        });        StreamMessageListenerContainer.StreamMessageListenerContainerOptions> options =                StreamMessageListenerContainer.StreamMessageListenerContainerOptions                        .builder()                        // 一次最多获取多少条消息                        .batchSize(3)                        // 运行 Stream 的 poll task                        .executor(executor)                        // Stream 中没有消息时,阻塞多长时间,需要比 `spring.redis.timeout` 的时间小                        .pollTimeout(Duration.ofSeconds(3))                        // 获取消息的过程或获取到消息给具体的消息者处理的过程中,发生了异常的处理                        .errorHandler(new ErrorHandler() {@Overridepublic void handleError(Throwable t) {    log.info("出现异常就来这里了" + t);}                        })                        .build();        StreamMessageListenerContainer> streamMessageListenerContainer =                StreamMessageListenerContainer.create(redisConnectionFactory, options);        // 独立消费        // 消费组A,自动ack        // 从消费组中没有分配给消费者的消息开始消费        if (!isStreamGroupExists(StreamRedisQueue.KEY,AutoAckStreamConsumeListener.GROUP)){            redisTemplate.opsForStream().createGroup(StreamRedisQueue.KEY,AutoAckStreamConsumeListener.GROUP);        }        streamMessageListenerContainer.receiveAutoAck(Consumer.from(AutoAckStreamConsumeListener.GROUP, "AutoAckConsumer"),                StreamOffset.create(StreamRedisQueue.KEY, ReadOffset.lastConsumed()), autoAckStreamConsumeListener);        // 消费组B,不自动ack        if (!isStreamGroupExists(StreamRedisQueue.KEY,BasicAckStreamConsumeListener.GROUP)){            redisTemplate.opsForStream().createGroup(StreamRedisQueue.KEY,BasicAckStreamConsumeListener.GROUP);        }        streamMessageListenerContainer.receive(Consumer.from(BasicAckStreamConsumeListener.GROUP, "BasicAckConsumer"),                StreamOffset.create(StreamRedisQueue.KEY, ReadOffset.lastConsumed()), basicAckStreamConsumeListener);        return streamMessageListenerContainer;    }        public boolean isStreamGroupExists(String streamKey, String groupName) {        RedisStreamCommands commands = redisConnectionFactory.getConnection().streamCommands();        //首先检查Stream Key是否存在,否则下面代码可能会因为尝试检查不存在的Stream Key而导致异常        if (!redisTemplate.hasKey(streamKey)){            return false;        }        //获取streamKey下的所有groups        StreamInfo.XInfoGroups xInfoGroups = commands.xInfoGroups(streamKey.getBytes());        AtomicBoolean exists= new AtomicBoolean(false);        xInfoGroups.forEach(xInfoGroup -> {            if (xInfoGroup.groupName().equals(groupName)){                exists.set(true);            }        });        return exists.get();    }}

生产工具

@Slf4j@Servicepublic class StreamRedisQueue {    //队列名    public static final String KEY = "stream_queue";    @Autowired    private RedisTemplate redisTemplate;    public String  produce(Map value) {        return redisTemplate.opsForStream().add(KEY, value).getValue();    }    public void createGroup(String key, String group){        redisTemplate.opsForStream().createGroup(key, group);    }}

测试

生产消息

@Slf4j@RestController@RequestMapping(value = "/queue")public class RedisMQController {    @Autowired    private StreamRedisQueue streamRedisQueue;    @RequestMapping(value = "/stream/produce", method = RequestMethod.GET)    public void streamProduce() {        Map map = new HashMap<>();        map.put("刘德华", "大家好我是刘德华");        map.put("周杰伦", "周杰伦");        map.put("time", DateUtil.now());        String result = streamRedisQueue.produce(map);        log.info("返回结果:{}", result);    }}

只要有消息,消费者就会消费

来源地址:https://blog.csdn.net/qq_63815371/article/details/129729737

--结束END--

本文标题: Redis队列详解(springboot实战)

本文链接: https://www.lsjlt.com/news/442881.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • Redis队列详解(springboot实战)
    前言 MQ应用有很多,比如ActiveMQ,RabbitMQ,Kafka等,但是也可以基于redis来实现,可以降低系统的维护成本和实现复杂度,本篇介绍redis中实现消息队列的几种方案,并通过springboot实战使其更易懂。 1...
    99+
    2023-10-25
    java 数据结构 redis spring boot Powered by 金山文档
  • Python算法应用实战之队列详解
    队列(queue) 队列是先进先出(FIFO, First-In-First-Out)的线性表,在具体应用中通常用链表或者数组来实现,队列只允许在后端(称为rear)进行插入操作,在前端(称为front)进...
    99+
    2022-06-04
    队列 算法 详解
  • Redis 实现队列原理的实例详解
    Redis 实现队列原理的实例详解 场景说明: ·用于处理比较耗时的请求,例如批量发送邮件,如果直接在网页触发执行发送,程序会出现超时 ·高并发场景,当某个时刻请求瞬间增加时,可以把请求写入到队列,后台在去...
    99+
    2022-06-04
    队列 详解 实例
  • 详解Redis Stream做消息队列
    目录ListPub/subStreamConsumer Grouplast_delivered_idpending_idscurdpending_ids如何避免消息丢失嵌入SpringBoot注册Redis s...
    99+
    2022-09-23
  • 详解Redis用链表实现消息队列
    前言 Redis链表经常会被用于消息队列的服务,以完成多程序之间的消息交换。个人认为redis消息队列有一个好处,就是可以实现分布式和共享,就和memcache作为mysql的缓存和mysql自带的缓存一样...
    99+
    2022-06-04
    队列 详解 链表
  • 从实战角度详解Disruptor高性能队列
    目录一、背景二、Java内置队列三、ArrayBlockingQueue的问题1.加锁a.关于锁和CASb.锁c.原子变量2.伪共享a.什么是共享b.缓存行c.什么是伪共享四、Dis...
    99+
    2022-11-12
  • SpringBoot整合rockerMQ消息队列详解
    目录Springboot整合RockerMQ使用总结消费模式生产者组和消费者组生产者投递消息的三种方式如何保证消息不丢失顺序消息分布式事务Springboot整合RockerMQ 1...
    99+
    2022-11-13
  • springboot整合redis之消息队列
    目录一、项目准备二、配置类三、redis中list数据类型定时器监听队列运行即监控队列四、发布/订阅模式五、ZSet实现延迟队列一、项目准备 依赖 <!-- R...
    99+
    2022-11-13
  • Redis实现延迟队列的全流程详解
    目录1、前言1.1、什么是延迟队列1.2、应用场景1.3、为什么要使用延迟队列2、Redis sorted set3、Redis 过期键监听回调4、Quartz定时任务5、Delay...
    99+
    2023-03-14
    Redis延迟队列实现 Redis延迟队列原理
  • 详解redis是如何实现队列消息的ack
    前言 由于公司提供的队列实在太过于蛋疼而且还限制不能使用其他队列,但为了保证数据安全性需要一个可以有ack功能的队列。 原生的redis中通过L/R PUSH/POP方式来实现队列的功能,这个当然是没办法满...
    99+
    2022-06-04
    队列 如何实现 详解
  • 怎么在springboot中用redis实现消息队列
    本篇内容主要讲解“怎么在springboot中用redis实现消息队列”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“怎么在springboot中用redis实现消息队列”吧!准备阶段安装redi...
    99+
    2023-06-19
  • SpringBoot整合RabbitMQ实现延迟队列的示例详解
    目录如何保证消息不丢失什么是消息投递可靠性ttl死信队列什么是死信队列消息有哪几种情况成为死信延迟队列springboot整合rabbitmq实现订单超时自动关闭如何保证消息不丢失 ...
    99+
    2023-05-16
    SpringBoot RabbitMQ实现延迟队列 SpringBoot RabbitMQ延迟队列 SpringBoot RabbitMQ队列 SpringBoot RabbitMQ
  • laravel使用redis队列实例讲解
    1、队列配置文件是config/queue.php(这里我默认配置即可): 2、 创建迁移表(failed-table 、jobs、migrations) php artis...
    99+
    2022-11-11
  • redis实现简单队列
    在工作中,时常会有用到队列的场景,比较常见的用rabbitMQ这些专业的组件,官网地址是:http://www.rabbitmq.com,重要的是官方有.net的客户端,但是如果对rabbitMQ不熟悉的话...
    99+
    2022-06-04
    队列 简单 redis
  • 实战干货之基于SpringBoot的RabbitMQ多种模式队列
    目录环境准备安装RabbitMQ依赖连接配置五种队列模式实现1 点对点的队列2 工作队列模式Work Queue3 路由模式Routing4 发布/订阅模式Publish/Subsc...
    99+
    2022-11-12
  • SpringBoot结合Redis实现序列化的方法详解
    目录前言配置类配置 Jackson2JsonRedisSerializer 序列化策略配置  RedisTemplate配置缓存策略测试代码完整代码前言 最近在学...
    99+
    2022-11-13
  • SpringBoot自定义Redis实现缓存序列化详解
    目录1、自定义RedisTemplate1.1、Redis API默认序列化机制1.2、自定义RedisTemplate序列化机制1.3、效果测试2、自定义RedisCacheMan...
    99+
    2022-11-13
  • TP5 queue队列详解
    thinkphp-queue 笔记 前言一 代码示例 1.1 安装 thinkphp-queue1.2 搭建消息队列的存储环境1.3 配置消息队列的驱动1.4 消息的创建与推送1.5 消息的消费与删除1.6 ...
    99+
    2023-08-31
    php
  • JAVA队列( Queue ) 详解
    队列(Queue)是一种常见的数据结构,它遵循先进先出(First-In-First-Out,FIFO)的原则。在队列中,新元素插入...
    99+
    2023-09-15
    Java
  • 详解SpringBoot集成消息队列的案例应用
    目录背景方案规划统一设计集成Redis消息队列集成ActiveMQ消息队列使用示例背景 最近在对公司开发框架进行优化,框架内涉及到多处入库的日志记录,例如登录日志/操作日志/访问日志...
    99+
    2022-11-13
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作