iis服务器助手广告广告
返回顶部
首页 > 资讯 > 后端开发 > GO >Go操作Kafka和Etcd方法详解
  • 419
分享到

Go操作Kafka和Etcd方法详解

2024-04-02 19:04:59 419人浏览 薄情痞子
摘要

目录操作kafkasarama下载及安装注意事项连接 kafka 发送消息连接 kafka 消费消息操作Etcd安装put和get操作watch操作安装报错:操作Kafka Kafk

操作Kafka

Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据,具有高性能、持久化、多副本备份、横向扩展等特点。本文介绍了如何使用 Go 语言发送和接收 kafka 消息。

sarama

Go 语言中连接 kafka 使用第三方库:GitHub.com/Shopify/sar…。

下载及安装

go get github.com/Shopify/sarama

注意事项

sarama v1.20 之后的版本加入了zstd压缩算法,需要用到 cgo,在 windows 平台编译时会提示类似如下错误:

# github.com/DataDog/zstd
exec: "GCc":executable file not found in %PATH%

所以在 Windows 平台请使用 v1.19 版本的 sarama。

连接 kafka 发送消息

package main
import (
	"fmt"
	"github.com/Shopify/sarama"
)
// 基于sarama第三方库开发的kafka client
func main() {
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
	config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
	config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回
	// 构造一个消息
	msg := &sarama.ProducerMessage{}
	msg.Topic = "WEB_log"
	msg.Value = sarama.StringEncoder("this is a test log")
	// 连接kafka
	client, err := sarama.NewSyncProducer([]string{"192.168.1.7:9092"}, config)
	if err != nil {
		fmt.Println("producer closed, err:", err)
		return
	}
	defer client.Close()
	// 发送消息
	pid, offset, err := client.SendMessage(msg)
	if err != nil {
		fmt.Println("send msg failed, err:", err)
		return
	}
	fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

连接 kafka 消费消息

package main
import (
	"fmt"
	"github.com/Shopify/sarama"
)
// kafka consumer
func main() {
	consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
	if err != nil {
		fmt.Printf("fail to start consumer, err:%v\n", err)
		return
	}
	partitionList, err := consumer.Partitions("web_log") // 根据topic取到所有的分区
	if err != nil {
		fmt.Printf("fail to get list of partition:err%v\n", err)
		return
	}
	fmt.Println(partitionList)
	for partition := range partitionList { // 遍历所有的分区
		// 针对每个分区创建一个对应的分区消费者
		pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetNewest)
		if err != nil {
			fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
			return
		}
		defer pc.AsyncClose()
		// 异步从每个分区消费信息
		go func(sarama.PartitionConsumer) {
			for msg := range pc.Messages() {
				fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
			}
		}(pc)
	}
}

操作Etcd

这里使用官方的etcd/clientv3包来连接etcd并进行相关操作。

安装

go get go.etcd.io/etcd/clientv3

put和get操作

put命令用来设置键值对数据,get命令用来根据key获取值。

package main
import (
	"context"
	"fmt"
	"time"
	"go.etcd.io/etcd/clientv3"
)
func main(){
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   []string{"122.51.79.172:2379"},
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		// handle error!
		fmt.Printf("connect to etcd failed, err:%v\n", err)
		return
	}
	fmt.Println("connect to etcd success")
	defer cli.Close()
	// put
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	_, err = cli.Put(ctx, "coolops", "test")
	cancel()
	if err != nil {
		fmt.Printf("put to etcd failed, err:%v\n", err)
		return
	}
	// get
	ctx, cancel = context.WithTimeout(context.Background(), time.Second)
	resp, err := cli.Get(ctx, "coolops")
	cancel()
	if err != nil {
		fmt.Printf("get from etcd failed, err:%v\n", err)
		return
	}
	for _, ev := range resp.Kvs {
		fmt.Printf("%s:%s\n", ev.Key, ev.Value)
	}
}

watch操作

watch用来获取未来更改的通知。

package main
import (
	"context"
	"fmt"
	"time"
	"go.etcd.io/etcd/clientv3"
)
func main(){
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   []string{"122.51.79.172:2379"},
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		// handle error!
		fmt.Printf("connect to etcd failed, err:%v\n", err)
		return
	}
	fmt.Println("connect to etcd success")
	defer cli.Close()
	// watch 操作,返回的是一个通道
	rch := cli.Watch(context.Background(), "coolops") // <-chan WatchResponse
	for wresp := range rch {
		for _, ev := range wresp.Events {
			fmt.Printf("Type: %s Key:%s Value:%s\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
		}
	}
}

安装报错:

go: finding github.com/coreos/pkg latest
# github.com/coreos/etcd/clientv3/balancer/resolver/endpoint
E:\DEV\Go\pkg\mod\github.com\coreos\etcd@v3.3.19+incompatible\clientv3\balancer\resolver\endpoint\endpoint.go:114:78: undefined: resolver.BuildOption
E:\DEV\Go\pkg\mod\github.com\coreos\etcd@v3.3.19+incompatible\clientv3\balancer\resolver\endpoint\endpoint.go:182:31: undefined: resolver.ResolveNowOption
# github.com/coreos/etcd/clientv3/balancer/picker
E:\DEV\Go\pkg\mod\github.com\coreos\etcd@v3.3.19+incompatible\clientv3\balancer\picker\err.go:37:44: undefined: balancer.PickOptions
E:\DEV\Go\pkg\mod\github.com\coreos\etcd@v3.3.19+incompatible\clientv3\balancer\picker\roundrobin_balanced.go:55:54: undefined: balancer.PickOptions

解决: 将go.mod里的prpc改为1.26.0版本

google.golang.org/grpc v1.26.0

以上就是Go操作Kafka和Etcd方法详解的详细内容,更多关于Go操作Kafka Etcd的资料请关注编程网其它相关文章!

您可能感兴趣的文档:

--结束END--

本文标题: Go操作Kafka和Etcd方法详解

本文链接: https://www.lsjlt.com/news/121099.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • Go操作Kafka和Etcd方法详解
    目录操作Kafkasarama下载及安装注意事项连接 kafka 发送消息连接 kafka 消费消息操作Etcd安装put和get操作watch操作安装报错:操作Kafka Kafk...
    99+
    2024-04-02
  • go操作Kafka使用示例详解
    目录1. Kafka介绍1.1 Kafka是什么1.2 Kafka的特点1.3 常用的场景1.4 Kafka中包含以下基础概念1.5 消息1.6 消息格式2. Kafka深层介绍2....
    99+
    2022-12-08
    go操作Kfaka go Kfaka
  • Go读写锁操作方法示例详解
    目录引言读写锁有很多方法读操作写操作引言 前面讲到,在资源竞争的时候可以使用互斥锁,保证了资源访问的唯一性,但也降低了性能,仔细分析一下场景,如果只是读取数据,无论多少个gorout...
    99+
    2024-04-02
  • Go语言学习之文件操作方法详解
    目录引言1. 打开和关闭文件2. 读取文件2.1 defer 语句2.2 手动宕机处理2.3 打开文件并获取内容2.4 bufio 读取文件2.5 ioutil 读取文件2.6 读取...
    99+
    2024-04-02
  • C#操作XML方法详解
    目录 using System.Xml; //初始化一个xml实例 XmlDocument xml=new XmlDocument(); //导入指定xml文件 xml.Load(...
    99+
    2024-04-02
  • python操作yaml的方法详解
    目录一、参考链接二、python类型转换为yaml三、yaml转换为python类型总结一、参考链接 https://pyyaml.org/wiki/PyYAMLDocumentat...
    99+
    2024-04-02
  • GO的锁和原子操作的示例详解
    目录GO的锁和原子操作分享锁是什么锁是用来做什么的互斥锁互斥锁 - 解决问题读写锁我们先来写一个读写锁的DEMO自旋锁和互斥锁的区别如何选择锁啥是原子操作总结GO的锁和原子操作分享 ...
    99+
    2023-02-24
    GO锁 原子操作 GO锁 GO 原子操作
  • PostgreSql JDBC事务操作方法详解
    目录JDBC事务相关方法简介禁用自动提交模式提交事务回滚事务PostgreSQL JDBC 事务示例JDBC事务相关方法简介 本文将借助示例,简单讲解下JDBC操作Pg事务的流程。 首先来简单讲解下事务的定义:为了确保两...
    99+
    2022-11-19
    PostgreSql JDBC事务操作 PostgreSql JDBC
  • 详解python中的IO操作方法
    目录python文件I/Oraw_input函数input函数打开和关闭文件open 函数file对象的属性close()方法write()方法read()方法Python with...
    99+
    2024-04-02
  • Jquery操作DOM元素方法详解
    一、文本输入框: text <input type=”text” value=”99.com” size=12 id=”input1” /> 1、获取文本值: $("#i...
    99+
    2024-04-02
  • Laravel操作redis和缓存操作详解
    目录一:操作redis1:redis拓展安装2:配置redis3:操作redis二:缓存操作1:缓存配置2:缓存操作一:操作redis 1:redis拓展安装 composer re...
    99+
    2023-02-13
    Laravel操作redis Laravel 缓存操作 Laravel redis
  • 详解操作cookie的原生方法cookieStore
    目录1. 平时如何操作 cookie2. 新方式 cookieStore2.1 基本方法2.2 设置 cookie2.3 获取 cookie2.4 获取所有的 cookie2.5 删...
    99+
    2024-04-02
  • React使用refs操作DOM方法详解
    在react框架 甚至说是三大框架中都是不太支持大家直接去操作dom的 因为也没什么必要 当然也会有特殊情况 例如视频播放 强制动画 第三方插件的一些渲染或初始化 官方也给了我们对应...
    99+
    2022-11-13
    React操作DOM React refs操作DOM
  • 详解C#操作XML的方法总结
    本文的主要模块为: 1.生成xml文件 2.遍历xml文件的节点信息 3.修改xml文件的节点信息 4.向xml文件添加节点信息 5.删除指定xml文件的节点信息 假设我们需要设计出...
    99+
    2022-11-13
    C#操作XML方法 C#操作XML C# XML
  • C++资源管理操作方法详解
    目录以对象管理资源在资源管理类中小心copy行为在资源管理类中提供对原始资源的访问成对使用new和delete时要采用相同形式以独立语句将new对象置入智能指针以对象管理资源 cla...
    99+
    2024-04-02
  • SpringDataJPA详解增删改查操作方法
    目录1、服务层调用dao继承的接口中的方法2、使用jpql语句进行查询3、可以引入原生的sql语句4、根据jpa规定的特殊命名方法完成查询5、动态查询1、服务层调用dao继承的接口中...
    99+
    2024-04-02
  • Go语言类方法和对象方法详解
    go语言中,类方法(type)作用于整个结构体类型,用于执行不操作具体实例的操作。对象方法(receiver)绑定到具体实例,用于操作实例数据。实战案例中,类方法用于创建和获取员工信息,...
    99+
    2024-04-03
    方法 go语言
  • 使用GO操作MongoDB的方法
    目录安装MongoDB驱动程序连接MongoDB列出所有数据库从MongDB中查询数据查询单个文档查询多个文档更新多个文档删除MongoDB文档获取MongoDB服务状态安装Mong...
    99+
    2024-04-02
  • C#操作INI文件的方法详解
    目录INI文件介绍kernel32Demo案例实现功能程序代码扩展作用本文主要介绍通过调用kernel32函数,实现对ini文件的读取和写入。 INI文件介绍 INI文件全称是Ini...
    99+
    2022-11-13
    C#操作INI文件 C#操作INI C# INI文件
  • Go操作redis与redigo的方法
    这篇文章主要介绍了Go操作redis与redigo的方法的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇Go操作redis与redigo的方法文章都会有所收获,下面我们一起来看看吧。Go-操作redis安装gol...
    99+
    2023-06-30
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作