iis服务器助手广告
返回顶部
首页 > 资讯 > 后端开发 > Python >java发送kafka事务消息的实现方法
  • 112
分享到

java发送kafka事务消息的实现方法

2024-04-02 19:04:59 112人浏览 八月长安

Python 官方文档:入门教程 => 点击学习

摘要

前言 事务对java开发的同学来说并不陌生,我们使用事务的目的在于避免产生重复数据或者说利用数据存储中间件的事务特性确保数据的精准性,比如大家熟悉的Mysql,我们在程序开始时,只需

前言

事务对java开发的同学来说并不陌生,我们使用事务的目的在于避免产生重复数据或者说利用数据存储中间件的事务特性确保数据的精准性,比如大家熟悉的Mysql,我们在程序开始时,只需要在程序中添加上事务注解即可

kafka客户端事务,直接使用客户端提供的相关的api即可,和jdbc事务的使用很类似,主要包含下面5个API

// 1 初始化事务
void initTransactions();


// 2 开启事务
void beginTransaction() throws ProducerFencedException;


// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
 String consumerGroupId) throws ProducerFencedException;


// 4 提交事务
void commitTransaction() throws ProducerFencedException;


// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

下面结合实际的代码以及效果演示进行说明

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
 
import java.util.Properties;
 
public class ProducerTransaction {
 
    public static void main(String[] args) throws Exception {
 
        // 1. 创建 kafka 生产者的配置对象
        Properties properties = new Properties();
        // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092");
 
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
 
        // 设置事务 id(必须),事务 id 任意起名
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_0");
 
        // 3. 创建 kafka 生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
 
        // 初始化事务
        kafkaProducer.initTransactions();
        // 开启事务
        kafkaProducer.beginTransaction();
        System.out.println("开始发送消息");
        try {
            // 4. 调用 send 方法,发送消息
            for (int i = 0; i < 5; i++) {
                // 发送消息
                kafkaProducer.send(new ProducerRecord<>("zcy222", "hello kafka " + i));
            }
            //int i = 1 / 0;
            // 提交事务
            kafkaProducer.commitTransaction();
        } catch (Exception e) {
            System.out.println(e);
            // 终止事务
            kafkaProducer.abortTransaction();
        } finally {
            // 5. 关闭资源
            kafkaProducer.close();
        }
    }
 
}

运行上面的代码,正常是可以发送到指定的topic下

接下来,我们将上面的代码中的 1/0 放开,再次运行程序,可以看到,程序中抛异常了,但是消息并没有发送到kafka的broker,说明事务的配置生效了

 到此这篇关于java发送kafka事务消息的实现方法的文章就介绍到这了,更多相关java发送kafka事务消息内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

--结束END--

本文标题: java发送kafka事务消息的实现方法

本文链接: https://www.lsjlt.com/news/164298.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • java发送kafka事务消息的实现方法
    前言 事务对java开发的同学来说并不陌生,我们使用事务的目的在于避免产生重复数据或者说利用数据存储中间件的事务特性确保数据的精准性,比如大家熟悉的mysql,我们在程序开始时,只需...
    99+
    2024-04-02
  • kafka批量发送消息的方法是什么
    Kafka通过Producer API提供了批量发送消息的方法。以下是使用Kafka Producer API进行批量发送消息的步骤...
    99+
    2023-10-20
    kafka
  • kafka发送消息的方式有哪些
    今天小编给大家分享一下kafka发送消息的方式有哪些的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。kafka发送消息的方式p...
    99+
    2023-07-05
  • 使用Spring Boot和Kafka实现消息发送和订阅
    文章目录 一,新建Spring Boot1,Maven配置2,无法识别为SpringBoot项目3,无效的源发行版4,无法访问SpringApplication5,运行直接Finish6,服务...
    99+
    2023-09-05
    spring boot kafka 后端 订阅 消息队列
  • 关于kafka发送消息的三种方式总结
    目录kafka发送消息的方式需要引入文件测试方法MAC下操作指令windows操作指令总结kafka发送消息的方式 package com.zl.kafkademo; impor...
    99+
    2023-05-14
    kafka发送消息 kafka发送 kafka消息
  • RabbitMQ,RocketMQ,Kafka 事务性,消息丢失,消息顺序性和消息重复发送的处理策略问题
    目录消息队列常见问题处理分布式事务什么是分布式事务常见的分布式事务解决方案基于 MQ 实现的分布式事务本地消息表-最终一致性MQ事务-最终一致性RocketMQ中如何处理事务Kafk...
    99+
    2024-04-02
  • Kafka中如何实现消息的事务性保证
    Kafka中可以通过以下几种方式实现消息的事务性保证: 使用生产者事务:Kafka提供了生产者事务API,可以确保消息的原子性提...
    99+
    2024-04-02
  • MFC模拟实现自定义消息发送的方法
    这篇文章主要讲解了“MFC模拟实现自定义消息发送的方法”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“MFC模拟实现自定义消息发送的方法”吧!在MFC框架下,有很多系统已经定义好的消息,例如O...
    99+
    2023-06-29
  • SpringBoot实现MQTT消息发送和接收方式
    目录Spring integration交互逻辑1、maven依赖2、yaml配置文件3、mqtt生产者消费者配置类4、消息处理类 5、mqtt发送接口 6、mq...
    99+
    2023-03-11
    SpringBoot MQTT消息 MQTT消息发送 MQTT消息接收
  • python实现定时发送qq消息
    因为生活中老是忘记各种事情,刚好又在学python,便突发奇想通过python实现提醒任务的功能(尽管TIM有定时功能),也可定时给好友、群、讨论组发送qq消息。其工作流程是:访问数据库提取最近计划——>根据数据内容(提醒时间、提醒...
    99+
    2023-01-31
    消息 python qq
  • RocketMQ的事务消息发送流程是什么
    本篇内容介绍了“RocketMQ的事务消息发送流程是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!事务消息发送流程半消息实现了分布式环境...
    99+
    2023-07-05
  • java实现web实时消息推送的七种方案
    目录引言什么是消息推送(push)短轮询长轮询iframe流SSE (我的方式)MQTTWebsocket自定义推送Github地址引言 做了一个小破站,现在要实现一个站内信web消...
    99+
    2024-04-02
  • Java实现微信公众号发送模版消息
    微信公众号发送模版消息 背景:如下图,当用户发布需求的时候,公众号自定推送消息。例如:微信支付的时候,公众号会推送支付成功消息 前提:发送模版消息,顾名思义,前提就是需要有模版,那...
    99+
    2024-04-02
  • Kafka中的Producer发送消息失败后如何处理
    当Kafka中的Producer发送消息失败后,可以通过以下几种方式处理: 重试发送:Producer可以设置重试机制,当发送消...
    99+
    2024-04-02
  • react-native消息推送实现方式
    目录react-native极光推送一、安装插件二、配置安卓配置IOS配置三、使用解决ios角标无法清除总结react-native极光推送 先去官网注册:https://www.j...
    99+
    2023-02-18
    react-native消息推送 react-native 消息推送
  • .NET对接极光消息推送的实现方法
    目录什么是APP消息推送?极光推送介绍快速对接Jpush极光推送.NET FX 4.5项目接入相关链接地址什么是APP消息推送?   很多手机APP会不定时的给用户推送消息,例如一些...
    99+
    2024-04-02
  • Java网络编程UDP实现消息发送及聊天
    TCP可以实现聊天,UDP也可以实现消息发送及聊天。不同的是,TCP需要有服务端和客户端的连接,但UDP不需要,只需要有发送方和接收方即可。 一、实现消息发送 发送方: pack...
    99+
    2024-04-02
  • Kafka中Producer如何处理消息发送失败的情况
    在Kafka中,Producer在发送消息时可能会遇到消息发送失败的情况。Producer可以通过以下几种方式来处理消息发送失败的情...
    99+
    2024-03-12
    Kafka
  • Java利用钉钉机器人实现发送群消息
    目录添加群机器人Java请求示例官方SDK请求示例pom引入官方SDK群消息通知方法添加群机器人 可以查看这篇文章:添加机器人到钉钉群 使用命令行工具curl快速验证自定义机器人是否...
    99+
    2024-04-02
  • java连接websocket服务器并发送消息
    一、用python快速启动一个websocker服务器 import tornado.ioloopimport tornado.webimport tornado.websocketclass WebSocketHandler(tornad...
    99+
    2023-09-12
    java websocket 服务器
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作