iis服务器助手广告广告
返回顶部
首页 > 资讯 > 数据库 >Flink CDC2.4 整库实时同步MySql 到Doris
  • 690
分享到

Flink CDC2.4 整库实时同步MySql 到Doris

flinkmysql大数据flinkcdc整库同步mysql同步doris 2023-09-05 07:09:28 690人浏览 薄情痞子
摘要

环境         flink 1.15.4  实现原因         目前有很多工具都支持无代码实现Mysql -> Doris 的实时同步         如:SlectDB 已发布的功能包                 Dink

环境

        flink 1.15.4 

实现原因

        目前有很多工具都支持无代码实现Mysql -> Doris 的实时同步

        如:SlectDB 已发布的功能包

                Dinky SeaTunnel TIS 等等

         不过好多要么不支持表结构变动,要不不支持多sink,我们的业务必须支持对表结构的实时级变动,因为会对表字段级别的修改,字段类型更改,字段名字更改删除添加等

        所以要支持整库同步且又要表结构的实时变动就要自己写

                

所需jar

        flink-doris-connector-1.15-1.4.0.jar  -- 实现一键万表同步

        flink-sql-connector-Mysql-cdc-2.4.0.jar --包含所有相关依赖,无需在导入debezium、cdc等等

流程

        1、脚本创建库表

        2、同步表结构程序  

        3、Flink cdc 程序

对比第一版本:使用 Flink CDC 实现 MySQL 数据,表结构实时入 Apache Doris 效率有所提升

        首次同步时keyby 后开窗聚合导致数据倾斜

        聚合数据有字符串拼接改为JSONArray 避免聚合导致背压,字符串在数据量较大时拼接效率太低

Flink cdc 代码

        1、FlinkSingleSync.Scala

        

package com.zbkj.syncimport com.alibaba.fastjson2.{JSON, JSONObject,JSONArray}import com.ververica.cdc.connectors.mysql.source.MySqlSourceimport com.ververica.cdc.connectors.mysql.table.StartupOptionsimport com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfigimport com.ververica.cdc.debezium.JsonDebeziumDeserializationSchemaimport com.zbkj.util.SinkBuilder.getKafkaSinkimport com.zbkj.util._import org.apache.flink.api.common.eventtime.WatermarkStrategyimport org.apache.flink.api.common.restartstrategy.RestartStrategiesimport org.apache.flink.api.java.utils.ParameterToolimport org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimewindowsimport org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanupimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.scala._import java.util.Propertiesobject FlinkSingleSync {  PropertiesManager.initUtil()  val props: PropertiesUtil = PropertiesManager.getUtil  def main(args: Array[String]): Unit = {    val env = StreamExecutionEnvironment.getExecutionEnvironment    // 并行度    env.setParallelism(props.parallelism)    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)        val parameters: ParameterTool = ParameterTool.fromArgs(args)    val memberID = parameters.getInt("memberID", 0)    val source = parameters.get("source", "")    val log = parameters.getBoolean("log", true)    if (memberID == 0) {      sys.exit(0)    }    val thisMember = "ttk_member_%d".fORMat(memberID)    val jobName = "Sync Member %d".format(memberID)    val syncTopic = "sync_data_%d".format(memberID)    println(syncTopic)    val sourceFormat = SourceFormat.sourceFormat(source)    env.setParallelism(4)        // 启用检查点,指定触发checkpoint的时间间隔(单位:毫秒,默认500毫秒),默认情况是不开启的    env.enableCheckpointing(1000L, CheckpointingMode.EXACTLY_ONCE)    // 设定Checkpoint超时时间,默认为10分钟    env.getCheckpointConfig.setCheckpointTimeout(600000)        env.getCheckpointConfig.setCheckpointStorage("file:///data/flink-checkpoints/sync/%d".format(memberID))        env.getCheckpointConfig.setMinPauseBetweenCheckpoints(60000)    // 默认情况下,只有一个检查点可以运行    // 根据用户指定的数量可以同时触发多个Checkpoint,进而提升Checkpoint整体的效率    //env.getCheckpointConfig.setMaxConcurrentCheckpoints(2)        env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)    //    env.getCheckpointConfig.setPreferCheckpointForRecovery(true)    // 设置可以允许的checkpoint失败数    env.getCheckpointConfig.setTolerableCheckpointFailureNumber(3)    //设置可容忍的检查点失败数,默认值为0表示不允许容忍任何检查点失败    env.getCheckpointConfig.setTolerableCheckpointFailureNumber(2)    env.disableOperatorChaining()        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 30000L))    val dataBaseList = thisMember    var tableList = thisMember + ".*"    if (!log) {      tableList = "lb_crm_customer_log|.*(? JSON.parseObject(line)).setParallelism(4)    val DDLSqlStream: DataStream[JSONObject] = streamSource.filter(line => !line.containsKey("op")).uid("ddlSqlStream")    val DMLStream: DataStream[JSONObject] = streamSource.filter(line => line.containsKey("op")).uid("dmlStream")        val DMLDataStream = FlinkCDCSyncETL.binLogETLOne(DMLStream)    val keyByDMLDataStream:DataStream[(String, String, String, JSONArray)] = DMLDataStream.keyBy(keys => (keys._1, keys._2, keys._3))      .timeWindow(Time.milliseconds(props.window_time_milliseconds))      .reduce((itemFirst, itemSecond) => (itemFirst._1, itemFirst._2, itemFirst._3,combineJsonArray(itemFirst._4,itemSecond._4)))      .map(line=>(line._1,line._2,line._3.split("-")(0),line._4))      .name("分组聚合").uid("keyBy")    keyByDMLDataStream.addSink(new SinkDoris(dorisStreamLoad)).name("数据写入Doris").uid("SinkDoris").setParallelism(4)    val DDLKafkaSink=getKafkaSink("schema_change")    DDLSqlStream.map(jsObj => jsObj.toJSONString()).sinkTo(DDLKafkaSink).name("同步DDL入Kafka").uid("SinkDDLKafka")        val kafkaSink=getKafkaSink(syncTopic)    keyByDMLDataStream.map(line=>(line._2,line._3,1)).filter(!_._2.endsWith("_sql"))      .keyBy(keys => (keys._1, keys._2))      .window(TumblingProcessingTimeWindows.of(Time.seconds(1))).sum(2)      .map(line =>{        val json = new JSONObject()        json.put("member_id", line._1)        json.put("table", line._2)        json.toJSONString()      }).sinkTo(kafkaSink).name("同步数据库表入Kafka").uid("syncDataTableToKafka")    env.execute(jobName)      }    def combineJsonArray(jsr1:JSONArray,jsr2:JSONArray): JSONArray ={    jsr1.addAll(jsr2)    jsr1  }}

2.FlinkCDCSyncETL.scala

package com.zbkj.utilimport com.alibaba.fastjson2.{JSON, JSONArray, JSONObject}import org.apache.flink.api.scala.createTypeInformationimport org.apache.flink.streaming.api.scala.DataStreamimport java.util.Randomobject FlinkCDCSyncETL {  def binLogETLOne(dataStreamSource: DataStream[JSONObject]): DataStream[(String, String, String, JSONArray)] = {        val tupleData: DataStream[(String, String, String, JSONArray)] = dataStreamSource.map(line => {      var data: JSONObject = new JSONObject()      var jsr: JSONArray = new JSONArray()      var mergeType = "APPEND"      val source = line.getJSONObject("source")      val db = source.getString("db")      val table = source.getString("table")      val op=line.getString("op")      if ("d" == op) {        data = line.getJSONObject("before")        mergeType = "DELETE"      } else if ("u" == op) {        data = line.getJSONObject("after")        mergeType = "APPEND"      } else if ("c" == op) {        data = line.getJSONObject("after")      } else if ("r" == op) {        data = line.getJSONObject("after")        mergeType = "APPEND"      }      jsr.add(data)      Tuple4(mergeType, db, table+ "-" + new Random().nextInt(4), jsr)    })    tupleData  }}

3.DorisStreamLoad2.scala

package com.zbkj.utilimport org.apache.doris.flink.exception.StreamLoadExceptionimport org.apache.doris.flink.sink.HttpPutBuilderimport org.apache.http.client.methods.CloseableHttpResponseimport org.apache.http.entity.StringEntityimport org.apache.http.impl.client.{DefaultRedirectStrategy, HttpClientBuilder, HttpClients}import org.apache.http.util.EntityUtilsimport org.slf4j.{Logger, LoggerFactory}import java.util.{Properties, UUID}class DorisStreamLoad2(props: PropertiesUtil) extends Serializable {  private val logger: Logger = LoggerFactory.getLogger(classOf[DorisStreamLoad2])  private lazy val httpClientBuilder: HttpClientBuilder = HttpClients.custom.setRedirectStrategy(new DefaultRedirectStrategy() {    override protected def isRedirectable(method: String): Boolean = {      // If the connection target is FE, you need to deal with 307 redirect。      true    }  })  def loadJson(jsonData: String, mergeType: String, db: String, table: String): Unit = try {    val loadUrlPattern = "http://%s/api/%s/%s/_stream_load?"    val entity = new StringEntity(jsonData, "UTF-8")    val streamLoadProp = new Properties()    streamLoadProp.setProperty("merge_type", mergeType)    streamLoadProp.setProperty("format", "json")    streamLoadProp.setProperty("column_separator", ",")    streamLoadProp.setProperty("line_delimiter", ",")    streamLoadProp.setProperty("strip_outer_array", "true")    streamLoadProp.setProperty("exec_mem_limit", "6442450944")    streamLoadProp.setProperty("strict_mode", "true")    val httpClient = httpClientBuilder.build    val loadUrlStr = String.format(loadUrlPattern, props.doris_load_host, db, table)    try {      val builder = new HttpPutBuilder()      val label = UUID.randomUUID.toString      builder.setUrl(loadUrlStr)        .baseAuth(props.doris_user, props.doris_passWord)        .addCommonHeader()        .setLabel(label)        .setEntity(entity)        .addProperties(streamLoadProp)      handlePreCommitResponse(httpClient.execute(builder.build()))    }    def handlePreCommitResponse(response: CloseableHttpResponse): Unit = {      val statusCode: Int = response.getStatusLine.getStatusCode      if (statusCode == 200 && response.getEntity != null) {        val loadResult: String = EntityUtils.toString(response.getEntity)        logger.info("load Result {}", loadResult)      } else {        throw new StreamLoadException("stream load error: " + response.getStatusLine.toString)      }    }  }}

4.SinkDoris.scala

package com.zbkj.utilimport com.alibaba.fastjson2.JSONArrayimport org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.functions.sink.RichSinkFunctionclass SinkDoris(dorisStreamLoad:DorisStreamLoad2) extends RichSinkFunction[(String, String, String, JSONArray)]  {  override def open(parameters: Configuration): Unit = {}    override def invoke(value:(String, String, String, JSONArray)): Unit = {    dorisStreamLoad.loadJson(value._4.toString,value._1,value._2,value._3)  }  override def close(): Unit = {}}

来源地址:https://blog.csdn.net/xiaofei2017/article/details/131459614

您可能感兴趣的文档:

--结束END--

本文标题: Flink CDC2.4 整库实时同步MySql 到Doris

本文链接: https://www.lsjlt.com/news/394831.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作