iis服务器助手广告
返回顶部
首页 > 资讯 > 精选 >kafka生产者发送消息流程是什么
  • 757
分享到

kafka生产者发送消息流程是什么

2023-07-05 18:07:07 757人浏览 薄情痞子
摘要

今天小编给大家分享一下kafka生产者发送消息流程是什么的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。消息发送过程消息的发送

今天小编给大家分享一下kafka生产者发送消息流程是什么的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。

消息发送过程

消息的发送可能会经过拦截器、序列化、分区器等过程。消息发送的主要涉及两个线程,分别为main线程和sender线程。

kafka生产者发送消息流程是什么

如图所示,主线程由 afkaProducer 创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器RecordAccumulator (也称为消息收集器)中。 Sender 线程负责从RecordAccumulator 获取消息并将其发送到 Kafka中。

拦截器

在消息序列化之前会经过消息拦截器,自定义拦截器需要实现ProducerInterceptor接口,接口主要有两个方案#onSend和#onAcknowledgement,在消息发送之前会调用前者方法,可以在发送之前假如处理逻辑,比如计费。在收到服务端ack响应后会触发后者方法。需要注意的是拦截器中不要加入过多的复杂业务逻辑,以免影响发送效率。

消息分区

消息ProducerRecord会将消息路由到那个分区中,分两种情况:

指定了partition字段

如果消息ProducerRecord中指定了 partition字段,那么就不需要走分区器,直接发往指定得partition分区中。

没有指定partition,但自定义了分区器

没指定parittion,也没有自定义分区器,但key不为空

没指定parittion,也没有自定义分区器,key也为空

源码

// KafkaProducer#partitionprivate int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {//指定分区partition则直接返回,否则走分区器        Integer partition = record.partition();        return partition != null ?                partition :                partitioner.partition(                        record.topic(), record.key(), serializedKey, record.value(),                 serializedValue, cluster);}
//DefaultPartitioner#partitionpublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {        if (keyBytes == null) {            return stickyPartitionCache.partition(topic, cluster);        }         List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);        int numPartitions = partitions.size();        // hash the keyBytes to choose a partition        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;    }

partition 方法中定义了分区分配逻辑 如果 ke 不为 null , 那 么默认的分区器会对 key 进行哈 希(采 MurmurHash3 算法 ,具备高运算性能及 低碰 撞率),最终根据得到 哈希值来 算分区号, 有相同 key 的消息会被写入同一个分区 如果 key null ,那么消息将会以轮询的方式发往主题内的各个可用分区。

消息累加器

分区确定好了之后,消息并不是直接发送给broker,因为一个个发送网络消耗太大,而是先缓存到消息累加器RecordAccumulator,RecordAccumulator主要用来缓存消息 Sender 线程可以批量发送,进 减少网络传输 的资源消耗以提升性能 RecordAccumulator 缓存的大 小可以通过生产者客户端参数 buffer memory 配置,默认值为 33554432B ,即 32MB如果生产者发送消息的速度超过发 送到服务器的速度 ,则会导致生产者空间不足,这个时候 KafkaProducer的send()方法调用要么 被阻塞,要么抛出异常,这个取决于参数 max block ms 的配置,此参数的默认值为 60秒。

消息累加器本质上是个ConcurrentMap,

ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;

发送流程源码分析

//KafkaProducer@Overridepublic Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {// intercept the record, which can be potentially modified; this method does not throw exceptions    //首先执行拦截器链ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);return doSend(interceptedRecord, callback);}private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {        TopicPartition tp = null;try {throwIfProducerClosed();// first make sure the metadata for the topic is availablelong nowMs = time.milliseconds();ClusterAndWaitTime clusterAndWaitTime;try {clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);} catch (KafkaException e) {if (metadata.isClosed())throw new KafkaException("Producer closed while send in progress", e);throw e;}nowMs += clusterAndWaitTime.waitedOnMetadataMs;long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);Cluster cluster = clusterAndWaitTime.cluster;byte[] serializedKey;try {//key序列化serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());} catch (ClassCastException cce) {throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +" specified in key.serializer", cce);}byte[] serializedValue;try {//value序列化serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());} catch (ClassCastException cce) {throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +" specified in value.serializer", cce);}//获取分区partitionint partition = partition(record, serializedKey, serializedValue, cluster);tp = new TopicPartition(record.topic(), partition);setReadOnly(record.headers());Header[] headers = record.headers().toArray();//消息压缩int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),compressionType, serializedKey, serializedValue, headers);//判断消息是否超过最大允许大小,消息缓存空间是否已满ensureValidRecordSize(serializedSize);long timestamp = record.timestamp() == null ? nowMs : record.timestamp();if (log.isTraceEnabled()) {log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);}// producer callback will make sure to call both 'callback' and interceptor callbackCallback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp); if (transactionManager != null && transactionManager.isTransactional()) {transactionManager.failIfNotReadyForSend();}//将消息缓存在消息累加器RecordAccumulator中RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);        //开辟新的ProducerBatchif (result.abortForNewBatch) {int prevPartition = partition;partitioner.onNewBatch(record.topic(), cluster, prevPartition);partition = partition(record, serializedKey, serializedValue, cluster);tp = new TopicPartition(record.topic(), partition);if (log.isTraceEnabled()) {log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);}// producer callback will make sure to call both 'callback' and interceptor callbackinterceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp); result = accumulator.append(tp, timestamp, serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);}if (transactionManager != null && transactionManager.isTransactional())transactionManager.maybeAddPartitionToTransaction(tp);//判断消息是否已满,唤醒sender线程进行发送消息if (result.batchIsFull || result.newBatchCreated) {log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);this.sender.wakeup();}return result.future;// handling exceptions and record the errors;// for API exceptions return them in the future,// for other exceptions throw directly} catch (Exception e) {// we notify interceptor about all exceptions, since onSend is called before anything else in this methodthis.interceptors.onSendError(record, tp, e);throw e;}}

生产消息的可靠性

消息发送到broker,什么情况下生产者才确定消息写入成功了呢?ack是生产者一个重要的参数,它有三个值,ack=1表示leader副本写入成功服务端即可返回给生产者,是吞吐量和消息可靠性的平衡方案;ack=0表示生产者发送消息之后不需要等服务端响应,这种消息丢失风险最大;ack=-1表示生产者需要等等ISR中所有副本写入成功后才能收到响应,这种消息可靠性最高但吞吐量也是最小的。

以上就是“kafka生产者发送消息流程是什么”这篇文章的所有内容,感谢各位的阅读!相信大家阅读完这篇文章都有很大的收获,小编每天都会为大家更新不同的知识,如果还想学习更多的知识,请关注编程网精选频道。

--结束END--

本文标题: kafka生产者发送消息流程是什么

本文链接: https://www.lsjlt.com/news/354534.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • kafka生产者发送消息流程是什么
    今天小编给大家分享一下kafka生产者发送消息流程是什么的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。消息发送过程消息的发送...
    99+
    2023-07-05
  • Kafka中生产者和消费者指的是什么
    在Kafka中,生产者和消费者是指Kafka消息系统中参与消息传递的两种角色。 生产者是指负责向Kafka集群中的主题(topic)...
    99+
    2024-03-14
    Kafka
  • kafka生产者发送超时如何解决
    要解决Kafka生产者发送超时问题,可以采取以下几个方法: 增加发送超时时间:在创建生产者时,可以通过设置max.block.ms...
    99+
    2023-10-20
    kafka
  • kafka批量发送消息的方法是什么
    Kafka通过Producer API提供了批量发送消息的方法。以下是使用Kafka Producer API进行批量发送消息的步骤...
    99+
    2023-10-20
    kafka
  • RocketMQ的事务消息发送流程是什么
    本篇内容介绍了“RocketMQ的事务消息发送流程是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!事务消息发送流程半消息实现了分布式环境...
    99+
    2023-07-05
  • Kafka系列:查看Topic列表、消息消费情况、模拟生产者消费者
    1、查看kafka队列中topic信息 1.1、查看所有topic ./kafka-topics.sh --zookeeper 10.128.106.52:2181 --list 1.2、查看kafka中指定topic的详情 ./kafk...
    99+
    2023-08-21
    kafka 分布式 java
  • 如何在没有生产者的情况下创建kafka消息?
    最近发现不少小伙伴都对Golang很感兴趣,所以今天继续给大家介绍Golang相关的知识,本文《如何在没有生产者的情况下创建kafka消息?》主要内容涉及到等等知识点,希望能帮到你!当然如果阅读本文...
    99+
    2024-04-04
  • golang 并发编程之生产者消费者详解
    golang 最吸引人的地方可能就是并发了,无论代码的编写上,还是性能上面,golang 都有绝对的优势 学习一个语言的并发特性,我喜欢实现一个生产者消费者模型,这个模型非常经典,适...
    99+
    2024-04-02
  • RocketMQ消息发送流程源码剖析
    目录正文读源码1 调用defaultMQProducerImpl.send()2 设置过期时间3 执行defaultMQProducerImpl.sendDefaultImpl()方...
    99+
    2022-11-13
    RocketMQ消息发送流程 RocketMQ 消息
  • Java多线程中消费者与生产者的关系是什么
    这篇文章将为大家详细讲解有关Java多线程中消费者与生产者的关系是什么,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。  多线程:CPU中各种任务在交替执行过程中,被称为多线程处理。其中,每个任务的一次动态...
    99+
    2023-06-02
  • 什么是Kafka的消息模型
    Kafka的消息模型是指Kafka中消息的结构和传递方式。Kafka的消息模型基于发布-订阅的模式,其中消息被发布到一个或多个主题(...
    99+
    2024-03-12
    Kafka
  • Kafka的消息传递语义是什么
    Kafka的消息传递语义是至少一次传递。这意味着当消息发布到Kafka中时,Kafka会尽最大努力确保消息至少被传递一次,即使出现了...
    99+
    2024-04-02
  • Java实现生产者消费者的两种方式分别是什么
    本篇文章给大家分享的是有关Java实现生产者消费者的两种方式分别是什么,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。   我在8年前去面试程序员的时候,一个...
    99+
    2023-06-03
  • kafka消费者配置的步骤是什么
    使用Kafka消费者需要以下步骤: 配置消费者属性:包括设置消费者组ID、服务器地址、自动提交偏移量等参数。 创建Kafka消费者...
    99+
    2024-04-02
  • Springboot详解RocketMQ实现消息发送与接收流程
    springboot+rockermq 实现简单的消息发送与接收 普通消息的发送方式有3种:单向发送、同步发送和异步发送。 下面来介绍下 springboot+rockermq 整合...
    99+
    2024-04-02
  • android消息推送机制是什么
    Android消息推送机制是一种通过网络将消息推送给已经安装了应用程序的Android设备的技术。它主要依靠Google提供的Fir...
    99+
    2023-09-28
    android
  • ssl邮件发送流程是什么
    SSL邮件发送流程如下:1. 客户端(发件人)建立与邮件服务器的连接。2. 客户端发送与SMTP(Simple Mail Trans...
    99+
    2023-09-05
    ssl
  • Kafka中的消息过期策略是什么
    Kafka中的消息过期策略是通过设置消息的时间戳(timestamp)和过期时间(TTL)来实现的。当消息被发送到Kafka集群时,...
    99+
    2024-04-02
  • Java多线程中消费者生产者模式怎么实现
    这篇文章主要讲解了“Java多线程中消费者生产者模式怎么实现”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Java多线程中消费者生产者模式怎么实现”吧!  //主类&nb...
    99+
    2023-06-17
  • Springboot详细讲解RocketMQ实现顺序消息的发送与消费流程
    目录一、创建Springboot项目添加rockermq依赖二、配置rocketmq三、新建一个controller来做消息发送四、创建消费端监听消息消费消息五、启动服务测试顺序消息...
    99+
    2024-04-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作