In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-23 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
Author: freewind
Than the original project warehouse: https://github.com/Bytom/bytom
In the previous article, we said that when the original request for block data from other nodes, BlockKeeper will send a BlockRequestMessage to tell each other the desired block height, and put the binary data corresponding to this information into the sendQueue channel corresponding to ProtocolReactor, waiting to be sent. The specific details of the transmission, because the logic is more complex, so in the previous article did not explain in detail, put into this article.
Since sendQueue is a channel, the BlockKeeper side does not know who will take the data and send it under what circumstances after it is put in. After searching through the code, we found that there is only one type that directly monitors the data in the sendQueue, and that is the MConnection that appeared earlier. The object of MConnection monitors the data in sendQueue in its OnStart method, and then, when it is found, it takes it away and puts it into a channel called sending.
Things are getting a little complicated:
As we know from the previous article, a MConnection corresponds to a connection to the peer, and there are many cases of establishing a connection between the original nodes: for example, actively connecting to other nodes, or after other nodes are actively connected to my channel sending, we also need to know under what circumstances who will monitor the sending, and how it is sent to other nodes after the data in the sending is taken away from it.
As before, when we encounter complex problems, we first decompose them into small problems through the principle of "mutual independence and complete exhaustion", and then solve them one by one.
So the first thing we need to figure out is:
Under what circumstances will an object of MConnection be created and its OnStart method called?
(so we know how the data in sendQueue is monitored.)
After analysis, we find that the startup of MConnection occurs in only one place, that is, in the OnStart method of Peer. So the question becomes: under what circumstances will the object of Peer be created and its OnStart method called?
After some more twists and turns, it is finally determined that in Bihara, the Peer.OnStart method will eventually be called in the following four situations:
After the original node is started, when actively connecting to the seed node specified in the configuration file and the node saved in the addrbook.json in the local data directory, compared with the original listening on the local P2P port, starting PEXReactor when other nodes are connected, and communicating with the node on the current connection using its own protocol in a useless Switch.Connect2Switches method (negligible)
The fourth situation is completely ignored. In the third case, because PEXReactor uses a file sharing protocol similar to BitTorrent to share data with other nodes, it is logically independent and can be regarded as an auxiliary role, which we will not consider for the time being. So we only need to analyze the first two situations.
When the original node starts, how do you actively connect to other nodes and finally call the MConnection.OnStart method?
First, let's quickly go to the SyncManager.Start method:
Cmd/bytomd/main.go#L54
Func main () {cmd: = cli.PrepareBaseCmd (commands.RootCmd, "TM", os.ExpandEnv (config.DefaultDataDir ()) cmd.Execute ()}
Cmd/bytomd/commands/run_node.go#L41
Func runNode (cmd * cobra.Command, args [] string) error {n: = node.NewNode (config) if _, err: = n.Start (); err! = nil {/ /...}
Node/node.go#L169
Func (n * Node) OnStart () error {/ /... N.syncManager.Start () / /.}
Netsync/handle.go#L141
Func (sm * SyncManager) Start () {go sm.netStart () / /...}
Then we will enter the netStart () method. In this method, the original will actively connect other nodes:
Func (sm * SyncManager) netStart () error {/ /... If sm.config.P2P.Seeds! = "{/ / dial out seeds: = strings.Split (sm.config.P2P.Seeds,", ") if err: = sm.DialSeeds (seeds); err! = nil {return err}} return nil}
The sm.config.P2P.Seeds that appears here corresponds to the seed node in the p2p.seeds in config.toml in the local data directory.
Then actively connect each seed through sm.DialSeeds:
Netsync/handle.go#L229-L231
Func (sm * SyncManager) DialSeeds (seeds [] string) error {return sm.sw.DialSeeds (sm.addrBook, seeds)}
P2p/switch.go#L311-L340
Func (sw * Switch) DialSeeds (addrBook * AddrBook, seeds [] string) error {/ /. For I: = 0; I
< len(perm)/2; i++ { j := perm[i] sw.dialSeed(netAddrs[j]) } // ...} p2p/switch.go#L342-L349 func (sw *Switch) dialSeed(addr *NetAddress) { peer, err := sw.DialPeerWithAddress(addr, false) // ...} p2p/switch.go#L351-L392 func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer, error) { // ... peer, err := newOutboundPeerWithConfig(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig) // ... err = sw.AddPeer(peer) // ...} 先是通过newOutboundPeerWithConfig创建了peer,然后把它加入到sw(即Switch对象)中。 p2p/switch.go#L226-L275 func (sw *Switch) AddPeer(peer *Peer) error { // ... // Start peer if sw.IsRunning() { if err := sw.startInitPeer(peer); err != nil { return err } } // ...} 在sw.startInitPeer中,将会调用peer.Start: p2p/switch.go#L300-L308 func (sw *Switch) startInitPeer(peer *Peer) error { peer.Start() // ...} 而peer.Start对应了Peer.OnStart,最后就是: p2p/peer.go#L207-L211 func (p *Peer) OnStart() error { p.BaseService.OnStart() _, err := p.mconn.Start() return err} 可以看到,在这里调用了mconn.Start,终于找到了。总结一下就是: Node.Start ->SyncManager.Start-> SyncManager.netStart-> Switch.DialSeeds-> Switch.AddPeer-> Switch.startInitPeer-> Peer.OnStart-> MConnection.OnStart
So, this is the end of the analysis of the first active connection to other nodes. Here is the second situation:
When other nodes are connected to this node, how did Bihara get to the MConnection.OnStart method?
After the original node is started, it will listen to the local P2P port and wait for other nodes to connect. So what does the process look like?
Since the startup process of the original node has appeared many times in the current article, it will not be posted here. Let's start directly with Switch.OnStart (which is started when SyncManager is started):
P2p/switch.go#L186-L185
Func (sw * Switch) OnStart () error {/ /... For _, peer: = range sw.peers.List () {sw.startInitPeer (peer)} / / Start listeners for _, listener: = range sw.listeners {go sw.listenerRoutine (listener)} / /.}
After this method has been omitted, there are two pieces of code left, one is startInitPeer (...) and the other is sw.listenerRoutine (listener).
If you pay attention to the previous section just now, you will find that startInitPeer (...) Method will call Peer.Start immediately. However, what needs to be explained here is that after my analysis, I found that this code actually didn't work, because at this moment, sw.peers is always empty, and it hasn't had time to be added to peer by other code. So I think it can be deleted so as not to mislead the reader. (an issue is mentioned, see # 902)
The second piece of code, listenerRoutine, if you can remember, is used to listen to the local P2P port, which is explained in detail in the previous article "how to listen to the P2P port".
We still need to dig into it again today to see how it got to MConnection.OnStart:
P2p/switch.go#L498-L536
Func (sw * Switch) listenerRoutine (l Listener) {for {inConn, ok: = SyncManager.Start-> SyncManager.netStart-> Switch.OnStart-> Switch.listenerRoutine-> Switch.listenerRoutine-> Switch.addPeerWithConnectionAndConfig-> Switch.AddPeer-> Switch.startInitPeer-> Peer.OnStart-> MConnection.OnStart
So, we have also finished analyzing the second case.
But so far, we have only solved the first small problem in this problem, that is, we finally know under what circumstances the original code will start a MConnection, thus monitoring the sendQueue channel and transferring the information data to be sent to the sending channel.
So, let's move on to the next little question:
After the data is put into the channel sending, who will pick it up?
After analysis, it is found that the channels sendQueue and sending are of type Channel, but they serve different purposes. SendQueue is used to store the complete information data to be sent, while sending is at a lower level, and the data it holds may be sent into multiple blocks. If there is only one channel of sendQueue, then it is difficult to achieve the operation of partitioning.
The sending of Channel is called by MConnection. Fortunately, when we keep going back, we find that we have come to MConnection.OnStart. In other words, what we are studying in this small problem is exactly the part behind the first two chains:
Node.Start-> SyncManager.Start-> SyncManager.netStart-> Switch.DialSeeds-> Switch.AddPeer-> Switch.startInitPeer-> Peer.OnStart-> MConnection.OnStart->?
Node.Start-> SyncManager.Start-> SyncManager.netStart-> Switch.OnStart-> Switch.listenerRoutine-> Switch.addPeerWithConnectionAndConfig-> Switch.AddPeer-> Switch.startInitPeer-> Peer.OnStart-> MConnection.OnStart->?
That's what's up there. Part.
So let's start directly with MConnection.OnStart:
P2p/connection.go#L152-L159
Func (c * MConnection) OnStart () error {/ /... Go c.sendRoutine () / /.}
The c.sendRoutine () method is what we need. When the MConnection starts, the send operation begins (waiting for the data to arrive). Its code is as follows:
P2p/connection.go#L289-L343
Func (c * MConnection) sendRoutine () {/ /... Case
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.