iis服务器助手广告广告
返回顶部
首页 > 资讯 > 后端开发 > ASP.NET >.Net实现延迟队列
  • 169
分享到

.Net实现延迟队列

2024-04-02 19:04:59 169人浏览 薄情痞子
摘要

目录介绍使用场景方案Redis过期事件配置控制台订阅webapi中订阅RabbitMQ延迟队列生产消息消费消息其他方案介绍 具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就

介绍

具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就是说可以指定队列中的消息在哪个时间点被消费。

使用场景

延时队列在项目中的应用还是比较多的,尤其像电商类平台:

  • 订单成功后,在30分钟内没有支付,自动取消订单
  • 外卖平台发送订餐通知,下单成功后60s给用户推送短信。
  • 如果订单一直处于某一个未完结状态时,及时处理关单,并退还库存
  • 淘宝新建商户一个月内还没上传商品信息,将冻结商铺等

该介绍来自其他文章

方案

下面的例子没有进行封装,所以代码仅供参考

Redis过期事件

注意:

不保证在设定的过期时间立即删除并发送通知,数据量大的时候会延迟比较大

不保证一定送达

发送即忘策略,不包含持久化

但是比如有些场景,对这个时间不是那么看重,并且有其他措施双层保障,该实现方案是比较简单。

redis自2.8.0之后版本提供Keyspace Notifications功能,允许客户订阅Pub / Sub频道,以便以某种方式接收影响Redis数据集事件。

配置

需要修改配置启用过期事件,比如在windows客户端中,需要修改redis.windows.conf文件,在linux中需要修改redis.conf,修改内容是:

-- 取消注释
notify-keyspace-events Ex

-- 注释
#notify-keyspace-events ""

然后重新启动服务器,比如windows

 .\redis-server.exe  .\redis.windows.conf

或者linux中使用Docker-compose重新部署redis

  redis:
    container_name: redis
    image: redis
    hostname: redis
    restart: always
    ports: 
      - "6379:6379"
    volumes: 
      - $PWD/redis/redis.conf:/etc/redis.conf
      - /root/common-docker-compose/redis/data:/data
    command: 
      /bin/bash -c "redis-server /etc/redis.conf" #启动执行指定的redis.conf文件

然后使用客户端订阅事件

-- windows
.\redis-cli
 
-- linux
docker exec -it 容器标识 redis-cli
 
psubscribe __keyevent@0__:expired

控制台订阅

使用StackExchange.Redis组件订阅过期事件

var connectionMultiplexer = ConnectionMultiplexer.Connect(_redisConnection);
var db = connectionMultiplexer.GetDatabase(0);

db.StringSet("orderno:123456", "订单创建", TimeSpan.FromSeconds(10));
Console.WriteLine("开始订阅");

var subscriber = connectionMultiplexer.GetSubscriber();

//订阅库0的过期通知事件
subscriber.Subscribe("__keyevent@0__:expired", (channel, key) =>
{
    Console.WriteLine($"key过期 channel:{channel} key:{key}");
});

Console.ReadLine();

输出结果:

key过期 channel:keyevent@0:expired key:orderno:123456

如果启动多个客户端监听,那么多个客户端都可以收到过期事件。

WEBapi中订阅

创建RedisListenService继承自:BackgroundService

public class RedisListenService : BackgroundService
{
    private readonly ISubscriber _subscriber;

    public RedisListenService(IServiceScopeFactory serviceScopeFactory)
    {
        using var scope = serviceScopeFactory.CreateScope();
        var configuration = scope.ServiceProvider.GetRequiredService<IConfiguration>();

        var connectionMultiplexer = ConnectionMultiplexer.Connect(configuration["redis"]);
        var db = connectionMultiplexer.GetDatabase(0);
        _subscriber = connectionMultiplexer.GetSubscriber();
    }

    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
        //订阅库0的过期通知事件
        _subscriber.Subscribe("__keyevent@0__:expired", (channel, key) =>
        {
            Console.WriteLine($"key过期 channel:{channel} key:{key}");
        });

        return Task.CompletedTask;
    }
}

注册该后台服务

services.AddHostedService<RedisListenService>();

启用项目,给redis指定库设置值,等过期后会接收到过期通知事件。

RabbitMQ延迟队列

版本信息 Rabbitmq版本:3.10.5 Erlang版本:24.3.4.2

要使用rabbitmq做延迟是需要安装插件(rabbitmq_delayed_message_exchange)的

插件介绍:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq

下载地址:Https://GitHub.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

将下载好的插件(d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez)映射到容器的plugins目录下:

docker run -d --name myrabbit -p 9005:15672 -p 5672:5672  -e RABBITMQ_DEFAULT_VHOST=customer -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -v d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez:/plugins/rabbitmq_delayed_message_exchange-3.10.2.ez  rabbitmq:3-management-alpine

进入容器

docker exec -it 容器名称/标识 bash

启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

查看是否启用

rabbitmq-plugins list

[E]和[e]表示启用,然后重启服务

rabbitmq-server restart

然后在管理界面添加交换机可以看到

生产消息

发送的消息类型是:x-delayed-message

[HttpGet("send/delay")]
public string SendDelayedMessage()
{
    var factory = new ConnectionFactory()
    {
        HostName = "localhost",//IP地址
        Port = 5672,//端口号
        UserName = "admin",//用户账号
        PassWord = "123456",//用户密码
        VirtualHost = "customer"
    };
    using var connection = factory.CreateConnection();
    using var channel = connection.CreateModel();

    var exchangeName = "delay-exchange";
    var routingkey = "delay.delay";
    var queueName = "delay_queueName";

    //设置Exchange队列类型
    var argMaps = new Dictionary<string, object>()
    {
        {"x-delayed-type", "topic"}
    };
    //设置当前消息为延时队列
    channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps);
    channel.QueueDeclare(queueName, true, false, false, argMaps);
    channel.QueueBind(queueName, exchangeName, routingkey);

    var time = 1000 * 5;
    var message = $"发送时间为 {DateTime.Now:yyyy-MM-dd HH:mm:ss} 延时时间为:{time}";
    var body = Encoding.UTF8.GetBytes(message);
    var props = channel.CreateBasicProperties();
    //设置消息的过期时间
    props.Headers = new Dictionary<string, object>()
            {
                {  "x-delay", time }
            };
    channel.BasicPublish(exchange: exchangeName, routingKey: routingkey, basicProperties: props, body: body);
    Console.WriteLine("成功发送消息:" + message);

    return "success";
}

消费消息

消费消息我是弄了一个后台任务(RabbitmqDelayedHostService)在处理

public class RabbitmqDelayedHostService : BackgroundService
{
    private readonly IModel _channel;
    private readonly IConnection _connection;

    public RabbitmqDelayedHostService()
    {
        var connFactory = new ConnectionFactory//创建连接工厂对象
        {
            HostName = "localhost",//IP地址
            Port = 5672,//端口号
            UserName = "admin",//用户账号
            Password = "123456",//用户密码
            VirtualHost = "customer"
        };
        _connection = connFactory.CreateConnection();
        _channel = _connection.CreateModel();

        //交换机名称
        var exchangeName = "exchangeDelayed";
        var queueName = "delay_queueName";
        var routingkey = "delay.delay";
        var argMaps = new Dictionary<string, object>()
        {
            {"x-delayed-type", "topic"}
        };
        _channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps);
        _channel.QueueDeclare(queueName, true, false, false, argMaps);
        _channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingkey);
        //声明为手动确认
        _channel.BasicQos(0, 1, false);
    }

    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
        var queueName = "delay_queueName";

        var consumer = new EventingBasicConsumer(_channel);
        consumer.Received += (model, ea) =>
        {
            var message = Encoding.UTF8.GetString(ea.Body.ToArray());
            var routingKey = ea.RoutingKey;
            Console.WriteLine($"接受到消息的时间为 {DateTime.Now:yyyy-MM-dd HH:mm:ss},routingKey:{routingKey} message:{message} ");

            //手动确认
            _channel.BasicAck(ea.DeliveryTag, true);
        };
        _channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

        return Task.CompletedTask;
    }

    public override void Dispose()
    {
        _connection.Dispose();
        _channel.Dispose();
        base.Dispose();
    }
}

注册该后台任务

services.AddHostedService<RabbitmqDelayedHostService>();

输出结果

成功发送消息:发送时间为 2022-07-02 18:54:22 延时时间为:5000

成功发送消息:发送时间为 2022-07-02 18:54:22 延时时间为:5000

成功发送消息:发送时间为 2022-07-02 18:54:22 延时时间为:5000

成功发送消息:发送时间为 2022-07-02 18:54:23 延时时间为:5000

成功发送消息:发送时间为 2022-07-02 18:54:23 延时时间为:5000

成功发送消息:发送时间为 2022-07-02 18:54:23 延时时间为:5000

接受到消息的时间为 2022-07-02 18:54:27,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:22 延时时间为:5000

接受到消息的时间为 2022-07-02 18:54:27,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:22 延时时间为:5000

接受到消息的时间为 2022-07-02 18:54:27,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:22 延时时间为:5000

接受到消息的时间为 2022-07-02 18:54:28,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:23 延时时间为:5000

接受到消息的时间为 2022-07-02 18:54:28,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:23 延时时间为:5000

接受到消息的时间为 2022-07-02 18:54:28,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:23 延时时间为:5000

其他方案

  • Hangfire延迟队列
BackgroundJob.Schedule(
  () => Console.WriteLine("Delayed!"),
   TimeSpan.FromDays(7));
  • 时间轮
  • Redisson DelayQueue
  • 计时管理器

到此这篇关于.net实现延迟队列的文章就介绍到这了。希望对大家的学习有所帮助,也希望大家多多支持编程网。

--结束END--

本文标题: .Net实现延迟队列

本文链接: https://www.lsjlt.com/news/153464.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • .Net实现延迟队列
    目录介绍使用场景方案Redis过期事件配置控制台订阅WebApi中订阅RabbitMq延迟队列生产消息消费消息其他方案介绍 具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就...
    99+
    2022-11-13
  • Redis延迟队列和分布式延迟队列的简答实现
            最近,又重新学习了下Redis,Redis不仅能快还能慢,简直利器,今天就为大家介绍一下Redi...
    99+
    2022-11-12
  • 怎么在Redis中实现延迟队列和分布式延迟队列
    这篇文章给大家介绍怎么在Redis中实现延迟队列和分布式延迟队列,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。1. 实现一个简单的延迟队列。  我们知道目前JAVA可以有DelayedQueue,我们首先开一个Dela...
    99+
    2023-06-15
  • RabbitMQ延迟队列
    目录 一、概念 二、使用场景 三、RabbitMQ 中的 TTL (一)队列设置 TTL (二)消息设置 TTL (三)两者的区别 四、整合SpringBoot实现延迟队列 (一)创建项目 (二)添加依赖 (三)修改配置文件 (四)添加Sw...
    99+
    2023-09-01
    rabbitmq 分布式 java-rabbitmq 后端 java
  • 如何实现Redis延迟队列
    这期内容当中小编将会给大家带来有关如何实现Redis延迟队列,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。延迟队列,顾名思义它是一种带有延迟功能的消息队列。那么,是在什么...
    99+
    2022-10-18
  • Redis如何实现延迟队列
    目录Redis实现延迟队列Redis延迟队列Redis实现延时队列的优化方案延时队列的应用延时队列的实现总结Redis实现延迟队列 Redis延迟队列 Redis 是通过有序集合(ZSet)的方式来实现延迟消息队列的,Z...
    99+
    2023-04-28
    Redis延迟队列 Redis实现延迟队列 Redis队列
  • Go+Redis实现延迟队列实操
    目录前言简单的实现定义消息PushConsume存在的问题多消费者实现定义消息PushConsume存在的问题总结前言 延迟队列是一种非常使用的数据结构,我们经常有需要延迟推送处理消...
    99+
    2022-11-11
  • 如何实现一个延迟队列
    本篇内容介绍了“如何实现一个延迟队列”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!延迟队列定义首先,队列这...
    99+
    2022-10-19
  • 基于Golang实现延迟队列(DelayQueue)
    目录背景原理堆随机删除重置元素到期时间Golang实现数据结构实现原理添加元素阻塞获取元素Channel方式阻塞读取性能测试总结背景 延迟队列是一种特殊的队列,元素入队时需要指定到期...
    99+
    2022-11-11
  • 使用Redis怎么实现延迟队列
    本篇文章给大家分享的是有关使用Redis怎么实现延迟队列,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。方案一:采用通过定时任务采用数据库/非关系型数据库轮询方案。优点: 实现简...
    99+
    2023-06-15
  • Redis实现延迟队列方法介绍
    延迟队列,顾名思义它是一种带有延迟功能的消息队列。那么,是在什么场景下我才需要这样的队列呢? 1. 背景 我们先看看以下业务场景: 当订单一直处于未支付状态时,如何及时的关闭订单如何定期检查处于退款状态的订单是否已经退款成功在订单长时间没有...
    99+
    2023-09-17
    redis java java-rabbitmq
  • Java如何实现异步延迟队列
    这篇文章主要介绍“Java如何实现异步延迟队列”,在日常操作中,相信很多人在Java如何实现异步延迟队列问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Java如何实现异步延迟队列”的疑惑有所帮助!接下来,请跟...
    99+
    2023-07-05
  • ThinkPHP6 think-queue 消息队列(延迟队列)
    安装 composer require topthink/think-queue 配置 配置文件位于 config/queue.php [ 'default'=>'sync' //...
    99+
    2023-09-04
    php 数据库 mysql
  • thinkphp6、thinkphp5.0 使用think-queue实现普通队列和延迟队列
    何为异步消息队列: 所谓消息队列,就是一个以队列数据结构为基础的一个实体,这个实体是真实存在的,比如程序中的数组,数据库中的表,或者redis等等,都可以。 异步队列的作用: 个人认为消息队列的主...
    99+
    2023-08-31
    redis php
  • 基于Redis延迟队列的实现代码
    使用场景 工作中大家往往会遇到类似的场景: 1.对于红包场景,账户 A 对账户 B 发出红包通常在 1 天后会自动归还到原账户。 2.对于实时支付场景,如果账户 A 对商户 S 付款...
    99+
    2022-11-12
  • thinkphp6使用think-queue怎么实现普通队列和延迟队列
    本文小编为大家详细介绍“thinkphp6使用think-queue怎么实现普通队列和延迟队列”,内容详细,步骤清晰,细节处理妥当,希望这篇“thinkphp6使用think-queue怎么实现普通队列和延迟队列”文章能帮助大家解决疑惑,下...
    99+
    2023-06-30
  • Redis实现延迟队列的全流程详解
    目录1、前言1.1、什么是延迟队列1.2、应用场景1.3、为什么要使用延迟队列2、Redis sorted set3、Redis 过期键监听回调4、Quartz定时任务5、Delay...
    99+
    2023-03-14
    Redis延迟队列实现 Redis延迟队列原理
  • Redis实现延迟队列的方法是什么
    这篇文章主要介绍“Redis实现延迟队列的方法是什么”,在日常操作中,相信很多人在Redis实现延迟队列的方法是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Redis实现延迟队列的方法是什么”的疑惑有所...
    99+
    2023-07-05
  • RabbitMQ消息队列怎么实现延迟任务
    这篇文章主要介绍“RabbitMQ消息队列怎么实现延迟任务”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“RabbitMQ消息队列怎么实现延迟任务”文章能帮助大家解决问题。一、序言延迟任务应用广泛,延...
    99+
    2023-06-29
  • Golang实现基于Redis的可靠延迟队列
    目录前言原理详解pending2ReadyScriptready2UnackScriptunack2RetryScriptackconsume前言 在之前探讨延时队列的文章中我们提到了 redisson delayque...
    99+
    2022-06-22
    Golang Redis可靠延迟队列 Golang Redis 延迟队列 Golang 延迟队列
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作