iis服务器助手广告广告
返回顶部
首页 > 资讯 > 精选 >SQL Server CDC配合Kafka Connect监听数据变化的示例分析
  • 379
分享到

SQL Server CDC配合Kafka Connect监听数据变化的示例分析

2023-06-22 06:06:18 379人浏览 泡泡鱼
摘要

这篇文章主要为大家展示了“sql Server CDC配合kafka Connect监听数据变化的示例分析”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“SQL&nb

这篇文章主要为大家展示了“sql Server CDC配合kafka Connect监听数据变化的示例分析”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“SQL Server CDC配合Kafka Connect监听数据变化的示例分析”这篇文章吧。

写在前面

  进入主题,通常企业为了实现数据统计、数据分析数据挖掘、解决信息孤岛等全局数据的系统化运作管理 ,为BI、经营分析、决策支持系统等深度开发应用奠定基础,挖掘数据价值 ,企业会开始着手建立数据仓库,数据中台。而这些数据来源则来自于企业的各个业务系统的数据或爬取外部的数据,从业务系统数据到数据仓库的过程就是一个ETL(Extract-TransfORM-Load)行为,包括了采集、清洗、数据转换等主要过程,通常异构数据抽取转换使用Sqoop、DataX等,日志采集Flume、Logstash、Filebeat等。

  数据抽取分为全量抽取和增量抽取,全量抽取类似于数据迁移或数据复制,全量抽取很好理解;增量抽取在全量的基础上做增量,只监听、捕捉动态变化的数据。如何捕捉数据的变化是增量抽取的关键,一是准确性,必须保证准确的捕捉到数据的动态变化,二是性能,不能对业务系统造成太大的压力。

增量抽取方式

  通常增量抽取有几种方式,各有优缺点。

1. 触发器

  在源数据库上的目标表创建触发器,监听增、删、改操作,捕捉到数据的变更写入临时表。

优点:操作简单、规则清晰,对源表不影响;

缺点:对源数据库有侵入,对业务系统有一定的影响;

2. 全表比对

  在ETL过程中,抽取方建立临时表待全量抽取存储,然后在进行比对数据。

优点:对源数据库、源表都无需改动,完全交付ETL过程处理,统一管理;

缺点:ETL效率低、设计复杂,数据量越大,速度越慢,时效性不确定;

3. 全表删除后再插入

  在抽取数据之前,先将表中数据清空,然后全量抽取。

优点:ETL 操作简单,速度快。

缺点:全量抽取一般采取T+1的形式,抽取数据量大的表容易对数据库造成压力;

4. 时间戳

  时间戳的方式即在源表上增加时间戳列,对发生变更的表进行更新,然后根据时间戳进行提取。

优点:操作简单,ELT逻辑清晰,性能比较好;

缺点:对业务系统有侵入,数据库表也需要额外增加字段。对于老的业务系统可能不容易做变更。

5. CDC方式

  变更数据捕获Change Data Capture(简称CDC),SQLServer为实时更新数据同步提供了CDC机制,类似于Mysql的binlog,将数据更新操作维护到一张CDC表中。开启CDC的源表在插入INSERT、更新UPDATE和删除DELETE活动时会插入数据到日志表中。cdc通过捕获进程将变更数据捕获到变更表中,通过cdc提供的查询函数,可以捕获这部分数据。详情可以查看官方介绍:关于变更数据捕获 (SQL Server)

SQL Server CDC配合Kafka Connect监听数据变化的示例分析

优点:提供易于使用的api 来设置CDC 环境,缩短ETL 的时间,无需修改业务系统表结构。

缺点:受数据库版本的限制,实现过程相对复杂。

CDC增量抽取

先决条件

已搭建好Kafka集群ZooKeeper集群;

源数据库支持CDC,版本采用开发版或企业版。

案例环境:

ubuntu 20.04

Kafka2.13-2.7.0

Zookeeper 3.6.2

SQL Server 2012

步骤

  除了数据库开启CDC支持以外,主要还是要将变更的数据通过Kafka Connect传输数据,Debezium是目前官方推荐的连接器,它支持绝大多数主流数据库:Mysqlpostgresql、SQL Server、oracle等等,详情查看Connectors。

1. 数据库步骤

开启数据库CDC支持

  在源数据库执行以下命令:

EXEC sys.sp_cdc_enable_db Go

  附上关闭语句:

exec sys.sp_cdc_disable_db

查询是否启用

select * from sys.databases where is_cdc_enabled = 1

创建测试数据表:(已有表则跳过此步骤)

create  table T_LiocDC(    ID int identity(1,1) primary key ,    Name nvarchar(16),    Sex bit,    CreateTime datetime,    UpdateTime datetime);

对源表开启CDC支持:

exec sp_cdc_enable_table @source_schema='dbo', @source_name='T_LioCDC', @role_name=null,@supports_net_changes = 1;

确认是否有权限访问CDC Table:

EXEC sys.sp_cdc_help_change_data_capture

SQL Server CDC配合Kafka Connect监听数据变化的示例分析

确认SQL Server Agent已开启:

EXEC master.dbo.xp_servicecontrol N'QUERYSTATE',N'SQLSERVERAGENT'

SQL Server CDC配合Kafka Connect监听数据变化的示例分析

  以上则完成对数据库的CDC操作。

2. Kafka步骤

  Kafka Connect的工作模式分为两种,分别是standalone模式和distributed模式。standalone用于单机测试,本文用distributed模式,用于生产环境。(Kafka必须先运行启动,再进行以下步骤进行配置。)

下载Sql Server Connector

  下载连接器后,创建一个文件夹来存放,解压到该目录下即可,例子路径:/usr/soft/kafka/kafka_2.13_2.7.0/plugins(记住这个路径,配置中要用到)

SQL Server CDC配合Kafka Connect监听数据变化的示例分析

下载地址:debezium-connector-sqlserver-1.5.0.Final-plugin.tar.gz

SQL Server CDC配合Kafka Connect监听数据变化的示例分析

编辑connect-distributed.properties配置

  修改Kafka connect配置文件,$KAFKA_HOME/config/connect-distributed.properties,变更内容如下:

//kafka集群ip+portbootstrap.servers=172.192.10.210:9092,172.192.10.211:9092,172.192.10.212:9092key.converter.schemas.enable=falsevalue.converter.schemas.enable=falseoffset.storage.topic=connect-offsetsoffset.storage.replication.factor=1offset.storage.partitions=3offset.storage.cleanup.policy=compactconfig.storage.topic=connect-configsconfig.storage.replication.factor=1status.storage.topic=connect-statusstatus.storage.replication.factor=1status.storage.partitions=3//刚刚下载连接器解压的路径plugin.path=/usr/soft/kafka/kafka_2.13_2.7.0/plugins

看到配置中有三个Topic,分别是

config.storage.topic:用以保存connector和task的配置信息,需要注意的是这个主题的分区数只能是1,而且是有多副本的。

offset.storage.topic:用以保存offset信息。

status.storage.topic:用以保存connetor的状态信息。

这些Topic可以不用创建,启动后会默认创建。

启动Kafka集群

  保存配置之后,将connect-distributed.properties分发到集群中,然后启动:

bin/connect-distributed.sh config/connect-distributed.properties

检查是否启动

  connector支持REST API的方式进行管理,所以用Post man或者Fiddler可以调用相关接口进行管理。检查是否启动:

SQL Server CDC配合Kafka Connect监听数据变化的示例分析

不用奇怪,上面配置集群的IP是172段,这里的192.168.1.177仍是我的集群中的一个服务器,因为服务器都使用了双网卡。因为还没有连接器相关配置,所以接口返回是一个空数组,接下来将新增一个连接器。

编写sqlserver-cdc-source.JSON

{    "name": "sqlserver-cdc-source",    "config": {        "connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",        "database.server.name" : "JnServer",        "database.hostname" : "172.192.20.2", --目标数据库的ip        "database.port" : "1433",  --目标数据库的端口        "database.user" : "sa",   --目标数据库的账号        "database.passWord" : "123456",  --密码        "database.dbname" : "Dis",  --目标数据库的数据库名称        "table.whitelist": "dbo.T_LioCDC", --监听表名         "schemas.enable" : "false",           "mode":"incrementing",  --增量模式         "incrementing.column.name": "ID", --增量列名        "database.history.kafka.bootstrap.servers" : "172.192.10.210:9092,172.192.10.211:9092,172.192.10.212", --kafka集群        "database.history.kafka.topic": "TopicTLioCDC",  --kafka topic内部使用,不是由消费者使用        "value.converter.schemas.enable":"false",        "value.converter":"org.apache.kafka.connect.json.JsonConverter"    }}//源文地址: https://www.cnblogs.com/EminemJK/p/14688907.html

还有其他额外的配置,可以参考官方文档。然后执行

SQL Server CDC配合Kafka Connect监听数据变化的示例分析

继续执行检查,就发现连接器已经成功配置了:

SQL Server CDC配合Kafka Connect监听数据变化的示例分析

其他API

GET /connectors – 返回所有正在运行的connector名。POST /connectors – 新建一个connector; 请求体必须是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必须包含你的connector的配置信息。GET /connectors/{name} – 获取指定connetor的信息。GET /connectors/{name}/config – 获取指定connector的配置信息。PUT /connectors/{name}/config – 更新指定connector的配置信息。GET /connectors/{name}/status – 获取指定connector的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。GET /connectors/{name}/tasks – 获取指定connector正在运行的task。GET /connectors/{name}/tasks/{taskid}/status – 获取指定connector的task的状态信息。PUT /connectors/{name}/pause – 暂停connector和它的task,停止数据处理知道它被恢复。PUT /connectors/{name}/resume – 恢复一个被暂停的connector。POST /connectors/{name}/restart – 重启一个connector,尤其是在一个connector运行失败的情况下比较常用POST /connectors/{name}/tasks/{taskId}/restart – 重启一个task,一般是因为它运行失败才这样做。DELETE /connectors/{name} – 删除一个connector,停止它的所有task并删除配置。//源文地址: Https://www.cnblogs.com/EminemJK/p/14688907.html

查看Topic

/usr/soft/kafka/kafka_2.13_2.7.0# bin/kafka-topics.sh --list --zookeeper localhost:2000

SQL Server CDC配合Kafka Connect监听数据变化的示例分析

TopicJnServer.dbo.T_LioCDC则是供我们消费的主题,启动一个消费者进行监听测试:

bin/kafka-console-consumer.sh --bootstrap-server 172.192.10.210:9092  --consumer-property group.id=group1 --consumer-property client.id=consumer-1  --topic JnServer.dbo.T_LioCDC

然后再源表进行一些列增删改操作,

--测试代码insert into T_LioCDC(name, sex, createtime,UpdateTime)  values ('A',1,getdate(),getdate())insert into T_LioCDC(name, sex, createtime,UpdateTime)  values ('B',0,getdate(),getdate())insert into T_LioCDC(name, sex, createtime,UpdateTime)  values ('C',1,getdate(),getdate())insert into T_LioCDC(name, sex, createtime,UpdateTime)  values ('D',0,getdate(),getdate())insert into T_LioCDC(name, sex, createtime,UpdateTime)  values ('E',1,getdate(),getdate())insert into T_LioCDC(name, sex, createtime,UpdateTime)  values ('F',1,getdate(),getdate())insert into T_LioCDC(name, sex, createtime,UpdateTime)  values ('G',0,getdate(),getdate())update T_LioCDCset Name='Lio.Huang',UpdateTime=getdate()where ID=7

SQL Server CDC配合Kafka Connect监听数据变化的示例分析

已经成功捕捉到数据的变更,对比几个操作Json,依次是insert、update、delete:

SQL Server CDC配合Kafka Connect监听数据变化的示例分析SQL Server CDC配合Kafka Connect监听数据变化的示例分析SQL Server CDC配合Kafka Connect监听数据变化的示例分析

以上是“SQL Server CDC配合Kafka Connect监听数据变化的示例分析”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注编程网精选频道!

--结束END--

本文标题: SQL Server CDC配合Kafka Connect监听数据变化的示例分析

本文链接: https://www.lsjlt.com/news/303054.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • SQL Server CDC配合Kafka Connect监听数据变化的示例分析
    这篇文章主要为大家展示了“SQL Server CDC配合Kafka Connect监听数据变化的示例分析”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“SQL&nb...
    99+
    2023-06-22
  • 解析SQL Server CDC配合Kafka Connect监听数据变化的问题
    写在前面   好久没更新Blog了,从CRUD Boy转型大数据开发,拉宽了不少的知识面,从今年年初开始筹备、组建、招兵买马,到现在稳定开搞中,期间踏过无数的火坑,也许除了这篇还很写...
    99+
    2024-04-02
  • VUE监听窗口变化事件的示例分析
    这篇文章将为大家详细讲解有关VUE监听窗口变化事件的示例分析,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。Vuejs 本身就是一个 MVVM 的框架。但是在监听 wind...
    99+
    2024-04-02
  • sql server跟踪数据库的示例分析
    今天就跟大家聊聊有关sql server跟踪数据库的示例分析,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。 select * FROM ...
    99+
    2024-04-02
  • sql server 2005数据修改的示例分析
    这篇文章主要为大家展示了“sql server 2005数据修改的示例分析”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“sql server 2005数据修改的...
    99+
    2024-04-02
  • SQL Server数据库分区分表的示例分析
    这篇文章主要介绍SQL Server数据库分区分表的示例分析,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!1、 需求说明将数据库Demo中的表按照日期字段进行水平分区分表。要求数据文件按一年一个文件存储,且...
    99+
    2023-06-14
  • SQL Server中Table字典数据查询SQL的示例分析
    这篇文章主要介绍SQL Server中Table字典数据查询SQL的示例分析,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!前言在数据库系统原理与设计(第3版)教科书中这样写道:数据库...
    99+
    2024-04-02
  • Sql Server数据库常用Transact-SQL脚本的示例分析
    这篇文章主要介绍了Sql Server数据库常用Transact-SQL脚本的示例分析,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。Tran...
    99+
    2024-04-02
  • SQL Server生成sql脚本并执行还原数据库的示例分析
    这篇文章将为大家详细讲解有关SQL Server生成sql脚本并执行还原数据库的示例分析,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。上次介绍过如何通过ba...
    99+
    2024-04-02
  • MySQL数据库性能优化之SQL优化的示例分析
    这篇文章将为大家详细讲解有关MySQL数据库性能优化之SQL优化的示例分析,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。  注:这篇文章是以 MySQL 为背景,很多内容...
    99+
    2024-04-02
  • SQL Server数据库中表名称、字段比较的示例分析
    这篇文章主要为大家展示了“SQL Server数据库中表名称、字段比较的示例分析”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“SQL Server数据库中表名称...
    99+
    2024-04-02
  • SQL Server 2008处理隐式数据类型转换的示例分析
    小编给大家分享一下SQL Server 2008处理隐式数据类型转换的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧! ...
    99+
    2024-04-02
  • 配置操作系统重启后Oracle数据库和监听自动启动的示例分析
    小编给大家分享一下配置操作系统重启后Oracle数据库和监听自动启动的示例分析,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧! --配置操作系统重启后,实例自动启...
    99+
    2024-04-02
  • MySQL 5.6下table_open_cache参数优化合理配置的示例分析
    这篇文章主要介绍MySQL 5.6下table_open_cache参数优化合理配置的示例分析,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!1、简介 table_cache是一个非常...
    99+
    2024-04-02
  • mysql数据库中my.cnf配置文件重要参数优化配置的示例分析
    小编给大家分享一下mysql数据库中my.cnf配置文件重要参数优化配置的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!...
    99+
    2024-04-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作