iis服务器助手广告广告
返回顶部
首页 > 资讯 > 后端开发 > JAVA >实战Java springboot 采用Flink CDC操作SQL Server数据库获取增量变更数据
  • 870
分享到

实战Java springboot 采用Flink CDC操作SQL Server数据库获取增量变更数据

数据库javaspringbootflinksqlserver 2023-09-01 09:09:07 870人浏览 泡泡鱼
摘要

目录 前言: 1、springboot引入依赖: 2、yml配置文件 3、创建SQL server CDC变更数据监听器 4、反序列化数据,转为变更JSON对象 5、CDC 数据实体类 6、自定义ApplicationContextUtil

目录

前言:

1、springboot引入依赖:

2、yml配置文件

3、创建SQL server CDC变更数据监听器

4、反序列化数据,转为变更JSON对象

5、CDC 数据实体类

6、自定义ApplicationContextUtil

7、自定义sink 交由spring管理,处理变更数据


前言:

        我的场景是从SQL Server数据库获取指定表的增量数据,查询了很多获取增量数据的方案,最终选择了flink的 flink-connector-sqlserver-cdc ,这个需要用到SQL Server 的CDC(变更数据捕获),通过CDC来获取增量数据,处理数据前需要对数据库进行配置,如果不清楚如何配置可以看看我这篇文章:《SQL Server数据库开启CDC变更数据捕获操作指引》

废话不多说,直接上干货,如有不足还请指正

1、SpringBoot引入依赖:

            1.16.0                            com.microsoft.sqlserver            mssql-jdbc            9.4.0.jre8                                    org.projectlombok            lombok            1.18.26                            org.apache.flink            flink-java            ${flink.version}                            org.apache.flink            flink-streaming-java            ${flink.version}                            org.apache.flink            flink-clients            ${flink.version}                            com.ververica            flink-connector-sqlserver-cdc            2.3.0                            org.apache.flink            flink-connector-kafka            ${flink.version}                            org.apache.flink            flink-table-planner-blink_2.11            1.13.6            

2、yml配置文件

spring:    datasource:    url: jdbc:sqlserver://127.0.0.1:1433;DatabaseName=HM_5001    username: sa    passWord: root    driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver# 实时同步SQL Server数据库配置CDC:  DataSource:    host: 127.0.0.1    port: 1433    database: HM_5001    tableList: dbo.t1,dbo.Tt2,dbo.t3,dbo.t4    username: sa    password: sa

3、创建SQL server CDC变更数据监听器

import com.ververica.cdc.connectors.sqlserver.SqlServerSource;import com.ververica.cdc.connectors.sqlserver.table.StartupOptions;import com.ververica.cdc.debezium.DebeziumSourceFunction;import lombok.extern.slf4j.Slf4j;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.springframework.beans.factory.annotation.Value;import org.springframework.boot.ApplicationArguments;import org.springframework.boot.ApplicationRunner;import org.springframework.stereotype.Component;import java.io.Serializable;@Component@Slf4jpublic class SQLServerCDCListener implements ApplicationRunner, Serializable {        @Value("${CDC.DataSource.host}")    private String host;    @Value("${CDC.DataSource.port}")    private String port;    @Value("${CDC.DataSource.database}")    private String database;    @Value("${CDC.DataSource.tableList}")    private String tableList;    @Value("${CDC.DataSource.username}")    private String username;    @Value("${CDC.DataSource.password}")    private String password;    private final DataChangeSink dataChangeSink;    public SQLServerCDCListener(DataChangeSink dataChangeSink) {        this.dataChangeSink = dataChangeSink;    }    @Override    public void run(ApplicationArguments args) throws Exception {        log.info("开始启动Flink CDC获取ERP变更数据......");        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setParallelism(1);        DebeziumSourceFunction dataChangeInfoMysqlSource = buildDataChangeSource();        DataStream streamSource = env                .addSource(dataChangeInfomysqlSource, "SQLServer-source")                .setParallelism(1);        streamSource.addSink(dataChangeSink);        env.execute("SQLServer-stream-cdc");    }        private DebeziumSourceFunction buildDataChangeSource() {        String[] tables = tableList.replace(" ", "").split(",");        return SqlServerSource.builder()                .hostname(host)                .port(Integer.parseInt(port))                .database(database) // monitor sqlserver database                .tableList(tables) // monitor products table                .username(username)                .password(password)                                .startupOptions(StartupOptions.latest())                .deserializer(new JSONDebeziumDeserializationSchema()) // converts SourceRecord to jsON String                .build();    }}

4、反序列化数据,转为变更JSON对象

import com.alibaba.fastjson.JSONObject;import com.ververica.cdc.debezium.DebeziumDeserializationSchema;import io.debezium.data.Envelope;import lombok.extern.slf4j.Slf4j;import org.apache.flink.api.common.typeinfo.TypeInfORMation;import org.apache.flink.util.Collector;import org.apache.kafka.connect.data.Field;import org.apache.kafka.connect.data.Schema;import org.apache.kafka.connect.data.Struct;import org.apache.kafka.connect.source.SourceRecord;import java.time.Instant;import java.time.LocalDateTime;import java.time.ZoneId;import java.util.List;import java.util.Optional;@Slf4jpublic class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema {    public static final String TS_MS = "ts_ms";    public static final String BEFORE = "before";    public static final String AFTER = "after";    public static final String SOURCE = "source";    public static final String CREATE = "CREATE";    public static final String UPDATE = "UPDATE";        @Override    public void deserialize(SourceRecord sourceRecord, Collector collector) {        try {            String topic = sourceRecord.topic();            String[] fields = topic.split("\\.");            String database = fields[1];            String tableName = fields[2];            Struct struct = (Struct) sourceRecord.value();            final Struct source = struct.getStruct(SOURCE);            DataChangeInfo dataChangeInfo = new DataChangeInfo();            dataChangeInfo.setBeforeData(getJsonObject(struct, BEFORE).toJSONString());            dataChangeInfo.setAfterData(getJsonObject(struct, AFTER).toJSONString());            // 获取操作类型  CREATE UPDATE DELETE  1新增 2修改 3删除            Envelope.Operation operation = Envelope.operationFor(sourceRecord);            String type = operation.toString().toUpperCase();            int eventType = type.equals(CREATE) ? 1 : UPDATE.equals(type) ? 2 : 3;            dataChangeInfo.setEventType(eventType);            dataChangeInfo.setDatabase(database);            dataChangeInfo.setTableName(tableName);            ZoneId zone = ZoneId.systemDefault();            Long timestamp = Optional.ofNullable(struct.get(TS_MS)).map(x -> Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis);            dataChangeInfo.setChangeTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), zone));            //7.输出数据            collector.collect(dataChangeInfo);        } catch (Exception e) {            log.error("SQLServer消息读取自定义序列化报错:{}", e.getMessage());            e.printStackTrace();        }    }        private JSONObject getJsonObject(Struct value, String fieldElement) {        Struct element = value.getStruct(fieldElement);        JSONObject jsonObject = new JSONObject();        if (element != null) {            Schema afterSchema = element.schema();            List fieldList = afterSchema.fields();            for (Field field : fieldList) {                Object afterValue = element.get(field);                jsonObject.put(field.name(), afterValue);            }        }        return jsonObject;    }    @Override    public TypeInformation getProducedType() {        return TypeInformation.of(DataChangeInfo.class);    }}

5、CDC 数据实体类

import lombok.Data;import java.io.Serializable;import java.time.LocalDateTime;@Datapublic class DataChangeInfo implements Serializable {        private String database;        private String tableName;        private LocalDateTime changeTime;        private Integer eventType;        private String beforeData;        private String afterData;}

6、自定义ApplicationContextUtil

import org.springframework.beans.BeansException;import org.springframework.context.ApplicationContext;import org.springframework.context.ApplicationContextAware;import org.springframework.stereotype.Component;import java.io.Serializable;@Componentpublic class ApplicationContextUtil implements ApplicationContextAware, Serializable {        private static ApplicationContext context;    @Override    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {        this.context = applicationContext;    }    public static ApplicationContext getApplicationContext() {        return context;    }    public static  T getBean(Class beanClass) {        return context.getBean(beanClass);    }}

7、自定义sink 交由spring管理,处理变更数据

import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import org.springframework.stereotype.Component;import lombok.extern.slf4j.Slf4j;@Component@Slf4jpublic class DataChangeSink extends RichSinkFunction {    private static final long serialVersionUID = -74375380912179188L;    private UserMapper userMapper;        @Override    public void open(Configuration parameters) throws Exception {        super.open(parameters);        userMapper = ApplicationContextUtil.getBean(UserMapper.class);    }    @Override    public void invoke(DataChangeInfo dataChangeInfo, Context context) {        log.info("收到变更原始数据:{}", dataChangeInfo);        // TODO 开始处理你的数据吧    }

以上是我亲自验证测试的结果,已发布生产环境,如有不足还请指正。

来源地址:https://blog.csdn.net/weixin_42717648/article/details/130148168

--结束END--

本文标题: 实战Java springboot 采用Flink CDC操作SQL Server数据库获取增量变更数据

本文链接: https://www.lsjlt.com/news/387971.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作