In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-29 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
In view of how RocketMQ stores files, this article introduces the corresponding analysis and solutions in detail, hoping to help more partners who want to solve this problem to find a more simple and easy way.
The RocketMQ storage path defaults to ${ROCKRTMQ_HOME} / store, which mainly stores the index of messages, message queues corresponding to topics, and so on.
1. Overview
View its catalog file
Commitlog: the directory where the message is stored
Config: some configuration information during operation
Consumequeue: message consumption queue storage directory
Index: message index file storage directory
Abort: if there is an abort file that indicates that Broker is not normally closed, the file will be created when it is started by default and deleted when it exits normally.
Checkpoint: file detection point. Stores the time stamp of the last flush of commitlog files, the time of last flush of consumequeue, and the timestamp of the last flush of index index files.
2. File introduction 2. 1, commitlog file
The storage address of the commitlog file: $HOME\ store\ commitlog$ {fileName}. The default size of each file is 1G = 102410241024. The name name is fileName, with a length of 20 digits, zero padding on the left, and the rest is the starting offset. For example, 0000000000000000 represents the first file, the starting offset is 0, and the file size is 1G=1073741824. When this file is full, the second file name is 000000001073741824, the starting offset is 1073741824, and so on, the third file name is 00000000002147483648, the starting offset is 2147483648, the file is written sequentially when the message is stored, and when the file is full, the next file is written.
The files under the commitlog directory mainly store messages. The length of each message is different. Look at the logical view of its storage. The first four bytes of each message store the total length of the message.
The message unit storage details of the file
Numbered field abbreviation field size (in bytes) Field meaning 1msgSize4 represents the size of the message 2MAGICCODE4MAGICCODE = daa320a73BODY CRC4 message body BODY CRC when broker restarts recover 4queueId4 is checked
5flag4
The value of 6QUEUEOFFSET8 is an offset that is not a real consume queue, and can represent the number of messages in this consumeQueue queue or tranStateTable queue. If it is a non-transactional message or a commit transactional message, you can find the data in the consumeQueue by using this value. QUEUEOFFSET * 20 is the offset address. If it is a PREPARED or Rollback transaction, you can use this value to find data from tranStateTable that represents the physical starting address offset of the message in commitLog. 8SYSFLAG4 indicates that the message is the state of things and other message characteristics. The binary number is four bytes from right to left: when all four bytes are 0 (value 0), it means non-transactional message; when the first byte is 1 (value 1), the message is Compressed. Multiple messages (MultiTags) when the second byte is 1 (value 2), prepared message when the third byte is 1 (value 4), commit message when the fourth byte is 1 (value 8), and rollback message when the third / fourth byte is 1 (value 12) The timestamp of the non-transactional message 9BORNTIMESTAMP8 message generator (producer) when the third / fourth byte is 0. The 10BORNHOST8 message generator (producer) address (address:port) the 11STORETIMESTAMP8 message stored at the broker storage time the 12STOREHOSTADDRESS8 message stored to the broker address (address:port) the 13RECONSUMETIMES8 message was re-charged several times by a subscription group (counted independently between subscription groups) because the retry message was sent to the queue queueId=0 of topic name% retry%groupName Successful consumption is recorded as 0 14PreparedTransaction Offset8 represents transaction messages in prepared state 15messagebodyLength4 message body size value 16messagebodybodyLength message body content 17topicLength1topic name content size 18topictopicLengthtopic content value 19propertiesLength2 attribute value size 20propertiespropertiesLengthpropertiesLength size attribute data 2.2, consumequeue
RocketMQ implements the consumption of messages based on the topic subscription model, and consumers are concerned about all the messages under the topic. However, because the messages of different topics are not continuously stored in the commitlog file, you can imagine how slow it will be if you just retrieve the message file. In order to improve efficiency, the corresponding topic queue establishes an index file. In order to speed up message retrieval and save disk space, each consumequeue entry stores the offset, message length and tag hashcode value of the key information of the message in the commitog file.
View the directory structure:
A single consumequeue file contains 300000 entries by default, each with 20 bytes, so the size of each file is a fixed 20w x 20 bytes. A single consumequeue file can be thought of as an array, and the subscript is the logical offset. The offset of the consumption progress storage of the message is the logical offset.
2.3 、 IndexFile
IndexFile: used to provide access to the generated index file, querying the real physical content of the message through the message key value. On the actual physical storage, the file name is named after the timestamp at the time of creation, the size of a fixed single IndexFile file is about 400m, and an IndexFile can hold 2000W indexes.
2.3.1. IndexFile structure analysis
IndexHead data: beginTimestamp: the index file contains the minimum storage time of the message endTimestamp: the index file contains the maximum storage time of the message beginPhyoffset: the index file contains the minimum physical offset of the message (commitlog file offset) endPhyoffset: the index file contains the maximum physical offset of the message (commitlog file offset) hashSlotCount:hashslot number, not the number of hash slots used, it is not meaningful here, indexCount: the number of Index entries used
Hash slots: an IndexFile contains 500W Hash slots by default, and each Hash slot stores the index of the latest hashcode Index that falls in that Hash slot.
Hashcode phyoffset of Index entry list hashcode:key: physical offset of the message timedif: the difference between the storage time of the message and the timestamp of the first message. Less than 0 indicates that the message is invalid preIndexNo: the Index index of the previous record of the entry. When the hash conflicts, the linked list structure is constructed based on this value.
2.3.2, IndexFile entry storage
RocketMQ writes the mapping relationship between the message index key and the offset of the message into IndexFile, and its core implementation method is public boolean putKey (final String key, final long phyOffset, final long storeTimestamp). The meaning of the parameters are the index of the message, the physical offset of the message, and the storage time of the message.
Public boolean putKey (final String key, final long phyOffset, final long storeTimestamp) {/ / determine whether the current number of entries is greater than the maximum allowed number of entries if (this.indexHeader.getIndexCount () < this.indexNum) {/ / get the hash value of KEY (positive integer) int keyHash = indexKeyHashMethod (key); / / calculate the subscript int slotPos of hash slot = keyHash% this.hashSlotNum / / get the physical address of the hash slot int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize; FileLock fileLock = null; try {/ / fileLock = this.fileChannel.lock (absSlotPos, hashSlotSize, / / false) / / get the data stored in the hash slot int slotValue = this.mappedByteBuffer.getInt (absSlotPos); / / determine whether the value is less than or equal to 0 or greater than the maximum entry of the current index file if (slotValue this.indexHeader.getIndexCount ()) {slotValue = invalidIndex } / / calculate the time difference between the current message storage time and the first message timestamp long timeDiff = storeTimestamp-this.indexHeader.getBeginTimestamp (); / / second timeDiff = timeDiff / 1000; if (this.indexHeader.getBeginTimestamp () Integer.MAX_VALUE) {timeDiff = Integer.MAX_VALUE } else if (timeDiff < 0) {timeDiff = 0 } / / physical address of calculated entry = size of index header (40 bytes) + size of hash slot (4 bytes) * number of slots (500w) + number of maximum entries of current index * size per index (20 bytes) int absIndexPos = IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + this.indexHeader.getIndexCount () * indexSize / / the hash value stored in key (4 bytes) + the physical offset of the message (8 bytes) + the timestamp difference between the message storage timestamp and the index file (4 bytes) + the value of the current hash slot (4 bytes) this.mappedByteBuffer.putInt (absIndexPos, keyHash); this.mappedByteBuffer.putLong (absIndexPos + 4, phyOffset) This.mappedByteBuffer.putInt (absIndexPos + 4 + 8, (int) timeDiff); this.mappedByteBuffer.putInt (absIndexPos + 4 + 8 + 4, slotValue); / / Storage the number of entries contained in the current index into the hash slot, overwriting the value of the original hash slot this.mappedByteBuffer.putInt (absSlotPos, this.indexHeader.getIndexCount ()) If (this.indexHeader.getIndexCount () = begin) & & (timeRead
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 228
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.