iis服务器助手广告广告
返回顶部
首页 > 资讯 > 数据库 >Flink读取数据的5种方式(文件,Socket,Kafka,MySQL,自定义数据源)
  • 212
分享到

Flink读取数据的5种方式(文件,Socket,Kafka,MySQL,自定义数据源)

flink大数据 2023-09-08 17:09:20 212人浏览 八月长安
摘要

flink读取数据的5种方式 从文件中读取数据从Socket中读取数据从Kafka中读取数据从MySQL中读取数据从自定义数据源读取数据 从文件中读取数据 这是最简单的数据读取方式。当需要

flink读取数据的5种方式

从文件中读取数据

这是最简单的数据读取方式。当需要进行功能测试时,可以将数据保存在文件中,读取后验证流处理的逻辑是否符合预期。

程序代码:

package cn.jihui.flinkimport org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentobject readFile {  def main(args: Array[String]): Unit = {    val env = StreamExecutionEnvironment.getExecutionEnvironment    val file_name = "C:\\Users\\32985\\ideaProjects\\flink_demo1\\resources\\wc.txt"    val streamData = env.readTextFile(file_name)    streamData.print    env.execute("read data from file")  }}

输出结果

"C:\Program Files\Java\jdk-11\bin\java.exe" "-javaagent:D:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2019.3.3\lib\idea_rt.jar=61478:D:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2019.3.3\bin" -Dfile.encoding=UTF-8 -classpath C:\Users\32985\IdeaProjects\flink_demo1\out\production\flink_demo1;D:\Bigdata\scala-2.12.18\lib\scala-library.jar;D:\Bigdata\scala-2.12.18\lib\scala-parser-combinators_2.12-1.0.7.jar;D:\Bigdata\scala-2.12.18\lib\scala-reflect.jar;D:\Bigdata\scala-2.12.18\lib\scala-swing_2.12-2.0.3.jar;D:\Bigdata\scala-2.12.18\lib\scala-xml_2.12-2.1.0.jar;D:\Bigdata\flink-1.10.1\lib\log4j-1.2.17.jar;D:\Bigdata\flink-1.10.1\lib\slf4j-log4j12-1.7.15.jar;D:\Bigdata\flink-1.10.1\lib\flink-dist_2.12-1.10.1.jar;D:\Bigdata\flink-1.10.1\lib\flink-table_2.12-1.10.1.jar;D:\Bigdata\flink-1.10.1\lib\flink-table-blink_2.12-1.10.1.jar cn.jihui.flink.readFilelog4j:WARN No appenders could be found for logger (org.apache.flink.api.java.typeutils.TypeExtractor).log4j:WARN Please initialize the log4j system properly.log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.WARNING: An illegal reflective access operation has occurredWARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/D:/Bigdata/flink-1.10.1/lib/flink-dist_2.12-1.10.1.jar) to field java.lang.String.valueWARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleanerWARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operationsWARNING: All illegal access operations will be denied in a future release2> hello world3> how are you5> I am fine7> how old are youProcess finished with exit code 0

Socket中读取数据

用于验证一些通过Socket传输数据的场景非常方便。

程序代码:

package cn.jihui.flinkimport org.apache.flink.streaming.api.Scala.StreamExecutionEnvironmentobject readSocket {  def main(args: Array[String]): Unit = {    val env = StreamExecutionEnvironment.getExecutionEnvironment    val ip = "172.16.3.6"    val port = 9999    val streamData = env.socketTextStream(ip, port)    streamData.print    env.execute("read data from socket")  }}

测试时,需要先在172.16.3.6服务器上启动nc,然后再启动Flink读取数据。如

[bigdata@vm6 ~]$ nc -lk 9999hello worldhow are you  happy new year

在nc每输入一行数据,在Flink上均可接收到该行数据。

Flink输出内容如下:

"C:\Program Files\Java\jdk-11\bin\java.exe" "-javaagent:D:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2019.3.3\lib\idea_rt.jar=61731:D:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2019.3.3\bin" -Dfile.encoding=UTF-8 -classpath C:\Users\32985\IdeaProjects\flink_demo1\out\production\flink_demo1;D:\Bigdata\scala-2.12.18\lib\scala-library.jar;D:\Bigdata\scala-2.12.18\lib\scala-parser-combinators_2.12-1.0.7.jar;D:\Bigdata\scala-2.12.18\lib\scala-reflect.jar;D:\Bigdata\scala-2.12.18\lib\scala-swing_2.12-2.0.3.jar;D:\Bigdata\scala-2.12.18\lib\scala-xml_2.12-2.1.0.jar;D:\Bigdata\flink-1.10.1\lib\log4j-1.2.17.jar;D:\Bigdata\flink-1.10.1\lib\slf4j-log4j12-1.7.15.jar;D:\Bigdata\flink-1.10.1\lib\flink-dist_2.12-1.10.1.jar;D:\Bigdata\flink-1.10.1\lib\flink-table_2.12-1.10.1.jar;D:\Bigdata\flink-1.10.1\lib\flink-table-blink_2.12-1.10.1.jar cn.jihui.flink.readSocketlog4j:WARN No appenders could be found for logger (org.apache.flink.api.java.ClosureCleaner).log4j:WARN Please initialize the log4j system properly.log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.WARNING: An illegal reflective access operation has occurredWARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/D:/Bigdata/flink-1.10.1/lib/flink-dist_2.12-1.10.1.jar) to field java.lang.String.valueWARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleanerWARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operationsWARNING: All illegal access operations will be denied in a future release3> hello world4> how are you5> happy new year

kafka中读取数据

从Kafka中读取数据,可能是Flink应用最广泛的一种方式。目前我的客户也使用这种方式进行流数据的处理。

从Kafka中读取数据,可以使用flink-connector-kafka

创建一个Maven工程,加入下面的依赖:

    org.apache.flink    flink-connector-kafka-0.11_2.12    1.10.2

程序代码:

package cn.jihui.flinkimport java.util.Propertiesimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentimport org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011object readKafka {  def main(args: Array[String]): Unit = {    val env = StreamExecutionEnvironment.getExecutionEnvironment    val properties = new Properties()    properties.setProperty("bootstrap.servers", "172.16.3.6:9092")    properties.setProperty("group.id", "data-group")    val streamData = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema, properties))    streamData.print()    env.execute()  }}

启动Kafka

[bigdata@vm6 kafka-2.6.0]$ ./bin/kafka-server-start.sh -daemon config/server.properties

查看是否成功

[bigdata@vm6 ~]$ jps8690 QuorumPeerMain21068 Jps10366 Kafka[bigdata@vm6 ~]$

启动Kafka的测试脚本

[bigdata@vm6 bin]$ ./kafka-console-producer.sh --broker-list 172.16.3.6:9092 --topic sensor>hello world>happy new year>what are you doing here?   >

当在Kafka的测试脚本中输入文本时,启动Flink程序可以接收到数据,并进行输出。

输出结果

"C:\Program Files\Java\jdk-11\bin\java.exe" "-javaagent:D:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2019.3.3\lib\idea_rt.jar=60325:D:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2019.3.3\bin" -Dfile.encoding=UTF-8 -classpath C:\Users\32985\IdeaProjects\flink_demo\target\classes;C:\Users\32985\.m2\repository\org\apache\flink\flink-scala_2.12\1.10.1\flink-scala_2.12-1.10.1.jar;C:\Users\32985\.m2\repository\org\apache\flink\flink-core\1.10.1\flink-core-1.10.1.jar;C:\Users\32985\.m2\repository\org\apache\flink\flink-annotations\1.10.1\flink-annotations-1.10.1.jar;C:......:\Users\32985\.m2\repository\net\jpountz\lz4\lz4\1.3.0\lz4-1.3.0.jar;C:\Users\32985\.m2\repository\org\xerial\snappy\snappy-java\1.1.2.6\snappy-java-1.1.2.6.jar;C:\Users\32985\.m2\repository\mysql\mysql-connector-java\5.1.46\mysql-connector-java-5.1.46.jar;D:\Bigdata\scala-2.12.18\lib\scala-library.jar;D:\Bigdata\scala-2.12.18\lib\scala-parser-combinators_2.12-1.0.7.jar;D:\Bigdata\scala-2.12.18\lib\scala-reflect.jar;D:\Bigdata\scala-2.12.18\lib\scala-swing_2.12-2.0.3.jar;D:\Bigdata\scala-2.12.18\lib\scala-xml_2.12-2.1.0.jar cn.jihui.flink.readKafkaSLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.WARNING: An illegal reflective access operation has occurredWARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/C:/Users/32985/.m2/repository/org/apache/flink/flink-core/1.10.1/flink-core-1.10.1.jar) to field java.lang.String.valueWARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleanerWARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operationsWARNING: All illegal access operations will be denied in a future release7> hello world7> happy new year7> what are you doing here?Process finished with exit code -1

Mysql中读取数据

当需要从关系数据库中读取数据时,可以简单的JDBC方式进行读取。

程序代码:

package cn.jihui.flinkimport java.sql.{DriverManager, ResultSet}import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.scala._//case class SensorReading(id: String, timestamp: Long, temperature: Double)object readmysql {  def main(args: Array[String]): Unit = {    val env = StreamExecutionEnvironment.getExecutionEnvironment    val streamData = env.addSource(new MySQLSource)    streamData.print()    env.execute("read data from mysql")  }}class MySQLSource extends SourceFunction[SensorReading] {  var running: Boolean = true;  override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {    val strConn = "jdbc:mysql://172.16.3.6:3306/flink"    val conn = DriverManager.getConnection(strConn, "jihui", "111111")    val selectStmt = conn.prepareStatement("select * from t_sensor;")    val resultRs: ResultSet = selectStmt.executeQuery()    while (resultRs.next()) {      if (running) {        val id = resultRs.getString(1)        val timestamp = resultRs.getLong(2)        val temperature = resultRs.getDouble(3)        sourceContext.collect(SensorReading(id, timestamp, temperature))      }    }    resultRs.close()    selectStmt.close()    conn.close()  }  override def cancel(): Unit = {    running = false  }}

输出内容:

"C:\Program Files\Java\jdk-11\bin\java.exe" "-javaagent:D:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2019.3.3\lib\idea_rt.jar=61886:D:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2019.3.3\bin" -Dfile.encoding=UTF-8 -classpath ......C:\Users\32985\.m2\repository\org\apache\kafka\kafka-clients\0.11.0.2\kafka-clients-0.11.0.2.jar;C:\Users\32985\.m2\repository\net\jpountz\lz4\lz4\1.3.0\lz4-1.3.0.jar;C:\Users\32985\.m2\repository\org\xerial\snappy\snappy-java\1.1.2.6\snappy-java-1.1.2.6.jar;C:\Users\32985\.m2\repository\mysql\mysql-connector-java\5.1.46\mysql-connector-java-5.1.46.jar;D:\Bigdata\scala-2.12.18\lib\scala-library.jar;D:\Bigdata\scala-2.12.18\lib\scala-parser-combinators_2.12-1.0.7.jar;D:\Bigdata\scala-2.12.18\lib\scala-reflect.jar;D:\Bigdata\scala-2.12.18\lib\scala-swing_2.12-2.0.3.jar;D:\Bigdata\scala-2.12.18\lib\scala-xml_2.12-2.1.0.jar cn.jihui.flink.readMySQLSLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.WARNING: An illegal reflective access operation has occurredWARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/C:/Users/32985/.m2/repository/org/apache/flink/flink-core/1.10.1/flink-core-1.10.1.jar) to field java.lang.String.valueWARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleanerWARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operationsWARNING: All illegal access operations will be denied in a future release7> SensorReading(Sensor_4,1687944878278,42.6205)1> SensorReading(Sensor_10,1687944879294,31.3545)7> SensorReading(Sensor_7,1687944878278,16.3773)1> SensorReading(Sensor_4,1687944879294,42.6205)7> SensorReading(Sensor_8,1687944879294,7.3655)7> SensorReading(Sensor_2,1687944878278,32.3747)8> SensorReading(Sensor_1,1687944879294,20.0778)8> SensorReading(Sensor_9,1687944878278,7.65736)5> SensorReading(Sensor_3,1687944878278,32.9043)6> SensorReading(Sensor_1,1687944880296,20.0778)5> SensorReading(Sensor_10,1687944880296,31.3545)2> SensorReading(Sensor_8,1687944880296,7.3655)6> SensorReading(Sensor_6,1687944878278,4.37994)2> SensorReading(Sensor_5,1687944880296,42.1028)2> SensorReading(Sensor_2,1687944880296,32.3747)4> SensorReading(Sensor_10,1687944878278,31.3545)4> SensorReading(Sensor_9,1687944879294,7.65736)4> SensorReading(Sensor_9,1687944880296,7.65736)5> SensorReading(Sensor_2,1687944879294,32.3747)5> SensorReading(Sensor_6,1687944879294,4.37994)3> SensorReading(Sensor_5,1687944879294,42.1028)4> SensorReading(Sensor_7,1687944879294,16.3773)6> SensorReading(Sensor_5,1687944878278,42.1028)8> SensorReading(Sensor_8,1687944878278,7.3655)1> SensorReading(Sensor_4,1687944880296,42.6205)6> SensorReading(Sensor_3,1687944879294,32.9043)3> SensorReading(Sensor_7,1687944880296,16.3773)8> SensorReading(Sensor_6,1687944880296,4.37994)3> SensorReading(Sensor_3,1687944880296,32.9043)3> SensorReading(Sensor_1,1687944878278,20.0778)Process finished with exit code 0

从自定义数据源读取数据

当需要对Flink进行一些性能测试时,可以使用自定义数据源来简化测试过程。

Flink支持自定义数据源,可以使用循环生成所需要格式的测试数据,使用方式非常灵活。

程序代码:

package cn.jihui.flinkimport java.util.Randomimport org.apache.flink.streaming.api.functions.source.SourceFunctionimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.scala._case class SensorReading(id: String, timestamp: Long, temperature: Double)object readCustom {  def main(args: Array[String]): Unit = {    val env = StreamExecutionEnvironment.getExecutionEnvironment    val dataCustom = env.addSource(new MySensorSource())    dataCustom.print()    env.execute("custom data source")  }}class MySensorSource extends SourceFunction[SensorReading] {  var running: Boolean = true  override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {    val rand = new Random()    var currTemp = 1.to(10).map(i => ("Sensor_" + i, rand.nextDouble() * 50))    while(running) {      val currTime = System.currentTimeMillis()      currTemp.map(        data => (data._1, data._2 + rand.nextGaussian())      )      currTemp.foreach(        data => sourceContext.collect(SensorReading(data._1, currTime, data._2))      )    }  }  override def cancel(): Unit = running = false}

运行程序后,会产生源源不断的数据。

输出内容:

"C:\Program Files\Java\jdk-11\bin\java.exe" "-javaagent:D:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2019.3.3\lib\idea_rt.jar=62739:D:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2019.3.3\bin" -Dfile.encoding=UTF-8 -classpath C:\Users\32985\IdeaProjects\flink_demo1\out\production\flink_demo1;D:\Bigdata\scala-2.12.18\lib\scala-library.jar;D:\Bigdata\scala-2.12.18\lib\scala-parser-combinators_2.12-1.0.7.jar;D:\Bigdata\scala-2.12.18\lib\scala-reflect.jar;D:\Bigdata\scala-2.12.18\lib\scala-swing_2.12-2.0.3.jar;D:\Bigdata\scala-2.12.18\lib\scala-xml_2.12-2.1.0.jar;D:\Bigdata\flink-1.10.1\lib\log4j-1.2.17.jar;D:\Bigdata\flink-1.10.1\lib\slf4j-log4j12-1.7.15.jar;D:\Bigdata\flink-1.10.1\lib\flink-dist_2.12-1.10.1.jar;D:\Bigdata\flink-1.10.1\lib\flink-table_2.12-1.10.1.jar;D:\Bigdata\flink-1.10.1\lib\flink-table-blink_2.12-1.10.1.jar cn.jihui.flink.readCustomlog4j:WARN No appenders could be found for logger (org.apache.flink.api.scala.ClosureCleaner$).log4j:WARN Please initialize the log4j system properly.log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.WARNING: An illegal reflective access operation has occurredWARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/D:/Bigdata/flink-1.10.1/lib/flink-dist_2.12-1.10.1.jar) to field java.lang.String.valueWARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleanerWARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operationsWARNING: All illegal access operations will be denied in a future release7> SensorReading(Sensor_4,1687944878278,42.62052120270371)1> SensorReading(Sensor_6,1687944878278,4.3799399141547735)3> SensorReading(Sensor_8,1687944878278,7.365503979569099)5> SensorReading(Sensor_2,1687944878278,32.374658798648426)6> SensorReading(Sensor_3,1687944878278,32.9042516066123)2> SensorReading(Sensor_7,1687944878278,16.37730093567341)8> SensorReading(Sensor_5,1687944878278,42.1027668775983)4> SensorReading(Sensor_1,1687944878278,20.077801500835886)5> SensorReading(Sensor_10,1687944878278,31.35446091105863)4> SensorReading(Sensor_9,1687944878278,7.657356771011931)5> SensorReading(Sensor_8,1687944879294,7.365503979569099)3> SensorReading(Sensor_6,1687944879294,4.3799399141547735)6> SensorReading(Sensor_1,1687944879294,20.077801500835886)1> SensorReading(Sensor_4,1687944879294,42.62052120270371)4> SensorReading(Sensor_7,1687944879294,16.37730093567341)2> SensorReading(Sensor_5,1687944879294,42.1027668775983)6> SensorReading(Sensor_9,1687944879294,7.657356771011931)7> SensorReading(Sensor_2,1687944879294,32.374658798648426)8> SensorReading(Sensor_3,1687944879294,32.9042516066123)7> SensorReading(Sensor_10,1687944879294,31.35446091105863)4> SensorReading(Sensor_5,1687944880296,42.1027668775983)1> SensorReading(Sensor_2,1687944880296,32.374658798648426)5> SensorReading(Sensor_6,1687944880296,4.3799399141547735)7> SensorReading(Sensor_8,1687944880296,7.365503979569099)6> SensorReading(Sensor_7,1687944880296,16.37730093567341)2> SensorReading(Sensor_3,1687944880296,32.9042516066123)3> SensorReading(Sensor_4,1687944880296,42.62052120270371)8> SensorReading(Sensor_1,1687944880296,20.077801500835886)1> SensorReading(Sensor_10,1687944880296,31.35446091105863)8> SensorReading(Sensor_9,1687944880296,7.657356771011931)Process finished with exit code -1

来源地址:https://blog.csdn.net/jihui8848/article/details/131478669

您可能感兴趣的文档:

--结束END--

本文标题: Flink读取数据的5种方式(文件,Socket,Kafka,MySQL,自定义数据源)

本文链接: https://www.lsjlt.com/news/400425.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作