iis服务器助手广告
返回顶部
首页 > 资讯 > 精选 >怎么基于sqlite实现kafka延时消息
  • 716
分享到

怎么基于sqlite实现kafka延时消息

2023-06-26 04:06:54 716人浏览 安东尼
摘要

这篇文章主要介绍“怎么基于sqlite实现kafka延时消息”,在日常操作中,相信很多人在怎么基于sqlite实现kafka延时消息问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”怎么基于sqlite实现kaf

这篇文章主要介绍“怎么基于sqlite实现kafka延时消息”,在日常操作中,相信很多人在怎么基于sqlite实现kafka延时消息问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”怎么基于sqlite实现kafka延时消息”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

    1、需求

    延时消息(或者说定时消息)是业务系统里一个常见的功能点。常用业务场景如:

    1) 订单超时取消

    2) 离线超过指定时间的用户,召回通知

    3) 手机消失多久后通知监护人……

    现流行的实现方案主要有:

    1)数据库定时轮询,扫描到达到延时时间的记录,业务处理,删除该记录

    2)jdk 自带延时队列(DelayQueue),或优化的时间轮算法

    3)redis 有序集合

    4)支持延时消息的分布式消息队列

    但以上方案,都存在各种缺陷:

    1)定时轮询间隔小,则对数据库造成很大压力,分布式微服务架构不好适配。

    2)jdk 自带延时队列,占用内存高,服务重启则丢失消息,分布式微服务架构不好适配。

    3)Redis 有序集合比较合适,但内存贵,分布式微服务架构不好适配。

    4)现在主流的 RocketMQ 不支持任意延时时间的延时消息,RabbitMQ或ActiveMQ 性能不够好,发送配置麻烦,kafka不支持延时消息。

    因此,我想实现一个适配分布式微服务架构、高性能、方便业务系统使用的延时消息转发中间件

    2、实现思路

    要保证高性能,推荐使用 kafka 或者 RocketMQ 做分布式消息队列。当前是基于 sqlite 实现 kafka 延时消息。

    当前实现思路是基于kafka的,实际适用于任意MQ产品。

    2.1 整体实现思路

    怎么基于sqlite实现kafka延时消息

    2.2 程序业务逻辑

    1)业务系统先推送延时消息到统一延时消息队列

    2)定时读取延时消息队列的延时消息,保存于本地,提交偏移量

    3)定时扫描本地到达延时期限的消息,转发到实际业务消息队列

    4)删除本地延时消息

    2.3 实现细节

    1)一个业务处理流程使用一个sqlite数据库文件,可并发执行提高性能。

    2)使用雪花算法生成 id 。

    3)没有延时消息时,线程休眠一定时间,减低kafka集群、和本地io压力。

    4)本地存储使用 sqlite。

    2.4 依赖框架

    1)kafka-client

    2)sqlite

    3)slf4j+log4j2

    4)jackson

    3、性能测试

    测试机器: i5-6500,16GB内存,机械硬盘

    延时消息大小: 1kb

    并发处理数:1

    已本地简单测试,性能表现:

    1) 1个并发处理数就可以达到1秒存储、转发、删除 约15000条延时消息,2 个可以达到 30000条/s ……

    2) 一次性处理1万条记录,是经过多次对比试验得出的合适批次大小

    也测试了其它两个本地存储方案的性能:

    1)直接存读 JSON 文件,读写性能太差(约1200条记录/s,慢在频繁创建、打开、关闭文件,随机磁盘io);

    2)RocksDB 存读,写入性能非常好(97000条记录/s),但筛选到期延时消息性能太差了,在数据量大于100w时,表现不如 sqlite,而且运行时占用内存、cpu 资源非常高。

    4、部署

    4.1 系统环境依赖

    1)jdk 1.8

    2)kafka 1.1.0

    可以自行替换为符合实际kafka版本的jar包(不会有冲突的,jar包版本和kafka服务版本不一致可能会有异常[无法拉取消息、提交失败等])。

    可修改pom.xml内的 kafka_version

    <kafka_version>1.1.0</kafka_version>

    重新打包即可。当前程序可以独立部署,对现有工程项目无侵入性。

    4.2 安装

    1)在项目根目录执行 Maven 打包后,会生成 dev_ops 文件

    2)在 dev_ops 目录下执行 java -jar kafka_delay_sqlite-20220102.jar 即可启动程序

    3)如需修改配置,可在dev_ops目录内创建kafka.properties文件,设置自定义配置

    默认配置如下:

    # kafka 连接url [ip:port,ip:port……]kafka.url=127.0.0.1:9092# 延时消息本地存储路径,建议使用绝对值kafka.delay.store.path=/data/kafka_delay# 统一延时消息topickafka.delay.topic=common_delay_msg# 消费者组idkafka.delay.group.id=common_delay_app# 并发处理数。限制条件: workers 小于等于topic分区数kafka.delay.workers=2

    4)业务方发送 kafka 消息到 topic (common_delay_msg)

    消息体参数说明:

    {  "topic": "实际业务topic",  "messageKey": "消息的key,影响发送到那个分区",  "message": "业务消息内容",  "delayTime": 1641470704}

    delayTime: 指定延时时限,秒级别时间戳

    消息体案例:

    {  "topic": "cancel_order",  "messageKey": "123456",  "message": "{\"orderId\":123456789123456,\"userId\":\"yhh\"}",  "delayTime": 1641470704}

    4.3 程序迁移

    复制 延时消息保存目录 到新机器,重启部署、启动程序即可。(该配置项所在目录 kafka.delay.store.path=/data/kafka_delay)

    4.4 排查日志

    日志默认输出到 /logs/kafka_delay/ ,日志输出方式为异步输出。

    system.log 记录了系统 info 级别以上的日志,info级别日志不是立刻输出的,所以程序重启时,可能会丢失部分日志

    exception.log 记录了系统 warn 级别以上的日志,日志配置为立即输出,程序正常重启,不会丢失日志,重点关注这个日志即可。

    如需自定义日志配置,可以在 log4j2.xml 进行配置。

    如果要进行本地调试,可以解开注释,否则控制台没有日志输出:

            <Root level="info">            <!--非本地调试环境下,建议注释掉 console_appender-->            <!--<AppenderRef ref="console_appender"/>-->            <AppenderRef ref="system_log_appender"/>            <AppenderRef ref="system_error_log_appender"/>        </Root>

    5、注意事项

    1) 由于设置了线程空闲时休眠机制,延时消息最大可能会推迟8秒钟发送。

    如果觉得延迟时间比较大,可以自行修改源码的配置,重新打包即可。

    KafkaUtils.subscribe()

    MsgTransferTask.run()

    2) 当前程序严格依赖于系统时钟,注意配置程序部署服务器的时钟和业务服务器时钟一致

    3) 建议配置统一延时消息队列(common_delay_msg)的分区数为 2 的倍数

    4) 每个 kafka.delay.workers 约需要 200 mb 内存,默认配置为2 , JVM 建议配置 1 GB 以上内存,避免频繁GC

    workers 增大后,不要再减小,否则会导致部分 sqlite 数据库没有线程访问,消息丢失。

    并发处理数越大,延时消息处理效率越高,但需要注意不要大于topic的分区数。

    需要自行测试多少个并发处理数就会达到磁盘io、网络带宽上限。

    当前程序主要瓶颈在于磁盘io和网络带宽,实际内存和cpu资源占用极低。

    5) 程序运行时,不要操作延时消息保存目录即里面的文件

    6) 当前配置为正常情况下不会抛弃消息模式,但程序重启时,存在重复发送消息的可能,下游业务系统需要做好幂等性处理。

    如果kafka集群异常,当前配置为重新发送16次,如果仍不能恢复过来,则抛弃当前消息,实际生产环境里,基本不可能出现该场景。

    如果确定消息不能抛弃,需要自行修改源码(MsgTransferTask.run,KafkaUtils.send(&hellip;&hellip;)),重新打包、部署。

    7) 程序出现未知异常(sqlite被手动修改、磁盘满了&hellip;&hellip;),会直接结束程序运行。

    到此,关于“怎么基于sqlite实现kafka延时消息”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注编程网网站,小编会继续努力为大家带来更多实用的文章!

    --结束END--

    本文标题: 怎么基于sqlite实现kafka延时消息

    本文链接: https://www.lsjlt.com/news/306762.html(转载时请注明来源链接)

    有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

    本篇文章演示代码以及资料文档资料下载

    下载Word文档到电脑,方便收藏和打印~

    下载Word文档
    猜你喜欢
    • 怎么基于sqlite实现kafka延时消息
      这篇文章主要介绍“怎么基于sqlite实现kafka延时消息”,在日常操作中,相信很多人在怎么基于sqlite实现kafka延时消息问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”怎么基于sqlite实现kaf...
      99+
      2023-06-26
    • 如何基于sqlite实现kafka延时消息详解
      目录1、需求2、实现思路2.1 整体实现思路2.2 程序业务逻辑2.3 实现细节2.4 依赖框架3、性能测试4、部署4.1 系统环境依赖4.2 安装4.3 程序迁移4.4 排查日志5...
      99+
      2024-04-02
    • 基于kafka实现SpringCloudBus消息总线
      目录一、什么是消息总线二、整合消息总线实现配置自动刷新2.1 面向客户端基本架构2.2 面向服务端的架构三、利用kafka实现消息总线3.1 Spring Boot 整合kafka3...
      99+
      2024-04-02
    • 基于kafka怎么实现Spring Cloud Bus消息总线
      这篇文章主要介绍“基于kafka怎么实现Spring Cloud Bus消息总线”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“基于kafka怎么实现Spring Clo...
      99+
      2023-06-30
    • Go+Kafka实现延迟消息的实现示例
      目录前言原理简单的实现生产者延迟服务消费者改进点通用的延迟服务生产者负责延迟服务总结前言 延迟队列是一个非常有用的工具,我们经常遇到需要使用延迟队列的场景,比如延迟通知,订单关闭等等...
      99+
      2024-04-02
    • kafka延时队列怎么实现
      Kafka是一个分布式的消息队列系统,它本身并不直接支持延时队列的功能。但是可以通过一些策略来实现延时队列的功能,下面是一种常见的实...
      99+
      2023-08-08
      kafka
    • Java实现Redis延时消息队列
      目录什么是延时任务 延时任务的特点 实现思路: 代码实现 1.消息模型2.RedisMq 消息队列实现类3.消息生产者 4.消息消费者 5. 消息执接口 6. 任务类型的实现类:可以...
      99+
      2024-04-02
    • Redis中如何实现消息队列和延时消息队列
      这篇文章将为大家详细讲解有关Redis中如何实现消息队列和延时消息队列,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。list的几个命令lpush (left push)由...
      99+
      2024-04-02
    • RabbitMQ消息队列怎么实现延迟任务
      这篇文章主要介绍“RabbitMQ消息队列怎么实现延迟任务”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“RabbitMQ消息队列怎么实现延迟任务”文章能帮助大家解决问题。一、序言延迟任务应用广泛,延...
      99+
      2023-06-29
    • 基于JS怎么实现消消乐游戏
      这篇文章主要讲解了“基于JS怎么实现消消乐游戏”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“基于JS怎么实现消消乐游戏”吧!游戏的准备工作首先我们思考游戏的机制: 游戏有一个“棋盘”,是一个...
      99+
      2023-06-30
    • 基于Canal以及消息队列实现MySQL的Binlog近实时同步
      基于Canal以及消息队列实现MySQL的Binlog近实时同步 1.canal的应用场景 目前普遍基于日志增量订阅和消费的业务,主要包括 基于数据库增量日志解析,提供增量数据订阅和消费数据库镜像数据库实时备份索引构建和实时维护(拆分异构索...
      99+
      2023-08-21
      mysql 数据库 java
    • JavaScript基于ChatGPT实现打字机消息回复
      目录1 背景2 简介3 服务端实现3.1 协议3.2 消息格式3.2.1 event3.2.2 id3.2.3 retry3.2.4 data3.3 示例4 浏览器 API4.1 建...
      99+
      2023-05-19
      JavaScript基于ChatGPT实现消息回复 JavaScript ChatGPT消息回复 JavaScrip ChatGPT
    • Python基于钉钉监控发送消息提醒怎么实现
      这篇文章主要介绍“Python基于钉钉监控发送消息提醒怎么实现”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“Python基于钉钉监控发送消息提醒怎么实现”文章能帮助大家解决问题。一.使用前设置钉钉既...
      99+
      2023-07-02
    • 如何利用rabbitMq的死信队列实现延时消息
      目录前言mq基本的消息模型mq死信队列的消息模型maven依赖配置普通队列和死信队列死信队列消费者发送消息测试测试成功总结前言 使用mq自带的死信去实现延时消息要注意一个坑点,就是m...
      99+
      2023-01-28
      rabbitMq死信队列 rabbitMq延时消息 rabbitMq延时队列
    • Pytorch实现GCN(基于Message Passing消息传递机制实现)
      文章目录 前言 一、导入相关库 二、加载Cora数据集 三、定义GCN网络 3.1 定义GCN层 3.1.1 消息传递阶段(message...
      99+
      2023-09-03
      pytorch 深度学习 python 人工智能 神经网络
    • Kafka怎么实现消息的持久性和高可靠性
      Kafka实现消息的持久性和高可靠性主要通过以下几个方面: 分区复制:Kafka采用分区复制的机制来实现数据的持久性和高可靠性。...
      99+
      2024-03-14
      Kafka
    • .NETCore基于RabbitMQ实现延时队列的两方法
      目录前言实现延时队列的两种方式利用rabbitmq死信队列x-dead-letter-exchange和x-dead-letter-routing-key.NETCore实现方式ra...
      99+
      2024-04-02
    • Android基于Sqlite怎么实现注册和登录功能
      本篇内容主要讲解“Android基于Sqlite怎么实现注册和登录功能”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Android基于Sqlite怎么实现注册和登录功能”吧!实现逻辑项目的图片结...
      99+
      2023-06-30
    • 基于rabbitmq延迟插件怎么实现分布式延迟任务
      本文小编为大家详细介绍“基于rabbitmq延迟插件怎么实现分布式延迟任务”,内容详细,步骤清晰,细节处理妥当,希望这篇“基于rabbitmq延迟插件怎么实现分布式延迟任务”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知...
      99+
      2023-06-26
    • 基于WPF实现带蒙版的MessageBox消息提示框
      介绍 框架使用大于等于.NET40; Visual Studio 2022; 项目使用 MIT 开源许可协议; Nuget Install-Packag...
      99+
      2022-11-13
      WPF消息提示框 WPF 提示框 WPF 消息框
    软考高级职称资格查询
    编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
    • 官方手机版

    • 微信公众号

    • 商务合作