文章目录 一、docker 搭建 flink1、选择合适的 flink 版本2、重新创建 JobManager、TaskManager 容器并挂载配置文件 二、flink 简单示例1
docker 安装就不介绍了,去 dockerHub 搜索 flink 镜像,选择合适的版本安装 https://hub.docker.com/_/flink/tags
使用 docker 命令 docker pull flink: 1.16.0-Scala_2.12-java8
拉去镜像
0-scala_2.12-java8 镜像版本说明,flink 1.16.0,flink 内置 scala 版本 2.12,Java 版本 8
建议先简单启动 flink 容器 JobManager、TaskManager 两个容器将配置文件复制出来方便挂载
# 创建 docker 网络,方便 JobManager 和 TaskManager 内部访问 docker network create flink-network# 创建 JobManager docker run \ -itd \ --name=jobmanager \ --publish 8081:8081 \ --network flink-network \ --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" \ flink:1.16.0-scala_2.12-java8 jobmanager # 创建 TaskManager docker run \ -itd \ --name=taskmanager \ --network flink-network \ --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" \ flink:1.16.0-scala_2.12-java8 taskmanager
启动成功
访问 8081 端口如下
copy 配置文件
# jobmanager 容器 docker cp jobmanager:/opt/flink/conf ./JobManager/# taskmanager 容器docker cp taskmanager:/opt/flink/conf ./TaskManager/
修改 JobManager/conf/flink-conf.yaml WEB 端口号为 18081
修改 TaskManager/conf/flink-conf.yaml 容器任务槽为 5
启动容器挂载配置文件
# 启动 jobmanager docker run -itd -v /root/docker/flink/JobManager/conf/:/opt/flink/conf/ --name=jobmanager --publish 18081:18081 --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" --network flink-network flink:1.16.0-scala_2.12-java8 jobmanager# 启动 taskmanager docker run -itd -v /root/docker/flink/TaskManager/conf/:/opt/flink/conf/ --name=taskmanager --network flink-network --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" flink:1.16.0-scala_2.12-java8 taskmanager
参数解释
如下两个容器启动成功,可以看到 web 端口为 18081,taskmanager 启动一个,包含 5 个任务槽
官网参考地址:https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/configuration/overview/
使用 Maven 命令指定原型 Flink Maven Archetype 快速创建一个包含了必要依赖的 Flink 程序骨架,自定义项目 groupId、artifactId、package 等信息
mvn archetype:generate ^ -DarchetypeGroupId=org.apache.flink ^ -DarchetypeArtifactId=flink-quickstart-java ^ -DarchetypeVersion=1.16.0^ -DgroupId=com.ye ^ -dartifactId=flink-study ^ -Dversion=0.1 ^ -Dpackage=com.ye ^ -DinteractiveMode=false
下载成功打开项目目录
如下:注意运行需要设置启动参数,否则启动会找不到类,因为 pom.xml 文件 flink 相关包都添加了
表示只用于生产环境,另一种方法就是将
修改为
流处理和批处理在 flink 低版本(貌似1.12)需要区分,目前都使用流处理写法
下面代码用来统计单词出现的的次数
public class DataBatchJob { public static void main(String[] args) throws Exception { // 获取 flink 环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 添加数据源 DataStreamSource<String> streamSource = env.fromElements("hello world", "hello flink", "flink", "hello", "world"); // 对传入的流数据分组 SingleOutputStreamOperator<Tuple2<String, Integer>> streamOperator = streamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { // value 传入的数据,out // Tuple2 二元组 // out 传出的值 @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] split = value.split(" "); for (String s : split) { out.collect(Tuple2.of(s, 1)); } } }); // 按二元组的第 0 个位置分组 KeyedStream<Tuple2<String, Integer>, Tuple> keyBy = streamOperator.keyBy(0); // 按二元组的第 1 个位置求和 SingleOutputStreamOperator<Tuple2<String, Integer>> sum = keyBy.sum(1); sum.print(); env.execute("统计单词出现的次数"); }}
执行结果如下
上传 flink 集群
下面示例通过 Socket 文本源,对输入的大于 500 和小于 500 的分别求和
public class DataStreamJob { private static final Logger logger = LoggerFactory.getLogger(DataStreamJob.class); public static void main(String[] args) throws Exception { // 获取 flink 环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 添加 socket 文本流数据源 //DataStreamSource streamSource = env.fromElements("200", "100", "6000", "500", "2000", "300", "1500", "900"); DataStreamSource<String> streamSource = env.socketTextStream("127.0.0.1", 7777); // 对大于 500 和小于 500 进行分组 KeyedStream<String, String> stringKeyedStream = streamSource.keyBy(new KeySelector<String, String>() { @Override public String geTKEy(String s) throws Exception { int i = Integer.parseInt(s); return i > 500 ? "ge" : "lt"; } }); // 开 10 秒滚动窗口,每 10 秒为一批数据 【00:00:00 ~ 00:00:10)、【00:00:10 ~ 00:00:20)左闭右开区间 WindowedStream<String, String, TimeWindow> windowedStream = stringKeyedStream.window(TumblingProcessingTimewindows.of(Time.seconds(10))); // 窗口处理函数,泛型 String, Integer, String, TimeWindow 依次对应 输入类型、输出类型、 KEY类型(即keyBy 返回的类型), 窗口 SingleOutputStreamOperator<Integer> outputStreamOperator = windowedStream.process(new ProcessWindowFunction<String, Integer, String, TimeWindow>() { @Override public void process(String key, ProcessWindowFunction<String, Integer, String, TimeWindow>.Context context, Iterable<String> elements, Collector<Integer> out) throws Exception { System.out.println(key); AtomicInteger sum = new AtomicInteger(); elements.forEach(item -> sum.addAndGet(Integer.parseInt(item))); out.collect(sum.get()); } }); // 输出 outputStreamOperator.print(); env.execute("分组求和"); }}
在 window 或 linux 开启 Socket 文本流测试
打包项目:可以在 pom.xml 修改启动类,也可以在命令启动或者 ui 界面上传设置启动类参数
使用 ui 界面上传 jar 到 flink 集群,点击 submit 运行
# 如果集群( 即JobManager) 在当前服务器可以使用如下命令$ bin/flink run -Dexecution.runtime-mode=BATCH <jarFile># 如果集群( 即JobManager) 不在当前服务器,在 TaskManager 服务器提交作业可以使用如下命令# -m 指定 JobManager 服务器地址# -c 指定作业入口程序# -p 指定并行度$ bin/flink run -m 192.168.1.1:8081 -c com.ye.StreamWordCount -p 2 <jarFile># 撤销任务$ bin/flink cancle <jobId>
批处理运行完成
流处理正在运行
使用 docker 启动的 flink 集群发现 UI 界面的 stdout 没有 print 输出
来源地址:https://blog.csdn.net/qq_41538097/article/details/129113866
--结束END--
本文标题: docker 搭建 flink 并上传任务
本文链接: https://www.lsjlt.com/news/394142.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
下载Word文档到电脑,方便收藏和打印~
2024-04-01
2024-04-03
2024-04-03
2024-01-21
2024-01-21
2024-01-21
2024-01-21
2023-12-23
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0