广告
返回顶部
首页 > 资讯 > 精选 >怎么使用canal+Kafka进行数据库同步操作
  • 771
分享到

怎么使用canal+Kafka进行数据库同步操作

2023-06-27 10:06:59 771人浏览 安东尼
摘要

这篇文章主要介绍了怎么使用canal+kafka进行数据库同步操作的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇怎么使用canal+Kafka进行数据库同步操作文章都会有所收获,下面我们一起来看看吧。平时工作中

这篇文章主要介绍了怎么使用canal+kafka进行数据库同步操作的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇怎么使用canal+Kafka进行数据库同步操作文章都会有所收获,下面我们一起来看看吧。

平时工作中数据库是我们经常使用的,在微服务拆分的架构中,各服务拥有自己的数据库,所以常常会遇到服务之间数据通信的问题。比如,B 服务数据库的数据来源于A服务的数据库;A 服务的数据有变更操作时,需要同步到 B 服务中。

第一种解决方案:

在代码逻辑中,有相关 A 服务数据写操作时,以调用接口的方式,调用 B 服务接口,B 服务再将数据写到新的数据库中。这种方式看似简单,但其实“坑”很多。在 A 服务代码逻辑中会增加大量这种调用接口同步的代码,增加了项目代码的复杂度,以后会越来越难维护。并且,接口调用的方式并不是一个稳定的方式,没有重试机制,没有同步位置记录,接口调用失败了怎么处理,突然的大量接口调用会产生的问题等,这些都要考虑并且在业务中处理。这里会有不少工作量。想到这里,就将这个方案排除了。

第二种解决方案:

通过数据库的binlog进行同步。这种解决方案,与 A 服务是独立的,不会和 A 服务有代码上的耦合。可以直接 tcp连接进行传输数据,优于接口调用的方式。 这是一套成熟的生产解决方案,也有不少binlog同步的中间件工具,所以我们关注的就是哪个工具能够更好的构建稳定、性能满足且易于高可用部署的方案。

经过调研,我们选择了canal。canal是阿里巴巴 MySQL binlog 增量订阅&消费组件,已经有在生产上实践的例子,并且方便的支持和其他常用的中间件组件组合,比如kafkaelasticsearch等,也有了canal-Go go语言的client库,满足我们在go上的需求。

工作流程

  1. Canal连接到 A 数据库,模拟slave

  2. canal-clientCanal建立连接,并订阅对应的数据库表

  3. A 数据库发生变更写入到binlogCanal向数据库发送dump请求,获取binlog并解析,发送解析后的数据给canal-client

  4. canal-client收到数据,将数据同步到新的数据库

Protocol Buffer的序列化速度还是很快的。反序列化后得到的数据,是每一行的数据,按照字段名和字段的值的结构,放到一个数组中 代码简单示例:

func Handler(entry protocol.Entry)  {var keys []stringrowChange := &protocol.RowChange{}proto.Unmarshal(entry.GetStoreValue(), rowChange)if rowChange != nil {eventType := rowChange.GetEventType()for _, rowData := range rowChange.GetRowDatas() { // 遍历每一行数据if eventType == protocol.EventType_DELETE || eventType == protocol.EventType_UPDATE {columns := rowData.GetBeforeColumns() // 得到更改前的所有字段属性} else if eventType == protocol.EventType_INSERT {columns := rowData.GetAfterColumns() // 得到更后前的所有字段属性}......}}}

遇到的问题

为了高可用和更高的性能,我们会创建多个canal-client构成一个集群,来进行解析并同步到新的数据库。这里就出现了一个比较重要的问题,如何保证canal-client集群解析消费binlog的顺序性呢?

我们使用的binlogrow模式。每一个写操作都会产生一条binlog日志。 举个简单的例子:插入了一条 a 记录,并且立马修改 a 记录。这样会有两个消息发送给canal-client,如果由于网络等原因,更新的消息早于插入的消息被处理了,还没有插入记录,更新操作的最后效果是失败的。

怎么办呢? canal可以和消息队列组合呀!而且支持kafkaRabbitMQRocketMQ多种选择,如此优秀。我们在消息队列这层来实现消息的顺序性。

选择canal+kafka方案

我们选择了消息队列的业界标杆: kafka UCloud提供了kafkarocketMQ消息队列产品服务,使用它们能够快速便捷的搭建起一套消息队列系统。加速开发,方便运维

下面就让我们来一探究竟:

选择kafka消息队列产品,并申请开通

![kafka消息队列](https://atts.yisu.com/attachments/image/20200817/1597643207499951.jpg "kafka消息队列")

开通完成后,在管理界面,创建kafka集群,根据自身需求,选择相应的硬件配置

![硬件配置](Https://atts.yisu.com/attachments/image/20200817/1597643247605810.jpg "硬件配置")

一个kafka + ZooKeeper集群就搭建出来了,给力!

![kafka+ZooKeeper集群](https://atts.yisu.com/attachments/image/20200817/1597643275823817.jpg "kafka+ZooKeeper集群")

并且包含了节点管理、Topic管理、Consumer Group管理,能够非常方便的直接在控制台对配置进行修改

监控视图方面,监控的数据包括kafka生成和消费QPS,集群监控,ZooKeeper的监控。能够比较完善的提供监控指标。

![监控指标](https://atts.yisu.com/attachments/image/20200817/1597643316579478.jpg "监控指标")

![监控指标](https://atts.yisu.com/attachments/image/20200817/1597643347799100.jpg "监控指标")

![监控指标](https://atts.yisu.com/attachments/image/20200817/1597643363690805.jpg "监控指标")

canal的kafka配置

canal配上kafka也非常的简单。 vi /usr/local/canal/conf/canal.properties

# 可选项: tcp(默认), kafka, RocketMQcanal.serverMode = kafka# ...# kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092canal.mq.servers = 127.0.0.1:9002canal.mq.retries = 0# flagMessage模式下可以调大该值, 但不要超过MQ消息体大小上限canal.mq.batchSize = 16384canal.mq.maxRequestSize = 1048576# flatMessage模式下请将该值改大, 建议50-200canal.mq.lingerMs = 1canal.mq.bufferMemory = 33554432# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)canal.mq.canalBatchSize = 50# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时canal.mq.canalGetTimeout = 100# 是否为flat JSON格式对象canal.mq.flatMessage = falsecanal.mq.compressionType = nonecanal.mq.acks = all# kafka消息投递是否使用事务canal.mq.transaction = false# mq confiGCanal.mq.topic=default# dynamic topic route by schema or table regex#canal.mq.dynamicTopic=mytest1.user,mytest2\\\\..*,.*\\\\..*canal.mq.dynamicTopic=mydatabase.mytablecanal.mq.partition=0# hash partition configcanal.mq.partitionsNum=3canal.mq.partitionHash=mydatabase.mytable

解决顺序消费问题

看到下面这一行配置

canal.mq.partitionHash=mydatabase.mytable

我们配置了kafkapartitionHash,并且我们一个Topic就是一个表。这样的效果就是,一个表的数据只会推到一个固定的partition中,然后再推给consumer进行消费处理,同步到新的数据库。通过这种方式,解决了之前碰到的binlog日志顺序处理的问题。这样即使我们部署了多个kafka consumer端,构成一个集群,这样consumer从一个partition消费消息,就是消费处理同一个表的数据。这样对于一个表来说,牺牲掉了并行处理,不过个人觉得,凭借kafka的性能强大的处理架构,我们的业务在kafka这个节点产生瓶颈并不容易。并且我们的业务目的不是实时一致性,在一定延迟下,两个数据库保证最终一致性。

(推荐微课:sql微课)

下图是最终的同步架构,我们在每一个服务节点都实现了集群化。全都跑在UCloudUk8s服务上,保证了服务节点的高可用性。

canal也是集群换,但是某一时刻只会有一台canal在处理binlog,其他都是冗余服务。当这台canal服务挂了,其中一台冗余服务就会切换到工作状态。同样的,也是因为要保证binlog的顺序读取,所以只能有一台canal在工作。

并且,我们还用这套架构进行缓存失效的同步。我们使用的缓存模式是:Cache-Aside。同样的,如果在代码中数据更改的地方进行缓存失效操作,会将代码变得复杂。所以,在上述架构的基础上,将复杂的触发缓存失效的逻辑放到kafka-client端统一处理,达到一定解耦的目的。

关于“怎么使用canal+Kafka进行数据库同步操作”这篇文章的内容就介绍到这里,感谢各位的阅读!相信大家对“怎么使用canal+Kafka进行数据库同步操作”知识都有一定的了解,大家如果还想学习更多知识,欢迎关注编程网精选频道。

--结束END--

本文标题: 怎么使用canal+Kafka进行数据库同步操作

本文链接: https://www.lsjlt.com/news/309882.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • 怎么使用canal+Kafka进行数据库同步操作
    这篇文章主要介绍了怎么使用canal+Kafka进行数据库同步操作的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇怎么使用canal+Kafka进行数据库同步操作文章都会有所收获,下面我们一起来看看吧。平时工作中...
    99+
    2023-06-27
  • 使用phonegap怎么对数据库进行操作
    使用phonegap怎么对数据库进行操作?很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。实例如下:<!DOCTYPE html> ...
    99+
    2023-06-09
  • 怎么使用Kafka进行数据分析
    使用Kafka进行数据分析可以分为以下几个步骤:1. 安装和配置Kafka:首先需要下载和安装Kafka,并进行相关的配置。配置文件...
    99+
    2023-10-21
    kafka
  • 使用Node怎么对MongoDB数据库进行操作
    这篇文章给大家介绍使用Node怎么对MongoDB数据库进行操作,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。1.使用 MongoDB模块 进行操作 首先在工作目录安装 mo...
    99+
    2022-10-18
  • 使用gorm怎么对MySql数据库进行操作
    本篇文章给大家分享的是有关使用gorm怎么对MySql数据库进行操作,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。1、表中字段区分大小写的设置在使用gorm查询的时候,会出现账...
    99+
    2023-06-07
  • 如何使用MySQL进行跨数据库的数据同步?
    如何使用MySQL进行跨数据库的数据同步?在现代的软件开发中,数据库的使用无处不在。而随着软件项目的增长,数据的同步和备份变得越来越重要。MySQL是一个强大的关系型数据库管理系统,同时也提供了一些可靠的方法来实现跨数据库的数据同步。本文将...
    99+
    2023-10-22
    数据同步 MySQL跨数据库 跨数据库同步
  • 怎么使用PHP查询数据库数值进行操作
    这篇文章主要介绍“怎么使用PHP查询数据库数值进行操作”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“怎么使用PHP查询数据库数值进行操作”文章能帮助大家解决问题。首先,我们需要连接到数据库。PHP提...
    99+
    2023-07-05
  • 使用Apache SeaTunnel进行数据库同步(MySQL to MySQL)
    Apache SeaTunnel 起到的主要作用是什么? 目前,大数据体系里有各种各样的数据引擎,有大数据生态的 Hadoop、Hive、Kudu、Kafka、HDFS,也有泛大数据库体系的 MongoDB、Redis、ClickHouse...
    99+
    2023-08-17
    数据库 mysql seaTunnel
  • 怎么运用PHP进行数据库操作类
    这篇文章主要为大家展示了“怎么运用PHP进行数据库操作类”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“怎么运用PHP进行数据库操作类”这篇文章吧。PHP高级实战...
    99+
    2022-10-18
  • 怎么使用PHP进行数据库查找和修改操作
    今天小编给大家分享一下怎么使用PHP进行数据库查找和修改操作的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。一、数据库连接在进...
    99+
    2023-07-05
  • Android中怎么利用LitePal对数据库进行操作
    Android中怎么利用LitePal对数据库进行操作,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。首先在app/build.grade文件中编辑dependencies{.....
    99+
    2023-06-04
  • thinkphp如何使用ORM进行数据库操作
    本篇内容介绍了“thinkphp如何使用ORM进行数据库操作”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!ThinkPHP 是一款基于 PH...
    99+
    2023-07-06
  • ODBC中怎么利用CRecordset类对数据库进行操作
    ODBC中怎么利用CRecordset类对数据库进行操作,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。 1.MFC中的ODBC类...
    99+
    2022-10-18
  • oracle怎么用shell脚本链接数据库进行操作
    本篇内容主要讲解“oracle怎么用shell脚本链接数据库进行操作”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“oracle怎么用shell脚本链接数据库进行...
    99+
    2022-10-18
  • 使用Golang怎么操作数据库
    使用Golang怎么操作数据库?针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。golang适合做什么golang可以做服务器端开发,但golang很适合做日志处理、数据打包、...
    99+
    2023-06-14
  • 使用Apache Doris自动同步整个 MySQL/Oracle 数据库进行数据分析
    Flink-Doris-Connector 1.4.0 允许用户一步将包含数千个表的整个数据库(MySQL或Oracle )摄取到Apache Doris(一种实时分析数据库)中。 通过内置的Flink CDC,连接器可以直接将上游源的表...
    99+
    2023-09-27
    Doris hadoop mysql oracle
  • php怎么进行数据库查询和修改操作
    PHP 是一种广泛使用的服务器端脚本语言,用于 Web 开发。当开发者需要从数据库中读取数据或对数据库进行修改时,PHP 提供了一些简单而强大的功能,这些功能使得查询和修改数据库变得轻松自如。本文将介绍一些 PHP 中常用的数据库查询和修改...
    99+
    2023-05-14
  • 如何使用PHP查询数据库数值进行操作
    PHP是一种非常流行的服务器端编程语言,广泛用于Web开发。在Web开发中,PHP通常与数据库一起使用,以更好地管理和操作数据。在本文中,我们将介绍如何使用PHP查询数据库数值进行操作。首先,我们需要连接到数据库。PHP提供了许多库和扩展来...
    99+
    2023-05-14
    php 数据库
  • 利用mybatis怎么对数据库进行增删改查操作
    这篇文章将为大家详细讲解有关利用mybatis怎么对数据库进行增删改查操作,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。所需要用到的其他工具或技术:项目管理工具 : Maven测试运行工具 ...
    99+
    2023-05-31
    mybatis
  • Java中怎么利用Streams对数据库进行查询操作
    Java中怎么利用Streams对数据库进行查询操作,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。示例数据库我们使用的示例数据库是Saki...
    99+
    2022-10-18
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作