iis服务器助手广告广告
返回顶部
首页 > 资讯 > 精选 >java中kafka怎么使用
  • 885
分享到

java中kafka怎么使用

kafkajava 2023-09-14 05:09:11 885人浏览 薄情痞子
摘要

在Java中使用kafka,首先需要添加Kafka的依赖项。你可以在Maven的pom.xml文件中添加以下依赖项:```xmlor

在Java中使用kafka,首先需要添加Kafka的依赖项。你可以在Maven的pom.xml文件中添加以下依赖项:
```xml

org.apache.kafka
kafka-clients
2.8.0

```
接下来,你可以使用Kafka的Java客户端来编写代码。以下是一个简单的示例,演示了如何使用Java发送和接收消息:
```java
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.consumer.*;
import java.util.Properties;
public class KafkaExample {
private static final String TOPIC = "my-topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String GROUP_ID = "my-group";
public static void main(String[] args) {
// 创建生产者
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Producer producer = new KafkaProducer<>(producerProps);
// 发送消息
ProducerRecord record = new ProducerRecord<>(TOPIC, "key", "value");
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.printf("Sent record to topic=%s, partition=%d, offset=%d%n",
metadata.topic(), metadata.partition(), metadata.offset());
}
}
});
// 关闭生产者
producer.close();
// 创建消费者
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Consumer consumer = new KafkaConsumer<>(consumerProps);
// 订阅主题并消费消息
consumer.subscribe(Collections.singletonList(TOPIC));
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records) {
System.out.printf("Received record: key=%s, value=%s%n", record.key(), record.value());
}
}
}
}
```
在上面的示例中,我们首先创建了一个生产者,并使用`ProducerConfig`类的常量来配置生产者的属性,例如Kafka集群的地址、键和值的序列化方式等。然后,我们创建了一个`ProducerRecord`对象,指定要发送的主题、键和值。我们调用生产者的`send()`方法来发送消息,并通过`Callback`来处理发送结果。最后,我们关闭了生产者。
然后,我们创建了一个消费者,并使用`ConsumerConfig`类的常量来配置消费者的属性,例如Kafka集群的地址、键和值的反序列化方式、消费者组等。我们订阅了一个主题,并在一个无限循环中调用`poll()`方法来获取消息。我们遍历消息并进行处理。
请注意,这只是一个简单的示例,用于演示如何使用Java操作Kafka。在实际应用中,你可能需要更复杂的逻辑来处理消息,并使用更多的配置选项来优化性能和确保可靠性。

--结束END--

本文标题: java中kafka怎么使用

本文链接: https://www.lsjlt.com/news/406588.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作