iis服务器助手广告广告
返回顶部
首页 > 资讯 > 精选 >kafka核心消费逻辑是什么
  • 262
分享到

kafka核心消费逻辑是什么

2023-07-06 01:07:44 262人浏览 薄情痞子
摘要

这篇文章主要介绍“kafka核心消费逻辑是什么”,在日常操作中,相信很多人在kafka核心消费逻辑是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”kafka核心消费逻辑是什么”的疑惑有所帮助!接下来,请跟

这篇文章主要介绍“kafka核心消费逻辑是什么”,在日常操作中,相信很多人在kafka核心消费逻辑是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”kafka核心消费逻辑是什么”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

kafka消费者线程

突击检查八股文,实现线程的方法有哪些?嗯?没复习是吧,行没关系,那感谢参加本次面试哈。

常用的几种方式分别是:

  • 继承Thread类,重写run方法

  • 实现Runbale接口,重写run方法

  • 实现Callable接口,重写call方法

这里我们直接创捷出一个任务类实现Runable方法,重写run方法,一个线程当作一个kafka client,所以要在任务类中声明一个KafkaConsumer的成员变量,另外创建任务需要指定当前任务的名称也就是线程名,还有要监听的topic主题。

private KafkaConsumer<String, String> consumer;private String topic;private String threadName;

name和topic通过构造方法传进来,同时在构造方法里完成对client的初始化操作。

   public KafkaConsumerRunnable(String bootServer, String groupId, String topic) {       this.topic = topic;       Properties props = new Properties();       props.put("bootstrap.servers", bootServer);       props.put("group.id", groupId);       props.put("enable.auto.commit", "false");       props.put("auto.offset.reset", "latest");       props.put("max.poll.records", 5);       props.put("session.timeout.ms", "60000");       props.put("max.poll.interval.ms", 300000);       props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  //键反序列化方式       props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");       this.consumer = new KafkaConsumer&lt;&gt;(props);   }

这里封装kafka client的必要信息,入参bootServer为kafka集群ip,groupId为threadName,我们规定一个线程为一个kafka消费链接,消费一个topic。

上一篇线程池保证了任务不会轻易挂掉,就算挂掉了也会重新提交,所以为了节省资源不做所谓的同groupId的负载操作。session.timeout.ms和max.poll.interval.ms可以根据当前的kafka资源灵活配置,不然可能会引发一些reblance。

enable.auto.commit设置为false,手动提交offset,auto.offset.reset这块由于业务特殊,本来就是流式图表瞬时的展示,如果真的出现了数据丢失那就丢了吧,从最新的数据读取。

接下来只需要处理下消费逻辑,consumer.subscribe(Collections.singletonList(this.topic))开始订阅监听kafka数据,搞一个while true不断的消费数据,try catch只需要对WakeupException做处理,kafka客户端会在关闭的时候抛出WakeupException异常。

finally里提交offset,无论这条offset对应的数据消费成功还是失败都是消费过了,失败了就过去了。

   @Override   public void run() {   consumer.subscribe(Collections.singletonList(this.topic));   String key = "stream_chart:" + this.name;   Thread.currentThread().setName(key);   try {      while (true) {         ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));         // 如果队列中没有消息 等待KAFKA_TIME_OUT后调用poll,如果有消息立即消费         for (ConsumerRecord<String, String> record : records) {            String value = record.value();            log.info("线程 {} 消费kafka数据 -> {} \n", Thread.currentThread().getName(), value);            RedisConfig.getRedisTemplate().opsForZSet().add(key, value, Instant.now().getEpochSecond() * 1000);         }      }   } catch (WakeupException e) {      log.info("ignore for shutdown", e);   } finally {      consumer.commitAsync();   }}

我们消费到数据直接放到redis的zset结构里,当前的时间戳作为score,最后留一个关闭客户端的后门

// 退出后关掉客户端public void shutDown() {   consumer.wakeup();}

任务提交

任务提交这块只需要在业务service中注入线程池,创建对应的KafkaRunable任务封装对应的信息,执行execute即可。

这里有个坑需要注意下,第二次突击检查八股文,线程池提交方法submitexecute的区别说一下。不知道的立刻去熟读并背诵。

public class TestTheadPool {    public static void main(String[] args) {        ExecutorService executorService= Executors.newFixedThreadPool(1);        executorService.submit(new task("submit"));        executorService.execute(new task("execute"));    }}class task implements  Runnable{    private String name;    public task(String name) {        this.name = name;    }    @Override    public void run() {        System.out.println(this.name + " start task");        int i=1/0;    }}

熟悉的同学通过示例代码可以看出来,submit提交的线程不会抛出异常代码,只有获取Future返回值并执行get方法才会捕获到异常。这块涉及到异步的东西不再赘述

try {    Future<?> submit = executorService.submit(new task("submit"));    submit.get();} catch (InterruptedException e) {    e.printStackTrace();} catch (ExecutionException e) {    e.printStackTrace();}

所以我们要使用execute执行,不然kafka消费线程里消费失败了拦截不到就不会被重新提交,导致线程挂掉。

到此,关于“kafka核心消费逻辑是什么”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注编程网网站,小编会继续努力为大家带来更多实用的文章!

--结束END--

本文标题: kafka核心消费逻辑是什么

本文链接: https://www.lsjlt.com/news/357302.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • kafka核心消费逻辑是什么
    这篇文章主要介绍“kafka核心消费逻辑是什么”,在日常操作中,相信很多人在kafka核心消费逻辑是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”kafka核心消费逻辑是什么”的疑惑有所帮助!接下来,请跟...
    99+
    2023-07-06
  • kafka核心消费逻辑源码分析
    本篇内容主要讲解“kafka核心消费逻辑源码分析”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“kafka核心消费逻辑源码分析”吧!消费逻辑框架搭建好之后着手开发下kafka的核心消费逻辑,流式图...
    99+
    2023-07-06
  • 流式图表拒绝增删改查之kafka核心消费逻辑下篇
    目录前篇回顾kafka消费者线程任务提交前篇回顾 流式图表框架搭建kafka核心消费逻辑线程池搭建 kafka消费者线程 突击检查八股文,实现线程的方法有哪些?嗯?没复习是吧,行没...
    99+
    2023-05-15
    kafka消费逻辑流式图表 流式图表拒绝增删改查
  • 流式图表拒绝增删改查之kafka核心消费逻辑上篇
    目录消费逻辑代码设计消费池消费逻辑 上文 流式图表框架搭建 框架搭建好之后着手开发下kafka的核心消费逻辑,流式图表的核心消费逻辑就是实现一个消费链接池维护消费者客户端...
    99+
    2023-05-15
    kafka消费流式图表 流式图表拒绝增删改查
  • Kafka心跳与消费机制是什么
    这篇“Kafka心跳与消费机制是什么”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“Kafka心跳与消费机制是什么”文章吧。K...
    99+
    2023-06-27
  • Kafka的核心实践是什么
    这篇文章将为大家详细讲解有关Kafka的核心实践是什么,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。Kafka的核心实践  1.版本升级  之前大数据平台一...
    99+
    2024-04-02
  • kafka消费者配置的步骤是什么
    使用Kafka消费者需要以下步骤: 配置消费者属性:包括设置消费者组ID、服务器地址、自动提交偏移量等参数。 创建Kafka消费者...
    99+
    2024-04-02
  • mysql逻辑主键是什么
    mysql逻辑主键是指在数据库中用于标识一条记录的字段或字段组合,但是它并不是唯一的。逻辑主键通常被用于数据查询和数据操作。逻辑主键可以是任何具有标识性质的字段,比如在用户表中,用户名可以作为逻辑主键,因为它可以用于标识一条记录,但是它并不...
    99+
    2023-07-10
  • vuex核心是什么
    这篇文章主要介绍“vuex核心是什么”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“vuex核心是什么”文章能帮助大家解决问题。 vuex是专门帮助vue管理的一个...
    99+
    2024-04-02
  • SEO逻辑指的是什么
    这篇文章给大家分享的是有关SEO逻辑指的是什么的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。  众所周知,SEO行业在刚萌芽的时候,其发展速度是相当快的,犹如坐火箭一般,操作起来也是容易的很,简单的关键词叠加、软...
    99+
    2023-06-10
  • hadoop核心是什么
    hadoop的核心是分布式文件系统hdfs和MapReduce,HDFS为海量的数据提供了存储,而MapReduce则为海量的数据提供了计算,Hadoop是一个由Apache基金会所开发的分布式系统基础架构,用户可以在不了解分布式底层细节的...
    99+
    2024-04-02
  • MySQL逻辑架构是什么
    MySQL逻辑架构是什么,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。逻辑架构图:我们把上面的图简化一下,就有了如下所示的MySQL简易的...
    99+
    2024-04-02
  • python短路逻辑是什么
    短路逻辑是一种在条件语句中使用逻辑运算符时的行为规则。在Python中,短路逻辑是指当使用"and"和"or"逻辑运算符时,如果表达...
    99+
    2023-08-15
    python
  • Kafka中生产者和消费者指的是什么
    在Kafka中,生产者和消费者是指Kafka消息系统中参与消息传递的两种角色。 生产者是指负责向Kafka集群中的主题(topic)...
    99+
    2024-03-14
    Kafka
  • hadoop的核心是是什么
    Hadoop的核心是一个分布式存储和计算框架,它允许用户在大规模集群上存储和处理大量数据。Hadoop包括两个主要组件:Hadoop...
    99+
    2024-04-02
  • 什么是LVM逻辑卷管理
    什么是LVM逻辑卷管理,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。  Linux因其开放性而受到许多企业和开发者的喜爱,it互联网市场相应的也增加了对Linu...
    99+
    2023-06-05
  • html中什么是逻辑部分
    这篇文章将为大家详细讲解有关html中什么是逻辑部分,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。 什么是逻辑部分?它是页面上相互关联的一组元素。如网页中的独立的栏目版...
    99+
    2024-04-02
  • jmeter逻辑控制器是什么
    本篇内容主要讲解“jmeter逻辑控制器是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“jmeter逻辑控制器是什么”吧!Jmeter逻辑控制器(Logic Controller)介绍: J...
    99+
    2023-06-05
  • Arrays.sort(arr)代码逻辑是什么
    本篇内容介绍了“Arrays.sort(arr)代码逻辑是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!首先看源码: publ...
    99+
    2023-06-29
  • 什么是JavaScript的核心
    本篇内容主要讲解“什么是JavaScript的核心”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“什么是JavaScript的核心”吧! ...
    99+
    2024-04-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作