iis服务器助手广告广告
返回顶部
首页 > 资讯 > 后端开发 > 其他教程 >基于Redis实现阻塞队列的方式
  • 374
分享到

基于Redis实现阻塞队列的方式

2024-04-02 19:04:59 374人浏览 安东尼
摘要

日常需求开发过程中,不免会遇到需要通过代码进行异步处理的情况,比如批量发送邮件,批量发送短信,数据导入,为了减少用户的等待,不希望一直菊花转啊转,因此需要进行异步处理,做法就是讲要处

日常需求开发过程中,不免会遇到需要通过代码进行异步处理的情况,比如批量发送邮件,批量发送短信,数据导入,为了减少用户的等待,不希望一直菊花转啊转,因此需要进行异步处理,做法就是讲要处理的数据添加到队列当中,然后按照排队的先后顺序进行异步处理。

这个队列,可以是专业的消息队列,如 RocketMQ/RabbitMQ 等,一般项目中,如果只是为了进行异步,未免有点杀鸡用牛刀的意味。
也可以使用基于 JVM 内存实现队列,但是如果项目进行了重启,就会造成队列数据丢失。
大部分的项目都会用到 Redis 中间件作为缓存使用,此时使用 Redis 的 list 结构来实现队列则是非常合适的选择。

因此,本文主要讲解基于 Redis 的方式实现异步队列。

本文首发个人技术博客: https://nullpointer.pw/redis-block-queue.html

基于 Redis 的 list 实现队列的方式也有多种,先说第一种不推荐的方式,即使用LPUSH生产消息,然后 while(true) 中通过RPOP消费消息,这种方式的确可以实现,但是不断代码不断的轮询,势必会消耗一些系统的资源。

第二种方式也是不推荐的方式,也是通过 LPUSH生产消息,然后通过 BRPOP 进行阻塞地等待并消费消息,这种方式较第一种方式减少了无用的轮询,降低系统资源的消耗,但是可能会存在队列消息丢失的情况,如果取出了消息然后处理失败,这个被取出的消息就将丢失。

第二种方式就是下文要介绍的方式,首先也是通过 LPUSH 生产消息,然后通过 BRPOPLPUSH阻塞地等待 list 新消息到来,有了新消息才开始消费,同时将消息备份到另外一个 list 当中,这种方式具备了第二种方式的优点,即减少了无用的轮询,同时也对消息进行了备份不会丢失数据,如果处理成功,可以通过 LREM 对备份的 list 中当前的这条消息进行删除处理。这种方式实现方式可以参考 模式: 安全的队列 .

Redis 基础


# 将一个或多个值 value 插入到列表 key 的表头
LPUSH key value [value …]

# 阻塞式等待,将列表 source 中的最后一个元素 (尾元素) 弹出,并返回给客户端。将 source 弹出的元素插入到列表 destination ,作为 destination 列表的的头元素。超时参数 timeout 接受一个以秒为单位的数字作为值。超时参数设为 0 表示阻塞时间可以无限期延长 (block indefinitely) 。
BRPOPLPUSH source destination timeout

# 根据参数 count 的值,移除列表中与参数 value 相等的元素。
LREM key count value

代码实现队列消息生产者

笔者使用的是 spring 相关 api 实现对 Redis 指令的调用。首先实现消息的生产代码,封装到一个工具类方法当中。这里很简单,就是调用了 lpush 方法,将序列化的 key 和 value 添加到列表当中去。


@Resource
private RedisConnectionFactory connectionFactory;

public void lPush(@Nonnull String key, @Nonnull String value) {
  RedisConnection connection = RedisConnectionUtils.getConnection(connectionFactory);
  try {
    byte[] byteKey = RedisSerializer.string().serialize(geTKEy(key));
    byte[] byteValue = RedisSerializer.string().serialize(value);
    assert byteKey != null;
    connection.lPush(byteKey, byteValue);
  } finally {
    RedisConnectionUtils.releaseConnection(connection, connectionFactory);
  }
}

代码实现队列消息消费者

因为实现队列消费消息的代码比较多,不可能每个需要阻塞消费的地方,对需要写这一坨代码,因此使用 Java8 的函数式接口实现方法的传递,同时阻塞式获取消息代码使用新线程去执行。

有人看到以下代码要吐槽了,不是说不用 while(true) 吗,怎么你这里面还是有,这里稍微解释一下,因为 SpringBoot 一般会指定 timeout 的全局超时时间,即使 BRPOPLPUSH 设置了 0,即无限期,当超出了 timeout 设置的值时,就会抛出 QueryTimeoutException 异常导致线程退出,因此添加了 try/catch 对异常进行捕获并忽略,同时使用 while(true) 保证线程可以继续执行。
代码中记录了当前消息处理结果,如果处理结果为成功,需要对备份队列的当前消息进行删除。


public void bRPopLPush(@Nonnull String key, Consumer<String> consumer) {
  CompletableFuture.runAsync(() -> {
    RedisConnection connection = RedisConnectionUtils.getConnection(connectionFactory);
    try {
      byte[] srcKey = RedisSerializer.string().serialize(getKey(key));
      byte[] dstKey = RedisSerializer.string().serialize(getBackupKey(key));
      assert srcKey != null;
      assert dstKey != null;
      while (true) {
        byte[] byteValue = new byte[0];
        boolean success = false;
        try {
          byteValue = connection.bRPopLPush(0, srcKey, dstKey);
          if (byteValue != null && byteValue.length != 0) {
            consumer.accept(new String(byteValue));
            success = true;
          }
        } catch (Exception ignored) {
          // 防止获取 key 达到超时时间抛出 QueryTimeoutException 异常退出
        } finally {
          if (success) {
            // 处理成功才删除备份队列的 key
            connection.lRem(dstKey, 1, byteValue);
          }
        }
      }
    } finally {
      RedisConnectionUtils.releaseConnection(connection, connectionFactory);
    }
  });
}

测试代码


@Test
public void testLPush() throws InterruptedException {
  String queueA = "queueA";
  int i = 0;
  while (true) {
    String msg = "Hello-" + i++;
    redisBlockQueue.lPush(queueA, msg);
    System.out.println("lPush: " + msg);
    Thread.sleep(3000);
  }
}

@Test
public void testBRPopLPush() {
  String queueA = "queueA";
  redisBlockQueue.bRPopLPush(queueA, (val) -> {
    // 在这里处理具体的业务逻辑
    System.out.println("val: " + val);
  });

  // 防止 Junit 进程退出
  LockSupport.park();
}

项目使用方式

为了方便使用,我将其抽取为了一个工具类,使用时通过 Spring 注入使用即可,
队列消费可以使用如下方式在项目启动的时候就进行阻塞监听队列,等待消费


@Resource
private RedisBlockQueue redisBlockQueue;

@PostConstruct
public void init() {
   redisBlockQueue.bRPopLPush(xx, (value) -> {
     //...
   });
}

本文完整代码下载GitHub 地址

到此这篇关于基于Redis实现阻塞队列的文章就介绍到这了,更多相关Redis阻塞队列内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

--结束END--

本文标题: 基于Redis实现阻塞队列的方式

本文链接: https://www.lsjlt.com/news/160357.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • 基于Redis实现阻塞队列的方式
    日常需求开发过程中,不免会遇到需要通过代码进行异步处理的情况,比如批量发送邮件,批量发送短信,数据导入,为了减少用户的等待,不希望一直菊花转啊转,因此需要进行异步处理,做法就是讲要处...
    99+
    2022-11-12
  • 基于Redis如何实现阻塞队列
    这篇文章主要介绍“基于Redis如何实现阻塞队列”,在日常操作中,相信很多人在基于Redis如何实现阻塞队列问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”基于Redis如何实现阻塞队列”的疑惑有所帮助!接下来...
    99+
    2023-06-22
  • redis实现队列的阻塞、延时、发布和订阅
    目录普通队列阻塞队列发布订阅模式延时队列和优先级队列应用场景Redis不仅可作为缓存服务器,还可以用作消息队列。它的列表类型天生支持用作消息队列。如下图所示: 由于Redis的列表...
    99+
    2022-11-13
  • Java阻塞队列的实现及应用
    目录1.手写生产者消费者模型2.手写定时器总结1.手写生产者消费者模型 所谓生产者消费者模型,可以用我们生活中的例子来类比:我去一个小摊儿买吃的,老板把已经做好的小吃都放在摆盘上,供...
    99+
    2022-11-12
  • redis怎么实现队列阻塞、延时、发布和订阅
    这篇“redis怎么实现队列阻塞、延时、发布和订阅”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“redis怎么实现队列阻塞、...
    99+
    2023-07-02
  • redis如何实现队列的阻塞、延时、发布和订阅
    这篇文章主要介绍了redis如何实现队列的阻塞、延时、发布和订阅的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇redis如何实现队列的阻塞、延时、发布和订阅文章都会有所收获,下...
    99+
    2022-10-19
  • Java阻塞队列的实现原理是什么
    本篇文章给大家分享的是有关Java阻塞队列的实现原理是什么,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。BlockingQueue接口提供了3个添加元素方法:add:添加元素到...
    99+
    2023-06-17
  • 详解Java阻塞队列(BlockingQueue)的实现原理
    阻塞队列 (BlockingQueue)是Java util.concurrent包下重要的数据结构,BlockingQueue提供了线程安全的队列访问方式:当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满;从阻塞队列取...
    99+
    2023-05-31
    java 阻塞队列 ava
  • 基于Redis延迟队列的实现代码
    使用场景 工作中大家往往会遇到类似的场景: 1.对于红包场景,账户 A 对账户 B 发出红包通常在 1 天后会自动归还到原账户。 2.对于实时支付场景,如果账户 A 对商户 S 付款...
    99+
    2022-11-12
  • 基于Redis实现分布式锁以及任务队列
    一、前言   双十一刚过不久,大家都知道在天猫、京东、苏宁等等电商网站上有很多秒杀活动,例如在某一个时刻抢购一个原价1999现在秒杀价只要999的手机时,会迎来一个用户请求的高峰期,可能会有几十万几百万的并...
    99+
    2022-06-04
    队列 分布式 Redis
  • Golang实现基于Redis的可靠延迟队列
    目录前言原理详解pending2ReadyScriptready2UnackScriptunack2RetryScriptackconsume前言 在之前探讨延时队列的文章中我们提到了 redisson delayque...
    99+
    2022-06-22
    Golang Redis可靠延迟队列 Golang Redis 延迟队列 Golang 延迟队列
  • 基于Redis实现延时队列的优化方案小结
    目录一、延时队列的应用二、延时队列的实现三、总结一、延时队列的应用 近期在开发部门的新项目,其中有个关键功能就是智能推送,即根据用户行为在特定的时间点向用户推送相应的提醒消息,比如以下业务场景: 在用户点击充值项后,半小...
    99+
    2022-07-05
    Redis延时队列 Redis延时队列优化
  • 百行代码实现基于Redis的可靠延迟队列
    目录原理详解pending2ReadyScriptready2UnackScriptunack2RetryScriptackconsume在之前探讨延时队列的文章中我们提到了 redisson delayqueue 使用...
    99+
    2022-06-23
    Redis可靠延迟队列 Redis延迟队列
  • .NETCore基于RabbitMQ实现延时队列的两方法
    目录前言实现延时队列的两种方式利用rabbitmq死信队列x-dead-letter-exchange和x-dead-letter-routing-key.NETCore实现方式ra...
    99+
    2022-11-13
  • 微服务Spring Boot 整合Redis 阻塞队列实现异步秒杀下单思路详解
    目录⛅引言一、秒杀优化 - 异步秒杀思路二、秒杀优化 - 基于Redis完成秒杀资格判断三、基于阻塞队列完成异步秒杀下单四、测试程序五、源码地址⛅引言 本章节,介绍使用阻塞队列实现秒...
    99+
    2022-11-13
    Spring Boot 整合Redis 阻塞队列 springboot异步秒杀下单
  • redis实现消息队列的方法
    这期内容当中的小编将会给大家带来有关redis实现消息队列的方法,以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。消息队列,Message Queue,常用于解决并发系统中的资源一致性问题...
    99+
    2022-10-18
  • redis实现延时队列的两种方式(小结)
    背景 项目中的流程监控,有几种节点,需要监控每一个节点是否超时。按传统的做法,肯定是通过定时任务,去扫描然后判断,但是定时任务有缺点:1,数据量大会慢;2,时间不好控制,太短,怕一...
    99+
    2022-11-12
  • Redis延迟队列和分布式延迟队列的简答实现
            最近,又重新学习了下Redis,Redis不仅能快还能慢,简直利器,今天就为大家介绍一下Redi...
    99+
    2022-11-12
  • 基于 Redis 实现接口限流的方式
    目录基于 Redis 实现接口限流1. 准备工作2. 限流注解3. 定制 RedisTemplate4. 开发 Lua 脚本5. 注解解析6. 接口测试7. 全局异常处理总结基于 R...
    99+
    2022-11-13
  • 基于Java数组实现循环队列的两种方法小结
    用java实现循环队列的方法:1、添加一个属性size用来记录眼下的元素个数。目的是当head=rear的时候。通过size=0还是size=数组长度。来区分队列为空,或者队列已满。2、数组中仅仅存储数组大小-1个元素,保证rear转一圈之...
    99+
    2023-05-30
    java 数组 循环队列
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作