Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Sample Analysis of Spark Remote Shuffle Service Best practices

2025-02-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article will explain in detail the example analysis of Spark Remote Shuffle Service best practices. The content of the article is of high quality, so the editor shares it for you as a reference. I hope you will have some understanding of the relevant knowledge after reading this article.

Introduction:

After nearly half a year of launch and operation, RSS jointly developed by Xitoutiao big data team and Aliyun EMR team can perfectly solve the technical challenges faced by Spark Shuffle and provide a strong guarantee for the stability and containerization of the cluster. Its business value is mainly reflected in the following aspects:

The effect of reducing cost and increasing efficiency is obvious.

SLA significantly improved

Job execution efficiency has been significantly improved.

Significant improvement in architectural flexibility

Business scenario and status quo

Qitoutiao, a technology company that relies on big data, experienced rapid business development from 2018 to 2019. The number of daily active users of the main App and other innovative App increased more than tenfold, and the corresponding big data system increased from the initial 100 machines to more than 1000. Multiple business lines rely on big data platform to carry out business. The efficiency and stability of big data's system has become the cornerstone of the company's business development. In big data's architecture, we use a mature solution in the industry. Storage is built on HDFS, computing resource scheduling depends on Yarn, table metadata is managed by Hive, and Spark is used for calculation, as shown in figure 1:

Fig. 1 Architecture diagram of offline big data platform of interesting headlines

Among them, Yarn cluster uses a single large cluster scheme, HDFS uses a federated scheme, and based on cost factors, HDFS and Yarn services mix DataNode and NodeManager on ECS.

In Qitoutiao, 6W + Spark tasks run on the Yarn cluster every day, and the new Spark tasks are stable at around 100. the rapid development of the company requires rapid implementation, and a lot of governance debts have been accumulated. Various problems show that the stability of the cluster needs to be improved, among which the stability of Shuffle has increasingly become the shackles of the cluster and needs to be solved urgently.

Challenges and Reflections on the current big data platform

The main business indicator of the big data platform in the past six months is to reduce costs and increase efficiency. On the one hand, the business side hopes that the offline platform can carry more operations every day, on the other hand, we have our own demand for cost reduction. How to support more business volume under the premise of cost reduction is a great challenge for every technical person. Students who are familiar with Spark should be well aware that in large-scale cluster scenarios, Spark Shuffle has major defects in implementation, which are reflected in the following aspects:

There are a large number of network packets in the Spark Shuffle Fetch process, and the existing External Shuffle Service design does not deal with these RPC requests very carefully. In large-scale scenarios, a lot of connection reset will occur, resulting in FetchFailed, resulting in stage recalculation.

There are a large number of random reads in the Spark Shuffle Fetch process. Under the condition of large-scale high-load clusters, disk IO load is high and CPU full load often occurs, which is very easy to occur FetchFailed, which leads to stage recalculation.

The recalculation process will magnify the busy degree of the cluster and preempt the machine resources, resulting in a serious vicious circle and the failure of the SLA, which requires the operation and maintenance personnel to manually run the jobs in the idle Label cluster.

The computing and Shuffle process architecture cannot be separated, the Shuffle cannot be limited to a specified cluster, and some SSD machines cannot be utilized.

Multiple shuffle process: for 10K mapper,5K reducer level homework, basically can not finish.

NodeManager and Spark Shuffle Service are the same process, and the Shuffle process is too heavy, which often leads to NodeManager restart, which affects the stability of Yarn scheduling.

The above problems are very painful for Spark R & D students. The variance of many assignments running every day is very large, and there are always some assignments that can not be completed, either split the business or go to the unique Yarn cluster. In addition to the existing challenges, we are also actively building next-generation infrastructure. As the concept of cloud native Kubernetes becomes more and more popular, Spark community also provides Spark on Kubernetes version. Compared with Yarn, Kubernetes can make better use of cloud native elasticity and provide richer OPS, deployment, isolation and other features. However, there are still many problems in Spark on Kubernetes, including Shuffle in the container, dynamic resource scheduling, limited scheduling performance and so on. In view of the landing of Kubernetes in Qitoutiao, we mainly have the following requirements:

Real-time cluster, OLAP cluster and Spark cluster are all independent of each other before, how can these resources form a unified big data resource pool. Through the natural isolation feature of Kubernetes, we can better realize the mixed part of offline business and real-time business, and achieve the purpose of reducing cost and increasing efficiency.

The company's online business runs in the Kubernetes cluster, how to make use of the different characteristics of online business and big data business for off-peak scheduling to achieve the least total resources of ECS.

Hope to be able to accommodate online services, big data, AI and other infrastructure based on Kubernetes, so as to unify the operation and maintenance system.

Because all big data business of Qitoutiao is currently deployed on Aliyun, Aliyun's EMR team and Qitoutiao's big data team have jointly developed Remote Shuffle Service (hereinafter referred to as RSS), which aims to solve all the problems mentioned at the Spark on Yarn level and provide Shuffle basic components for Spark to run on Kubernetes.

Design and implementation of Remote Shuffle Service

Background of Remote Shuffle Service as early as 2019, we noticed that there was already a corresponding discussion in the community, such as SPARK-25299. The main problem that the Issue wants to solve is that in the cloud native environment, Spark needs to write Shuffle data to remote services. However, we found that Spark 3.0 (the previous branch of master) only supported part of the interfaces, but there was no corresponding implementation. The main purpose of this interface is to write data to remote services under the existing Shuffle code framework. If it is implemented in this way, for example, writing Shuffle directly to a high-speed memory system such as HDFS or Alluxio, there will be considerable performance overhead. Qitoutiao has also done some corresponding work and part of the Poc. The performance is very different from that of the original Spark Shuffle implementation, and the worst performance can be reduced by more than 3 times. At the same time, we also investigated the implementation solutions of some other companies, such as Facebook's Riffle solution and LinkedIn's open source Magnet. These implementations first write the Shuffle file locally, and then in Merge or Upload to remote services, this is not compatible with our subsequent Kubernetes architecture, because in the Kubernetes scenario, local disk Hostpath or LocalPV is not a required option, and there will be isolation and permission problems.

Based on the above background, we jointly developed Remote Shuffle Service with Aliyun EMR team. RSS can provide the following capabilities, perfectly solve the technical challenges faced by Spark Shuffle, and provide a strong guarantee for the stability and containerization of our cluster, mainly reflected in the following aspects:

The design idea of high-performance server is lighter, general-purpose and stable than the original Shuffle Service,RPC of Spark.

Two copies of this mechanism, which can guarantee the minimum probability of Shuffle fetch failure (less than 0.01%).

Merging shuffle files, changing from shuffle N times to shuffle N times, and reading HDD disks sequentially will significantly improve the performance of shuffle heavy jobs.

Reduce the memory pressure during Executor calculation and avoid Shuffle Spill in the map process.

The computing and storage separation architecture allows Shuffle Service to be deployed to special hardware environments, such as SSD machines, which can guarantee jobs with extremely high SLA.

Perfectly solves the dependence on local disks in the Spark on Kubernetes solution.

3.2Achievement of Remote Shuffle Service 3.2.1 overall design

The Spark RSS architecture consists of three roles: Master, Worker, and Client. Master and Worker constitute the server, and Client is integrated into Spark ShuffleManager in a non-intrusive way (RssShuffleManager implements the ShuffleManager interface).

The main responsibility of Master is resource allocation and state management.

The primary responsibility of Worker is to process and store Shuffle data.

The primary responsibility of Client is to cache and push Shuffle data.

The overall process is shown below (where ResourceManager and MetaService are components of Master), as shown in figure 2.

Figure 2 overall architecture diagram of RSS

3.2.2 implementation process

Let's focus on the process of the implementation:

RSS adopts the shuffle mode of Push Style. Each Mapper holds a cache delimited by Partition. Shuffle data is first written to the cache, and PushData is triggered whenever the cache of a Partition is full.

Driver first makes a request for StageStart with Master. After receiving the RPC, Master will assign the corresponding Worker Partition and return it to Driver,Shuffle Client to get the meta-information, and then push the data.

Client starts pushing data to the master copy. After receiving the request, the master replica Worker caches the data to the local memory and forwards the request to the slave replica in the way of Pipeline, thus implementing the 2-copy mechanism.

In order not to block the PushData request, Worker receives the PushData request and passes it asynchronously to the proprietary thread pool for asynchronous processing. According to the Partition to which the Data belongs, it is copied to the pre-assigned buffer, and if the buffer is full, the flush is triggered. RSS supports a variety of storage backends, including DFS and Local. If the backend is DFS, only one of the master and slave replicas will flush, relying on two copies of DFS to ensure fault tolerance; if the backend is Local, both master and slave will flush.

After all the Mapper ends, the Driver triggers the StageEnd request. After Master receives the RPC, it sends a CommitFiles request to all Worker. After receiving it, the Worker flush the data belonging to the Stage buffer to the storage layer, the close file, and release the buffer. After Master receives all the responses, it records the list of files corresponding to each partition. If the CommitFiles request fails, Master marks the Stage as DataLost.

In the Reduce phase, reduce task first requests the file list corresponding to the Partition to Master. If the return code is DataLost, Stage recalculation or direct abort job will be triggered. If it returns to normal, the file data is read directly.

Generally speaking, the design points of RSS are summarized in three levels:

Using PushStyle to do shuffle avoids local storage, thus adapting to the computing storage separation architecture.

Aggregating according to reduce avoids random reading and writing of small files and network requests for small amounts of data.

Two copies are made to improve the stability of the system.

3.2.3 Fault tolerance

For RSS system, fault tolerance is very important, and we can implement it in the following dimensions:

PushData failed

When the number of PushData failures (Worker hanging, network busy, CPU busy, etc.) exceeds MaxRetry, Client will send a message to Master requesting a new Partition Location. After that, this Client will use a new Location address, which is called Revive.

If the Revive is caused by the client side rather than the Worker, the same Partition data will be distributed on different Worker, and the Meta component of Master will handle this situation correctly.

If WorkerLost occurs, it will cause a large number of PushData to fail at the same time, and a large number of Revive requests of the same Partition will call Master. To avoid assigning too many Location,Master to the same Partition to ensure that only one Revive request is actually processed, the rest of the requests are stuffed into the pending queue and returned to the same Location after Revive processing.

Worker downtime

When WorkerLost occurs, the Master sends a request for CommitFile to its peer for the replica data on the Worker, and then cleans up the buffer on the peer. If the Commit Files fails, the Stage is recorded as DataLost;. If successful, the subsequent PushData re-applies for the Location through the Revive mechanism.

Data deduplication

Speculation task and task recalculations can lead to data duplication. The solution is that the mapId,attemptId and batchId of each PushData are encoded in the data slice, and the Master records the attemtpId of the successful commit for each map task. The read side filters different attempt data through attemptId and the duplicate data of the same attempt through batchId.

Multiple copies

RSS currently supports both DFS and Local storage backends.

In DFS mode, ReadPartition failure directly results in Stage recalculation or abort job. In Local mode, ReadPartition failure triggers slave peer location read, and if both master and slave fail, Stage recalculation or abort job is triggered.

3.2.4 High availability

You can see that Master is a single point in the design of RSS, although the load of Master is very small and will not hang up easily, but this is undoubtedly a risk point for online stability. In the initial launch phase of the project, we hope that workaround can be carried out through SubCluster, that is, multiple sets of RSS can be deployed to host different businesses, so that even if the RSS Master goes down, only a limited part of the business will be affected. However, with the in-depth use of the system, we decided to face the problem and introduce highly available Master. The main implementations are as follows:

First of all, Master currently has a lot of metadata, so we can sink part of the metadata related to ApplD+ShuffleId itself into the ShuffleManager of Driver. Because there is not much metadata, the increased memory overhead of Driver is very limited.

In addition, with regard to the global load balancing metadata and scheduling-related metadata, we use Raft to achieve the high availability of Master components, so that we can truly achieve large-scale scalable requirements by deploying 3 or 5 Master.

Actual effect and analysis

4.1 performance and stability

The team tested TeraSort,TPC-DS and a large number of internal jobs, reduced the cost of random reading in the Reduce phase, and greatly improved the stability and performance of the task.

Figure 3 is the benchmark of TeraSort. Take 10T Terasort as an example, the Shuffle volume is about 5.6T after compression. You can see that in the RSS scenario, the performance of jobs of this magnitude will be greatly improved because the Shuffle read is read sequentially.

Figure 3 TeraSort performance test (RSS performance is better)

Figure 4 shows a large Shuffle heavy task after online desensitization. Before, it was possible to finish running in a mixed cluster, and the daily task SLA could not be completed on time. The main reason for the analysis is that a large number of FetchFailed causes stage to recalculate. After using RSS, you can run steadily every day, and there will not be any FetchFailed scenes in the 2.1T shuffle. Performance and SLA performance are more pronounced in larger datasets.

Fig. 4 Job stage diagram of actual business (using RSS to guarantee stability and performance)

4.2 Business effect

With the joint efforts of big data team and Aliyun EMR team, after nearly half a year of launch, operation of RSS, and long-term testing with business units, the business value is mainly reflected in the following aspects:

The effect of reducing cost and increasing efficiency is obvious. on the basis of a slight decline in the size of the cluster, it supports more computing tasks, and the TCO cost is reduced by 20%.

SLA has improved significantly, and large-scale Spark Shuffle tasks have changed from unfinished to capable of running. We can merge different SLA-level jobs into the same cluster, reduce the number of cluster nodes, achieve unified management, and reduce costs. Originally, some jobs with high SLA run in a unique Yarn cluster B. because the load of the main Yarn cluster An is very high, if it runs into cluster A, it will often hang up. After using RSS, you can safely run the job to the main cluster A, thus releasing the unique Yarn cluster B.

The efficiency of homework execution is significantly improved, running slowly-> running fast. We compared several typical Shuffle heavy jobs. An important line of business job originally took 3 hours, while the RSS version needed 1.6 hours. There are 5-10 jobs extracted online. The performance improvement of large jobs is quite obvious. Different jobs have an average performance improvement of more than 30%. Even for jobs with a small amount of shuffle, the long-run aPCge time will be reduced by 10% and 20% because it is relatively stable and does not need stage recalculation.

The flexibility of the architecture has been significantly improved and upgraded to a computing and storage separation architecture. In the process of Spark running in the container, using RSS as the basic component, Spark containerization can be landed on a large scale, which lays a foundation for offline unified resources and unified scheduling.

This is the end of the sample analysis of Spark Remote Shuffle Service best practices. I hope the above can be helpful and learn more. If you think the article is good, you can share it for more people to see.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report