In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-04 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly explains "how bytom nodes receive messages from each other". The content of the explanation is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "how bytom nodes receive messages from each other".
How does the original node receive the message sent by the other party?
If we search for BlockRequestMessage in the code, we will find that it is only in the ProtocolReactor.Receive method that we respond to this information. Then the crux of the problem is how Bihara received the message sent by the other party and handed it over to ProtocolReactor.Receive.
When Bihara sends a message, it is finally written to MConnection.bufWriter; accordingly, MConnection also has a bufReader for reading data, which is also bound to net.Conn:
P2p/connection.go#L114-L118
Func NewMConnectionWithConfig (conn net.Conn, chDescs [] * ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config * MConnConfig) * MConnection {mconn: = & MConnection {conn: conn, bufReader: bufio.NewReaderSize (conn, minReadBufferSize), bufWriter: bufio.NewWriterSize (conn, minWriteBufferSize)
(where the value of minReadBufferSize is constant 1024)
Therefore, to read the message sent by the other party, be sure to read the bufReader. After a simple search, we found that it was also started in MConnection.Start:
P2p/connection.go#L152-L159
Func (c * MConnection) OnStart () error {/ /... Go c.sendRoutine () go c.recvRoutine () / /...}
C.recvRoutine () is what we focus on this time. The c.sendRoutine above is used to send, which is the focus of our attention in the previous article.
Continue c.recvRoutine ():
P2p/connection.go#L403-L502
Func (c * MConnection) recvRoutine () {/ /... For {c.recvMonitor.Limit (maxMsgPacketTotalSize, atomic.LoadInt64 (& c.config.RecvRate), true) / /. PktType: = wire.ReadByte (c.bufReader, & n, & err) c.recvMonitor.Update (int (n)) / /. Switch pktType {/ /... Case packetTypeMsg: pkt, n, err: = msgPacket {}, int (0), error (nil) wire.ReadBinaryPtr (& pkt, c.bufReader, maxMsgPacketTotalSize, & n, & err) c.recvMonitor.Update (int (n)) / /. Channel, ok: = c.channelsIdx [pkt.ChannelID] / /. MsgBytes, err: = channel.recvMsgPacket (pkt) / /. If msgBytes! = nil {/ /... C.onReceive (pkt.ChannelID, msgBytes)} / /...}} / /.}
After simplification, this method is divided into three parts:
The first block limits the receiving rate to prevent malicious nodes from suddenly sending a large amount of data to kill the node. Like sending, its limit is 500K/s
The second block is the type of packet that is read from the c.bufReader. At present, there are three values, two of which are related to the heartbeat: packetTypePing and packetTypePong, and the other indicates that it is the normal information data type packetTypeMsg, which is also something we need to pay attention to.
The third block is to continue to read the complete packet from the c.bufReader, and then find the corresponding channel to process it according to its ChannelID. ChannelID has two values, BlockchainChannel and PexChannel. Currently, we only need to focus on the former, and its corresponding reactor is ProtocolReactor. When c.onReceive (pkt.ChannelID, msgBytes) is finally called, the read binary data msgBytes will be processed by ProtocolReactor.Receive
Our focus is on the third piece of content. The first is channel.recvMsgPacket (pkt), that is, how does the channel read the corresponding binary data from the packet packet?
P2p/connection.go#L667-L682
Func (ch * Channel) recvMsgPacket (packet msgPacket) ([] byte, error) {/ /. Ch.recving = append (ch.recving, packet.Bytes...) If packet.EOF = = byte (0x01) {msgBytes: = ch.recving / /... Ch.recving = ch.recving [: 0] return msgBytes, nil} return nil, nil}
In this method, I removed some error checking and performance comments. Interested students can click on the source code above, which is ignored here.
This code mainly uses a channel called recving to add the byte array held in the packet to it, and then determine whether the packet represents the end of the whole information, and if so, return the contents of the ch.recving completely for the caller to deal with; otherwise, return a nil, indicating that it is not finished and cannot be processed for the time being. In the previous article, the place about sending data can correspond to here, but the sender is much more troublesome, it takes three channels sendQueue, sending and send to implement, and the receiver here is simple.
Then go back to the previous method MConnection.recvRoutine, and let's move on to the final c.onReceive call. This onReceive is actually a function assigned to the channel by someone else, and it is located where the MConnection is created:
P2p/peer.go#L292-L310
Func createMConnection (conn net.Conn, p * Peer, reactorsByCh map [byte] Reactor, chDescs [] * ChannelDescriptor, onPeerError func (* Peer, interface {}), config * MConnConfig) * MConnection {onReceive: = func (chID byte) MsgBytes [] byte) {reactor: = reactorsByCh [chID] if reactor = = nil {if chID = = PexChannel {return} else {cmn.PanicSanity (cmn.Fmt ("Unknown channel% X", chID))} reactor.Receive (chID, p MsgBytes)} onError: = func (r interface {}) {onPeerError (p, r)} return NewMConnectionWithConfig (conn, chDescs, onReceive, onError, config)}
The logic is also relatively simple, that is, when the previous c.onReceive (pkt.ChannelID, msgBytes) call, it will find the corresponding chID based on the incoming Reactor, and then execute its Receive method. For the purposes of this article, you will enter ProtocolReactor.Receive.
So let's move on to ProtocolReactor.Receive:
Netsync/protocol_reactor.go#L179-L247
Func (pr * ProtocolReactor) Receive (chID byte, src * p2p.Peer, msgBytes [] byte) {_, msg, err: = DecodeMessage (msgBytes) / /. Switch msg: = msg. (type) {case * BlockRequestMessage: / /...}
Among them, DecodeMessage (...) Is to deserialize the incoming binary data into a BlockchainMessage object, which is an interface without any content, which has multiple implementation types. We will continue to judge the object later, and if it is information of type BlockRequestMessage, we will continue to process it accordingly. I have omitted the code for processing here for the time being, because it belongs to the next small problem, so let's not think about it for a while.
It seems that before we know it, we have almost figured out the second half of the first little question. So what's the first half? As we said earlier, the starting point for reading bufReader code is in MConnection.Start, so the first half is: under what circumstances did you get to MConnection.Start step by step from startup?
Fortunately, we had a special discussion on the first half of the question in the previous article, "how to send the information requesting block data", so we won't talk about it here. If necessary, you can take a look at it again (you can first see the section of "summary" at the end).
Let's move on to the second small question:
What kind of message will be sent to each other after receiving the BlockRequestMessage?
Here is to continue to talk about the previous ProtocolReactor.Receive. First of all, let's post its more complete code:
Netsync/protocol_reactor.go#L179-L247
Func (pr * ProtocolReactor) Receive (chID byte, src * p2p.Peer, msgBytes [] byte) {_, msg, err: = DecodeMessage (msgBytes) / /. Switch msg: = msg. (type) {case * BlockRequestMessage: var block * types.Block var err error if msg.Height! = 0 {block, err = pr.chain.GetBlockByHeight (msg.Height)} else {block, err = pr.chain.GetBlockByHash (msg.GetHash ())} / /. Response, err: = NewBlockResponseMessage (block) / /. Src.TrySend (BlockchainChannel, struct {BlockchainMessage} {response}) / /...}
As you can see, the logic is relatively simple, that is, according to the height or hash information specified in the BlockRequestMessage sent by the other party, find the corresponding block in the local blockchain data and send it to the BlockResponseMessage.
Among them chain.GetBlockByHeight (...) And chain.GetBlockByHash (...) If it is specified in detail, we need to have a deep understanding of how the block chain data is saved in the original node. We will not talk about it in this article and wait for a special study later.
Here, I think we just need to know that we will query the chunk data and construct a BlockResponseMessage, and then send it through the BlockchainChannel channel.
The src.TrySend method is called in the last line of code, which sends the information to the other peer. (the src refers to the peer of the other party.)
So how on earth did it get sent? Let's move on to the last small question:
How is this BlockResponseMessage message sent?
Let's first look at the peer.TrySend code:
P2p/peer.go#L242-L247
Func (p * Peer) TrySend (chID byte, msg interface {}) bool {if! p.IsRunning () {return false} return p.mconn.TrySend (chID, msg)}
It will internally call the MConnection.TrySend method, where chID is BlockchainChannel, that is, its corresponding Reactor is ProtocolReactor.
Thank you for your reading, the above is the content of "how bytom nodes receive messages sent by each other". After the study of this article, I believe you have a deeper understanding of how bytom nodes receive messages sent by each other, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.