In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-01 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
The purpose of this article
Shows how Giraph uses ZooKeeper to synchronize between Master and Workers (not sure).
Environment
Two workers are started on a stand-alone machine (machine name: giraphx).
Giraph follows the single-Master multi-Workers structure, and BSPServiceMaster uses MasterThread threads for global synchronization. After each Worker starts successfully, it reports its own health status to the Master, so how does Master detect whether the Workers is started successfully?
Master creates two directories on ZooKeeper, _ workerHealthyDir and _ workerUnhealthyDir, to record Healthy Workers and UnHealthy Workers, respectively.
It is mainly done in the getAllWorkerInfos () method in the BspServiceMaster class, and its call relationship is as follows. Note that it is difficult to find the getAllWorkerInfos () to MasterThread.run () method call relationship.
Cdn.xitu.io/2019/7/26/16c2c19b1f13cc4f?w=640&h=147&f=png&s=108416 ">
The two directories created are as follows:
/ _ hadoopBsp/job_201404102333_0002/_applicationAttemptsDir/0/_superstepDir/-1/_workerHealthyDir / _ hadoopBsp/job_201404102333_0002/_applicationAttemptsDir/0/_superstepDir/-1/_workerUnhealthyDir
In setup (), each Worker calls the registerHealth () method to register its own state.
If you are Healthy, add a child node / wokerInfo.getHostNameId () in the _ workerHealthyDir directory, otherwise add it in the workerUnhealthyDir directory. WokerInfo.getHostNameId () is: Hostname+ "" + TaskId. The child nodes created by Task1 and Task2 (Task 0 is master) are as follows:
/ _ hadoopBsp/job_201404102333_0002/_applicationAttemptsDir/0/_superstepDir/-1/_workerHealthyDir/giraphx_1/_hadoopBsp/job_201404102333_0002/_applicationAttemptsDir/0/_superstepDir/-1/_workerHealthyDir/giraphx_2
Master in the checkWorkers () method, in the While endless loop (which actually has a timeout limit), gets the child nodes in the _ workerHealthyDir directory by calling the getAllWorkerInfos () method, and then compares whether the number of child nodes reaches maxWorkers (defined when starting job, the-w parameter).
If it is less than maxWorkers, call the getAllWorkerInfos () method for the next round of detection; if it is equal to maxWorker, exit the While loop and return healthyWorkersInfoList: [worker (hostname=giraphx, MRtaskID=1, port=30001), Worker (hostname=giraphx, MRtaskID=2, port=30002)].
Problem: because in a distributed environment, each Worker and Maste run in parallel, each other does not know how the other is running. In step 3 above, if there are any child nodes that have not been created, they have been called in the while dead loop to detect the getAllWorkerInfos () method detection, which is inefficient and, of course, stupid!
Giraph borrows ZooKeeper for efficient detection. The design concept is as follows:
When master acquires the child node, it registers the Watcher (which is the registry that triggers the corresponding event).
If a task creates a child node, the Watcher event is triggered.
If the number of child nodes is less than maxWorkers, the await () method of workerHealthRegistrationChanged is called to release the lock of the current thread and fall into a waiting state. There will be no useless tests.
Description: workerHealthRegistrationChanged is the PredicateLock type (implements BspEvent interface). PredicateLock uses reentrant locks ReentrantLock and Condition to control threads.
When a task creates a child node, the Watcher event is triggered.
Call the public final void Process (WatchedEvent event) event in BspService, which activates the corresponding BspEvent event based on the path of the event. The corresponding here is:
The experiment runs as follows:
S (926))-process: Got a new event, path = / _ hadoopBsp/job_201404102333_0002/_applicationAttemptsDir/0/_superstepDir/-1/_workerHealthyDir, type = NodeChildrenChanged, state = SyncConnected INFO bsp.BspService (BspService.java:process)-process: workerHealthRegistrationChanged (worker health reported-healthy/unhealthy)
This activates the master thread and starts the next round of detection.
Stop when the number of child nodes is equal to maxWorkers.
Summary: every time a child node is created, it will be detected once, which is more efficient!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.