In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article is about how to pull messages from the broker message store in RocketMQ. The editor thinks it is very practical, so share it with you as a reference and follow the editor to have a look.
When consumer pulls a message, broker will first get the corresponding ConsumeQueue according to the topic+queueId to be pulled, then obtain the real offset/msgsize/tagscode information of the message in the commitlog from the corresponding offset position of the ConsumeQueue according to the consumption offset, and finally find out the message body from the commitlog.
The invocation entry for message pull in the broker storage layer is the DefaultMessageStore.getMessage method. The core logic is as follows:
/ / DefaultMessageStore.java public GetMessageResult getMessage (final String group, final String topic, final int queueId, final long offset, final int maxMsgNums, final MessageFilter messageFilter) {/... / 1. Locate ConsumeQueue ConsumeQueue consumeQueue = findConsumeQueue (topic, queueId); if (consumeQueue! = null) {minOffset = consumeQueue.getMinOffsetInQueue (); maxOffset = consumeQueue.getMaxOffsetInQueue (); if (maxOffset = = 0) {status = GetMessageStatus.NO_MESSAGE_IN_QUEUE; nextBeginOffset = nextOffsetCorrection (offset, 0);} else if (offset
< minOffset) { status = GetMessageStatus.OFFSET_TOO_SMALL; nextBeginOffset = nextOffsetCorrection(offset, minOffset); } else if (offset == maxOffset) { status = GetMessageStatus.OFFSET_OVERFLOW_ONE; nextBeginOffset = nextOffsetCorrection(offset, offset); } else if (offset >MaxOffset) {status = GetMessageStatus.OFFSET_OVERFLOW_BADLY; if (0 = = minOffset) {nextBeginOffset = nextOffsetCorrection (offset, minOffset);} else {nextBeginOffset = nextOffsetCorrection (offset, maxOffset);}} else {/ / 2. Read the content at consumption offset offset from ConsumeQueue SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer (offset); if (bufferConsumeQueue! = null) {try {status = GetMessageStatus.NO_MATCHED_MESSAGE; long nextPhyFileStartOffset = Long.MIN_VALUE; long maxPhyOffsetPulling = 0 Int I = 0; final int maxFilterMessageCount = Math.max (16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE); / / maximum amount of data pulled by a single request final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded (); ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit (); for (; I
< bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); // commitlog offset 8bytes int sizePy = bufferConsumeQueue.getByteBuffer().getInt(); // msg size 4bytes long tagsCode = bufferConsumeQueue.getByteBuffer().getLong(); // tags hashcode 8bytes // ... // 3. 通过tagscode快速过滤 if (messageFilter != null && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.NO_MATCHED_MESSAGE; } continue; } // 4. 从commitlog获取消息体 SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy); if (null == selectResult) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.MESSAGE_WAS_REMOVING; } nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy); continue; } // 5. 通过消息体过滤 if (messageFilter != null && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.NO_MATCHED_MESSAGE; } // release... selectResult.release(); continue; } this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet(); // 6.添加到返回结果 getResult.addMessage(selectResult); status = GetMessageStatus.FOUND; nextPhyFileStartOffset = Long.MIN_VALUE; } // ... } finally { bufferConsumeQueue.release(); } } else { // ... } } } else { status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE; nextBeginOffset = nextOffsetCorrection(offset, 0); } // ... return getResult; } ConsumeQueue中存储的是固定长度(每个消息20字节)的内容,因此访问比较简单: // ConsumeQueue.java public SelectMappedBufferResult getIndexBuffer(final long startIndex) { int mappedFileSize = this.mappedFileSize; long offset = startIndex * CQ_STORE_UNIT_SIZE; // 消费者offset * 固定20字节长度 if (offset >= this.getMinLogicOffset () {/ / navigate to the MappedFile MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset (offset); if (mappedFile! = null) {SelectMappedBufferResult result = mappedFile.selectMappedBuffer ((int) (offset% mappedFileSize)); / / read the actual data from MappedFile return result;}} return null;}
After obtaining the offset and size of the message in commitlog through ConsumeQueue, the method to get the message body is as follows
/ / CommitLog.java public SelectMappedBufferResult getMessage (final long offset, final int size) {int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig () .getMappedFileSizeCommitLog (); / / locate the MappedFile MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset (offset, offset = = 0) where the message is located; if (mappedFile! = null) {int pos = (offset% mappedFileSize); return mappedFile.selectMappedBuffer (pos, size) / / get the message body from MappedFile} return null;}
The overall process of message pull is as follows
Thank you for reading! This is the end of the article on "how to pull messages from broker message Storage in RocketMQ". I hope the above content can be of some help to you, so that you can learn more knowledge. if you think the article is good, you can share it for more people to see!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.