iis服务器助手广告广告
返回顶部
首页 > 资讯 > 数据库 >使用finksql方式将mysql数据同步到kafka中,每次只能同步一张表
  • 690
分享到

使用finksql方式将mysql数据同步到kafka中,每次只能同步一张表

mysqlkafka数据库 2023-08-30 14:08:42 690人浏览 薄情痞子
摘要

使用finksql方式将Mysql数据同步到kafka中,每次只能同步一张表 package flink;import org.apache.flink.streaming.api.environment.StreamExecutionEn

使用finksql方式将Mysql数据同步到kafka中,每次只能同步一张表

package flink;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.TableResult;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class FlinkSQL_CDC {    public static void main(String[] args) throws Exception {////        Configuration conf = new Configuration();//        conf.setInteger("rest.port",3335);//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);        //1.创建执行环境        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setParallelism(1);        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);        //2.创建Flink-mysql-CDC的Source        TableResult tableResult = tableEnv.executeSql("CREATE TABLE table_name (" +                "  id INT primary key," +                "  name STRING" +                ") WITH (" +                "  'connector' = 'mysql-cdc'," +                "  'hostname' = 'hadoop102'," +                "  'port' = '3306'," +                "  'username' = 'root'," +                "  'passWord' = 'xxxx'," +                "  'database-name' = 'student'," +                "  'table-name' = 'table_name'," +                "'server-time-zone' = 'Asia/Shanghai'," +                "'scan.startup.mode' = 'initial'" +                ")"        );        // 2. 注册SinkTable: sink_sensor//        tableEnv.executeSql("" +//                "CREATE TABLE kafka_binlog ( " +//                "  user_id INT, " +//                "  user_name STRING, " +//                "`proc_time` as PROCTIME()" +//                ") WITH ( " +//                "  'connector' = 'kafka', " +//                "  'topic' = 'test2', " +//                "  'properties.bootstrap.servers' = 'hadoop102:9092', " +//                "  'fORMat' = 'JSON' " +//                ")" +//                "");        //upsert-kafka        tableEnv.executeSql("" +                "CREATE TABLE kafka_binlog ( " +                "  user_id INT, " +                "  user_name STRING, " +                "`proc_time` as PROCTIME()," +                "  PRIMARY KEY (user_id) NOT ENFORCED" +                ") WITH ( " +                "  'connector' = 'upsert-kafka', " +                "  'topic' = 'test2', " +                "  'properties.bootstrap.servers' = 'hadoop102:9092', " +                "  'key.format' = 'json' ," +                "  'value.format' = 'json' " +                ")" +                "");        // 3. 从SourceTable 查询数据, 并写入到 SinkTable         tableEnv.executeSql("insert into kafka_binlog select * from table_name");         tableEnv.executeSql("select * from kafka_binlog").print();        env.execute();    }}

来源地址:https://blog.csdn.net/m0_37759590/article/details/132558090

您可能感兴趣的文档:

--结束END--

本文标题: 使用finksql方式将mysql数据同步到kafka中,每次只能同步一张表

本文链接: https://www.lsjlt.com/news/383061.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作