Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to build Environment based on K8s in flink

2025-03-01 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)06/01 Report--

This article mainly introduces how to build an environment based on K8s in flink, which has a certain reference value. Interested friends can refer to it. I hope you will gain a lot after reading this article.

I wrote some basic components of flink before, but I haven't talked about the environment building of flink yet. now let's talk about the basic environment building.

1. Reasons for using StatefulSet

For Flink, the biggest reason for using sts is that pod's hostname is orderly; the potential benefits are

Pod with hostname of-0 and-1 can be directly specified as jobmanager; can use one statefulset to start a cluster, while deployment must have 2; Jobmanager and TaskManager independent deployment

After pod fail for various reasons, since the hostname of pod pulled up by StatefulSet remains unchanged, the speed of cluster recover can theoretically be faster than that of deployment (each host name of deployment is random)

two。 Deploy Flink using StatefulSet

2.1 entrypoint for docker

Since it is up to the hostname to determine whether to start jobmanager or taskmanager, you need to match whether the hostname of the set jobmanager is consistent in entrypoint.

The passed parameter is: cluster ha; automatically determines which role to start based on the host name. You can also specify the role name directly.

The script for docker-entrypoint.sh is as follows:

#! / bin/sh # If unspecified, the hostname of the container is taken as the JobManager addressACTION_CMD= "$1" # if use cluster model, pod ${JOB_CLUSTER_NAME}-0Magna ${JOB_CLUSTER_NAME}-1 as jobmanagerif [${ACTION_CMD} = = "cluster"] Then jobmanagers= (${JOB_MANGER_HOSTS//,/}) ACTION_CMD= "taskmanager" for i in ${! jobmanagers [@]} do if ["$(hostname-s)" = "${jobmanagers [I]}"]; then ACTION_CMD= "jobmanager" echo "pod hostname match jobmanager config host, change action to jobmanager." Fi donefi # if ha model, replace ha configurationif ["$2" = = "ha"] Then sed-I-e "s | high-availability.cluster-id: cluster-id | high-availability.cluster-id: ${FLINK_CLUSTER_IDENT} | g"$FLINK_CONF_DIR/flink-conf.yaml" sed-I-e "s | high-availability.zookeeper.quorum: localhost:2181 | high-availability.zookeeper.quorum: ${FLINK_ZK_QUORUM} | g" $FLINK_CONF_DIR/flink-conf.yaml "sed-I-e" s | state.backend.fs.checkpointdir: checkpointdir | state.backend.fs.checkpointdir : hdfs:///user/flink/flink-checkpoints/$ {FLINK_CLUSTER_IDENT} | g "$FLINK_CONF_DIR/flink-conf.yaml" sed-I-e "s | high-availability.storageDir: hdfs:///flink/ha/ | high-availability.storageDir: hdfs:///user/flink/ha/$ {FLINK_CLUSTER_IDENT} | g"$FLINK_CONF_DIR/flink-conf.yaml" fi if [${ACTION_CMD} = = "help"] Then echo "Usage: $(basename" $0 ") (cluster ha | jobmanager | taskmanager | local | help)" exit 0elif [${ACTION_CMD} = = "jobmanager"] Then JOB_MANAGER_RPC_ADDRESS=$ {JOB_MANAGER_RPC_ADDRESS:-$ (hostname-f)} echo "Starting Job Manager" sed-I-e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS} / g"$FLINK_CONF_DIR/flink-conf.yaml" sed-I-e "s/jobmanager.heap.mb: 1024/jobmanager.heap.mb: ${JOB_MANAGER_HEAP_MB} / G "$FLINK_CONF_DIR/flink-conf.yaml" echo "config file:" & & grep'^ [^\ n #]'"$FLINK_CONF_DIR/flink-conf.yaml" exec "$FLINK_HOME/bin/jobmanager.sh" start-foreground cluster elif [${ACTION_CMD} = = "taskmanager"] Then TASK_MANAGER_NUMBER_OF_TASK_SLOTS=$ {TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-$ (grep-c ^ processor / proc/cpuinfo)} echo "Starting Task Manager" sed-I-e "s/taskmanager.heap.mb: 1024/taskmanager.heap.mb: ${TASK_MANAGER_HEAP_MB} / g"$FLINK_CONF_DIR/flink-conf.yaml" sed-I-e "s/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots: $TASK_ MANAGER_NUMBER_OF_TASK_SLOTS/g "$FLINK_CONF_DIR/flink-conf.yaml" echo "config file:" & & grep'^ [^\ n #]'"$FLINK_CONF_DIR/flink-conf.yaml" exec "$FLINK_HOME/bin/taskmanager.sh" start-foregroundelif [${ACTION_CMD} = = "local"] Then echo "Starting local cluster" exec "$FLINK_HOME/bin/jobmanager.sh" start-foreground localfi exec "$@"

2.2. Using ConfigMap to distribute hdfs and flink configuration files

ConfigMap introduction reference:

Https://kubernetes.io/docs/tasks/configure-pod-container/configure-pod-configmap/#create-configmaps-from-files

Q: why use ConfigMap

A: because the hadoop configuration file is different in different environments, it is not easy to package it into the image, so there are only two appropriate ways to use ConfigMap and Pod's InitContainer. With InitContainer, you can wget to get a remote configuration file, but you also need to rely on a configuration service. By comparison, ConfigMap is simpler.

Create ConfigMap

[root@rc-mzgjg] # kubectl create configmap hdfs-conf-- from-file=hdfs-site.xml-- from-file=core-site.xml

[root@rc-mzgjg] # kubectl create configmap flink-conf-- from-file=flink-conf/log4j-console.properties-- from-file=flink-conf/flink-conf.yaml

Use the describe command to view the ConfigMap created with the name hdfs-conf, and the contents of the file will be displayed to the console

[root@rc-mzgjg ~] # kubectl describe configmap hdfs-conf

Name: hdfs-conf

Namespace: default

Labels:

Annotations:

Data

=

Core-site.xml:

Using ConfigMap with volumeMounts

If you want to use a configuration file for the Container of Pod, you can mount it to Container through volumeMounts. Mount the configuration file to the / home/xxxx/conf/hadoop directory as shown in the following demo

ApiVersion: apps/v1kind: StatefulSetmetadata: name: flink-jmspec: selector: matchLabels: app: flink-jm serviceName: flink-jm replicas: 2 podManagementPolicy: Parallel template: metadata: labels: app: flink-jmspec: terminationGracePeriodSeconds: 2 containers:-name: test imagePullPolicy: Always image: ip:5000/test:latest args: ["sleep" "1D"] volumeMounts:-name: hdfs-conf mountPath: / home/xxxx/conf/hadoop volumes:-name: hdfs-conf configMap: # Provide the name of the ConfigMap containing the files you want to add to the container name: hdfs-conf

After creating the Pod, check the mount of the configuration file

[hadoop@flink-jm-0 hadoop] $ll / home/xxxx/conf/hadoop

Total 0

Lrwxrwxrwx. 1 root root 20 Apr 9 06:54 core-site.xml->.. data/core-site.xml

Lrwxrwxrwx. 1 root root 20 Apr 9 06:54 hdfs-site.xml->.. data/hdfs-site.xml

The configuration file is linked to the.. data directory

1.10 to support namespace sharing of Pod multi-Container

The original idea is that there are multiple Container in a Pod, and then the configuration file is one of the Container; tests to verify that the data directories cannot access each other; as expected, the image in one of the Container is the configuration file of hdfs-conf.

Containers:-name: hdfs-conf imagePullPolicy: Always image: ip:5000/hdfs-dev:2.6 args: ["sleep", "1D"]-name: flink-jm imagePullPolicy: Always image: ip:5000/flink:1.4.2

It is verified that the two Container's can only share the network, and the file directories can't see each other.

"Share Process Namespace between Containers in a Pod" this is only supported by Kubernates 1.10. Refer to

Https://kubernetes.io/docs/tasks/configure-pod-container/share-process-namespace/

2.3 configuration of StatefulSet

Configuration files for Flink and hadoop, which rely on ConfigMap for distribution

Environment variable name

Parameters.

Content

Description

The address JOB_MANAGER_HEAP_MB that FLINK_CLUSTER_IDENTnamespace/StatefulSet.namedefault/flink-cluster uses to set up zk ha and FLINK_ZK_QUORUMenv:FLINK_ZK_QUORUMip:2181HA ZK, the root directory of hdfs checkpiont

Env:JOB_MANAGER_HEAP_MB

Value:containers.resources.memory.limit-1024

The Heap size of 512JM. Due to the out-of-heap memory of Netty, you need a hostname smaller than container.resources.memory.limits; or a hostname that is easy to OOM killJOB_MANGER_HOSTSStatefulSet.name-0,StatefulSet.name-1flink-cluster-0,flink-cluster-1JM. You can use FQDNTASK_MANAGER_HEAP_MB instead of hostname.

Env:TASK_MANAGER_HEAP_MB

Value: containers.resources.memory.limit-1024

The Heap size of 512TM. Due to the out-of-heap memory of Netty, it needs to be less than container.resources.memory.limits;, otherwise it is easy to OOM killTASK_MANAGER_NUMBER_OF_TASK_SLOTS.

Containers.resources.cpu.limits

The number of slot of 2TM, which is set according to resources.cpu.limits

Pod's imagePullPolicy policy, test environment Always, pull every time to facilitate verification; online is IfNotPresent;. If you make changes to images, you must change the tag of images.

The complete content can be referred to as follows:

# headless service for statefulsetapiVersion: v1kind: Servicemetadata: name: flink-cluster labels: app: flink-clusterspec: clusterIP: None ports:-port: 8080 name: ui selector: app: flink-cluster---# create flink statefulsetapiVersion: apps/v1kind: StatefulSetmetadata: name: flink-clusterspec: selector: matchLabels: app: flink-cluster serviceName: flink-cluster replicas: 4 podManagementPolicy: Parallel template: metadata: labels: app: flink-cluster Spec: terminationGracePeriodSeconds: 2 containers:-name: flink-cluster imagePullPolicy: Always image: ip:5000/flink:1.4.2 args: ["cluster" "ha"] volumeMounts:-name: hdfs-conf mountPath: / home/xxxx/conf/hadoop-name: flink-conf mountPath: / home/xxxx/conf/flink-name: flink-log mountPath: / home/xxxx/logs resources: requests: memory: "1536Mi" Cpu: 1 limits: memory: "1536Mi" cpu: 2 env:-name: JOB_MANGER_HOSTS value: "flink-cluster-0 Flink-cluster-1 "- name: FLINK_CLUSTER_IDENT value:" default/flink-cluster "- name: TASK_MANAGER_NUMBER_OF_TASK_SLOTS value:" 2 "- name: FLINK_ZK_QUORUM value:" ip:2181 "- name: JOB_MANAGER_HEAP_MB value:" 512 "- name: TASK_MANAGER_HEAP _ MB value: "512" ports:-containerPort: 6124 name: blob-containerPort: 6125 name: query-containerPort: 8080 name: flink-ui volumes:-name: hdfs-conf configMap: # Provide the name of the ConfigMap containing the files you want to add to the container name: hdfs-conf -name: flink-conf configMap: name: flink-conf-name: flink-log hostPath: # directory location on host path: / tmp # this field is optional type: Directory

3. Test environment exposed to Flink UI

Because the test environment uses Flannel for network communication, the IP and port of Flink UI cannot be accessed outside the K8S cluster, so the internal IP needs to be mapped by NodePort. The configuration is as follows:

# only for test K8s cluster# use service to expose flink jobmanager 0rs web portapiVersion: v1kind: Servicemetadata: labels: app: flink-cluster statefulset.kubernetes.io/pod-name: flink-cluster-0 name: flink-web-0 namespace: defaultspec: ports:-port: 8080 protocol: TCP targetPort: 8080 selector: app: flink-cluster statefulset.kubernetes.io/pod-name: flink-cluster-0 type: NodePort---# use service to expose flink jobmanager 1s web portapiVersion: V1kind: Servicemetadata: labels: app: flink-cluster statefulset.kubernetes.io/pod-name: flink-cluster-1 name: flink-web-1 namespace: defaultspec: ports:-port: 8080 protocol: TCP targetPort: 8080 selector: app: flink-cluster statefulset.kubernetes.io/pod-name: flink-cluster-1 type: NodePort

4. Service deployment status

After performing the previous operation, you can view the current StatefulSet status

[root@rc-mzgjg] # kubectl get sts flink-cluster-o wide

NAME DESIRED CURRENT AGE CONTAINERS IMAGES

Flink-cluster 4 4 1h flink-cluster ip:5000/flink:1.4.2

Pod status of the container

[root@rc-mzgjg] # kubectl get pod-l app=flink-cluster-o wide

NAME READY STATUS RESTARTS AGE IP NODE

Flink-cluster-0 1/1 Running 0 1h ip1 ip5

Flink-cluster-1 1/1 Running 0 1h ip2 ip6

Flink-cluster-2 1/1 Running 0 1h ip3 ip7

Flink-cluster-3 1/1 Running 0 1h ip4 ip8

Related Service information

[root@rc-mzgjg] # kubectl get svc-l app=flink-cluster-o wide

NAME TYPE CLUSTER-IP EXTERNAL-IP PORT (S) AGE SELECTOR

Flink-cluster ClusterIP None 8080/TCP 2h app=flink-cluster

Flink-web-0 NodePort 10.254.8.103 8080:30495/TCP 1h app=flink-cluster,statefulset.kubernetes.io/pod-name=flink-cluster-0

Flink-web-1 NodePort 10.254.172.158 8080:30158/TCP 1h app=flink-cluster,statefulset.kubernetes.io/pod-name=flink-cluster-1

According to the information of Service, you can access Flink UI through any ip address of K8s node plus PORT.

The main point here is that a problem related to permissions was encountered in the process of building.

The error log is as follows

ERROR setFile (null,true) call failed

FileNotFoundException:no such file or directory

Reason: because the flink service lacks permissions for the log directory

Modification method:

1.adduser flink add appropriate users

2.chown-R flink:flink / home/xxxx/logs

Thank you for reading this article carefully. I hope the article "how to build an environment based on K8s in flink" shared by the editor will be helpful to you. At the same time, I also hope you will support us and pay attention to the industry information channel. More related knowledge is waiting for you to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report