目录 前言: 1、springboot引入依赖: 2、yml配置文件 3、创建SQL server CDC变更数据监听器 4、反序列化数据,转为变更JSON对象 5、CDC 数据实体类 6、自定义ApplicationContextUtil
目录
我的场景是从SQL Server数据库获取指定表的增量数据,查询了很多获取增量数据的方案,最终选择了flink的 flink-connector-sqlserver-cdc ,这个需要用到SQL Server 的CDC(变更数据捕获),通过CDC来获取增量数据,处理数据前需要对数据库进行配置,如果不清楚如何配置可以看看我这篇文章:《SQL Server数据库开启CDC变更数据捕获操作指引》
废话不多说,直接上干货,如有不足还请指正
1.16.0 com.microsoft.sqlserver mssql-jdbc 9.4.0.jre8 org.projectlombok lombok 1.18.26 org.apache.flink flink-java ${flink.version} org.apache.flink flink-streaming-java ${flink.version} org.apache.flink flink-clients ${flink.version} com.ververica flink-connector-sqlserver-cdc 2.3.0 org.apache.flink flink-connector-kafka ${flink.version} org.apache.flink flink-table-planner-blink_2.11 1.13.6
spring: datasource: url: jdbc:sqlserver://127.0.0.1:1433;DatabaseName=HM_5001 username: sa passWord: root driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver# 实时同步SQL Server数据库配置CDC: DataSource: host: 127.0.0.1 port: 1433 database: HM_5001 tableList: dbo.t1,dbo.Tt2,dbo.t3,dbo.t4 username: sa password: sa
import com.ververica.cdc.connectors.sqlserver.SqlServerSource;import com.ververica.cdc.connectors.sqlserver.table.StartupOptions;import com.ververica.cdc.debezium.DebeziumSourceFunction;import lombok.extern.slf4j.Slf4j;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.springframework.beans.factory.annotation.Value;import org.springframework.boot.ApplicationArguments;import org.springframework.boot.ApplicationRunner;import org.springframework.stereotype.Component;import java.io.Serializable;@Component@Slf4jpublic class SQLServerCDCListener implements ApplicationRunner, Serializable { @Value("${CDC.DataSource.host}") private String host; @Value("${CDC.DataSource.port}") private String port; @Value("${CDC.DataSource.database}") private String database; @Value("${CDC.DataSource.tableList}") private String tableList; @Value("${CDC.DataSource.username}") private String username; @Value("${CDC.DataSource.password}") private String password; private final DataChangeSink dataChangeSink; public SQLServerCDCListener(DataChangeSink dataChangeSink) { this.dataChangeSink = dataChangeSink; } @Override public void run(ApplicationArguments args) throws Exception { log.info("开始启动Flink CDC获取ERP变更数据......"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DebeziumSourceFunction dataChangeInfoMysqlSource = buildDataChangeSource(); DataStream streamSource = env .addSource(dataChangeInfomysqlSource, "SQLServer-source") .setParallelism(1); streamSource.addSink(dataChangeSink); env.execute("SQLServer-stream-cdc"); } private DebeziumSourceFunction buildDataChangeSource() { String[] tables = tableList.replace(" ", "").split(","); return SqlServerSource.builder() .hostname(host) .port(Integer.parseInt(port)) .database(database) // monitor sqlserver database .tableList(tables) // monitor products table .username(username) .password(password) .startupOptions(StartupOptions.latest()) .deserializer(new JSONDebeziumDeserializationSchema()) // converts SourceRecord to jsON String .build(); }}
import com.alibaba.fastjson.JSONObject;import com.ververica.cdc.debezium.DebeziumDeserializationSchema;import io.debezium.data.Envelope;import lombok.extern.slf4j.Slf4j;import org.apache.flink.api.common.typeinfo.TypeInfORMation;import org.apache.flink.util.Collector;import org.apache.kafka.connect.data.Field;import org.apache.kafka.connect.data.Schema;import org.apache.kafka.connect.data.Struct;import org.apache.kafka.connect.source.SourceRecord;import java.time.Instant;import java.time.LocalDateTime;import java.time.ZoneId;import java.util.List;import java.util.Optional;@Slf4jpublic class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema { public static final String TS_MS = "ts_ms"; public static final String BEFORE = "before"; public static final String AFTER = "after"; public static final String SOURCE = "source"; public static final String CREATE = "CREATE"; public static final String UPDATE = "UPDATE"; @Override public void deserialize(SourceRecord sourceRecord, Collector collector) { try { String topic = sourceRecord.topic(); String[] fields = topic.split("\\."); String database = fields[1]; String tableName = fields[2]; Struct struct = (Struct) sourceRecord.value(); final Struct source = struct.getStruct(SOURCE); DataChangeInfo dataChangeInfo = new DataChangeInfo(); dataChangeInfo.setBeforeData(getJsonObject(struct, BEFORE).toJSONString()); dataChangeInfo.setAfterData(getJsonObject(struct, AFTER).toJSONString()); // 获取操作类型 CREATE UPDATE DELETE 1新增 2修改 3删除 Envelope.Operation operation = Envelope.operationFor(sourceRecord); String type = operation.toString().toUpperCase(); int eventType = type.equals(CREATE) ? 1 : UPDATE.equals(type) ? 2 : 3; dataChangeInfo.setEventType(eventType); dataChangeInfo.setDatabase(database); dataChangeInfo.setTableName(tableName); ZoneId zone = ZoneId.systemDefault(); Long timestamp = Optional.ofNullable(struct.get(TS_MS)).map(x -> Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis); dataChangeInfo.setChangeTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), zone)); //7.输出数据 collector.collect(dataChangeInfo); } catch (Exception e) { log.error("SQLServer消息读取自定义序列化报错:{}", e.getMessage()); e.printStackTrace(); } } private JSONObject getJsonObject(Struct value, String fieldElement) { Struct element = value.getStruct(fieldElement); JSONObject jsonObject = new JSONObject(); if (element != null) { Schema afterSchema = element.schema(); List fieldList = afterSchema.fields(); for (Field field : fieldList) { Object afterValue = element.get(field); jsonObject.put(field.name(), afterValue); } } return jsonObject; } @Override public TypeInformation getProducedType() { return TypeInformation.of(DataChangeInfo.class); }}
import lombok.Data;import java.io.Serializable;import java.time.LocalDateTime;@Datapublic class DataChangeInfo implements Serializable { private String database; private String tableName; private LocalDateTime changeTime; private Integer eventType; private String beforeData; private String afterData;}
import org.springframework.beans.BeansException;import org.springframework.context.ApplicationContext;import org.springframework.context.ApplicationContextAware;import org.springframework.stereotype.Component;import java.io.Serializable;@Componentpublic class ApplicationContextUtil implements ApplicationContextAware, Serializable { private static ApplicationContext context; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.context = applicationContext; } public static ApplicationContext getApplicationContext() { return context; } public static T getBean(Class beanClass) { return context.getBean(beanClass); }}
import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import org.springframework.stereotype.Component;import lombok.extern.slf4j.Slf4j;@Component@Slf4jpublic class DataChangeSink extends RichSinkFunction { private static final long serialVersionUID = -74375380912179188L; private UserMapper userMapper; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); userMapper = ApplicationContextUtil.getBean(UserMapper.class); } @Override public void invoke(DataChangeInfo dataChangeInfo, Context context) { log.info("收到变更原始数据:{}", dataChangeInfo); // TODO 开始处理你的数据吧 }
以上是我亲自验证测试的结果,已发布生产环境,如有不足还请指正。
来源地址:https://blog.csdn.net/weixin_42717648/article/details/130148168
--结束END--
本文标题: 实战Java springboot 采用Flink CDC操作SQL Server数据库获取增量变更数据
本文链接: https://www.lsjlt.com/news/387971.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
下载Word文档到电脑,方便收藏和打印~
2024-04-01
2024-04-03
2024-04-03
2024-01-21
2024-01-21
2024-01-21
2024-01-21
2023-12-23
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0