广告
返回顶部
首页 > 资讯 > 后端开发 > Python >消息队列-kafka消费异常问题
  • 821
分享到

消息队列-kafka消费异常问题

2024-04-02 19:04:59 821人浏览 独家记忆

Python 官方文档:入门教程 => 点击学习

摘要

目录概述重试一定次数(消息丢失)加入到死讯队列(消息不丢失)总结概述 在kafka中,或者是说在任何消息队列中都有个消费顺序的问题。为了保证一个队列顺序消费,当当中一个消息消费异常时

概述

kafka中,或者是说在任何消息队列中都有个消费顺序的问题。为了保证一个队列顺序消费,当当中一个消息消费异常时,必将影响后续队列消息的消费,这样业务岂不是卡住了。比如笔者举个最简单的例子:我发送1-100的消息,在我的处理逻辑当中 msg%5==0我就进行 int i=1/0操作,这必将抛异常,一直阻塞在msg=5上,后面6-100无法消费。下面笔者给出解决方案。

重试一定次数(消息丢失)


@KafkaHandler
    @KafkaListener(topics = {"quickstart-events"},groupId = "test-consumer-group-2", concurrency = "1")
    public void test6(String msg){
              businessProcess(msg);
            }
           private void businessProcess(String msg){
        System.out.println("接收到消息:" + msg + "--" + System.currentTimeMillis() + "---" + Thread.currentThread().hashCode());
       if (Integer.valueOf(msg) % 5 == 0) {
            int i = 1 / 0;
        }
    }

说明:如果读者使用的是java客户端,也就是spring进行实现,那么在不做任何处理的情况下,会自动重试10次,然后消息会被直接处理掉。也就是说如果你的业务允许消息丢失,那么你不需要额外的编码处理

加入到死讯队列(消息不丢失)

消费端代码:


//1.启用手动提交offset
//2.配置errorHandler,用来加入到死讯队列
//3.不管业务处理是否处理异常还是正常都提交offset
@KafkaHandler
    @KafkaListener(topics = {"quickstart-events"},groupId = "test-consumer-group-2",
            errorHandler ="kafkaListenerErrorHandler", concurrency = "1")
    public void test6(String msg,Acknowledgment ack){
        try {
            businessProcess(msg);
        }finally {
            //手动提交
            ack.acknowledge();
        }
    }
//1.专门处理死讯队列消息,都是topicName+.DLT的主题
//2.死讯队列里,只有消费成功的才提交offset,否则等待bug修复完上线,继续处理
    @KafkaHandler
    @KafkaListener(topics = {"quickstart-events.DLT"},groupId = "test-consumer-group-2", concurrency = "1")
    public void test7(String msg,Acknowledgment ack){
        try {
            businessProcess(msg);
            ack.acknowledge();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
//业务代码
    private void businessProcess(String msg){
        System.out.println("接收到消息:" + msg + "--" + System.currentTimeMillis() + "---" + Thread.currentThread().hashCode());
        if (Integer.valueOf(msg) % 5 == 0) {
            int i = 1 / 0;
        }
    }

异常处理器


//1.向容器注册一个KafkaListenerErrorHandler类型的bean
//2.该bean就是当处理消息异常的时候,将消息加入到.DLT主题中
@Component("kafkaListenerErrorHandler")
public class KafkaListenerErrorHandlerTest implements KafkaListenerErrorHandler {
   @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    private static final String TOPIC_DLT=".DLT";
    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
        System.out.println("消费失败消息:"+message.toString());
        //获取消息处理异常主题
        MessageHeaders headers = message.getHeaders();
        String topic=headers.get("kafka_receivedTopic")+TOPIC_DLT;
        //放入死讯队列
        kafkaTemplate.send(topic,message.getPayload());
        return message;
    }
}

效果图

image.png 

说明:以上基本上就是使用死讯队列的方案,也许读者会觉得这样编码复杂度很高,但其实不用担心,其实上面这些代码基本上是使用死讯队列的模板代码,在成熟一点的公司,一般会使用上述代码进行简单封装,这里笔者给个思路,有兴趣同学可以实现一下。我们其实可以使用aop思想,进行自定义一个@EnableDLT这样的注解去实现,这样上面这个方案使用起来是不是就简单优雅了。之前笔者在开发过程中使用过亚马逊的消息队列服务,也不过是这样实现罢了。

总结

本篇文章就到这里了,希望可以给你带来一些帮助,也希望您能够多多关注编程网的更多内容!

--结束END--

本文标题: 消息队列-kafka消费异常问题

本文链接: https://www.lsjlt.com/news/129652.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • 消息队列-kafka消费异常问题
    目录概述重试一定次数(消息丢失)加入到死讯队列(消息不丢失)总结概述 在kafka中,或者是说在任何消息队列中都有个消费顺序的问题。为了保证一个队列顺序消费,当当中一个消息消费异常时...
    99+
    2022-11-12
  • 消息队列 Kafka
    Kafka Kafka 是一个分布式的基于发布/订阅模式的消息队列(MQ,Message Queue),主要应用于大数据实时处理领域 为什么使用消息队列MQ 在高并发环境下,同步请求来不及处理会发生堵塞,从而触发too many conn...
    99+
    2023-10-23
    kafka 分布式
  • kafka之消息队列
    大数据工具 kafka 学习 之前需要先了解队列的相关知识 了解万队列就知道kafka的用处 之后再详细了解kafka的具体知识和操作 ...
    99+
    2021-05-31
    kafka之消息队列
  • Kafka:消费者消费失败处理-重试队列
    kafka没有重试机制不支持消息重试,也没有死信队列,因此使用kafka做消息队列时,需要自己实 现消息重试的功能。 实现 创建新的kafka主题作为重试队列: 创建一个topic作为重试topic,用于接收等待重试的消息。普通topic消...
    99+
    2023-10-26
    kafka java 分布式 重试队列 消费者
  • Java RabbitMQ消息队列详解常见问题
    目录消息堆积保证消息不丢失死信队列延迟队列RabbitMQ消息幂等问题RabbitMQ消息自动重试机制合理的选择重试机制消费者开启手动ack模式rabbitMQ如何解决消息幂等问题R...
    99+
    2022-11-13
  • 解决RabbitMq消息队列Qos Prefetch消息堵塞问题
    mq是实现代码扩展的有利手段,个人喜欢用概念来学习新知识,介绍堵塞问题的之前,先来段概念的学习。 ConnectionFactory:创建connection的工厂类 Connect...
    99+
    2022-11-13
  • KOA+egg.js如何集成kafka消息队列
    这篇文章主要为大家展示了“KOA+egg.js如何集成kafka消息队列”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“KOA+egg.js如何集成kafka消息...
    99+
    2022-10-19
  • kafka队列消费情况怎么查看
    要查看Kafka队列的消费情况,你可以使用以下方法:1. 使用命令行工具:Kafka提供了一些命令行工具,可以用来查看消费情况。例如...
    99+
    2023-08-08
    kafka
  • 大数据Kafka:消息队列和Kafka基本介绍
    目录一、什么是消息队列二、消息队列的应用场景异步处理 应用耦合限流削峰消息驱动系统 三、消息队列的两种方式点对点模式发布/订阅模式四、常见的消息队列的产品1) RabbitMQ2) ...
    99+
    2022-11-12
  • 如何解决RabbitMq消息队列Qos Prefetch消息堵塞问题
    本篇内容介绍了“如何解决RabbitMq消息队列Qos Prefetch消息堵塞问题”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!...
    99+
    2023-06-29
  • Kafka系列:查看Topic列表、消息消费情况、模拟生产者消费者
    1、查看kafka队列中topic信息 1.1、查看所有topic ./kafka-topics.sh --zookeeper 10.128.106.52:2181 --list 1.2、查看kafka中指定topic的详情 ./kafk...
    99+
    2023-08-21
    kafka 分布式 java
  • 关于Kafka消息队列原理的总结
    目录Kafka消息队列原理Kafka的逻辑数据模型Kafka的分发策略Kafka的物理存储模型和查找数据的设计Kafka的持久化策略设计Kafka的节点间的数据一致性策略设计Kafk...
    99+
    2022-11-13
  • Java分布式学习之Kafka消息队列
    目录介绍Kafka核心相关名称kafka集群安装kafka使用kafka文件存储Springboot整合kafka介绍 Apache Kafka 是分布式发布-订阅消息系统,在 ka...
    99+
    2022-11-13
  • RabbitMq同一队列多个消费者问题
    RabbitMQ只有Queue,如果多个消费者绑定同一个queue,那么一条消息,只能被其中一个消费者取走(轮询)。本质上,RabbitMq的消费者的消息确认机制,就注定不可能让多个消费者同时去消费同一个队列中的同一条消息,只能轮询的方式去...
    99+
    2023-09-20
    java-rabbitmq rabbitmq java
  • Kafka中消息队列的两种模式讲解
    目录Kafka消息队列的两种模式1、点对点模式 2、发布/订阅模式 Kafka消息队列模型图解Kafka消息队列的两种模式 消息队列包括两种模式,点对点模式(po...
    99+
    2022-11-13
  • kafka消费不到数据问题
    出问题现象 最近项目使用到了kafka,别的系统作为生产者,我们系统作为消费者,但是经常出现消费者消费一段时间就不消费了,根本就触发不了kafkaListener的拉取动作。换一个消费者组,从最新的位置消费又可以消费的到,但是消费一段时间就...
    99+
    2023-08-16
    kafka 分布式 java
  • 队列在PHP与MySQL中的消息积压和消息消费的处理方法
    当网站系统涉及到大量并发操作时,往往需要处理大量的请求和消息,并确保消息的可靠传递。而消息队列则是一种高效、可靠的解决方案,可以有效地处理消息的积压和消费问题。本文将介绍队列在PHP与MySQL中的消息积压和消息消费的处理方法,并提供相应的...
    99+
    2023-10-21
    消息队列 积压处理 消费处理
  • RocketMQ消费者没有成功消费消息的问题排查
    背景 今天下游同事反馈,有一些以取消的订单库存还原异常了,导致部分商品库存没有还原。查日志发现没有收到还原消息,但是查看发送方是可以确认消息是已经发了的,那么是什么原因导致消费者没有收到,或者收到后没有处理消息呢。最后发现这些消息的状态都...
    99+
    2023-09-23
    java-rocketmq rocketmq java 开发语言
  • C#开发中如何处理消息队列和异步通信问题
    C#开发中如何处理消息队列和异步通信问题引言:在现代软件开发中,随着应用程序的规模和复杂程度不断增加,有效处理消息队列和实现异步通信变得非常重要。一些常见的应用场景包括分布式系统间的消息传递、后台任务队列的处理、事件驱动的编程等。本文将探讨...
    99+
    2023-10-22
    消息队列 异步通信 C#开发
  • kafka rabbitMQ及rocketMQ队列的消息可靠性保证分析
    目录1.消息丢失1.生产者发送失败2.消费者消费失败3.队列因为自身体原因丢失数据2.消息顺序1.kafka2.rocketMQ3.rabbitMQ3.消息重复1.消息丢失 1.生产...
    99+
    2022-11-13
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作