Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Giraph Source Code Analysis (3)-- message Communication

2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

We know from the previous article that each BSPServiceWorker has a WorkerServer object, and there is a ServerData object in the WorkerServer object, which is used as the data reality. The ServerData contains the partitionStore, edgeStore, incomingMessageStore, currentMessageStore, aggregation values of the Worker, and so on. The incomingMessageStore object is of MessageStoreByPartition (interface) type, that is, messages are stored by partition. The diagram of the MessageStoreByPartition interface is as follows:

Cdn.xitu.io/2019/7/25/16c27c8bc6000f2d?w=640&h=241&f=png&s=196139 ">

In the SimpleMessageStore abstract class, there is a variable map of type ConcurrentMap, which is used to store messages. The first layer is the mapping of pairtitionID to messages sent to the partition; the second layer is the VertexID to the message queue sent to the Vertex.

"Analysis of Giraph Communication Module": http://my.oschina.net/skyaugust/blog/95182

The message list for each vertex is specific to the ExtendedDataOutput type, which inherits the DataOutput interface and adds just a few methods. Each message is written to the ExtendedDataOutput object in bytes.

When sending messages, asynchronous communication is used.

The calculation processing of the vertex of the graph is executed concurrently with the message communication, and the message can be sent in the calculation process, and the large-scale message can be sent in different periods of time to avoid instantaneous network communication blocking, but the receiver needs extra space to store the temporarily received message, which is equivalent to space for time. On the other hand, in the centralized communication, the calculation of the vertex of the graph is carried out in series with the message communication. after the calculation is completed, the message is sent uniformly, and the control and implementation is simple, and the message can be optimized to the maximum extent at the sending end. however, it is easy to cause instantaneous network communication congestion and increase the message storage overhead of the sender.

Message communication between different Worker uses RPC mode, specifically Netty. Within the same Worker, two consecutive iterations of messages directly copy the messages to be sent to the incomingMessageStore of Worker through memory operations. The storage format and sending mechanism of the message are described in detail below.

Giraph uses Cache to cache messages, and when the message reaches a certain threshold, it is sent at once.

Since it is carried out in bulk mode, it will not be sent one by one. Messages sent to a vertex are stored in ByteArrayVertexIdData according to pair (actually of type ByteArrayVertexIdMessages). The introduction is as follows: org.apache.giraph.utils.ByteArrayVertexIdData

Function: store Pair in an byte array. There are ExtendedDataOutput objects that are used to store data.

There is also an inner class in this class: VertexIdDataIterator, which inherits from the VertexIdIterator class.

Org.apache.giraph.comm.SendCache is used to cache the sent information and then send it in "Bulk" mode. In Giraph, there can be multiple partitions on each Worker. The threshold for message caching is calculated in Worker, not Partition.

There is a ByteArrayVertexIdData [] dataCache array in SendCache to store messages sent to each Partition, and an int [] dataSizes array to record the size of messages sent to each Worker. If it is greater than MAX_MSG_REQUEST_SIZE (default is 512KB), all Partition cached messages on this Worker are sent to that Worker, as well as messages within the same Worker. There is an int [] initBufferSizes array to record the size of the ExtendedDataOutput object in the initialization ByteArrayVertexIdData of each Partition on each Worker. All Partition initial values on the same Worker are the same, which is the average. Note that the value of MAX_MSG_REQUEST_SIZE (message request size) is M, and there are P partitions,ADDTITIONNAL_MSG_REQUEST_SIZE (factors larger than the average) on the Worker, which defaults to 0.2f and is marked as A. Then the initial size of each Partition is: M * (1cm A) / P.

As we know from the previous article, each Worker has a NettyWorkerClientRequestProcessor to send messages. There are SendMessageCache objects in this class that are used to cache outgoing information. SendMessageRequest in the NettyWorkerClientRequestProcessor class (Imam)

The method is as follows, which is used to send message message to a vertex destVertexId.

Method explanation: first, get the corresponding partitionId and WorkerInfo according to destVertexId, then add the message to SendMessageCache, and return the message size workerMessageSize sent to the Worker to which the vertex belongs. If the value is greater than the default value 512KB, all Partition messages corresponding to the Worker are deleted from the SendMessageCache, and the deleted messages are assigned to workerMessages, whose type is PairList, and key is partitionId,value, which is the list of messages sent to the partition. Finally, the doRequest () method is called to send the information. The doRequest () method is as follows:

You can see that when you send a message, you first determine whether it is on the same Worker. If so, the doRequest of SendWorkerMessagesRequest is called to send the message; otherwise, the message is sent using WorkerClient (the underlying Netty is used). The following focuses on the mechanisms within the same Worker.

The doRequest method in the org.apache.giraph.comm.requests.SendWorkerMessagesRequest class is as follows:

The parameter is the ServerData of the Worker, and the partitionVertexData in the code is actually PairListworkerMessages. Traverses are always added to the incomingMessageStore in ServerData.

The addPartitionMessages () method in the ByteArrayMessagesPerVertexStore class is as follows:

When the user uses the type corresponding to Combiner,incomingMessageStore, it is OneMessagePerVertexStore, which stores only one message for each vertex, not a message queue. The structure is as follows:

When adding a message, the message corresponding to the vertex and the message to be added are merged by calling the combine () method, and then stored in the above structure diagram. The addPartitionMessages () method is as follows:

After the call () method in ComputeCallable calls computePartition (Partition) to calculate all the vertices on the Partition, the WorkerClientRequestProcessor.flush () method is called to send out all the remaining messages.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report