iis服务器助手广告广告
返回顶部
首页 > 资讯 > 精选 >基于k8s如何部署Session模式Flink集群
  • 594
分享到

基于k8s如何部署Session模式Flink集群

2023-07-05 12:07:15 594人浏览 安东尼
摘要

这篇文章主要介绍“基于k8s如何部署Session模式flink集群”,在日常操作中,相信很多人在基于k8s如何部署Session模式Flink集群问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”基于k8s如何

这篇文章主要介绍“基于k8s如何部署Session模式flink集群”,在日常操作中,相信很多人在基于k8s如何部署Session模式Flink集群问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”基于k8s如何部署Session模式Flink集群”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

基于k8s部署Session模式Flink集群

分布式计算领域中,Apache Flink是一个快速、可靠且易于使用的计算引擎。Flink集群是一个分布式系统,它由Flink JobManager和多个Flink TaskManager组成。部署Flink集群时,高可用性是非常重要的一个考虑因素。

什么是Session模式

在Flink中,有两种部署模式:Standalone和Session。Standalone模式下,Flink集群是一组独立的进程,它们共享同一个配置文件,并通过Akka通信。Session模式下,Flink集群是动态的、可伸缩的,可以根据需要启动或停止。Session模式下,Flink JobManager和TaskManager进程运行在容器中,可以通过k8s进行动态管理。

Session模式的优点是:

  • 可以根据需要启动或停止Flink集群

  • 可以动态添加或删除TaskManager

  • 可以使用k8s的伸缩功能自动调整Flink集群的大小

  • 可以与k8s的其他资源进行整合,例如存储卷、网络策略等

因此,Session模式是在kubernetes上部署Flink集群的首选模式。

Flink的filesystem

在 Flink 的处理过程中,数据可能会存储在不同的文件系统中,如本地文件系统、hdfs、S3 等。为了统一处理这些文件系统,Flink 引入了 FileSystem 的概念,它是一个抽象的接口,提供了对不同文件系统的统一访问方式。

fileSystem 的实现类可以通过 Flink 的配置文件指定。Flink 支持多种文件系统,包括本地文件系统、HDFS、S3、Google Cloud Storage 等,因为miNIO实现了s3协议,所以也可以使用minio来作为文件系统。

基于k8s部署高可用Session模式Flink集群

各组件版本号

组件版本号
kubernetes1.15.12
flink1.15.3

制作镜像

使用minio作为文件系统需要增加s3相关的依赖jar包,所以需要自己制作镜像

Dockerfile:

FROM apache/flink:1.15.3-Scala_2.12# 需要用到的jar包# flink-cdcADD lib/flink-sql-connector-mysql-cdc-2.3.0.jar /opt/flink/lib/# jdbc连接器ADD lib/flink-connector-jdbc-1.15.3.jar /opt/flink/lib/# Mysql驱动ADD lib/mysql-connector-j-8.0.32.jar /opt/flink/lib/# oracle驱动ADD lib/ojdbc8-21.9.0.0.jar /opt/flink/lib/# 文件系统插件需要放到插件目录,按规范放置RUN mkdir /opt/flink/plugins/s3-fs-presto && cp -f /opt/flink/opt/flink-s3-fs-presto-1.15.3.jar /opt/flink/plugins/s3-fs-presto/

构建镜像:

docker build -t sivdead/flink:1.15.3_scala_2.12 -f .\DockerFile .

配置文件(ConfigMap)

配置文件分两个部分,flink-conf.yamllog4j-console.properties

apiVersion: v1kind: ConfigMapmetadata:  name: flink-config  namespace: szyx-flink  labels:    app: flinkdata:  flink-conf.yaml: |+    kubernetes.cluster-id: szyx-flink    # 所在的命名空间    kubernetes.namespace: szyx-flink    jobmanager.rpc.address: flink-jobmanager    taskmanager.numberOfTaskSlots: 2    blob.server.port: 6124    jobmanager.rpc.port: 6123    taskmanager.rpc.port: 6122    queryable-state.proxy.ports: 6125    jobmanager.memory.process.size: 1600m    taskmanager.memory.process.size: 2867m    parallelism.default: 2    execution.checkpointing.interval: 10s        # 文件系统    fs.default-scheme: s3    # minio地址    s3.endpoint: https://minio.k8s.io:9000    # minio的bucket    s3.flink.bucket: szyxflink    s3.access-key: <minio账号>    s3.secret-key: <minio密码>    # 状态存储格式    state.backend: rocksdb    s3.path.style.access: true    blob.storage.directory: /opt/flink/tmp/blob    WEB.upload.dir: /opt/flink/tmp/upload    io.tmp.dirs: /opt/flink/tmp    # 状态管理    # checkpoint存储地址    state.checkpoints.dir: s3://szyxflink/state/checkpoint    # savepoint存储地址    state.savepoints.dir: s3://szyxflink/state/savepoint    # checkpoint间隔    execution.checkpointing.interval: 5000    execution.checkpointing.mode: EXACTLY_ONCE    # checkpoint保留数量    state.checkpoints.num-retained: 3    # history-server# 监视以下目录中已完成的作业    jobmanager.arcHive.fs.dir: s3://szyxflink/completed-jobs    # 每 10 秒刷新一次    historyserver.archive.fs.refresh-interval: 10000    historyserver.archive.fs.dir: s3://szyxflink/completed-jobs    # 高可用    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory    high-availability.storageDir: s3://szyxflink/ha    # 每6个小时触发一次savepoint    kubernetes.operator.periodic.savepoint.interval: 6h    kubernetes.operator.savepoint.history.max.age: 24h    kubernetes.operator.savepoint.history.max.count: 5    # Restart of unhealthy job deployments    kubernetes.operator.cluster.health-check.enabled: true    # Restart failed job deployments     kubernetes.operator.job.restart.failed: true  log4j-console.properties: |+    # This affects logging for both user code and Flink    rootLogger.level = INFO    rootLogger.appenderRef.console.ref = ConsoleAppender    rootLogger.appenderRef.rolling.ref = RollingFileAppender    # Uncomment this if you want to _only_ change Flink's logging    #logger.flink.name = org.apache.flink    #logger.flink.level = INFO    # The following lines keep the log level of common libraries/connectors on    # log level INFO. The root logger does not override this. You have to manually    # change the log levels here.    logger.akka.name = akka    logger.akka.level = INFO    logger.kafka.name= org.apache.kafka    logger.kafka.level = INFO    logger.hadoop.name = org.apache.hadoop    logger.hadoop.level = INFO    logger.ZooKeeper.name = org.apache.zookeeper    logger.zookeeper.level = INFO    # Log all infos to the console    appender.console.name = ConsoleAppender    appender.console.type = CONSOLE    appender.console.layout.type = PatternLayout    appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n    # Log all infos in the given rolling file    appender.rolling.name = RollingFileAppender    appender.rolling.type = RollingFile    appender.rolling.append = false    appender.rolling.fileName = ${sys:log.file}    appender.rolling.filePattern = ${sys:log.file}.%i    appender.rolling.layout.type = PatternLayout    appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n    appender.rolling.policies.type = Policies    appender.rolling.policies.size.type = SizeBasedTriggeringPolicy    appender.rolling.policies.size.size=100MB    appender.rolling.strategy.type = DefaultRolloverStrategy    appender.rolling.strategy.max = 10    # Suppress the irrelevant (wrong) warnings from the Netty channel handler    logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline    logger.netty.level = OFF

添加serviceAccount并授权

在 Kubernetes 上部署 Flink 集群时,需要创建一个 serviceAccount 来授权 Flink 任务在 Kubernetes 集群中执行。ServiceAccount 是 Kubernetes 中一种资源对象,用于授权 Pod 访问 Kubernetes API。当 Flink JobManager 或 TaskManager 启动时,需要使用这个 serviceAccount 来与 Kubernetes API 交互,获取集群资源并进行任务的调度和执行。

apiVersion: v1kind: ServiceAccountmetadata:  name: flink-service-account  namespace: szyx-flink---apiVersion: rbac.authorization.k8s.io/v1kind: Rolemetadata:  namespace: szyx-flink  name: flinkrules:- apiGroups: [""]  resources: ["pods", "services","configmaps"]  verbs: ["create", "get", "list", "watch", "delete"]- apiGroups: [""]  resources: ["pods/log"]  verbs: ["get"]- apiGroups: ["batch"]  resources: ["jobs"]  verbs: ["create", "get", "list", "watch", "delete"]- apiGroups: ["extensions"]  resources: ["ingresses"]  verbs: ["create", "get", "list", "watch", "delete"]---apiVersion: rbac.authorization.k8s.io/v1kind: RoleBindingmetadata:  namespace: szyx-flink  name: flink-role-bindingroleRef:  apiGroup: rbac.authorization.k8s.io  kind: Role  name: flinksubjects:- kind: ServiceAccount  name: flink-service-account  namespace: flink

部署JobManager

jobManager挂载用pvc

apiVersion: v1kind: PersistentVolumeClaimmetadata:  name: flink-tmp  namespace: szyx-flinkspec:  acceSSModes:    - ReadWriteOnce  resources:    requests:      storage: 40Gi

Deployment:

apiVersion: apps/v1kind: Deploymentmetadata:  name: flink-jobmanager  namespace: szyx-flinkspec:  replicas: 1 # Set the value to greater than 1 to start standby JobManagers  selector:    matchLabels:      app: flink      component: jobmanager  template:    metadata:      labels:        app: flink        component: jobmanager    spec:      containers:      - name: jobmanager        imagePullPolicy: Always        image: sivdead/flink:1.15.3_scala_2.12        env:        # 注入POD的ip到容器内        - name: POD_IP          valueFrom:            fieldRef:              apiVersion: v1              fieldPath: status.podIP        # 时区        - name: TZ          value: Asia/Shanghai        # The following args overwrite the value of jobmanager.rpc.address configured in the configuration config map to POD_IP.        args: ["jobmanager", "$(POD_IP)"]        ports:        - containerPort: 6123          name: rpc        - containerPort: 6124          name: blob-server        - containerPort: 8081          name: webui        livenessProbe:          tcpSocket:            port: 6123          initialDelaySeconds: 30          periodSeconds: 60        resources:          requests:            memory: "8192Mi"            cpu: "4"          limits:            memory: "8192Mi"            cpu: "4"        volumeMounts:        - name: flink-config-volume          mountPath: /opt/flink/conf        - name: tmp-dir          mountPath: /opt/flink/tmp        securityContext:          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary      serviceAccountName: flink-service-account # Service account which has the permissions to create, edit, delete ConfigMaps      # 节点选择器      nodeSelector:        zone: mainland      # 节点容忍      tolerations:        - key: zone          value: mainland          effect: NoSchedule      volumes:      - name: flink-config-volume        configMap:          name: flink-config          items:          - key: flink-conf.yaml            path: flink-conf.yaml          - key: log4j-console.properties            path: log4j-console.properties        name: tmp-dir        persistentVolumeClaim:          claimName: flink-tmp

Service:

apiVersion: v1kind: Servicemetadata:  name: flink-jobmanagerspec:  type: ClusterIP  ports:  - name: rpc    port: 6123  - name: blob-server    port: 6124  - name: webui    port: 8081  selector:    app: flink    component: jobmanager

Ingress:

apiVersion: extensions/v1beta1kind: Ingressmetadata:  annotations:    # 因为有可能需要上传jar包,所以需要设置大一些    Nginx.ingress.kubernetes.io/proxy-body-size: 300m    nginx.ingress.kubernetes.io/rewrite-target: /$1  name: job-manager  namespace: szyx-flinkspec:  rules:  - host: flink.k8s.io    Http:      paths:      - backend:          serviceName: flink-jobmanager          servicePort: 8081        path: /flink/(.*)

访问http://flink.k8s.io/flink/能打开flink界面,说明部署完成

基于k8s如何部署Session模式Flink集群

部署TaskManager

Deployment:

apiVersion: apps/v1kind: Deploymentmetadata:  name: flink-taskmanager  namespace: szyx-flinkspec:  replicas: 2  selector:    matchLabels:      app: flink      component: taskmanager  template:    metadata:      labels:        app: flink        component: taskmanager    spec:      containers:      - name: taskmanager        imagePullPolicy: Always        image: sivdead/flink:1.15.3_scala_2.12        args: ["taskmanager"]        ports:        - containerPort: 6122          name: rpc        - containerPort: 6125          name: query-state        livenessProbe:          tcpSocket:            port: 6122          initialDelaySeconds: 30          periodSeconds: 60        volumeMounts:        - name: flink-config-volume          mountPath: /opt/flink/conf/        securityContext:          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary        resources:          requests:            memory: "8192Mi"            cpu: "4"          limits:            memory: "8192Mi"            cpu: "4"            # 节点选择器      nodeSelector:        zone: mainland      # 节点容忍      tolerations:        - key: zone          value: mainland          effect: NoSchedule      volumes:      - name: flink-config-volume        configMap:          name: flink-config          items:          - key: flink-conf.yaml            path: flink-conf.yaml          - key: log4j-console.properties            path: log4j-console.properties

部署完成后,打开flink页面,查看TaskManages:

基于k8s如何部署Session模式Flink集群

测试提交作业

  • 在页面上提交flink自带的示例:WordCount.jar

基于k8s如何部署Session模式Flink集群

  • 重启jobmanager,检查作业jar包是否依然存在

运行作业

基于k8s如何部署Session模式Flink集群

检查运行结果

基于k8s如何部署Session模式Flink集群

基于k8s如何部署Session模式Flink集群

到此,关于“基于k8s如何部署Session模式Flink集群”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注编程网网站,小编会继续努力为大家带来更多实用的文章!

--结束END--

本文标题: 基于k8s如何部署Session模式Flink集群

本文链接: https://www.lsjlt.com/news/351743.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • 基于k8s如何部署Session模式Flink集群
    这篇文章主要介绍“基于k8s如何部署Session模式Flink集群”,在日常操作中,相信很多人在基于k8s如何部署Session模式Flink集群问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”基于k8s如何...
    99+
    2023-07-05
  • 一文详解基于k8s部署Session模式Flink集群
    目录基于k8s部署Session模式Flink集群什么是Session模式Flink的filesystem基于k8s部署高可用Session模式Flink集群各组件版本号制作镜像配置...
    99+
    2023-03-15
    k8s部署Session模式Flink集群 k8s部署集群
  • k8s如何部署redis集群
    这篇文章主要讲解了“k8s如何部署redis集群”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“k8s如何部署redis集群”吧!redis集群搭建1.1使用redis-cli创建集群#&nb...
    99+
    2023-07-05
  • k8s中如何部署redis集群
    在Kubernetes中部署Redis集群通常可以通过以下步骤进行: 创建Redis的ConfigMap:在Kubernetes中...
    99+
    2024-04-09
    redis
  • k8s中如何部署mysql集群
    在Kubernetes中部署MySQL集群可以使用StatefulSet和PersistentVolume。以下是一个简单的步骤: ...
    99+
    2024-04-02
  • 如何在K8s上部署Redis集群
    这篇文章将为大家详细讲解有关如何在K8s上部署Redis集群,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。一、前言 架构原理:每个Master都可以拥有多个Slave。当...
    99+
    2024-04-02
  • Kubernetes中如何使用Rancher部署K8S集群
    本篇文章给大家分享的是有关Kubernetes中如何使用Rancher部署K8S集群,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。1.安装Rancher这里使用三台机器来搭建K...
    99+
    2023-06-19
  • 如何部署mysql的集群模式galera-cluster
    这篇文章将为大家详细讲解有关如何部署mysql的集群模式galera-cluster,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。一: galera-cluster 的介...
    99+
    2024-04-02
  • 关于Rancher部署并导入K8S集群的问题
    Rancher 的部署可以有三种架构: 高可用 Kubernetes 安装: 建议使用 Kubernetes 程序包管理器 Helm 在专用的 Kubernetes 集群上...
    99+
    2024-04-02
  • 如何基于k8s的Ingress部署hexo博客
    这篇文章主要介绍“如何基于k8s的Ingress部署hexo博客”,在日常操作中,相信很多人在如何基于k8s的Ingress部署hexo博客问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”如何基于k8s的Ing...
    99+
    2023-06-19
  • redis如何部署集群
    这篇文章主要介绍“redis如何部署集群”,在日常操作中,相信很多人在redis如何部署集群问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”redis如何部署集群”的疑惑有所帮...
    99+
    2024-04-02
  • 如何部署Spark集群
    今天就跟大家聊聊有关如何部署Spark集群,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。1. 安装环境简介硬件环境:两台四核cpu、4G内存、500...
    99+
    2024-04-02
  • Linux系统安装部署nacos集群:基于nacos2.0.3
    目录 前言 1.yum安装方式 2.docker安装方式 一、前置条件 1.操作系统或者虚拟机上安装jdk,版本>=8 2.下载好nacos2.0.3的压缩包 二、解压 三、部署  1.将 cluster.conf.example 文件复制...
    99+
    2023-08-31
    linux 运维 服务器
  • php中什么是集群部署?如何实现集群部署?
    随着互联网进入快速发展的时代,各种网站、应用如雨后春笋般出现,人们对于服务的需求越来越高。而随着用户量增加,单一服务器已经无法满足需求,集群部署PHP项目成为解决方案之一。一、什么是集群部署?集群部署是将多台服务器组合在一起,按照特定的方式...
    99+
    2023-05-14
    集群部署 php
  • 基于Redis6.2.6版本部署Redis Cluster集群的问题
    目录1.Redis6.2.6简介以及环境规划2.二进制安装Redis程序2.1.二进制安装redis6.2.62.2.创建Reids Cluster集群目录3.配置Redis Clu...
    99+
    2024-04-02
  • docker如何部署etcd集群
    目录创建etcd数据目录创建docker网络etcd-cluster-compose.yml启动并验证集群启动验证集群k/v操作CURLetcdctl总结需要安装: dockerdo...
    99+
    2023-03-19
    docker部署etcd集群 docker部署 docker etcd集群
  • docker如何部署kafka集群
    要部署Kafka集群,可以使用Docker来简化整个过程。下面是一个基本的步骤:1. 安装Docker和Docker Compose...
    99+
    2023-10-08
    kafka docker
  • k8s如何部署分布式jenkins
    k8s如何部署分布式jenkins,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。Kubernetes是一个开源的,用于管理云平台中多个主机上的容器化的应用,Kubernet...
    99+
    2023-06-04
  • Linux中如何部署Hadoop集群
    这篇文章给大家分享的是有关Linux中如何部署Hadoop集群的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。Hadoop 是一个分布式系统基础架构,由Apache基金会开发。用户可以在不了解分布式底层细节的情况下...
    99+
    2023-06-27
  • MariaDB如何进行集群部署
    在MariaDB中进行集群部署通常使用Galera Cluster来实现。Galera Cluster是一个同步多主集群解决方案,可...
    99+
    2024-04-09
    MariaDB
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作