iis服务器助手广告广告
返回顶部
首页 > 资讯 > 后端开发 > ASP.NET >.Net Core 集成 Kafka的步骤
  • 910
分享到

.Net Core 集成 Kafka的步骤

2024-04-02 19:04:59 910人浏览 八月长安
摘要

目录kafkabrokertopicpartitionconsumer group安装kafka.net 操作 kafka生产者消费者运行一下总结最近维护的一个系统并发有点高,所以想

最近维护的一个系统并发有点高,所以想引入一个消息队列来进行削峰。考察了一些产品,最终决定使用kafka来当做消息队列。以下是关于kafka的一些知识的整理笔记

kafka

kafka 是分布式流式平台。它由linkedin开发,后贡献给了Apache开源组织并成为顶级开源项目。它可以应用在高并发场景下的日志系统,也可以当作消息队列来使用,也可以当作消息服务对系统进行解耦。

流处理平台有以下三种特性:

  • 可以让你发布和订阅流式的记录。这一方面与消息队列或者企业消息系统类似。
  • 可以储存流式的记录,并且有较好的容错性。
  • 可以在流式记录产生时就进行处理。

一般它可以应用于两个场景:

  • 构造实时流数据管道,它可以在系统或应用之间可靠地获取数据。 (相当于message queue)
  • 构建实时流式应用程序,对这些流数据进行转换或者影响。 (就是流处理,通过kafka stream topic和topic之间内部进行变化)

broker

kafka中的每个节点即每个服务器就是一个broker 。

topic

kafka中的topic是一个分类的概念,表示一类消息。生产者在生产消息的时候需要指定topic,消费者在消费消息的时候也需要指定topic。

partition

partition是分区的概念。kafka的一个topic可以有多个partition。每个partition会分散到不同的broker上,起到负载均衡的作用。生产者的消息会通过算法均匀的分散在各个partition上。

consumer group

kafka的消费者有个组的概念。一个partition可以被多consumer group订阅。每个消息会广播到每一个group中。但是每个消息只会被group中的一个consumer消费。相当于每个group,一个partition只能有一个consumer订阅,所以group中的consumer数量不可以超过topic中partition的数量。并且消息的消费的顺序在每个partition中是保证有序的,但是在多个partition之间是不保证的,因为consumer的消费速度是有快慢的。
所以如果要用kafka实现严格的消息队列点对点模式那么我们可以设置一个partition并且设置一个consumer。如果对消息消费的顺序不是那么敏感,那么可以设置多个partition来并行消费消息,提高吞吐量。

安装kafka

为了能体验下kafka,我们还是要实际安装一下kafka,毕竟空想是没有用的。现在有了Docker,安装起来也是相当滴简单。我们只需要定义好docker-compose的yml就行了。


version: '3'
services:
  ZooKeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 192.168.0.117
      KAFKA_CREATE_TOPICS: "test:3:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

1.我们在yml里定义2个service:

2.zookeeper,kafka的分布式依赖zookeeper,所以我需要先定义它。
kafka ,kafka的定义有几个地方要注意的。

  • depends_on:zookeeper 指定kafka依赖zookeeper这个service,当启动kafka的时候自动会启动zookeeper。
  • KAFKA_ADVERTISED_HOST_NAME 这里要指定宿主机的ip
  • KAFKA_CREATE_TOPICS 这个变量只是的默认创建的topic。"test:3:1"代表创建一个名为test的topic并且创建3个分区1个复制。

定义好这些之后我们只需要使用docker-compose命令运行它:


sudo docker-compose up -d

.net 操作 kafka

安装好kafka的docker环境之后,下面演示下如何使用.net操作kafka,进行消息的生产与消费。

生产者


        static async Task Main(string[] args)
        {
            Console.WriteLine("Hello World Producer!");

            var config = new ProducerConfig
            {
                BootstrapServers = "192.168.0.117:9092",
                ClientId = Dns.GetHostName(),
            };


            using (var producer = new ProducerBuilder<Null, string>(config).Build())
            {
                string topic = "test";
                for (int i = 0; i < 100; i++)
                {
                    var msg = "message " + i;
                    Console.WriteLine($"Send message:   value {msg}");
                    var result = await producer.ProduceAsync(topic, new Message<Null, string> { Value = msg });
                    Console.WriteLine($"Result: key {result.Key} value {result.Value} partition:{result.TopicPartition}");
                    Thread.Sleep(500);
                }
            }

            Console.ReadLine();

        }

新建一个控制台项目,从nuget安装kafka的官方client。


Install-Package Confluent.Kafka

代码非常简单,使用ProducerBuilder构造一个producer,然后调用ProduceAsync方法发送消息。
其中需要注意的是如果你的场景并发非常之高,官方文档推荐的方法是Produce而不是ProduceAsync。这是一个比较迷的地方。按常理使用ProduceAsync应该比使用同步方法Produce能获得更高的并发才对。但是文档确确实实说高并发场景请使用Produce。可能是为了避免ProduceAsync结果返回的时候异步线程上下文切换造成的性能开销。
原文:

There are a couple of additional benefits of using the Produce method. First, notification of message delivery (or failure) is strictly in the order of broker acknowledgement. With ProduceAsync, this is not the case because Tasks may complete on any thread pool thread. Second, Produce is more perfORMant because there is unavoidable overhead in the higher level Task based api.

消费者


        static void Main(string[] args)
        {
            Console.WriteLine("Hello World kafka consumer !");

            var config = new ConsumerConfig
            {
                BootstrapServers = "192.168.0.117:9092",
                GroupId = "foo",
                AutoOffsetReset = AutoOffsetReset.Earliest
            };

            var cancel = false;

            using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
            {
                var topic = "test";
                consumer.Subscribe(topic);

                while (!cancel)
                {
                    var consumeResult = consumer.Consume(CancellationToken.None);

                    Console.WriteLine($"Consumer message: { consumeResult.Message.Value} topic: {consumeResult.Topic} Partition: {consumeResult.Partition}");
                }

                consumer.Close();
            }
        }

消费者的演示代码同样很简单。我们需要指定groupId,然后订阅topic。使用ConsumerBuilder构造一个consumer,然后调用Consume方法进行消费就可以。
注意:
这里默认是自动commit消费。你也可以根据情况手动提交commit。

运行一下

我们运行一个生产者进程,按照500ms的速度生产消息。运行三个consumer进行消费,可以看到消息被均匀的推送到三个consumer上去。

总结

以上简单的介绍了kafka的背景、安装方法、使用场景。还简单演示了如何使用.net来操作kafka。它可以当作流式计算平台来使用,也可以当作传统的消息队列使用。它当前非常流行,网上的资料也多如牛毛。官方也提供了简单易用的.net sdk ,为.net 平台集成kafka提供了便利。

以上就是.net core 集成 Kafka的步骤的详细内容,更多关于.Net Core 集成 Kafka的资料请关注编程网其它相关文章!

--结束END--

本文标题: .Net Core 集成 Kafka的步骤

本文链接: https://www.lsjlt.com/news/124771.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • .Net Core 集成 Kafka的步骤
    目录kafkabrokertopicpartitionconsumer group安装kafka.net 操作 kafka生产者消费者运行一下总结最近维护的一个系统并发有点高,所以想...
    99+
    2024-04-02
  • ASP.NET Core 集成 React SPA应用的步骤
    目录wwwroot\uiReactUIMiddleware运行一下总结AgileConfig的UI使用react重写快完成了。上次搞定了基于jwt的登录模式(AntDesign Pr...
    99+
    2024-04-02
  • .Net集成敏感词组件的步骤
    目录ToolGood.WordsValidationAttribute敏感词热重载效果结语ToolGood.Words 首先我们要使用的开源组件是 ToolGood.Words ...
    99+
    2024-04-02
  • kafka-2.11集群的搭建步骤
    这篇文章主要介绍“kafka-2.11集群的搭建步骤”,在日常操作中,相信很多人在kafka-2.11集群的搭建步骤问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”kafka-...
    99+
    2024-04-02
  • .Net中集成敏感词组件的操作步骤
    小编给大家分享一下.Net中集成敏感词组件的操作步骤,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!ToolGood.Words首先我们要使用的开源组件是 Tool...
    99+
    2023-06-15
  • .Net Core微服务网关Ocelot集成Consul
    有consul基础的都知道,consul可以发现新增的服务,剔除掉无效的服务,赋予应用自动伸缩的能力。而ocelot如果集成了consul,那ocelot也能拥有这些能力,还可以自主...
    99+
    2024-04-02
  • SpringBoot集成EasyExcel的步骤
    目录一 、EasyExcel简介二、常用注解 三、依赖四、监听五、接口导入Excel六、接口 导出Excel (HttpServletResponse response, HttpS...
    99+
    2024-04-02
  • kafka集群重启的步骤是什么
    重新启动Kafka集群通常需要以下步骤: 停止所有Kafka节点:首先需要停止所有Kafka节点,可以使用命令./bin/kaf...
    99+
    2024-03-15
    kafka
  • .NET Core部署为Windows服务的详细步骤
    目录一、概述二、.NET Core部署Windows服务1、项目中需要的配置2、服务器中使用sc.exe工具部署Windows服务三、NetCore项目部署为Linux服务1、安装 ...
    99+
    2022-11-13
    .NET Core部署Windows .NET Core Windows服务
  • springboot集成apidoc的步骤
    本篇内容主要讲解“springboot集成apidoc的步骤”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“springboot集成apidoc的步骤”吧!一、apidoc简介apidoc通过在你...
    99+
    2023-06-19
  • openinstall 的Android集成步骤
    openinstall的作用:        登入openinstall官网,没有账号就注册个没有应用就添加一个应用(选择高级版)        下载导入sdk...
    99+
    2023-06-04
  • springboot 集成dubbo的步骤详解
    目录第一步 搭建zookeeper环境第二步 springboot集成dubbo1.项目目录机构2.代码编写2.1 api目录2.2 consumer目录 web访问、接口调用以及d...
    99+
    2024-04-02
  • springboot集成mybatisplus的详细步骤
    目录Mybatis-Plus介绍简介特性(官网提供)一、引入POM依赖二、配置文件application.yml三、编写表映射实体类四、编写Mapper五、测试Controller六...
    99+
    2022-11-13
    springboot集成mybatisplus springboot mybatisplus
  • 用.NET生成数据库的方法步骤
    本篇内容介绍了“用.NET生成数据库的方法步骤”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!开篇语本文主要...
    99+
    2024-04-02
  • SpringBoot Admin2.0集成Arthas的实现步骤
    小编给大家分享一下SpringBoot Admin2.0集成Arthas的实现步骤,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!项目最初使用 Arthas 主要有...
    99+
    2023-06-14
  • SpringBoot Admin2.0 集成Arthas的实现步骤
    目录技术选型相关 整体结构实现效果 改造方案1. 整体工程结构2. 外部链接配置 3. 对应 Spring MVC controller 实现 4. Arthas Spring Bo...
    99+
    2024-04-02
  • spring集成mybatis的步骤是什么
    集成Spring和MyBatis的步骤如下:1. 引入相关依赖:在项目的pom.xml文件中添加Spring和MyBatis的依赖。...
    99+
    2023-10-18
    spring mybatis
  • SpringBoot2.x 集成 Thymeleaf的详细步骤
    这篇文章主要介绍“SpringBoot2.x 集成 Thymeleaf的详细步骤”,在日常操作中,相信很多人在SpringBoot2.x 集成 Thymeleaf的详细步骤问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对...
    99+
    2023-06-20
  • .Net Core微服务网关Ocelot基础介绍及集成
    网关是什么 简单来说,网关就是暴露给外部的请求入口。就和门卫一样,外面的人想要进来,必须要经过门卫。当然,网关并不一定是必须的,后端服务通过http也可以很好的向客户端提供服务。但是...
    99+
    2024-04-02
  • SpringBoot集成Caffeine缓存的实现步骤
    目录Maven依赖 配置 示例 Maven依赖 要开始使用咖啡因Caffeine和Spring Boot,我们首先添加spring-boot-starter-cache和咖啡因Ca...
    99+
    2024-04-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作