iis服务器助手广告广告
返回顶部
首页 > 资讯 > 精选 >redis怎么实现队列阻塞、延时、发布和订阅
  • 714
分享到

redis怎么实现队列阻塞、延时、发布和订阅

2023-07-02 08:07:50 714人浏览 薄情痞子
摘要

这篇“redis怎么实现队列阻塞、延时、发布和订阅”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“Redis怎么实现队列阻塞、

这篇“redis怎么实现队列阻塞、延时、发布和订阅”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“Redis怎么实现队列阻塞、延时、发布和订阅”文章吧。

Redis不仅可作为缓存服务器,还可以用作消息队列。它的列表类型天生支持用作消息队列。如下图所示:

redis怎么实现队列阻塞、延时、发布和订阅

由于Redis的列表是使用双向链表实现的,保存了头节点和尾节点,所以在列表的头部和尾部两边插入或获取元素都是非常快的,时间复杂度为O(1)。

普通队列

可以直接使用Redis的list数据类型实现消息队列,只需简单的两个指令lpush和rpop或者rpush和lpop。

  • lpush+rpop:左进右出的队列

  • rpush+lpop:左出右进的队列

下面使用redis的命令来模拟普通队列。
使用lpush命令生产消息:

>lpush queue:single 1"1">lpush queue:single 2"2">lpush queue:single 3"3"

使用rpop命令消费消息:

>rpop queue:single"1">rpop queue:single"2">rpop queue:single"3"

下面使用Java代码来实现普通队列。

生产者SingleProducer

package com.morris.redis.demo.queue.single;import redis.clients.jedis.Jedis;public class SingleProducer {    public static final String SINGLE_QUEUE_NAME = "queue:single";    public static void main(String[] args) {        Jedis jedis = new Jedis();        for (int i = 0; i < 100; i++) {            jedis.lpush(SINGLE_QUEUE_NAME, "hello " + i);        }        jedis.close();    }}

消费者SingleConsumer:

package com.morris.redis.demo.queue.single;import redis.clients.jedis.Jedis;import java.util.Objects;import java.util.concurrent.TimeUnit;public class SingleConsumer {    public static void main(String[] args) throws InterruptedException {        Jedis jedis = new Jedis();        while (true) {            String message = jedis.rpop(SingleProducer.SINGLE_QUEUE_NAME);            if(Objects.nonNull(message)) {                System.out.println(message);            } else {                TimeUnit.MILLISECONDS.sleep(500);            }        }    }}

上面的代码已经基本实现了普通队列的生产与消费,但是上述的例子中消息的消费者存在两个问题:

  • 消费者需要不停的调用rpop方法查看redis的list中是否有待处理的数据(消息)。每调用一次都会发起一次连接,有可能list中没有数据,造成大量的空轮询,导致造成不必要的浪费。也许你可以使用Thread.sleep()等方法让消费者线程隔一段时间再消费,如果睡眠时间过长,这样不能处理一些时效性要求高的消息,睡眠时间过短,也会在连接上造成比较大的开销。

  • 如果生产者速度大于消费者消费速度,消息队列长度会一直增大,时间久了会占用大量内存空间。

阻塞队列

消费者可以使用brpop指令从redis的list中获取数据,这个指令只有在有元素时才返回,没有则会阻塞直到超时返回null,于是消费端就不需要休眠后获取数据了,这样就相当于实现了一个阻塞队列,

使用redis的brpop命令来模拟阻塞队列。

>brpop queue:single 30

可以看到命令行阻塞在了brpop这里了,30s后没数据就返回。

Java代码实现如下:

生产者与普通队列的生产者一致。

消费者BlockConsumer:

package com.morris.redis.demo.queue.block;import redis.clients.jedis.Jedis;import java.util.List;public class BlockConsumer {    public static void main(String[] args) {        Jedis jedis = new Jedis();        while (true) {            // 超时时间为1s            List<String> messageList = jedis.brpop(1, BlockProducer.BLOCK_QUEUE_NAME);            if (null != messageList && !messageList.isEmpty()) {                System.out.println(messageList);            }        }    }}

缺点:无法实现一次生产多次消费。

发布订阅模式

Redis除了对消息队列提供支持外,还提供了一组命令用于支持发布/订阅模式。利用Redis的pub/sub模式可以实现一次生产多次消费的队列。

发布:PUBLISH指令可用于发布一条消息,格式:

PUBLISH channel message

返回值表示订阅了该消息的数量。

订阅:SUBSCRIBE指令用于接收一条消息,格式:

SUBSCRIBE channel

使用SUBSCRIBE指令后进入了订阅模式,但是不会接收到订阅之前publish发送的消息,这是因为只有在消息发出去前订阅才会接收到。在这个模式下其他指令,只能看到回复。

回复分为三种类型:

  • 如果为subscribe,第二个值表示订阅的频道,第三个值表示是已订阅的频道的数量

  • 如果为message(消息),第二个值为产生该消息的频道,第三个值为消息

  • 如果为unsubscribe,第二个值表示取消订阅的频道,第三个值表示当前客户端的订阅数量。

下面使用redis的命令来模拟发布订阅模式。

生产者:

127.0.0.1:6379> publish queue hello(integer) 1127.0.0.1:6379> publish queue hi(integer) 1

消费者:

127.0.0.1:6379> subscribe queueReading messages... (press Ctrl-C to quit)1) "subscribe"2) "queue"3) (integer) 11) "message"2) "queue"3) "hello"1) "message"2) "queue"3) "hi"

Java代码实现如下:

生产者PubsubProducer:

package com.morris.redis.demo.queue.pubsub;import redis.clients.jedis.Jedis;public class PubsubProducer {    public static final String PUBSUB_QUEUE_NAME = "queue:pubsub";    public static void main(String[] args) {        Jedis jedis = new Jedis();        for (int i = 0; i < 100; i++) {            jedis.publish(PUBSUB_QUEUE_NAME, "hello " + i);        }        jedis.close();    }}

消费者PubsubConsumer:

package com.morris.redis.demo.queue.pubsub;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPubSub;public class PubsubConsumer {    public static void main(String[] args) throws InterruptedException {        Jedis jedis = new Jedis();        JedisPubSub jedisPubSub = new JedisPubSub() {            @Override            public void onMessage(String channel, String message) {                System.out.println("receive message: " + message);                if(message.indexOf("99") > -1) {                    this.unsubscribe();                }            }            @Override            public void onSubscribe(String channel, int subscribedChannels) {                System.out.println("subscribe channel: " + channel);            }            @Override            public void onUnsubscribe(String channel, int subscribedChannels) {                System.out.println("unsubscribe channel " + channel);            }        };        jedis.subscribe(jedisPubSub, PubsubProducer.PUBSUB_QUEUE_NAME);    }}

消费者可以启动多个,每个消费者都能收到所有的消息。

可以使用指令UNSUBSCRIBE退订,如果不加参数,则会退订所有由SUBSCRIBE指令订阅的频道。

Redis还支持基于通配符的消息订阅,使用指令PSUBSCRIBE (pattern subscribe),例如:

psubscribe channel.*

用PSUBSCRIBE指令订阅的频道也要使用指令PUNSUBSCRIBE指令退订,该指令无法退订SUBSCRIBE订阅的频道,同理UNSUBSCRIBE也不能退订PSUBSCRIBE指令订阅的频道。

同时PUNSUBSCRIBE指令通配符不会展开。例如:PUNSUBSCRIBE \*不会匹配到channel.\*,所以要取消订阅channel.\*就要这样写PUBSUBSCRIBE channel.\*。

Redis的pub/sub也有其缺点,那就是如果消费者下线,生产者的消息会丢失。

延时队列和优先级队列

Redis中有个数据类型叫Zset,其本质就是在数据类型Set的基础上加了个排序的功能而已,除了保存原始的数据value之外,还提供另一个属性score,这一属性在添加修改元素时候可以进行指定,每次指定后,Zset会自动重新按新的score值进行排序。

如果score字段设置为消息的优先级,优先级最高的消息排在第一位,这样就能实现一个优先级队列。

如果score字段代表的是消息想要执行时间的时间戳,将它插入Zset集合中,便会按照时间戳大小进行排序,也就是对执行时间先后进行排序,集合中最先要执行的消息就会排在第一位,这样的话,只需要起一个死循环线程不断获取集合中的第一个元素,如果当前时间戳大于等于该元素的score就将它取出来进行消费删除,就可以达到延时执行的目的,注意不需要遍历整个Zset集合,以免造成性能浪费。

下面使用redis的zset来模拟延时队列。

生产者:

127.0.0.1:6379> zadd queue:delay 1 order1 2 order2 3 order3(integer) 0

消费者:

127.0.0.1:6379> zrange queue:delay 0 0 withscores1) "order1"2) "1"127.0.0.1:6379> zrem queue:delay order1(integer) 1

Java代码如下:

生产者DelayProducer:

package com.morris.redis.demo.queue.delay;import redis.clients.jedis.Jedis;import java.util.Date;import java.util.Random;public class DelayProducer {    public static final String DELAY_QUEUE_NAME = "queue:delay";    public static void main(String[] args) {        Jedis jedis = new Jedis();        long now = new Date().getTime();        Random random = new Random();        for (int i = 0; i < 10; i++) {            int second = random.nextInt(30); // 随机订单失效时间            jedis.zadd(DELAY_QUEUE_NAME, now + second * 1000, "order"+i);        }        jedis.close();    }}

消费者:

package com.morris.redis.demo.queue.delay;import redis.clients.jedis.Jedis;import redis.clients.jedis.Tuple;import java.util.Date;import java.util.List;import java.util.Set;import java.util.concurrent.TimeUnit;public class DelayConsumer {    public static void main(String[] args) throws InterruptedException {        Jedis jedis = new Jedis();        while (true) {            long now = new Date().getTime();            Set<Tuple> tupleSet = jedis.zrangeWithScores(DelayProducer.DELAY_QUEUE_NAME, 0, 0);            if(tupleSet.isEmpty()) {                TimeUnit.MILLISECONDS.sleep(500);            } else {                for (Tuple tuple : tupleSet) {                    Double score = tuple.getScore();                    long time = score.longValue();                    if(time < now) {                        jedis.zrem(DelayProducer.DELAY_QUEUE_NAME, tuple.getElement());                        System.out.println("order[" + tuple.getElement() +"] is timeout at " + time);                    } else {                        TimeUnit.MILLISECONDS.sleep(500);                    }                    break;                }            }        }    }}

应用场景

延时队列可用于订单超时失效的场景
二级缓存(local+redis)中,当有缓存需要更新时,可以使用发布订阅模式通知其他服务器使得本地缓存失效。

以上就是关于“redis怎么实现队列阻塞、延时、发布和订阅”这篇文章的内容,相信大家都有了一定的了解,希望小编分享的内容对大家有帮助,若想了解更多相关的知识内容,请关注编程网精选频道。

--结束END--

本文标题: redis怎么实现队列阻塞、延时、发布和订阅

本文链接: https://www.lsjlt.com/news/340555.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • redis怎么实现队列阻塞、延时、发布和订阅
    这篇“redis怎么实现队列阻塞、延时、发布和订阅”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“redis怎么实现队列阻塞、...
    99+
    2023-07-02
  • redis实现队列的阻塞、延时、发布和订阅
    目录普通队列阻塞队列发布订阅模式延时队列和优先级队列应用场景Redis不仅可作为缓存服务器,还可以用作消息队列。它的列表类型天生支持用作消息队列。如下图所示: 由于Redis的列表...
    99+
    2024-04-02
  • redis如何实现队列的阻塞、延时、发布和订阅
    这篇文章主要介绍了redis如何实现队列的阻塞、延时、发布和订阅的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇redis如何实现队列的阻塞、延时、发布和订阅文章都会有所收获,下...
    99+
    2024-04-02
  • 怎么在Redis中实现延迟队列和分布式延迟队列
    这篇文章给大家介绍怎么在Redis中实现延迟队列和分布式延迟队列,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。1. 实现一个简单的延迟队列。  我们知道目前JAVA可以有DelayedQueue,我们首先开一个Dela...
    99+
    2023-06-15
  • go+redis实现消息队列发布与订阅的详细过程
    在做项目过程中,实现websocket得时候,不知道哪里写的不太合适,客户端消息收到一定程度,剩下的消息收不到,修改了缓冲区大小,还是没有解决问题,后面因为项目结束期比较紧张,没有时...
    99+
    2024-04-02
  • Java阻塞队列BlockingQueue怎么实现
    这篇文章主要讲解了“Java阻塞队列BlockingQueue怎么实现”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Java阻塞队列BlockingQueue怎么实现”吧!BlockingQ...
    99+
    2023-06-02
  • Redis延迟队列和分布式延迟队列的简答实现
            最近,又重新学习了下Redis,Redis不仅能快还能慢,简直利器,今天就为大家介绍一下Redi...
    99+
    2024-04-02
  • Redis中如何实现消息队列和延时消息队列
    这篇文章将为大家详细讲解有关Redis中如何实现消息队列和延时消息队列,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。list的几个命令lpush (left push)由...
    99+
    2024-04-02
  • 怎么利用Jedis实现Redis的订阅与发布
    怎么利用Jedis实现Redis的订阅与发布?很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。什么是Redis发布订阅Redis发布订阅是一种消息通信模式,发送者通...
    99+
    2023-05-31
    redis jedis
  • kafka延时队列怎么实现
    Kafka是一个分布式的消息队列系统,它本身并不直接支持延时队列的功能。但是可以通过一些策略来实现延时队列的功能,下面是一种常见的实...
    99+
    2023-08-08
    kafka
  • Java中怎么利用阻塞队列实现搜索
    这期内容当中小编将会给大家带来有关Java中怎么利用阻塞队列实现搜索,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。队列以一种先进先出的方式管理数据。如果你试图向一个已经满了的阻塞队列中添加一个元素,或是从...
    99+
    2023-06-17
  • 如何利用Redis和C++实现发布-订阅功能
    如何利用Redis和C++实现发布-订阅功能,需要具体代码示例引言:Redis是一种开源的高性能键值存储系统,它支持多种数据结构,并提供了一系列的客户端库,适用于各种编程语言。Redis的发布-订阅功能是其最常用的功能之一,可以实现消息的发...
    99+
    2023-10-22
    C++ redis 发布订阅
  • 使用Redis怎么实现延迟队列
    本篇文章给大家分享的是有关使用Redis怎么实现延迟队列,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。方案一:采用通过定时任务采用数据库/非关系型数据库轮询方案。优点: 实现简...
    99+
    2023-06-15
  • spring发布订阅模式怎么实现
    在Spring中,可以使用Spring的事件机制来实现发布订阅模式。 创建事件对象:首先,需要创建一个事件对象,该对象包含了需要发...
    99+
    2024-02-29
    spring
  • Redis中的发布订阅和事务怎么使用
    本篇内容主要讲解“Redis中的发布订阅和事务怎么使用”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Redis中的发布订阅和事务怎么使用”吧!发布订阅redis的发布订阅系统有点类似于我们生活中...
    99+
    2023-06-19
  • python中sub-pub机制怎么实现Redis的订阅与发布
    本篇内容介绍了“python中sub-pub机制怎么实现Redis的订阅与发布”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!先介绍一下red...
    99+
    2023-07-05
  • node.js中怎么实现发布订阅模式
    node.js中怎么实现发布订阅模式,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。实例如下://导入内置模块 let Event...
    99+
    2024-04-02
  • Redis中怎么实现实时订阅推送功能
    本篇文章为大家展示了Redis中怎么实现实时订阅推送功能,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。方案1:MQ的延迟投递。MQ虽然支持消息的延迟投递但尺度太大1...
    99+
    2024-04-02
  • 怎么用redis发布订阅方式实现简易的消息系统
    这篇文章主要讲解了“怎么用redis发布订阅方式实现简易的消息系统”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“怎么用redis发布订阅方式实现简易的消息系统”吧!I. 基本使用1. 配置我...
    99+
    2023-06-19
  • FreeRTOS实时操作系统空闲任务的阻塞延时怎么实现
    这篇文章主要介绍“FreeRTOS实时操作系统空闲任务的阻塞延时怎么实现”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“FreeRTOS实时操作系统空闲任务的阻塞延时怎么实现”文章能帮助大家解决问题。...
    99+
    2023-06-29
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作