Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Case Analysis of RocketMQ Master-Slave synchronization and the principle of HA Mechanism

2025-03-31 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

This article mainly introduces "the case analysis of RocketMQ master-slave synchronization and the principle of HA mechanism". In daily operation, I believe that many people have doubts about the case analysis of RocketMQ master-slave synchronization and the principle of HA mechanism. The editor consulted all kinds of data and sorted out simple and easy-to-use operation methods. I hope it will be helpful to answer the doubts of "RocketMQ master-slave synchronization case analysis and HA mechanism principle". Next, please follow the editor to study!

HA core class

The implementation logic of HA is placed in the ha directory of the store enclosure. The core implementation classes are as follows:

HAService: the core implementation class of master-slave synchronization

HAService$AcceptSocketService: master server listens for slave server connection implementation class

HAService$GroupTransferService: master-slave synchronous notification class to realize synchronous replication and asynchronous replication

HAService$HAClient: connect to the master service implementation class from the server

HAConnection: the encapsulation of the HA connection object on the master server. When the master server receives the message sent from the server, it will be encapsulated into a HAConnection object, which encapsulates the read Socket connection implementation and write Socket connection implementation:

HAConnection$ReadSocketService: master server read implementation class

HAConnection$WriteSocketService: the master server writes implementation classes

The overall working mechanism of RocketMQ master-slave synchronization is roughly as follows:

The slave server actively establishes a TCP connection to the master server, and then sends the maximum offset of the commitLog file to the master server every 5s to pull messages that have not been synchronized

The master server opens the listening port to listen for the information sent from the server. The master server receives the offset from the server for parsing, and returns to find out the unsynchronized messages to the slave server.

After receiving the messages from the master server, the client writes the messages to the commitLog file, updates the commitLog pull offset, and then continues to pull unsynchronized messages from the master service.

Slave-> Master process

As can be seen from the HA implementation logic, it can be roughly divided into two processes: reporting the offset from the server and sending unsynchronized messages from the master server to the slave server.

From the above implementation class, we can see that the logic of reporting the offset from the server to the master server in the HAClient class, the HAClient class is an inheritance of the ServiceThread class, that is, it is a thread service class. After Broker starts, Broker starts a thread to regularly execute the task of reporting the offset from the server to the master server.

Org.apache.rocketmq.store.ha.HAService.HAClient#run:

Public void run () {log.info (this.getServiceName () + "service started"); while (! this.isStopped ()) {try {/ / actively connect to the master server and get the socketChannel object if (this.connectMaster ()) {if (this.isTimeToReportOffset ()) {/ / perform the escalation offset to the master server boolean result = this.reportSlaveMaxOffset (this.currentReportedOffset) If (! result) {this.closeMaster ();}} / / polls this.selector.select (1000) every other second; / / processes messages sent from the master server boolean ok = this.processReadEvent (); if (! ok) {this.closeMaster () } / /. } else {this.waitForRunning (1000 * 5);}} catch (Exception e) {log.warn (this.getServiceName () + "service has exception.", e); this.waitForRunning (1000 * 5);} log.info (this.getServiceName () + "service end");}

The above is the HAClient thread run method logic, which is mainly to actively connect to the master server, report the offset to the master server, and process the messages sent by the master server, and constantly execute the above logic in a loop.

Org.apache.rocketmq.store.ha.HAService.HAClient#connectMaster:

Private boolean connectMaster () throws ClosedChannelException {if (null = = socketChannel) {String addr = this.masterAddress.get (); if (addr! = null) {SocketAddress socketAddress = RemotingUtil.string2SocketAddress (addr); if (socketAddress! = null) {this.socketChannel = RemotingUtil.connect (socketAddress); if (this.socketChannel! = null) {this.socketChannel.register (this.selector, SelectionKey.OP_READ) } this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset (); this.lastWriteTimestamp = System.currentTimeMillis ();} return this.socketChannel! = null;}

This method is the logic of connecting to the master server from the server. After the master server address is obtained and connected, a socketChannel object is obtained, and then the current timestamp is recorded as the last written timestamp. LastWriteTimestamp is used to calculate the master-slave synchronization interval. It should be noted that if no master server address is configured, this method will return false, that is, master-slave replication will not be performed.

This method also calls the getMaxPhyOffset () method of DefaultMessageStore to get the maximum offset of the commitLog file as the offset reported this time.

Org.apache.rocketmq.store.ha.HAService.HAClient#reportSlaveMaxOffset:

Private boolean reportSlaveMaxOffset (final long maxOffset) {this.reportOffset.position (0); this.reportOffset.limit (8); this.reportOffset.putLong (maxOffset); this.reportOffset.position (0); this.reportOffset.limit (8); for (int I = 0; I

< 3 && this.reportOffset.hasRemaining(); i++) { try { this.socketChannel.write(this.reportOffset); } catch (IOException e) { log.error(this.getServiceName() + "reportSlaveMaxOffset this.socketChannel.write exception", e); return false; } } return !this.reportOffset.hasRemaining();} 该方法向主服务器上报已拉取偏移量,具体做法是将 ByteBuffer 读取位置 position 值为 0,其实跳用 flip() 方法也可以,然后调用 putLong() 方法将 maxOffset 写入 ByteBuffer,将 limit 设置为 8,跟写入 ByteBuffer 中的 maxOffset(long 型)大小一样,最后采取 for 循环将 maxOffset 写入网络通道中,并调用 hasRemaining() 方法,该方法的逻辑为判断 position 是否小于 limit,即判断 ByteBuffer 中的字节流是否全部写入到通道中。 Master ->

Slave process

Org.apache.rocketmq.store.ha.HAService.AcceptSocketService#run:

Public void run () {log.info (this.getServiceName () + "service started"); while (! this.isStopped ()) {try {this.selector.select (1000); Set selected = this.selector.selectedKeys () If (selected! = null) {for (SelectionKey k: selected) {if ((k.readyOps () & SelectionKey.OP_ACCEPT)! = 0) {SocketChannel sc = ((ServerSocketChannel) k.channel ()) .accept () If (sc! = null) {HAService.log.info ("HAService receive new connection," + sc.socket (). GetRemoteSocketAddress ()); try {HAConnection conn = new HAConnection (HAService.this, sc); conn.start (); HAService.this.addConnection (conn) } catch (Exception e) {log.error ("new HAConnection exception", e); sc.close ();} else {log.warn ("Unexpected ops in select" + k.readyOps ());}} selected.clear () } catch (Exception e) {log.error (this.getServiceName () + "service has exception.", e);} log.info (this.getServiceName () + "service end");}

After the master server receives the pull offset from the server, it will be encapsulated into a HAConnection object. As mentioned earlier, HAConnection encapsulates the HA connection object of the master server, which includes the read implementation class and the realistic class. The start () method starts the read and write thread:

Org.apache.rocketmq.store.ha.HAConnection#start:

Public void start () {this.readSocketService.start (); this.writeSocketService.start ();}

Org.apache.rocketmq.store.ha.HAConnection.ReadSocketService#processReadEvent:

Private boolean processReadEvent () {int readSizeZeroTimes = 0; if (! this.byteBufferRead.hasRemaining ()) {this.byteBufferRead.flip (); this.processPostion = 0;} while (this.byteBufferRead.hasRemaining ()) {try {int readSize = this.socketChannel.read (this.byteBufferRead); if (readSize > 0) {readSizeZeroTimes = 0; this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore (). GetSystemClock (). Now () If ((this.byteBufferRead.position ()-this.processPostion) > = 8) {int pos = this.byteBufferRead.position ()-(this.byteBufferRead.position ()% 8); / / read the offset reported from the server from the network channel long readOffset = this.byteBufferRead.getLong (pos-8); this.processPostion = pos / / synchronous offset from the server HAConnection.this.slaveAckOffset = readOffset; if (HAConnection.this.slaveRequestOffset

< 0) { HAConnection.this.slaveRequestOffset = readOffset; log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset); } // 这里主要是同步后需要唤醒相关消息发送线程,实现主从同步是异步还是同步的功能 HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset); } } else if (readSize == 0) { if (++readSizeZeroTimes >

= 3) {break;}} else {log.error ("read socket [" + HAConnection.this.clientAddr + "]

< 0"); return false; } } catch (IOException e) { log.error("processReadEvent exception", e); return false; } } return true;} 从以上源码可看出,主服务器接收到从服务器上报的偏移量后,主要作了两件事: 获取从服务器上报的偏移量; 唤醒主从同步消费者发送消息同步返回的线程,该方法实现了主从同步-同步复制的功能。 org.apache.rocketmq.store.ha.HAConnection.WriteSocketService#run: public void run() { HAConnection.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { this.selector.select(1000); // 如果slaveRequestOffset=-1,说明读线程还没有获取从服务器的偏移量,继续循环等待 if (-1 == HAConnection.this.slaveRequestOffset) { Thread.sleep(10); continue; } // 如果nextTransferFromWhere=-1,说明线程刚开始执行数据传输 if (-1 == this.nextTransferFromWhere) { // 如果slaveRequestOffset=0,说明从服务器是第一次上报偏移量 if (0 == HAConnection.this.slaveRequestOffset) { // 获取最后一个 commitLog 文件且还未读取消费的偏移量 long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset(); // 求出最后一个commitLog偏移量的初始偏移量 masterOffset = masterOffset - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig() .getMapedFileSizeCommitLog()); if (masterOffset < 0) { masterOffset = 0; } // 更新 nextTransferFromWhere this.nextTransferFromWhere = masterOffset; } else { // 如果slaveRequestOffset!=0,则将该值赋值给nextTransferFromWhere this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset; } log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr + "], and slave request " + HAConnection.this.slaveRequestOffset); } // 判断上次写事件是否已全部写完成 if (this.lastWriteOver) { // 计算是否已到发送心跳包时间 long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp; // 发送心跳包,以保持长连接 if (interval >

HAConnection.this.haService.getDefaultMessageStore (). GetMessageStoreConfig () .getHaSendHeartbeatInterval () {/ / Build Header this.byteBufferHeader.position (0); this.byteBufferHeader.limit (headerSize); this.byteBufferHeader.putLong (this.nextTransferFromWhere); this.byteBufferHeader.putInt (0); this.byteBufferHeader.flip (); this.lastWriteOver = this.transferData () If (! this.lastWriteOver) continue;}} else {this.lastWriteOver = this.transferData (); if (! this.lastWriteOver) continue;} / / get synchronization message data SelectMappedBufferResult selectResult = HAConnection.this.haService.getDefaultMessageStore (). GetCommitLogData (this.nextTransferFromWhere); if (selectResult! = null) {int size = selectResult.getSize () If (size > HAConnection.this.haService.getDefaultMessageStore (). GetMessageStoreConfig (). GetHaTransferBatchSize ()) {size = HAConnection.this.haService.getDefaultMessageStore (). GetMessageStoreConfig () .getHaTransferBatchSize ();} long thisOffset = this.nextTransferFromWhere; this.nextTransferFromWhere + = size; selectResult.getByteBuffer (). Limit (size); this.selectMappedBufferResult = selectResult; / / Build Header this.byteBufferHeader.position (0) This.byteBufferHeader.limit (headerSize); this.byteBufferHeader.putLong (thisOffset); this.byteBufferHeader.putInt (size); this.byteBufferHeader.flip (); / / transfer messages to the slave server this.lastWriteOver = this.transferData ();} else {HAConnection.this.haService.getWaitNotifyObject () .allWaitForRunning } catch (Exception e) {HAConnection.log.error (this.getServiceName () + "service has exception.", e); break;}} if (this.selectMappedBufferResult! = null) {this.selectMappedBufferResult.release ();} this.makeStop (); readSocketService.makeStop (); haService.removeConnection (HAConnection.this); SelectionKey sk = this.socketChannel.keyFor (this.selector); if (sk! = null) {sk.cancel () } try {this.selector.close (); this.socketChannel.close ();} catch (IOException e) {HAConnection.log.error (", e);} HAConnection.log.info (this.getServiceName () +" service end ");}

Reading the implementation class implementation logic is relatively long, but mainly does the following things:

Calculate the offset that needs to be pulled. If it is pulled from the server for the first time, it will be synchronized from the initial offset of the last commitLog file

Transfer messages to the slave server

Send heartbeat packets to the slave server, maintaining a long connection.

With regard to the first step, I still need to explain in detail, because I have thought of a question before:

Remove the slave server of brokerA and start a new slave server to point to the brokerA master server. Will the messages of the master server be fully synchronized to the slave service?

Org.apache.rocketmq.store.MappedFileQueue#getMaxOffset:

Public long getMaxOffset () {MappedFile mappedFile = getLastMappedFile (); if (mappedFile! = null) {return mappedFile.getFileFromOffset () + mappedFile.getReadPosition ();} return 0;}

Org.apache.rocketmq.store.ha.HAConnection.WriteSocketService#run:

/ / find the initial offset of the last commitLog offset masterOffset = masterOffset-(masterOffset% HAConnection.this.haService.getDefaultMessageStore (). GetMessageStoreConfig () .getMapedFileSizeCommitLog ())

The answer can be found from the above logic. If there is a new slave server synchronization master server message, the synchronization starts with the initial offset of the last commitLog file.

Going back to the method that initially turned on the HAClient thread to report offsets, we found that one more thing was done in it:

/ / process the message sent by the master server boolean ok = this.processReadEvent ()

Org.apache.rocketmq.store.ha.HAService.HAClient#processReadEvent:

Private boolean processReadEvent () {int readSizeZeroTimes = 0; while (this.byteBufferRead.hasRemaining ()) {try {int readSize = this.socketChannel.read (this.byteBufferRead); if (readSize > 0) {lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock () .now (); readSizeZeroTimes = 0; / / read the message and write boolean result = this.dispatchReadRequest () to the commitLog file If (! result) {log.error ("HAClient, dispatchReadRequest error"); return false;}} else if (readSize = = 0) {if (+ + readSizeZeroTimes > = 3) {break;} else {/ / TODO ERROR log.info ("HAClient, processReadEvent read socket < 0"); return false }} catch (IOException e) {log.info ("HAClient, processReadEvent read socket exception", e); return false;}} return true;}

This method is used to deal with the message data sent back by the master server. Here, the processing of the while loop is used to continuously read the data from byteBuffer to the buffer. Finally, the dispatchReadRequest method is called to write the message data into the commitLog file to complete the last step of master-slave replication.

At this point, the study of "the case analysis of RocketMQ master-slave synchronization and the principle of HA mechanism" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report