iis服务器助手广告广告
返回顶部
首页 > 资讯 > 数据库 >flink学习33:flinkSQL连接mysql,查询插入数据
  • 356
分享到

flink学习33:flinkSQL连接mysql,查询插入数据

mysql数据库flink大数据scala 2023-09-07 10:09:08 356人浏览 八月长安
摘要

总览 生成运行时env 生成表环境 接上数据流,数据流数据生成表 把数据库中sink保存数据的表,在flink中生成一遍(相当于把flink生成的表,绑定到数据库中的表),配上数据库连接信息,并执行,及注册 查询表,可以根据注册表名查询 插

总览

生成运行时env

生成表环境

接上数据流,数据流数据生成表

数据库中sink保存数据的表,在flink中生成一遍(相当于把flink生成的表,绑定到数据库中的表),配上数据库连接信息,并执行,及注册

查询表,可以根据注册表名查询

插入表,可以根据生成的flink表进行数据插入

完整案例:

import org.apache.flink.streaming.api.Scala._import org.apache.flink.table.api.bridge.scala._import org.apache.flink.table.api._import org.apache.flink.table.api.{DataTypes, Table}import org.apache.flink.table.descriptors._object sqlReadMysql {  def main(args: Array[String]): Unit = {    // creat env    val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment    //parallelism    bsEnv.setParallelism(1)    //set env    val bsSetting = EnvironmentSettings      .newInstance()      .useBlinkPlanner()      .inStreamingMode()      .build()    //create table env    val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSetting)    //create ds    val dataStream = bsEnv.fromElements(Tuple2("01","lisi" ))    val table1 = bsTableEnv.fromDataStream(dataStream)    //create table    val sinkDDL =      """        |create table student2_flink (        |code varchar(20) null,        |name varchar(20) null        |)with(        |'connector.type'='jdbc',        |'connector.url'='jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=true&serverTimezone=UTC',        |'connector.table'='student2',        |'connector.driver'='com.mysql.jdbc.Driver',        |'connector.username'='root',        |'connector.passWord'='root'        |)        |""".stripMargin    println(sinkDDL)    // execute the create table sql    bsTableEnv.executeSql(sinkDDL)    //reGISter table    val myStudent = bsTableEnv.from("student2_flink")    //execute query    val result = bsTableEnv.sqlQuery(s"select * from $myStudent")    result.toRetractStream[(String, String)].print()    //insert data    table1.executeInsert("student2_flink")    //execute    bsEnv.execute()  }}

POM文件:

    4.0.0    org.sinopharm.gksk    gksk-bigdata    1.0-SNAPSHOT    gksk-bigdata        Http://www.example.com            UTF-8        1.8        1.8        1.2.17        1.7.25        1.7.25                                    org.apache.flink            flink-java            1.14.4                            org.apache.flink            flink-streaming-java_2.12            1.14.4                                    org.apache.flink            flink-scala_2.12            1.14.4                            org.apache.flink            flink-streaming-scala_2.12            1.14.4                                    org.apache.flink            flink-table-api-scala-bridge_2.12            1.14.4                            org.apache.flink            flink-table-planner_2.12            1.14.4                            org.apache.flink            flink-table-planner-blink_2.12            1.12.0            provided                            org.apache.flink            flink-table-common            1.14.4                            org.apache.flink            flink-csv            1.14.4                                    org.apache.flink            flink-connector-kafka_2.12            1.14.4                                    org.apache.flink            flink-statebackend-rocksdb_2.12            1.14.4                            org.apache.flink            flink-connector-jdbc_2.12            1.14.4                            mysql            mysql-connector-java            8.0.16                                    org.apache.flink            flink-clients_2.12            1.14.4                                    org.apache.flink            flink-runtime-WEB_2.12            1.14.4            runtime                                    junit            junit            4.11            test                                    org.slf4j            slf4j-log4j12            ${slf4j.version}            runtime                            log4j            log4j            ${log4j.version}            runtime                                                    src/main/java                                        src/main/scala                                                                                org.apache.Maven.plugins                maven-compiler-plugin                3.1                                    1.8                    1.8                                                                    net.alchim31.maven                scala-maven-plugin                3.2.2                                                            compiletestCompile                                                                                    

Could not instantiate the executor. Make sure a planner module is on the classpath

原因:pom文件中缺少 planner

解决办法:添加

    org.apache.flink    flink-table-planner_2.12    1.14.4

ps:注意有时候 配置两个planner也会报错

flinksql 连接mysql报错 JDBC-Class not found. - com.mysql.jdbc.Driver

原因:缺少mysql的jar

解决:pom文件添加:

    mysql    mysql-connector-java    8.0.16

open() failed.The server time zone value '�й���׼ʱ��' is unrecognized or represents more than one time zone. You must configure either the server or JDBC driver (via the serverTimezone configuration property) to use a more specifc time zone value if you want to utilize time zone support.

原因:URL没有指定时区,jdbc 6.0以上都有这个问题

解决:在URL后边加时区

'connector.url'='jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=true&serverTimezone=UTC',


 

  • useUnicode=true  表示使用Unicode字符,因此可以使用中文
  • characterEncoding=utf8  设置编码方式
  • useSSL=true   设置安全连接
  • serverTimezone=UTC    设置全球标准时间
     

open() failed.Cannot load connection class because of underlying exception: com.mysql.cj.exceptions.WrongArgumentException: MalfORMed database URL, failed to parse the main URL sections.

原因:连接的URL写错了

解决:好好看看,字符 、格式

来源地址:https://blog.csdn.net/hzp666/article/details/128796575

您可能感兴趣的文档:

--结束END--

本文标题: flink学习33:flinkSQL连接mysql,查询插入数据

本文链接: https://www.lsjlt.com/news/397983.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作