In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-01 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly introduces how to build an environment based on K8s in flink, which has a certain reference value. Interested friends can refer to it. I hope you will gain a lot after reading this article.
I wrote some basic components of flink before, but I haven't talked about the environment building of flink yet. now let's talk about the basic environment building.
1. Reasons for using StatefulSet
For Flink, the biggest reason for using sts is that pod's hostname is orderly; the potential benefits are
Pod with hostname of-0 and-1 can be directly specified as jobmanager; can use one statefulset to start a cluster, while deployment must have 2; Jobmanager and TaskManager independent deployment
After pod fail for various reasons, since the hostname of pod pulled up by StatefulSet remains unchanged, the speed of cluster recover can theoretically be faster than that of deployment (each host name of deployment is random)
two。 Deploy Flink using StatefulSet
2.1 entrypoint for docker
Since it is up to the hostname to determine whether to start jobmanager or taskmanager, you need to match whether the hostname of the set jobmanager is consistent in entrypoint.
The passed parameter is: cluster ha; automatically determines which role to start based on the host name. You can also specify the role name directly.
The script for docker-entrypoint.sh is as follows:
#! / bin/sh # If unspecified, the hostname of the container is taken as the JobManager addressACTION_CMD= "$1" # if use cluster model, pod ${JOB_CLUSTER_NAME}-0Magna ${JOB_CLUSTER_NAME}-1 as jobmanagerif [${ACTION_CMD} = = "cluster"] Then jobmanagers= (${JOB_MANGER_HOSTS//,/}) ACTION_CMD= "taskmanager" for i in ${! jobmanagers [@]} do if ["$(hostname-s)" = "${jobmanagers [I]}"]; then ACTION_CMD= "jobmanager" echo "pod hostname match jobmanager config host, change action to jobmanager." Fi donefi # if ha model, replace ha configurationif ["$2" = = "ha"] Then sed-I-e "s | high-availability.cluster-id: cluster-id | high-availability.cluster-id: ${FLINK_CLUSTER_IDENT} | g"$FLINK_CONF_DIR/flink-conf.yaml" sed-I-e "s | high-availability.zookeeper.quorum: localhost:2181 | high-availability.zookeeper.quorum: ${FLINK_ZK_QUORUM} | g" $FLINK_CONF_DIR/flink-conf.yaml "sed-I-e" s | state.backend.fs.checkpointdir: checkpointdir | state.backend.fs.checkpointdir : hdfs:///user/flink/flink-checkpoints/$ {FLINK_CLUSTER_IDENT} | g "$FLINK_CONF_DIR/flink-conf.yaml" sed-I-e "s | high-availability.storageDir: hdfs:///flink/ha/ | high-availability.storageDir: hdfs:///user/flink/ha/$ {FLINK_CLUSTER_IDENT} | g"$FLINK_CONF_DIR/flink-conf.yaml" fi if [${ACTION_CMD} = = "help"] Then echo "Usage: $(basename" $0 ") (cluster ha | jobmanager | taskmanager | local | help)" exit 0elif [${ACTION_CMD} = = "jobmanager"] Then JOB_MANAGER_RPC_ADDRESS=$ {JOB_MANAGER_RPC_ADDRESS:-$ (hostname-f)} echo "Starting Job Manager" sed-I-e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS} / g"$FLINK_CONF_DIR/flink-conf.yaml" sed-I-e "s/jobmanager.heap.mb: 1024/jobmanager.heap.mb: ${JOB_MANAGER_HEAP_MB} / G "$FLINK_CONF_DIR/flink-conf.yaml" echo "config file:" & & grep'^ [^\ n #]'"$FLINK_CONF_DIR/flink-conf.yaml" exec "$FLINK_HOME/bin/jobmanager.sh" start-foreground cluster elif [${ACTION_CMD} = = "taskmanager"] Then TASK_MANAGER_NUMBER_OF_TASK_SLOTS=$ {TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-$ (grep-c ^ processor / proc/cpuinfo)} echo "Starting Task Manager" sed-I-e "s/taskmanager.heap.mb: 1024/taskmanager.heap.mb: ${TASK_MANAGER_HEAP_MB} / g"$FLINK_CONF_DIR/flink-conf.yaml" sed-I-e "s/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots: $TASK_ MANAGER_NUMBER_OF_TASK_SLOTS/g "$FLINK_CONF_DIR/flink-conf.yaml" echo "config file:" & & grep'^ [^\ n #]'"$FLINK_CONF_DIR/flink-conf.yaml" exec "$FLINK_HOME/bin/taskmanager.sh" start-foregroundelif [${ACTION_CMD} = = "local"] Then echo "Starting local cluster" exec "$FLINK_HOME/bin/jobmanager.sh" start-foreground localfi exec "$@"
2.2. Using ConfigMap to distribute hdfs and flink configuration files
ConfigMap introduction reference:
Https://kubernetes.io/docs/tasks/configure-pod-container/configure-pod-configmap/#create-configmaps-from-files
Q: why use ConfigMap
A: because the hadoop configuration file is different in different environments, it is not easy to package it into the image, so there are only two appropriate ways to use ConfigMap and Pod's InitContainer. With InitContainer, you can wget to get a remote configuration file, but you also need to rely on a configuration service. By comparison, ConfigMap is simpler.
Create ConfigMap
[root@rc-mzgjg] # kubectl create configmap hdfs-conf-- from-file=hdfs-site.xml-- from-file=core-site.xml
[root@rc-mzgjg] # kubectl create configmap flink-conf-- from-file=flink-conf/log4j-console.properties-- from-file=flink-conf/flink-conf.yaml
Use the describe command to view the ConfigMap created with the name hdfs-conf, and the contents of the file will be displayed to the console
[root@rc-mzgjg ~] # kubectl describe configmap hdfs-conf
Name: hdfs-conf
Namespace: default
Labels:
Annotations:
Data
=
Core-site.xml:
Using ConfigMap with volumeMounts
If you want to use a configuration file for the Container of Pod, you can mount it to Container through volumeMounts. Mount the configuration file to the / home/xxxx/conf/hadoop directory as shown in the following demo
ApiVersion: apps/v1kind: StatefulSetmetadata: name: flink-jmspec: selector: matchLabels: app: flink-jm serviceName: flink-jm replicas: 2 podManagementPolicy: Parallel template: metadata: labels: app: flink-jmspec: terminationGracePeriodSeconds: 2 containers:-name: test imagePullPolicy: Always image: ip:5000/test:latest args: ["sleep" "1D"] volumeMounts:-name: hdfs-conf mountPath: / home/xxxx/conf/hadoop volumes:-name: hdfs-conf configMap: # Provide the name of the ConfigMap containing the files you want to add to the container name: hdfs-conf
After creating the Pod, check the mount of the configuration file
[hadoop@flink-jm-0 hadoop] $ll / home/xxxx/conf/hadoop
Total 0
Lrwxrwxrwx. 1 root root 20 Apr 9 06:54 core-site.xml->.. data/core-site.xml
Lrwxrwxrwx. 1 root root 20 Apr 9 06:54 hdfs-site.xml->.. data/hdfs-site.xml
The configuration file is linked to the.. data directory
1.10 to support namespace sharing of Pod multi-Container
The original idea is that there are multiple Container in a Pod, and then the configuration file is one of the Container; tests to verify that the data directories cannot access each other; as expected, the image in one of the Container is the configuration file of hdfs-conf.
Containers:-name: hdfs-conf imagePullPolicy: Always image: ip:5000/hdfs-dev:2.6 args: ["sleep", "1D"]-name: flink-jm imagePullPolicy: Always image: ip:5000/flink:1.4.2
It is verified that the two Container's can only share the network, and the file directories can't see each other.
"Share Process Namespace between Containers in a Pod" this is only supported by Kubernates 1.10. Refer to
Https://kubernetes.io/docs/tasks/configure-pod-container/share-process-namespace/
2.3 configuration of StatefulSet
Configuration files for Flink and hadoop, which rely on ConfigMap for distribution
Environment variable name
Parameters.
Content
Description
The address JOB_MANAGER_HEAP_MB that FLINK_CLUSTER_IDENTnamespace/StatefulSet.namedefault/flink-cluster uses to set up zk ha and FLINK_ZK_QUORUMenv:FLINK_ZK_QUORUMip:2181HA ZK, the root directory of hdfs checkpiont
Env:JOB_MANAGER_HEAP_MB
Value:containers.resources.memory.limit-1024
The Heap size of 512JM. Due to the out-of-heap memory of Netty, you need a hostname smaller than container.resources.memory.limits; or a hostname that is easy to OOM killJOB_MANGER_HOSTSStatefulSet.name-0,StatefulSet.name-1flink-cluster-0,flink-cluster-1JM. You can use FQDNTASK_MANAGER_HEAP_MB instead of hostname.
Env:TASK_MANAGER_HEAP_MB
Value: containers.resources.memory.limit-1024
The Heap size of 512TM. Due to the out-of-heap memory of Netty, it needs to be less than container.resources.memory.limits;, otherwise it is easy to OOM killTASK_MANAGER_NUMBER_OF_TASK_SLOTS.
Containers.resources.cpu.limits
The number of slot of 2TM, which is set according to resources.cpu.limits
Pod's imagePullPolicy policy, test environment Always, pull every time to facilitate verification; online is IfNotPresent;. If you make changes to images, you must change the tag of images.
The complete content can be referred to as follows:
# headless service for statefulsetapiVersion: v1kind: Servicemetadata: name: flink-cluster labels: app: flink-clusterspec: clusterIP: None ports:-port: 8080 name: ui selector: app: flink-cluster---# create flink statefulsetapiVersion: apps/v1kind: StatefulSetmetadata: name: flink-clusterspec: selector: matchLabels: app: flink-cluster serviceName: flink-cluster replicas: 4 podManagementPolicy: Parallel template: metadata: labels: app: flink-cluster Spec: terminationGracePeriodSeconds: 2 containers:-name: flink-cluster imagePullPolicy: Always image: ip:5000/flink:1.4.2 args: ["cluster" "ha"] volumeMounts:-name: hdfs-conf mountPath: / home/xxxx/conf/hadoop-name: flink-conf mountPath: / home/xxxx/conf/flink-name: flink-log mountPath: / home/xxxx/logs resources: requests: memory: "1536Mi" Cpu: 1 limits: memory: "1536Mi" cpu: 2 env:-name: JOB_MANGER_HOSTS value: "flink-cluster-0 Flink-cluster-1 "- name: FLINK_CLUSTER_IDENT value:" default/flink-cluster "- name: TASK_MANAGER_NUMBER_OF_TASK_SLOTS value:" 2 "- name: FLINK_ZK_QUORUM value:" ip:2181 "- name: JOB_MANAGER_HEAP_MB value:" 512 "- name: TASK_MANAGER_HEAP _ MB value: "512" ports:-containerPort: 6124 name: blob-containerPort: 6125 name: query-containerPort: 8080 name: flink-ui volumes:-name: hdfs-conf configMap: # Provide the name of the ConfigMap containing the files you want to add to the container name: hdfs-conf -name: flink-conf configMap: name: flink-conf-name: flink-log hostPath: # directory location on host path: / tmp # this field is optional type: Directory
3. Test environment exposed to Flink UI
Because the test environment uses Flannel for network communication, the IP and port of Flink UI cannot be accessed outside the K8S cluster, so the internal IP needs to be mapped by NodePort. The configuration is as follows:
# only for test K8s cluster# use service to expose flink jobmanager 0rs web portapiVersion: v1kind: Servicemetadata: labels: app: flink-cluster statefulset.kubernetes.io/pod-name: flink-cluster-0 name: flink-web-0 namespace: defaultspec: ports:-port: 8080 protocol: TCP targetPort: 8080 selector: app: flink-cluster statefulset.kubernetes.io/pod-name: flink-cluster-0 type: NodePort---# use service to expose flink jobmanager 1s web portapiVersion: V1kind: Servicemetadata: labels: app: flink-cluster statefulset.kubernetes.io/pod-name: flink-cluster-1 name: flink-web-1 namespace: defaultspec: ports:-port: 8080 protocol: TCP targetPort: 8080 selector: app: flink-cluster statefulset.kubernetes.io/pod-name: flink-cluster-1 type: NodePort
4. Service deployment status
After performing the previous operation, you can view the current StatefulSet status
[root@rc-mzgjg] # kubectl get sts flink-cluster-o wide
NAME DESIRED CURRENT AGE CONTAINERS IMAGES
Flink-cluster 4 4 1h flink-cluster ip:5000/flink:1.4.2
Pod status of the container
[root@rc-mzgjg] # kubectl get pod-l app=flink-cluster-o wide
NAME READY STATUS RESTARTS AGE IP NODE
Flink-cluster-0 1/1 Running 0 1h ip1 ip5
Flink-cluster-1 1/1 Running 0 1h ip2 ip6
Flink-cluster-2 1/1 Running 0 1h ip3 ip7
Flink-cluster-3 1/1 Running 0 1h ip4 ip8
Related Service information
[root@rc-mzgjg] # kubectl get svc-l app=flink-cluster-o wide
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT (S) AGE SELECTOR
Flink-cluster ClusterIP None 8080/TCP 2h app=flink-cluster
Flink-web-0 NodePort 10.254.8.103 8080:30495/TCP 1h app=flink-cluster,statefulset.kubernetes.io/pod-name=flink-cluster-0
Flink-web-1 NodePort 10.254.172.158 8080:30158/TCP 1h app=flink-cluster,statefulset.kubernetes.io/pod-name=flink-cluster-1
According to the information of Service, you can access Flink UI through any ip address of K8s node plus PORT.
The main point here is that a problem related to permissions was encountered in the process of building.
The error log is as follows
ERROR setFile (null,true) call failed
FileNotFoundException:no such file or directory
Reason: because the flink service lacks permissions for the log directory
Modification method:
1.adduser flink add appropriate users
2.chown-R flink:flink / home/xxxx/logs
Thank you for reading this article carefully. I hope the article "how to build an environment based on K8s in flink" shared by the editor will be helpful to you. At the same time, I also hope you will support us and pay attention to the industry information channel. More related knowledge is waiting for you to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.