Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the Spark On Kubernetes practice of TalkingData?

2025-04-05 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)06/01 Report--

This article introduces what the Spark On Kubernetes practice of TalkingData is like, the content is very detailed, interested friends can use it for reference, I hope it can be helpful to you.

As we all know, Spark is a fast, general-purpose large-scale data processing platform, similar to Hadoop's MapReduce computing framework. However, compared with MapReduce,Spark, with its scalable, memory-based computing and the advantage of directly reading and writing data in any format on Hadoop, batch processing is more efficient and has lower latency. In fact, Spark has become a unified platform for lightweight big data's rapid processing.

As a data computing platform and framework, Spark pays more attention to the management of Spark Application, while the underlying actual resource scheduling and management depends more on the support of external platforms:

Spark officially supports four types of Cluster Manager:Spark standalone cluster manager, Mesos, YARN and Kubernetes. Because our TalkingData uses Kubernetes as a resource scheduling and management platform, Spark On Kubernetes is the best solution for us.

How to build a Kubernetes cluster deployment available for production

At present, there are many ways to build Kubernetes in the market, such as Scratch, Kubeadm, Minikube or various hosting solutions. Because we need to build a functional verification cluster simply and quickly, we chose Kubeadm as the deployment tool for the cluster. The deployment steps are simple, performed on master:

Kubeadm init

Execute on node:

Kubeadm join-token:-discovery-token-ca-cert-hash sha256:

For more information on configuration, please see the official document: https://kubernetes.io/docs/setup/independent/create-cluster-kubeadm/.

It should be noted that due to domestic network restrictions, many images cannot be obtained from k8s.gcr.io. We need to replace them with images provided by third parties, such as https://hub.docker.com/u/mirrorgooglecontainers/.

The network

Kubernetes network is realized by default through CNI, and the mainstream CNI plugin are: Linux Bridge, MACVLAN, Flannel, Calico, Kube-router, Weave Net and so on. Flannel mainly uses VXLAN tunnel to solve the network communication between pod, while Calico and Kube-router use BGP. Because soft VXLAN has great loss on host performance and network, BGP has certain requirements for hardware switch, and our basic network is the second layer of VXLAN implementation, so we finally choose MACVLAN.

An example of CNI MACVLAN configuration is as follows:

{"name": "mynet", "type": "macvlan", "master": "eth0", "ipam": {"type": "host-local", "subnet": "10.0.0.0amp 17", "rangeStart": "10.0.64.1", "rangeEnd": "10.0.64.126", "gateway": "10.0.127.254" "routes": [{"dst": "0.0.0.0 dst 0"}, {"dst": "10.0.80.0 Accord 24", "gw": "10.0.0.61"}]}}

Pod subnet is 10.0.0.0amp 17, and the actual pod ip pool is 10.0.64.0swap 20. The cluster cidr is 10.0.80.0amp 24. The IPAM we use is host-local, and the rule is to establish a / 25 subnet on each Kubernetes node, which can provide 126 IP. We have also configured a static route to cluster cidr 10.0.80.0Universe 24, and the gateway is the host. This is because when the container is configured with macvlan, egress will not pass through the iptables of the host, which is quite different from Linux Bridge. In Linux Bridge mode, as long as you specify the kernel parameter net.bridge.bridge-nf-call-iptables = 1, all traffic entering the bridge will pass through the iptables of the host. After analyzing kube-proxy, we find that we can use KUBE-FORWARD as a chain for pod-to-service network forwarding:

-A FORWARD- m comment-- comment "kubernetes forward rules"-j KUBE-FORWARD-A KUBE-FORWARD- m comment-- comment "kubernetes forwarding rules"-m mark-- mark 0x4000/0x4000-j ACCEPT-A KUBE-FORWARD- s 10.0.0.0Ma17-m comment-- comment "kubernetes forwarding conntrack pod source rule"-- m conntrack-- ctstate RELATED,ESTABLISHED-j ACCEPT-A KUBE-FORWARD- d 10.0.0.0Compact 17-m comment-- comment "kubernetes forwarding conntrack pod destination rule"-m conntrack-- ctstate RELATED ESTABLISHED-j ACCEPT

Finally, use DNAT to the back-end pod through KUBE-SERVICES. If pod accesses other network segments, it goes through the physical gateway 10.0.127.254.

It is also important to note that for the consideration of kernel security, the macvlan of the link physical interface cannot communicate directly with the physical interface, which leads to the container cannot use the host as the gateway. We adopted a little trick to avoid this limitation. We created another macvlan from the physical interface, moved the physical IP to this interface, and the physical interface is only used as a network portal:

$cat / etc/sysconfig/network-scripts/ifcfg-eth0DEVICE=eth0IPV6INIT=noBOOTPROTO=none$ cat / etc/sysconfig/network-scripts/ifcfg-macvlanDEVICE=macvlanNAME=macvlanBOOTPROTO=noneONBOOT=yesTYPE=macvlanDEVICETYPE=macvlanDEFROUTE=yesPEERDNS=yesPEERROUTES=yesIPV4_FAILURE_FATAL=noIPADDR=10.0.0.61PREFIX=17GATEWAY=10.0.127.254MACVLAN_PARENT=eth0MACVLAN_MODE=bridge

In this way, two macvlan can communicate with each other.

Kube-dns

By default, Kubernetes uses kube-dns for DNS resolution and service discovery. However, in practice, we find that there is always a 5-second delay in accessing service through service domain on pod. Using tcpdump to grab packets, it is found that the delay occurs in DNS AAAA. Further investigation revealed that the problem was caused by the Race Condition of netfilter in conntrack and SNAT. In short, DNS An and AAAA record request messages are sent in parallel, which causes netfilter to assume that the second packet is duplicated (because of the same quintuple) at _ nf_conntrack_confirm, resulting in packet loss. For details, please take a look at the issue: https://github.com/kubernetes/kubernetes/issues/62628 I mentioned. A simple solution is to add options single-request-reopen to / etc/resolv.conf so that DNS An and AAAA record request messages using different source ports. The PR I mentioned is: https://github.com/kubernetes/kubernetes/issues/62628, you can refer to it. Our solution is not to use Kubernetes service, but to set up hostNetwork=true to provide DNS services using the host network. Because our basic network is a sophomore, pod and node can communicate directly, which avoids conntrack and SNAT.

Integration of Spark and Kubernetes

Due to the abstract design of Spark, we can use third-party resource management platforms to schedule and manage Spark jobs, such as Yarn, Mesos, and Kubernetes. There is currently an official experimental project that runs Spark on Kubernetes: https://spark.apache.org/docs/latest/running-on-kubernetes.html.

Basic principles

When we submit a Spark job to the Kubernetes cluster through spark-submit, the following process is performed:

Spark creates Spark driver in Kubernetes pod

Driver calls Kubernetes API to create executor pods,executor pods to execute job code

Calculation job ends, executor pods reclaims and cleans up

Driver pod is in completed state and keeps logs until Kubernetes GC or manual cleanup

precondition

Spark 2.3 +

Kubernetes 1.6 +

Have list, create, edit and delete permissions for Kubernetes pods

Kubernetes cluster must be correctly configured with Kubernetes DNS [1]

How to integrate Docker images

Since both Spark driver and executor run in Kubernetes pod, and we use Docker as container runtime enviroment, we first need to establish a Docker image of Spark.

The script and Dockerfile are already included in Spark distribution, and you can build an image with the following command:

$. / bin/docker-image-tool.sh-r-t my-tag build$. / bin/docker-image-tool.sh-r-t my-tag push submit job

After building the Spark image, we can submit the job with the following command:

$bin/spark-submit\-master k8s:// https://:\-deploy-mode cluster\-name spark-pi\-class org.apache.spark.examples.SparkPi\-jars https://path/to/dependency1.jar,https://path/to/dependency2.jar-files hdfs://host:port/path/to/file1 Hdfs://host:port/path/to/file2-conf spark.executor.instances=5\-conf spark.kubernetes.container.image=\ https://path/to/examples.jar

Where Spark master is the address of Kubernetes api server, which can be obtained from the following command:

$kubectl cluster-infoKubernetes master is running at http://127.0.0.1:6443

The job code and dependencies of Spark can be specified in-- jars,-- files and the last location. The protocol supports http, https and HDFS.

After executing the submit command, you will have the following output:

At the end of the task, output:

Visit Spark Driver UI

We can use kubectl port-forward locally to access Driver UI:

$kubectl port-forward 4040 purl 4040

Access via http://localhost:4040 after execution.

Access log

All logs for Spark can be accessed through Kubernetes API and kubectl CLI:

$kubectl-n = logs-f how to achieve tenant and resource isolation Kubernetes Namespace

In Kubernetes, we can use namespace to achieve resource allocation, isolation, and quotas among multiple users. Spark On Kubernetes also supports configuring namespace to create Spark jobs.

First, create a Kubernetes namespace:

$kubectl create namespace spark

Since our Kubernetes cluster uses RBAC, we also need to create serviceaccount and bind role:

$kubectl create serviceaccount spark- n spark$ kubectl create clusterrolebinding spark-role-clusterrole=edit-serviceaccount=spark:spark-namespace=spark

And add the following configuration to spark-submit:

$bin/spark-submit\-conf spark.kubernetes.authenticate.driver.serviceAccountName=spark\-conf spark.kubernetes.namespace=spark\. Resource isolation

Considering some characteristics of our Spark operation and computing resource isolation, we chose a more stable physical isolation scheme in the early stage. The specific approach is to provide a separate Kubernetes namespace for each group, and the calculation tasks are submitted in their own namespace. Computing resources are based on physical machines, converted into cpu and memory, and incorporated into the unified management of Kubernetes. In a Kubernetes cluster, computing resources are associated with namespace through node label and PodNodeSelector. Thus, when submitting a Spark job, the computing resource always selects the node associated with the namespace.

The specific practices are as follows:

1. Create a node label

$kubectl label nodes spark:spark

2. Enable Kubernetes admission controller

We are using kubeadm to install the Kubernetes cluster, so modify / etc/kubernetes/manifests/kube-apiserver.yaml and add PodNodeSelector after-- admission-control.

$cat / etc/kubernetes/manifests/kube-apiserver.yamlapiVersion: v1kind: Podmetadata: annotations: scheduler.alpha.kubernetes.io/critical-pod: "" creationTimestamp: null labels: component: kube-apiserver tier: control-plane name: kube-apiserver namespace: kube-systemspec: containers:-command:-kube-apiserver-- secure-port=6443-- proxy-client-cert-file=/etc/kubernetes/pki/front-proxy-client.crt-admission-control=Initializers,NamespaceLifecycle,LimitRanger ServiceAccount,DefaultStorageClass,DefaultTolerationSeconds,NodeRestriction,ResourceQuota,MutatingAdmissionWebhook,ValidatingAdmissionWebhook,PodNodeSelector...

3. Configure PodNodeSelector

Add scheduler.alpha.kubernetes.io/node-selector: spark=spark to the annotations of namespace.

ApiVersion: v1kind: Namespacemetadata: annotations: scheduler.alpha.kubernetes.io/node-selector: spark=spark name: spark

After completing the above configuration, you can pass the spark-submit test results:

$spark-submit--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-conf spark.kubernetes.namespace=spark-master k8s:// https://xxxx:6443-deploy-mode cluster-name spark-pi-class org.apache.spark.examples.SparkPi-conf spark.executor.instances=5-conf spark.kubernetes.container.image=xxxx/library/spark:v2.3 http://xxxx:81/spark-2.3.0-bin-hadoop2. 7/examples/jars/spark-examples_2.11-2.3.0.jar

We can see that Spark jobs are all assigned to the associated hadooptest-001 to 003 three node.

Problem to be solved Kubernetes HA

The cluster state of Kubernetes is basically stored in etcd, so etcd is the key to HA. As we are still in semi-production, HA has not given too much consideration to this aspect. Students who are interested can check out: https://kubernetes.io/docs/setup/independent/high-availability/.

Journal

Under Spark On Yarn, you can turn on yarn.log-aggregation-enable to aggregate log collection into HDFS for viewing. However, in Spark On Kubernetes, this log collection mechanism is missing. We can only view the logs of Spark through the log output of Kubernetes pod:

$kubectl-n = logs-f

Collect and aggregate logs, which we will combine with ES later.

Monitor and control

We have our own monitoring platform OWL [2] (open source) within TalkingData. In the future, we plan to write metric plugin and connect Kubernetes to OWL.

Hybrid deployment

In order to ensure the availability of computing resources in Spark operations, we adopted the scheme of physical isolation in the early stage. Obviously, this approach significantly reduces the utilization of physical resources. In the next step, we plan to adopt a mixed-part scheme, which can be implemented in the following three ways:

Hybrid deployment of HDFS and Kubernetes

Prioritize Spark jobs and Kubernetes node to run stateless other production services simultaneously on low-priority node

Use the cloud to achieve horizontal expansion of resources to prevent a sudden increase in resources

Resource expansion

When increasing resource utilization in the following two ways, the cluster may face the problems of resource shortage and availability:

Hybrid deployment

Resources are oversold.

This results in situations where the running resources are larger than the actual physical resources (I call it a resource run). One approach is to grade resources and give priority to ensuring partial levels of resource supply. Another approach is to achieve horizontal expansion of resources, dynamically replenish available resources, and automatically release them after the peak. I elaborated on this design concept in another article: https://xiaoxubeii.github.io/articles/k8s-on-cloud/.

TalkingData has a self-developed multi-cloud management platform. Our solution is to implement provider and manager with separate Kubernetes tdcloud-controller-manager as resources, and monitor alarms through TalkingData OWL to achieve horizontal expansion of resources.

This is the end of the Spark On Kubernetes practice of TalkingData. I hope the above content can be helpful to you and learn more knowledge. If you think the article is good, you can share it for more people to see.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report