iis服务器助手广告
返回顶部
首页 > 资讯 > 服务器 >Flink CDC整库同步(多表异构同步)
  • 135
分享到

Flink CDC整库同步(多表异构同步)

flink大数据 2023-10-03 22:10:15 135人浏览 薄情痞子
摘要

前言 flinkcdc单表同步比较简单,按照官方案例基本都能成功,多表异构同步、整库同步这块一直想尝试一下,社区说使用api可以做到,但是一直没能白嫖到可行方案(代码),然后自己动手尝试了下,咳咳,无

前言

flinkcdc单表同步比较简单,按照官方案例基本都能成功,多表异构同步、整库同步这块一直想尝试一下,社区说使用api可以做到,但是一直没能白嫖到可行方案(代码),然后自己动手尝试了下,咳咳,无奈技术太菜,java各种语法都搞的不是太明白,时间跨度蛮久,中间遇到了不少问题,中途偶然间在群里看到了很久很久以前群友发的一份同步方案,可惜缺少了反序列化的过程,借鉴过来改巴改巴(也改了好几个星期,太菜了),勉强是能跑了,分享出来,能帮到大家一点也就很好了。

方案思路

这个方案的整体思路我先说一下(大佬的思路,我借鉴的),首先我们先使用Mysqlcatalog获取到各个表的信息(列名、列类型之类的),然后创建相应的sink table,然后flinkcdc的DataStream是提供了整库获取数据的能力的,所以我们就采用DataStream的方式拿到数据,然后在自定义反序列化里形成的输出,得到DataStream<,然后根据tableName将这个流拆分(过滤),就相当于一个tablename对应一个自己的DataStream,然后将每个流转为一个sourcetable,然后insert into sinktable select * from sourcetable,然后…gameover。

走起:

flink版本:1.15.2(1.15以下版本貌似还没有mysqlcatalog,如果要使用低版本,代码需要调整一下)
flink cdc版本:2.3.0
注意:需先在sink库创建好相应的表(之前忘记写了)

不巴拉了,直接上代码,场景是mysql -> mysql,sink端如果是其他数据库理论上应该是一样,source表需要有主键,这是flinkcdc底层约定好的,没有会报错。

package com.cityos;import com.ververica.cdc.connectors.mysql.source.MySqlSource;import com.ververica.cdc.connectors.mysql.table.StartupOptions;import org.apache.commons.lang3.StringUtils;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.api.common.typeinfo.TypeInfORMation;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.api.java.typeutils.RowTypeInfo;import org.apache.flink.calcite.shaded.com.Google.common.collect.Maps;import org.apache.flink.connector.jdbc.catalog.MySqlCatalog;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Schema;import org.apache.flink.table.api.StatementSet;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.table.catalog.DefaultCatalogTable;import org.apache.flink.table.catalog.ObjectPath;import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;import org.apache.flink.table.types.DataType;import org.apache.flink.table.types.logical.LogicalType;import org.apache.flink.table.types.logical.RowType;import org.apache.flink.types.Row;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.ArrayList;import java.util.List;import java.util.Map;public class FlinkCdcMultiSyncJdbc {    private static final Logger log = LoggerFactory.getLogger(FlinkCdcMultiSyncJdbc.class);    public static void main(String[] args) throws Exception {       // source端连接信息        String userName = "root";        String passWord = "18772247265Ldy@";        String host = "localhost";        String db = "flinktest1";       // 如果是整库,tableList = ".*"        String tableList = "lidy.NLP_category,lidy.nlp_classify_man_made3";        int port = 33306;       // sink连接信息模板        String sink_url = "jdbc:mysql://localhost:33306/flinktest?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai";        String sink_username = "root";        String sink_password = "18772247265Ldy@";        String connectorWithBody =                " with (\n" +                        " 'connector' = 'jdbc',\n" +                        " 'url' = '${sink_url}',\n" +                        " 'username' = '${sink_username}',\n" +                        " 'password' = '${sink_password}',\n" +                        " 'table-name' = '${tableName}'\n" +                        ")";        connectorWithBody = connectorWithBody.replace("${sink_url}", sink_url)                .replace("${sink_username}", sink_username)                .replace("${sink_password}", sink_password);        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.enableCheckpointing(3000);        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);        // 注册同步的库对应的catalog        MySqlCatalog mysqlCatalog = new MySqlCatalog("mysql-catalog", db, userName, passWord, String.format("jdbc:mysql://%s:%d", host, port));        List<String> tables = new ArrayList<>();       // 如果整库同步,则从catalog里取所有表,否则从指定表中取表名        if (".*".equals(tableList)) {            tables = mysqlCatalog.listTables(db);        } else {            String[] tableArray = tableList.split(",");            for (String table : tableArray) {                tables.add(table.split("\\.")[1]);            }        }       // 创建表名和对应RowTypeInfo映射的Map        Map<String, RowTypeInfo> tableTypeInformationMap = Maps.newConcurrentMap();        Map<String, DataType[]> tableDataTypesMap = Maps.newConcurrentMap();        Map<String, RowType> tableRowTypeMap = Maps.newConcurrentMap();        for (String table : tables) {            // 获取mysql catalog中注册的表            ObjectPath objectPath = new ObjectPath(db, table);            DefaultCatalogTable catalogBaseTable = (DefaultCatalogTable) mysqlCatalog.getTable(objectPath);            // 获取表的Schema            Schema schema = catalogBaseTable.getUnresolvedSchema();            // 获取表中字段名列表            String[] fieldNames = new String[schema.getColumns().size()];            // 获取DataType            DataType[] fieldDataTypes = new DataType[schema.getColumns().size()];            LogicalType[] logicalTypes = new LogicalType[schema.getColumns().size()];            // 获取表字段类型            TypeInformation<?>[] fieldTypes = new TypeInformation[schema.getColumns().size()];            // 获取表的主键            List<String> primaryKeys = schema.getPrimaryKey().get().getColumnNames();            for (int i = 0; i < schema.getColumns().size(); i++) {                Schema.UnresolvedPhysicalColumn column = (Schema.UnresolvedPhysicalColumn) schema.getColumns().get(i);                fieldNames[i] = column.getName();                fieldDataTypes[i] = (DataType) column.getDataType();                fieldTypes[i] = InternalTypeInfo.of(((DataType) column.getDataType()).getLogicalType());                logicalTypes[i] = ((DataType) column.getDataType()).getLogicalType();            }            RowType rowType = RowType.of(logicalTypes, fieldNames);            tableRowTypeMap.put(table, rowType);            // 组装sink表ddl sql            StringBuilder stmt = new StringBuilder();            String tableName = table;            String jdbcSinkTableName = String.format("jdbc_sink_%s", tableName);            stmt.append("create table ").append(jdbcSinkTableName).append("(\n");            for (int i = 0; i < fieldNames.length; i++) {                String column = fieldNames[i];                String fieldDataType = fieldDataTypes[i].toString();                stmt.append("\t").append(column).append(" ").append(fieldDataType).append(",\n");            }            stmt.append(String.format("PRIMARY KEY (%s) NOT ENFORCED\n)", StringUtils.join(primaryKeys, ",")));            String formatJdbcSinkWithBody = connectorWithBody                    .replace("${tableName}", jdbcSinkTableName);            String createSinkTableDdl = stmt.toString() + formatJdbcSinkWithBody;            // 创建sink表            log.info("createSinkTableDdl: {}", createSinkTableDdl);            tEnv.executeSql(createSinkTableDdl);            tableDataTypesMap.put(tableName, fieldDataTypes);            tableTypeInformationMap.put(tableName, new RowTypeInfo(fieldTypes, fieldNames));        }       // 监控mysql binlog        MySqlSource mySqlSource = MySqlSource.<Tuple2<String, Row>>builder()                .hostname(host)                .port(port)                .databaseList(db)                .tableList(tableList)                .username(userName)                .password(passWord)                .deserializer(new CustomDebeziumDeserializer(tableRowTypeMap))                .startupOptions(StartupOptions.initial())                .build();        SingleOutputStreamOperator<Tuple2<String, Row>> dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql cdc").disableChaining();        StatementSet statementSet = tEnv.createStatementSet();        // dataStream转Table,创建临时视图,插入sink表        for (Map.Entry<String, RowTypeInfo> entry : tableTypeInformationMap.entrySet()) {            String tableName = entry.geTKEy();            RowTypeInfo rowTypeInfo = entry.getValue();            SingleOutputStreamOperator<Row> mapStream = dataStreamSource.filter(data -> data.f0.equals(tableName)).map(data -> data.f1, rowTypeInfo);            Table table = tEnv.fromChangelogStream(mapStream);            String temporaryViewName = String.format("t_%s", tableName);            tEnv.createTemporaryView(temporaryViewName, table);            String sinkTableName = String.format("jdbc_sink_%s", tableName);            String insertSql = String.format("insert into %s select * from %s", sinkTableName, temporaryViewName);            log.info("add insertSql for {},sql: {}", tableName, insertSql);            statementSet.addInsertSql(insertSql);        }        statementSet.execute();    }}

对应的反序列化代码

package com.cityos;import com.ververica.cdc.debezium.DebeziumDeserializationSchema;import com.ververica.cdc.debezium.table.DeserializationRuntimeConverter;import com.ververica.cdc.debezium.utils.TemporalConversions;import io.debezium.data.Envelope;import io.debezium.data.SpecialValueDecimal;import io.debezium.data.VariableScaleDecimal;import io.debezium.time.*;import org.apache.flink.api.common.typeinfo.TypeHint;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;import org.apache.flink.table.data.DecimalData;import org.apache.flink.table.data.RowData;import org.apache.flink.table.data.StringData;import org.apache.flink.table.data.TimestampData;import org.apache.flink.table.types.logical.DecimalType;import org.apache.flink.table.types.logical.LogicalType;import org.apache.flink.table.types.logical.RowType;import org.apache.flink.types.Row;import org.apache.flink.types.RowKind;import org.apache.flink.util.Collector;import org.apache.kafka.connect.data.Decimal;import org.apache.kafka.connect.data.Field;import org.apache.kafka.connect.data.Schema;import org.apache.kafka.connect.data.Struct;import org.apache.kafka.connect.source.SourceRecord;import java.math.BigDecimal;import java.NIO.ByteBuffer;import java.time.Instant;import java.time.LocalDateTime;import java.time.ZoneId;import java.util.Map;public class CustomDebeziumDeserializer implements DebeziumDeserializationSchema {        private final Map<String, RowType> tableRowTypeMap;    private Map<String, DeserializationRuntimeConverter> physicalConverterMap = Maps.newConcurrentMap();    CustomDebeziumDeserializer(Map tableRowTypeMap) {        this.tableRowTypeMap = tableRowTypeMap;        for (String tablename : this.tableRowTypeMap.keySet()) {            RowType rowType = this.tableRowTypeMap.get(tablename);            DeserializationRuntimeConverter physicalConverter =createNotNullConverter(rowType);            this.physicalConverterMap.put(tablename,physicalConverter);        }    }    @Override    public void deserialize(SourceRecord record, Collector out) throws Exception {        Envelope.Operation op = Envelope.operationFor(record);        Struct value = (Struct) record.value();        Schema valueSchema = record.valueSchema();        Struct source = value.getStruct("source");        String tablename = source.get("table").toString();        DeserializationRuntimeConverter physicalConverter = physicalConverterMap.get(tablename);        if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {            Row insert = extractAfterRow(value, valueSchema, physicalConverter);            insert.setKind(RowKind.INSERT);            out.collect(Tuple2.of(tablename,insert));        } else if (op == Envelope.Operation.DELETE) {            Row delete = extractBeforeRow(value, valueSchema, physicalConverter);            delete.setKind(RowKind.DELETE);            out.collect(Tuple2.of(tablename,delete));        } else {            Row before = extractBeforeRow(value, valueSchema, physicalConverter);            before.setKind(RowKind.UPDATE_BEFORE);            out.collect(Tuple2.of(tablename,before));            Row after = extractAfterRow(value, valueSchema, physicalConverter);            after.setKind(RowKind.UPDATE_AFTER);            out.collect(Tuple2.of(tablename,after));        }    }    private Row extractAfterRow(Struct value, Schema valueSchema, DeserializationRuntimeConverter physicalConverter) throws Exception {        Schema afterSchema = valueSchema.field(Envelope.FieldName.AFTER).schema();        Struct after = value.getStruct(Envelope.FieldName.AFTER);        return (Row) physicalConverter.convert(after, afterSchema);    }    private Row extractBeforeRow(Struct value, Schema valueSchema, DeserializationRuntimeConverter physicalConverter) throws Exception {        Schema beforeSchema = valueSchema.field(Envelope.FieldName.BEFORE).schema();        Struct before = value.getStruct(Envelope.FieldName.BEFORE);        return (Row) physicalConverter.convert(before, beforeSchema);    }    @Override    public TypeInformation<Tuple2<String, Row>> getProducedType() {        return TypeInformation.of(new TypeHint<Tuple2<String, Row>>() {        });    }    public static DeserializationRuntimeConverter createNotNullConverter(LogicalType type) {        switch (type.getTypeRoot()) {            case NULL:                return new DeserializationRuntimeConverter() {                    private static final long serialVersionUID = 1L;                    @Override                    public Object convert(Object dbzObj, Schema schema) {                        return null;                    }                };            case BOOLEAN:                return convertToBoolean();            case TINYINT:                return new DeserializationRuntimeConverter() {                    private static final long serialVersionUID = 1L;                    @Override                    public Object convert(Object dbzObj, Schema schema) {                        return Byte.parseByte(dbzObj.toString());                    }                };            case SMALLINT:                return new DeserializationRuntimeConverter() {                    private static final long serialVersionUID = 1L;                    @Override                    public Object convert(Object dbzObj, Schema schema) {                        return Short.parseShort(dbzObj.toString());                    }                };            case INTEGER:            case INTERVAL_YEAR_MONTH:                return convertToInt();            case BIGINT:            case INTERVAL_DAY_TIME:                return convertToLong();            case DATE:                return convertToDate();            case TIME_WITHOUT_TIME_ZONE:                return convertToTime();            case TIMESTAMP_WITHOUT_TIME_ZONE:                return convertToTimestamp(ZoneId.of("UTC"));            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:                return convertToLocalTimeZoneTimestamp(ZoneId.of("UTC"));            case FLOAT:                return convertToFloat();            case DOUBLE:                return convertToDouble();            case CHAR:            case VARCHAR:                return convertToString();            case BINARY:            case VARBINARY:                return convertToBinary();            case DECIMAL:                return createDecimalConverter((DecimalType) type);            case ROW:                return createRowConverter(                        (RowType) type);            case ARRAY:            case MAP:            case MULTISET:            case RAW:            default:                throw new UnsupportedOperationException("Unsupported type: " + type);        }    }    private static DeserializationRuntimeConverter convertToBoolean() {        return new DeserializationRuntimeConverter() {            private static final long serialVersionUID = 1L;            @Override            public Object convert(Object dbzObj, Schema schema) {                if (dbzObj instanceof Boolean) {                    return dbzObj;                } else if (dbzObj instanceof Byte) {                    return (byte) dbzObj == 1;                } else if (dbzObj instanceof Short) {                    return (short) dbzObj == 1;                } else {                    return Boolean.parseBoolean(dbzObj.toString());                }            }        };    }    private static DeserializationRuntimeConverter convertToInt() {        return new DeserializationRuntimeConverter() {            private static final long serialVersionUID = 1L;            @Override            public Object convert(Object dbzObj, Schema schema) {                if (dbzObj instanceof Integer) {                    return dbzObj;                } else if (dbzObj instanceof Long) {                    return ((Long) dbzObj).intValue();                } else {                    return Integer.parseInt(dbzObj.toString());                }            }        };    }    private static DeserializationRuntimeConverter convertToLong() {        return new DeserializationRuntimeConverter() {            private static final long serialVersionUID = 1L;            @Override            public Object convert(Object dbzObj, Schema schema) {                if (dbzObj instanceof Integer) {                    return ((Integer) dbzObj).longValue();                } else if (dbzObj instanceof Long) {                    return dbzObj;                } else {                    return Long.parseLong(dbzObj.toString());                }            }        };    }    private static DeserializationRuntimeConverter createDecimalConverter(DecimalType decimalType) {        final int precision = decimalType.getPrecision();        final int scale = decimalType.getScale();        return new DeserializationRuntimeConverter() {            private static final long serialVersionUID = 1L;            @Override            public Object convert(Object dbzObj, Schema schema) {                BigDecimal bigDecimal;                if (dbzObj instanceof byte[]) {                    // decimal.handling.mode=precise                    bigDecimal = Decimal.toLogical(schema, (byte[]) dbzObj);                } else if (dbzObj instanceof String) {                    // decimal.handling.mode=string                    bigDecimal = new BigDecimal((String) dbzObj);                } else if (dbzObj instanceof Double) {                    // decimal.handling.mode=double                    bigDecimal = BigDecimal.valueOf((Double) dbzObj);                } else {                    if (VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) {                        SpecialValueDecimal decimal =    VariableScaleDecimal.toLogical((Struct) dbzObj);                        bigDecimal = decimal.getDecimalValue().orElse(BigDecimal.ZERO);                    } else {                        // fallback to string                        bigDecimal = new BigDecimal(dbzObj.toString());                    }                }                return DecimalData.fromBigDecimal(bigDecimal, precision, scale);            }        };    }    private static DeserializationRuntimeConverter convertToDouble() {        return new DeserializationRuntimeConverter() {            private static final long serialVersionUID = 1L;            @Override            public Object convert(Object dbzObj, Schema schema) {                if (dbzObj instanceof Float) {                    return ((Float) dbzObj).doubleValue();                } else if (dbzObj instanceof Double) {                    return dbzObj;                } else {                    return Double.parseDouble(dbzObj.toString());                }            }        };    }    private static DeserializationRuntimeConverter convertToFloat() {        return new DeserializationRuntimeConverter() {            private static final long serialVersionUID = 1L;            @Override            public Object convert(Object dbzObj, Schema schema) {                if (dbzObj instanceof Float) {                    return dbzObj;                } else if (dbzObj instanceof Double) {                    return ((Double) dbzObj).floatValue();                } else {                    return Float.parseFloat(dbzObj.toString());                }            }        };    }    private static DeserializationRuntimeConverter convertToDate() {        return new DeserializationRuntimeConverter() {            private static final long serialVersionUID = 1L;            @Override            public Object convert(Object dbzObj, Schema schema) {                return (int) TemporalConversions.toLocalDate(dbzObj).toEpochDay();            }        };    }    private static DeserializationRuntimeConverter convertToTime() {        return new DeserializationRuntimeConverter() {            private static final long serialVersionUID = 1L;            @Override            public Object convert(Object dbzObj, Schema schema) {                if (dbzObj instanceof Long) {                    switch (schema.name()) {                        case MicroTime.SCHEMA_NAME:return (int) ((long) dbzObj / 1000);                        case NanoTime.SCHEMA_NAME:return (int) ((long) dbzObj / 1000_000);                    }                } else if (dbzObj instanceof Integer) {                    return dbzObj;                }                // get number of milliseconds of the day                return TemporalConversions.toLocalTime(dbzObj).toSecondOfDay() * 1000;            }        };    }    private static DeserializationRuntimeConverter convertToTimestamp(ZoneId serverTimeZone) {        return new DeserializationRuntimeConverter() {            private static final long serialVersionUID = 1L;            @Override            public Object convert(Object dbzObj, Schema schema) {                if (dbzObj instanceof Long) {                    switch (schema.name()) {                        case Timestamp.SCHEMA_NAME:return TimestampData.fromEpochMillis((Long) dbzObj);                        case MicroTimestamp.SCHEMA_NAME:long micro = (long) dbzObj;return TimestampData.fromEpochMillis(        micro / 1000, (int) (micro % 1000 * 1000));                        case NanoTimestamp.SCHEMA_NAME:long nano = (long) dbzObj;return TimestampData.fromEpochMillis(        nano / 1000_000, (int) (nano % 1000_000));                    }                }                LocalDateTime localDateTime =                        TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone);                return TimestampData.fromLocalDateTime(localDateTime);            }        };    }    private static DeserializationRuntimeConverter convertToLocalTimeZoneTimestamp(            ZoneId serverTimeZone) {        return new DeserializationRuntimeConverter() {            private static final long serialVersionUID = 1L;            @Override            public Object convert(Object dbzObj, Schema schema) {                if (dbzObj instanceof String) {                    String str = (String) dbzObj;                    // TIMESTAMP_LTZ type is encoded in string type                    Instant instant = Instant.parse(str);                    return TimestampData.fromLocalDateTime(LocalDateTime.ofInstant(instant, serverTimeZone));                }                throw new IllegalArgumentException(                        "Unable to convert to TimestampData from unexpected value '"    + dbzObj    + "' of type "    + dbzObj.getClass().getName());            }        };    }    private static DeserializationRuntimeConverter convertToString() {        return new DeserializationRuntimeConverter() {            private static final long serialVersionUID = 1L;            @Override            public Object convert(Object dbzObj, Schema schema) {                return StringData.fromString(dbzObj.toString());            }        };    }    private static DeserializationRuntimeConverter convertToBinary() {        return new DeserializationRuntimeConverter() {            private static final long serialVersionUID = 1L;            @Override            public Object convert(Object dbzObj, Schema schema) {                if (dbzObj instanceof byte[]) {                    return dbzObj;                } else if (dbzObj instanceof ByteBuffer) {                    ByteBuffer byteBuffer = (ByteBuffer) dbzObj;                    byte[] bytes = new byte[byteBuffer.remaining()];                    byteBuffer.get(bytes);                    return bytes;                } else {                    throw new UnsupportedOperationException("Unsupported BYTES value type: " + dbzObj.getClass().getSimpleName());                }            }        };    }    private static DeserializationRuntimeConverter createRowConverter(RowType rowType) {        final DeserializationRuntimeConverter[] fieldConverters =                rowType.getFields().stream()                        .map(RowType.RowField::getType)                        .map(    logicType ->            createNotNullConverter( logicType))                        .toArray(DeserializationRuntimeConverter[]::new);        final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);        return new DeserializationRuntimeConverter() {            private static final long serialVersionUID = 1L;            @Override            public Object convert(Object dbzObj, Schema schema) throws Exception {                Struct struct = (Struct) dbzObj;                int arity = fieldNames.length;                Row row = new Row(arity);                for (int i = 0; i < arity; i++) {                    String fieldName = fieldNames[i];                    Field field = schema.field(fieldName);                    if (field == null) {                        row.setField(i, null);                    } else {                        Object fieldValue = struct.getWithoutDefault(fieldName);                        Schema fieldSchema = schema.field(fieldName).schema();                        Object convertedField =    convertField(fieldConverters[i], fieldValue, fieldSchema);                        row.setField(i, convertedField);                    }                }                return row;            }        };    }    private static Object convertField(            DeserializationRuntimeConverter fieldConverter, Object fieldValue, Schema fieldSchema)            throws Exception {        if (fieldValue == null) {            return null;        } else {            return fieldConverter.convert(fieldValue, fieldSchema);        }    }}

再贴上我的pom.xml

<?xml version="1.0" encoding="UTF-8"?><project xmlns="Http://Maven.apache.org/POM/4.0.0"         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>    <groupId>com.cityos</groupId>    <artifactId>flink_1_15</artifactId>    <version>1.0-SNAPSHOT</version>    <properties>        <java.version>1.8</java.version>        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>        <spring-boot.version>2.3.7.RELEASE</spring-boot.version>        <flink.version>1.15.2</flink.version>        <Scala.binary.version>2.12</scala.binary.version>        <!--        <scala.version>2.12.12</scala.version>-->    </properties>    <repositories>        <repository>            <id>scala-tools.org</id>            <name>Scala-Tools Maven2 Repository</name>            <url>http://scala-tools.org/repo-releases</url>        </repository>        <repository>            <id>spring</id>            <url>https://maven.aliyun.com/repository/spring</url>        </repository>        <repository>            <id>cloudera</id>            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>        </repository>    </repositories>    <dependencies>        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-scala_${scala.binary.version}</artifactId>            <version>${flink.version}</version>        </dependency>        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>            <version>${flink.version}</version>        </dependency>        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>            <version>${flink.version}</version>            <!--            <scope>provided</scope>-->        </dependency>        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>            <version>${flink.version}</version>        </dependency>        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-table-common</artifactId>            <version>${flink.version}</version>            <!--            <scope>provided</scope>-->        </dependency>        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-clients</artifactId>            <version>${flink.version}</version>        </dependency>        <!-- flink-connector-kafka -->        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-connector-kafka</artifactId>            <version>${flink.version}</version>        </dependency>        <!-- flink-connector-jdbc -->        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-connector-jdbc</artifactId>            <version>${flink.version}</version>        </dependency>        <!--        mysql-cdc-->        <dependency>            <groupId>com.ververica</groupId>            <artifactId>flink-connector-mysql-cdc</artifactId>            <version>2.3.0</version>        </dependency>        <!--        mysql-->        <dependency>            <groupId>mysql</groupId>            <artifactId>mysql-connector-java</artifactId>            <version>8.0.29</version>        </dependency>        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-JSON</artifactId>            <version>${flink.version}</version>        </dependency>        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-csv</artifactId>            <version>${flink.version}</version>        </dependency>        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->        <dependency>            <groupId>org.slf4j</groupId>            <artifactId>slf4j-log4j12</artifactId>            <version>1.7.21</version>            <scope>compile</scope>        </dependency>        <!-- https://mvnrepository.com/artifact/log4j/log4j -->        <dependency>            <groupId>log4j</groupId>            <artifactId>log4j</artifactId>            <version>1.2.17</version>        </dependency>    </dependencies>    <dependencyManagement>        <dependencies>            <dependency>                <groupId>org.springframework.boot</groupId>                <artifactId>spring-boot-dependencies</artifactId>                <version>${spring-boot.version}</version>                <type>pom</type>                <scope>import</scope>            </dependency>        </dependencies>    </dependencyManagement>    <build>        <plugins>            <plugin>                <groupId>org.apache.maven.plugins</groupId>                <artifactId>maven-compiler-plugin</artifactId>                <version>3.8.1</version>                <configuration>                    <source>1.8</source>                    <target>1.8</target>                    <encoding>UTF-8</encoding>                </configuration>            </plugin>            <plugin>                <groupId>org.springframework.boot</groupId>                <artifactId>spring-boot-maven-plugin</artifactId>                <version>2.3.7.RELEASE</version>                <configuration>                    <mainClass>com.cityos.Flink1142Application</mainClass>                </configuration>                <executions>                    <execution>                        <id>repackage</id>                        <goals><goal>repackage</goal>                        </goals>                    </execution>                </executions>            </plugin>        </plugins>    </build></project>

有兴趣的看看,没兴趣的或者感觉不屑的划过就好,莫喷我,代码写的确实是丑。

来源地址:https://blog.csdn.net/qq_36062467/article/details/128117647

--结束END--

本文标题: Flink CDC整库同步(多表异构同步)

本文链接: https://www.lsjlt.com/news/423189.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作