总览 生成运行时env 生成表环境 接上数据流,数据流数据生成表 把数据库中sink保存数据的表,在flink中生成一遍(相当于把flink生成的表,绑定到数据库中的表),配上数据库连接信息,并执行,及注册 查询表,可以根据注册表名查询 插
生成运行时env
生成表环境
接上数据流,数据流数据生成表
把数据库中sink保存数据的表,在flink中生成一遍(相当于把flink生成的表,绑定到数据库中的表),配上数据库连接信息,并执行,及注册
查询表,可以根据注册表名查询
插入表,可以根据生成的flink表进行数据插入
import org.apache.flink.streaming.api.Scala._import org.apache.flink.table.api.bridge.scala._import org.apache.flink.table.api._import org.apache.flink.table.api.{DataTypes, Table}import org.apache.flink.table.descriptors._object sqlReadMysql { def main(args: Array[String]): Unit = { // creat env val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment //parallelism bsEnv.setParallelism(1) //set env val bsSetting = EnvironmentSettings .newInstance() .useBlinkPlanner() .inStreamingMode() .build() //create table env val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSetting) //create ds val dataStream = bsEnv.fromElements(Tuple2("01","lisi" )) val table1 = bsTableEnv.fromDataStream(dataStream) //create table val sinkDDL = """ |create table student2_flink ( |code varchar(20) null, |name varchar(20) null |)with( |'connector.type'='jdbc', |'connector.url'='jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=true&serverTimezone=UTC', |'connector.table'='student2', |'connector.driver'='com.mysql.jdbc.Driver', |'connector.username'='root', |'connector.passWord'='root' |) |""".stripMargin println(sinkDDL) // execute the create table sql bsTableEnv.executeSql(sinkDDL) //reGISter table val myStudent = bsTableEnv.from("student2_flink") //execute query val result = bsTableEnv.sqlQuery(s"select * from $myStudent") result.toRetractStream[(String, String)].print() //insert data table1.executeInsert("student2_flink") //execute bsEnv.execute() }}
4.0.0 org.sinopharm.gksk gksk-bigdata 1.0-SNAPSHOT gksk-bigdata Http://www.example.com UTF-8 1.8 1.8 1.2.17 1.7.25 1.7.25 org.apache.flink flink-java 1.14.4 org.apache.flink flink-streaming-java_2.12 1.14.4 org.apache.flink flink-scala_2.12 1.14.4 org.apache.flink flink-streaming-scala_2.12 1.14.4 org.apache.flink flink-table-api-scala-bridge_2.12 1.14.4 org.apache.flink flink-table-planner_2.12 1.14.4 org.apache.flink flink-table-planner-blink_2.12 1.12.0 provided org.apache.flink flink-table-common 1.14.4 org.apache.flink flink-csv 1.14.4 org.apache.flink flink-connector-kafka_2.12 1.14.4 org.apache.flink flink-statebackend-rocksdb_2.12 1.14.4 org.apache.flink flink-connector-jdbc_2.12 1.14.4 mysql mysql-connector-java 8.0.16 org.apache.flink flink-clients_2.12 1.14.4 org.apache.flink flink-runtime-WEB_2.12 1.14.4 runtime junit junit 4.11 test org.slf4j slf4j-log4j12 ${slf4j.version} runtime log4j log4j ${log4j.version} runtime src/main/java src/main/scala org.apache.Maven.plugins maven-compiler-plugin 3.1 1.8 net.alchim31.maven scala-maven-plugin 3.2.2 compile testCompile
原因:pom文件中缺少 planner
解决办法:添加
org.apache.flink flink-table-planner_2.12 1.14.4
ps:注意有时候 配置两个planner也会报错
原因:缺少mysql的jar包
解决:pom文件添加:
mysql mysql-connector-java 8.0.16
原因:URL没有指定时区,jdbc 6.0以上都有这个问题
解决:在URL后边加时区
'connector.url'='jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=true&serverTimezone=UTC',
原因:连接的URL写错了
解决:好好看看,字符 、格式
--结束END--
本文标题: flink学习33:flinkSQL连接mysql,查询插入数据
本文链接: https://www.lsjlt.com/news/397983.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
下载Word文档到电脑,方便收藏和打印~
2024-05-12
2024-05-12
2024-05-12
2024-05-12
2024-05-12
2024-05-12
2024-05-12
2024-05-11
2024-05-11
2024-05-11
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0