In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
Author | Bai Song
There are nine chapters about Giraph, and the fifth chapter of this paper.
Environment: 2 workers started on a stand-alone machine (machine name: giraphx).
Input: SSSP folder, which contains two files, 1.txt and 2.txt.
1. After Worker reports his health to Master, he starts waiting for Master to create InputSplit.
Method: each Worker checks whether a Znode node exists, and sets Watcher on this Znode. If it does not exist, the lock of the current thread is released through the waitForever () method of BSPEvent and falls into a waiting state. Wait until master creates the znode. This step is located in the startSuperStep method in the BSPServiceWorker class, and the waiting code is as follows:
Cdn.xitu.io/2019/8/8/16c6f1c19ae23057?w=558&h=454&f=png&s=237620 ">
2. Master calls the createInputSplits () method to create an InputSplit.
In the generateInputSplits () method, the InputSplits is obtained according to the VertexInputFormat set by the user. The code is as follows:
Where minSplitCountHint is the minimum number of split created, and its values are as follows:
MinSplitCountHint = number of Workers * NUM_INPUT_THREADS
NUM_INPUT_THREADS represents the number of threads per Input split loading, and the default value is 1. It is verified that the minSplitCountHint parameter in the getSplits () method in the TextVertexValueInputFormat abstract class is ignored. The VertexInputFormat entered by the user inherits the TextVertexValueInputFormat abstract class.
If the resulting splits.size is less than minSplitCountHint, then some worker is not used.
Once you have the split information, write it to the Zookeeper for other workers to access. The split information obtained above is as follows:
[hdfs://giraphx:9000/user/root/SSSP/1.txt:0+66, hdfs://giraphx:9000/user/root/SSSP/2.txt:0+46]
Iterate through the splits List to create a Znode for each split with a value of split. If you create a Znode for split-0, the value is: hdfs://giraphx:9000/user/root/SSSP/1.txt:0+66
/ _ hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/0
Create a znode for split-1 (as follows). The value is: hdfs://giraphx:9000/user/root/SSSP/2.txt:0+46
/ _ hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/1
Finally, creating znode: / _ hadoopBsp/job_201404102333_0013/_vertexInputSplitsAllReady means that all the splits has been created.
3. Master creates a Partitions based on splits. First determine the number of partition.
The MasterGraphPartitioner object in BSPServiceMaster defaults to HashMasterPartitioner. Its createInitialPartitionOwners () method is as follows:
In the above code, the number of Partition is calculated in the utility class PartitionUtils. The formula is as follows:
PartitionCount=PARTITION_COUNT_MULTIPLIER availableWorkerInfos.size () availableWorkerInfos.size (), where PARTITION_COUNT_MULTIPLIER represents Multiplier for the current workers squared, and the default value is 1.
As you can see, the partitionCount value is 4 (122). The created partitionOwnerList information is as follows:
[(id=0,cur=Worker (hostname=giraphx, MRtaskID=1, port=30001), prev=null,ckpt_file=null)
(id=1,cur=Worker (hostname=giraphx, MRtaskID=2, port=30002), prev=null,ckpt_file=null)
(id=2,cur=Worker (hostname=giraphx, MRtaskID=1, port=30001), prev=null,ckpt_file=null)
(id=3,cur=Worker (hostname=giraphx, MRtaskID=2, port=30002), prev=null,ckpt_file=null)]
4. Master creates a Znode:/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_partitionExchangeDir for the following exchange partition.
5. Master is finally in the assignPartitionOwners () method.
Write masterinfo,chosenWorkerInfoList,partitionOwners and other information into the Znode (as the data of the Znode), and the path of the Znode is: / _ hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_addressesAndPartitions.
Master calls the barrierOnWorkerList () method to start waiting for each Worker to finish loading the data. The invocation relationship is as follows:
Create a znode,path=/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir in barrierOnWorkerList. Then check whether the number of child nodes of the znode is equal to the number of workers, and if not, the thread falls into a waiting state. After a later worker finishes loading the data, a child node (such as / _ hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir/giraphx_1) is created to activate the thread to continue judging.
6. When Master creates the znode for step 5, worker is activated.
Each worker reads from the znode that the data,data contains masterInfo,WorkerInfoList and partitionOwnerList, and then each worker begins to load the data.
Copy the partitionOwnerList to the partitionOwnerList variable of the workerGraphPartitioner (default type HashWorkerPartitioner) object in the BSPServiceWorker class, and each subsequent vertex gets its corresponding partitionOwner through the workerGraphPartitioner object according to vertexID.
Each Worker gets the child node from znode: / _ hadoopBsp/job_201404102333_0013/_vertexInputSplitDir and gets the inputSplitPathList, as shown below:
[/ _ hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/1
/ _ hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/0]
Each Worker then creates N InputsCallable threads to read the data. N=Min (NUM_INPUT_THREADS,maxInputSplitThread), where the default value of NUM_INPUT_THREADS is 1MaxInputSplitThread = (InputSplitSize-1/maxWorkers + 1).
So, the default for each worker is to create a thread to load data.
In the reserveInputSplit () method in the InputSplitsHandler class, each worker traverses the inputSplitPathList, creating a znode to retain (identify the split to be processed). The code and comments are as follows:
When you get a znode with the reserveInputSplit () method, the loadInputSplit method of the loadSplitsCallable class starts to get the path information of its HDFS through the znode, and then reads the data and redistributes the data.
The readInputSplit () method of the VertexInputSplitsCallable class is as follows:
7. After each worker loads the data, call the waitForOtherWorkers () method to wait for the other workers to finish processing the split.
The strategy is as follows: each worker creates child nodes under the / _ hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir directory, followed by its own worker information. For example, the child nodes created by worker1 and worker2 are as follows:
/ _ hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir/giraphx_1
/ _ hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir/giraphx_2
Once created, wait for master to create / _ hadoopBsp/job_201404102333_0013/_vertexInputSplitsAllDone.
8. From step 5, if master finds that the number of child nodes under / _ hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir is equal to the total number of workers, it will be created in the coordinateInputSplits () method
_ hadoopBsp/job_201404102333_0013/_vertexInputSplitsAllDone, tell each worker that all worker has finished processing split.
9. Finally, global synchronization is done.
Master creates a znode,path=/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir, and then calls the barrierOnWorkerList method to check whether the number of children of the znode is equal to the number of workers. If not, the thread falls into a waiting state. Wait for worker to create a child node to activate the thread to continue judging.
Each worker gets its own Partition Stats, enters the finishSuperStep method, and waits for all Request to be processed; send its own Aggregator information to master; to create child nodes, such as / _ hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir/giraphx_1,data is the partitionStatsList and workerSentMessages statistics of the worker
Finally, the waitForOtherWorkers () method is called to wait for the master to create the / _ hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_superstepFinished node.
After master discovers that the number of child nodes of / _ hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir is equal to the number of workers, it collects the aggregator information sent by each worker according to the data on the / _ hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_ workerFinishedDir child node, which is summarized into globalStats.
Master sets globalStats.setHaltComputation (true) if it finds that (1) all vertices in the global information are voteHalt and there is no message delivery, or (2) reaches the maximum number of iterations. Tell works to end the iteration.
Master creates / _ hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_superstepFinished node, and data is globalStats. Tell all workers that the current super step is over.
After each Worker detects that the master creates a / _ hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_superstepFinished node, it reads the data of the znode, that is, global statistics. Then decide whether to proceed to the next iteration.
10. Start the next super step after synchronization.
11. Summary of synchronization process of master and workers.
(1) master creates znode A, and then detects whether the number of child nodes of An is equal to the number of workers, which is not equal to waiting. After a worker creates a child node, the master is awakened for detection.
(2) each worker does its own work, and when it is finished, create the child node A1 of A. Then wait for master to create znode B.
(3) if master detects that the number of child nodes in An is equal to the number of workers, create Znode B
(4) after master creates the B node, each worker is activated. After the synchronization ends, each worker can start the next super step.
In essence, global synchronization is done through znode B.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.