Flink on Kubernetes

网友投稿 945 2022-10-24

本站部分文章、图片属于网络上可搜索到的公开信息,均用于学习和交流用途,不能代表睿象云的观点、立场或意见。我们接受网民的监督,如发现任何违法内容或侵犯了您的权益,请第一时间联系小编邮箱jiasou666@gmail.com 处理。

Flink on Kubernetes

(1) MiniKube方式:

https://kubernetes.io/docs/setup/learning-environment/minikube/

(2) Kind方式:

https://github.com/kubernetes-sigs/kind

在k8s上部署一个基本Flink session cluster 有三个组件:

(1)  一个Deployment/Job 用来运行JobManager

(2)  一个Deployment管理一群TaskManager

(3)  一个Service用于暴露JobManager的REST和UI端口服务

(1)安装docker

https://docs.docker.com/install/

(2)  创建user code jars,用户jars会跟flink binaries在一个镜像

(3)  dockerfile自定义设置,镜像会基于Dockerfile构建

FROM openjdk:8-jre-alpine# Install requirementsRUN apk add --no-cache bash snappy libc6-compat# Flink environment variablesENV FLINK_INSTALL_PATH=/optENV FLINK_HOME $FLINK_INSTALL_PATH/flinkENV FLINK_LIB_DIR $FLINK_HOME/libENV FLINK_PLUGINS_DIR $FLINK_HOME/pluginsENV FLINK_OPT_DIR $FLINK_HOME/optENV FLINK_JOB_ARTIFACTS_DIR $FLINK_INSTALL_PATH/artifactsENV PATH $PATH:$FLINK_HOME/bin# flink-dist can point to a directory or a tarball on the local systemARG flink_dist=NOT_SETARG job_artifacts=NOT_SETARG python_version=NOT_SET# hadoop jar is optionalARG hadoop_jar=NOT_SET*# Install PythonRUN \ if [ "$python_version" = "2" ]; then \ apk add --no-cache python; \ elif [ "$python_version" = "3" ]; then \ apk add --no-cache python3 && ln -s usr/bin/python3 usr/bin/python; \ fi# Install build dependencies and flinkADD $flink_dist $hadoop_jar $FLINK_INSTALL_PATH/ADD $job_artifacts/* $FLINK_JOB_ARTIFACTS_DIR/RUN set -x && \ ln -s $FLINK_INSTALL_PATH/flink-[0-9]* $FLINK_HOME && \ for jar in $FLINK_JOB_ARTIFACTS_DIR/*.jar; do [ -f "$jar" ] || continue; ln -s $jar $FLINK_LIB_DIR; done && \ if [ -n "$python_version" ]; then ln -s $FLINK_OPT_DIR/flink-python*.jar $FLINK_LIB_DIR; fi && \ if [ -f ${FLINK_INSTALL_PATH}/flink-shaded-hadoop* ]; then ln -s ${FLINK_INSTALL_PATH}/flink-shaded-hadoop* $FLINK_LIB_DIR; fi && \ addgroup -S flink && adduser -D -S -H -G flink -h $FLINK_HOME flink && \ chown -R flink:flink ${FLINK_INSTALL_PATH}/flink-* && \ chown -R flink:flink ${FLINK_JOB_ARTIFACTS_DIR}/ && \ chown -h flink:flink $FLINK_HOMECOPY docker-entrypoint.sh USER flinkEXPOSE 8081 6123ENTRYPOINT ["/docker-entrypoint.sh"]CMD ["--help"]

(4)  三种构建镜像方式

1.基于本地的flink构建

build.sh --from-local-dist --job-artifacts [--with-python2|--with-python3] --image-name

需要注意:前提是你已经构建了flink binaries

mvn package -pl flink-dist -am

2.从archive stored中构建flink image

build.sh --from-archive --job-artifacts [--with-python2|--with-python3] --image-name

3.  构建定制化版本Flink/Hadoop/Scala 的镜像

build.sh --from-release --flink-version 1.6.0 --hadoop-version 2.8 --scala-version 2.11 --job-artifacts [--with-python2|--with-python3] --image-name

(1)配置yaml文件

flink-configuration-configmap.yaml

apiVersion: v1kind: ConfigMapmetadata: name: flink-config labels: app: flinkdata: flink-conf.yaml: |+ jobmanager.rpc.address: flink-jobmanager taskmanager.numberOfTaskSlots: 1 blob.server.port: 6124 jobmanager.rpc.port: 6123 taskmanager.rpc.port: 6122 jobmanager.heap.size: 1024m taskmanager.memory.total-process.size: 1024m log4j.properties: |+ log4j.rootLogger=INFO, file log4j.logger.akka=INFO log4j.logger.org.apache.kafka=INFO log4j.logger.org.apache.hadoop=INFO log4j.logger.org.apache.zookeeper=INFO log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.file=${log.file} log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file

jobmanager-deployment.yaml

apiVersion: apps/v1kind: Deploymentmetadata: name: flink-jobmanagerspec: replicas: 1 selector: matchLabels: app: flink component: jobmanager template: metadata: labels: app: flink component: jobmanager spec: containers: - name: jobmanager image: flink:latest workingDir: /opt/flink command: ["/bin/bash", "-c", "$FLINK_HOME/bin/jobmanager.sh start;\ while :; do if [[ -f $(find log -name '*jobmanager*.log' -print -quit) ]]; then tail -f -n +1 log/*jobmanager*.log; fi; done"] ports: - containerPort: 6123 name: rpc - containerPort: 6124 name: blob - containerPort: 8081 name: ui livenessProbe: tcpSocket: port: 6123 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/flink/conf securityContext: runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j.properties path: log4j.properties

taskmanager-deployment.yaml

apiVersion: apps/v1kind: Deploymentmetadata: name: flink-taskmanagerspec: replicas: 2 selector: matchLabels: app: flink component: taskmanager template: metadata: labels: app: flink component: taskmanager spec: containers: - name: taskmanager image: flink:latest workingDir: /opt/flink command: ["/bin/bash", "-c", "$FLINK_HOME/bin/taskmanager.sh start; \ while :; do if [[ -f $(find log -name '*taskmanager*.log' -print -quit) ]]; then tail -f -n +1 log/*taskmanager*.log; fi; done"] ports: - containerPort: 6122 name: rpc livenessProbe: tcpSocket: port: 6122 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/flink/conf/ securityContext: runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j.properties path: log4j.properties

jobmanager-service.yaml

apiVersion: v1kind: Servicemetadata: name: flink-jobmanagerspec: type: ClusterIP ports: - name: rpc port: 6123 - name: blob port: 6124 - name: ui port: 8081 selector: app: flink component: jobmanager

jobmanager-rest-service.yaml. 这个文件为可选项,一般生产也不使用,NodePort 服务是引导外部流量到你的服务的最原始方式。NodePort,正如这个名字所示,在K8s节点上开放一个特定端口,任何发送到该端口的流量都被转发到对应服务。

apiVersion: v1kind: Servicemetadata: name: flink-jobmanager-restspec: type: NodePort ports: - name: rest port: 8081 targetPort: 8081 selector: app: flink component: jobmanager

(2)  按照顺序创建Resource

kubectl create -f flink-configuration-configmap.yamlkubectl create -f jobmanager-service.yamlkubectl create -f jobmanager-deployment.yamlkubectl create -f taskmanager-deployment.yaml

(3)  查看Resource

# 查看pod -n 指定自己设置的namespace名字kubectl get pod -n namespace# 查看deploymentkubectl get deploy# 查看servicekubectl get svc# 查看jobkubectl get job

(1) kubectl proxy

在terminal运行 kubectl proxy浏览器中打开http://localhost:8001/api/v1/namespaces/default/services/flink-jobmanager:ui/proxy

(2) kubectl port-forward

1.  运行命令映射jobmanager web ui端口到本机端口

kubectl port-forward ${flink-jobmanager-pod} 8081:8081

2.  浏览器中打开http://localhost:8081

(3) 提交Job时候命令行指定(实现基于第二种)

./bin/flink run -m localhost:8081 ./examples/streaming/WordCount.jar

(4)为JobManager的REST服务创建NodePort服务

# 创建NodePort servicekubectl create -f jobmanager-rest-service.yaml# 获取service的node port情况kubectl get svc flink-jobmanager-rest# 提交Job./bin/flink run -m : ./examples/streaming/WordCount.jar

kubectl delete -f jobmanager-deployment.yamlkubectl delete -f taskmanager-deployment.yamlkubectl delete -f jobmanager-service.yamlkubectl delete -f flink-configuration-configmap.yaml

----------  END  ----------

!关注不迷路~ 各种福利、资源定期分享!

上一篇:kubernetes 架构
下一篇:Kubernetes常用术语
相关文章

 发表评论

暂时没有评论,来抢沙发吧~