In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)06/02 Report--
Author | Wu Yishun
Transferred from | JD.com technology
Why does JD.com want to do Hadoop on the scale of 10,000 units?
With the growth of JD.com 's business, the original Hadoop cluster has been unable to meet the rapid growth of storage and computing needs. Although splitting a cluster can share some of the pressure, it brings some other problems, such as if a business inevitably needs data on another cluster after splitting a cluster, it brings the problem of reading data across clusters, which seriously affects the efficiency of job execution. On the other hand, each cluster always has free time, and when a cluster is idle, these resources are wasted and have no value.
In order to increase production efficiency and save costs, the previously scattered cluster resources must be managed together to form a super-large cluster to provide services, and various parallel frameworks can make use of its storage and computing resources for business processing.
Overview of Hadoop
As the processing platform of big data, Hadoop has a history of more than ten years. The design idea is to use cheap desktops to form a large cluster for distributed computing and data storage, to ensure the security and high availability of data by means of redundant backup, and to complete the fast processing of super-large data sets by means of parallel computing.
Improve the computing and storage capacity of the Hadoop cluster by adding more nodes. Usually in distributed parallel processing data, the cost of mobile computing code is lower than that of mobile data, so Hadoop's MapReduce framework distributes the computing code to each data node for execution, making use of the network interaction with less data locality to improve performance.
In the past, before Hadoop 2.0, the design of Hadoop included two parts, the first part was distributed storage HDFS, and the other part was MapReduce computing framework. Since the Hadoop2.0 version, the computing framework has been optimized and upgraded to the YARN (Yet Another Resource Negotiator) we use now. YARN provides the functions of distributed resource management and job scheduling, as well as a unified programming model. Through this programming model, many computing frameworks can be migrated to YARN.
From the perspective of vision, Hadoop is committed to solving complex data processing and computing, dealing with structured and unstructured data storage, and providing distributed massive data parallel processing.
In retrospect, we used MPI and OpenMP to implement a distributed processing program, when we needed to control the remote start and stop of the program and write our own fault-tolerant code. Now Hadoop encapsulates these tedious and universal functions into the framework through optimization and abstraction, so that developers only need to pay attention to their own business logic code and do not need to write some error retry and communication-related code, which greatly increases the development efficiency. At the same time, data engineers who are not very good at writing code can easily use Hadoop clusters to implement their own distributed processing and analysis programs.
Under the Hadoop 2.0 YARN architecture, there are mainly the following components:
1. ResourceManager: master node service, responsible for maintaining node information and responsible for resource management and job scheduling. You can use Zookeeper to achieve high availability. 2. NodeManager: computing node service, responsible for computing and managing Container processes on the current node. Can deploy 1 million 3. ApplicationMaster: each application submitted by the user will contain an ApplicationMaster, which is responsible for communicating with RM or releasing resources to start and stop Task with NM. Monitoring the running status of tasks 4. Container: Container is a resource abstraction in YARN, which encapsulates resources of multiple latitudes, such as CPU, memory, disk, etc. 5. Client: responsible for submitting jobs, while providing some command line tools
Introduction to JD.com Hadoop distributed Resource Management and Job scheduling
JD.com began to use Hadoop a long time ago, stepped on a lot of holes, from feeling the stones across the river to now a little success, whether it is business problems or Hadoop framework itself problems, we have encountered.
By solving these problems, we have made a lot of functional upgrades and modifications to Hadoop, some of which have been given back to the community, while others have been deposited in our own branch versions. Today, our Hadoop big data platform provides rich functions and perfect tools to protect JD.com and big data's business.
At present, in the environment of JD.com and big data, in order to meet the needs of different businesses on the running environment, we use the Docker On YARN mode to isolate the running environment, allowing everyone to customize their own running environment to install their own algorithm library. The mode of using Linux CGroup supports strict isolation of computing resources, ensuring that the computing resources of each job are not affected by other jobs. In addition, the resource and scheduling model is extended and the scheduling support of GPU and other hardware is added. A log query tool is unified for the business side to help quickly locate errors.
In the past, there were various small clusters on the big data platform, such as Presto, Alluxio, etc. Each small cluster has its own batch of machines, and only one service may be deployed on each machine. These services are not high in utilization, or even wasteful. We decided to use YARN for unified resource management and scheduling. After several years of development, we have transplanted most of the parallel frameworks to run on YARN (such as Presto, Alluxio), making full use of these machine resources by making full use of the advantages and scheduling characteristics of YARN, which greatly improves the utilization of cluster resources.
At the same time, we have also developed a series of deep learning frameworks and tools such as Tensorflow On YARN and Caffe On YARN to help algorithm engineers directly use Hadoop clusters for algorithm processing. The iterative speed of algorithm and service is greatly accelerated. Let big data platform acquire the ability of deep learning and processing.
Later, in order to better support remote multi-activity and cross-regional expansion, we upgraded and implemented 10,000 Hadoop cluster distributed resource management and scheduling systems, solving the previous bottleneck of single cluster expansion and the inability to effectively support cross-room scheduling and disaster recovery. The system has been deployed online, and has passed the test of 618 this year, it can be said to be rock solid.
After the system was gradually launched, we interconnected several big data computer rooms across regions of JD.com. At the same time, our HDFS also achieved the same cross-computer room function. At this time, JD.com and big data processing platform system really had the ability to deploy and expand across regions.
The system has strong flexibility and can easily achieve job migration and data migration across computer rooms by modifying scheduling routing policies and storing data mapping tables. Between different clusters in the same computer room, the operation of jobs across sub-clusters can make full use of the resources of each cluster, and the function can be dynamically switched according to the load of the sub-cluster at any time, without the participation of users and completely transparent to users.
In order to make the new big data platform system more friendly and easy to manage and use, the team started the interfacing project. We use WEB technology to implement the administrator-oriented big data platform management system. After using this management system, we can flexibly and conveniently get on and off the sub-cluster, manage and modify the scheduling policy in real time, and no longer need to log in to the corresponding physical server to execute relevant commands as before. Through standardization and systematization, we encapsulate the operation and maintenance commands in the code, and there are relevant verification and authority authentication before and after the execution of each command, which reduces the misoperation during manual operation, and the system will be rolled back automatically if errors occur.
The platform provides user-level rights management, which can flexibly manage the permissions of computing resources in the cluster, in order to control the amount of computing resources that each user can use and the authority authentication of resource pool.
In the real production environment, the platform will divide resources according to certain usage rules, and assign relevant permissions to the corresponding people or departments, so as to prevent some users from maliciously submitting jobs to other people's resource pools. At the same time, the platform also refines the operation rights to prevent some users from maliciously manipulating other people's jobs (such as stopping execution).
Previously, there will be multiple clusters on big data platform, each cluster corresponds to its own client, and each client corresponds to its own configuration file, so it is not conducive to management when it is troublesome for operation and maintenance.
After the modification and upgrade of the scheduling architecture, it can be logically understood as adding a layer of scheduling abstraction (Router), from the original two-level scheduling to three-level scheduling. That is, the strategy choice of the sub-cluster. The current job submission process is as follows:
1. The client first sends the job submission request to Router2. Router forwards the job request to the corresponding subcluster 3. 0 according to the configured scheduling information. After the subcluster receives the job request, schedule the operation of the job.
In this way, each client uses the same set of configuration files, ensuring that the client is lightweight and no longer needs to distinguish between cluster information as before. All scheduling policies and logic are encapsulated in Router components. (we keep all the scheduling policy and control information in DBMS.)
The dynamic cross-subcluster resource borrowing function of jobs is added, which can control whether the related jobs in a queue need to be executed across subgroups at any time. It is convenient for a single sub-cluster to dynamically borrow the resources of another idle cluster when resources are tight.
The concept of logical queue name is added. For users, they only need to care about their own logical queue name, but they do not need to care about which physical queue the job is actually running in. Through this function, the platform side can control which physical queue of which sub-cluster the logical queue is really running in at any time. To achieve the purpose of rapid migration or disaster recovery.
In order to avoid accidental loss or failure of Router, we have separately developed high availability and load balancing features for Router components. Multiple Router nodes will be deployed throughout the cluster, and each data center will have one or more Router. Client requests will choose the most appropriate one from multiple distributed Router servers according to load and distance. At the same time, we support hanging up Router at any point in time (if the connection status of Router is not available, the client will automatically switch to the Router of another Actvie)
The following is a logical block diagram of the architecture, which contains all the components of the entire architecture. Two new components, Router and State&Policy Store, are added. The former directly connects with Client to shield the information related to the backend RM subcluster and provides the function of submitting and querying job information, and multiple servers can be deployed to provide services at the same time. The latter is responsible for saving the status information of all current sub-clusters, the address information of Active RM and the scheduling policy information. (every once in a while, the subcluster reports its current service status by heartbeat and stores it in StateStore.) currently, we support a variety of scheduling strategies to meet the scheduling needs in a variety of scenarios.
The specific submission process is as follows:
1. Client submits jobs to Router. 2. Router gets logical queue scheduling policy information 3. Transfer the job submission request to the corresponding ResourceManager and save the Application related information to StateStore. 4. ResourceManager starts AppMaster after receiving the request, and AppMaster initiates a resource request to AMRMProxy after AppMaster starts. 5. After AMRMProxy receives this request, it reads the policy information from State&Store Policy to determine whether the job needs to run across subclusters 6. Send a resource request to the corresponding subcluster, and AppMaster is responsible for initiating the Container to which the request is sent.
Optimization Strategy of very large-scale Hadoop Cluster & Optimization idea
There are many problems with the native scheduler. One of the most important is the performance problem, so we have developed a multi-channel allocation strategy based on queue image, which greatly improves the performance of the ResourceManager scheduler, so that our single YARN sub-cluster has more than 10,000 large-scale resource management and scheduling capabilities.
On the other hand, it enriches the algorithm logic of the scheduler for allocating resources, adding sorting and filtering rules for multiple dimensions, and multiple rules can be combined, such as memory-based, load-based, utilization-based and so on.
There are other ResourceManager performance-related code optimizations, such as simplifying the resource calculation process, splitting locks, and so on.
Service performance and framework functions are optimized in terms of MapReduce. It is mainly related to Shuffle services.
Optimization & Analysis & testing tool
Benchmark
HiBench https://github.com/intel-hadoop/HiBench
Hadoop comes with Benchmark
JVM analysis tool
Http://gceasy.io/
Http://fastthread.io
Linux performance analysis
Perf
NMON
Google Tools
Prospects and expectations for the future
The practice of JD.com big data platform provides a technical framework and implementation method for reference. In the future, JD.com big data platform will continue to move forward in the direction of e-commerce-level distributed architecture and technology solution evolution. For this, we also have some new features coming online.
First, how to use the resources within the group to save costs
In the past, the annual promotion needed to purchase machines according to the flow in previous years, but the low utilization rate of these machines wasted a lot of costs after the promotion ended. In order to solve this problem, the current big data platform has been docked with Archimedes, a proprietary cloud within the group, and the platform can flexibly use cloud resources through automatic scaling. In the future, Dacheng will use this feature to undertake some computing tasks.
Second, big data platform production
JD.com has accumulated rich experience in big data processing and precipitated some excellent middleware and service products. In the future, we will cloud these products to provide services to the outside world.
Author | Wu Yishun
JD.com big data platform senior technical expert, good at big data platform resource management and scheduling system development and construction. At present, it focuses on the development and construction of ten thousand distributed scheduling systems and deep learning platforms.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.