In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
In this issue, the editor will bring you about how to deeply analyze the buffer pool mechanism of Kafka Producer. The article is rich in content and analyzes and describes it from a professional point of view. I hope you can get something after reading this article.
In-depth analysis of the buffer pool mechanism of Kafka Producer
In the new version of Kafka Producer, a message buffer pool is designed. When creating a Producer, a buffer pool of 32m is created by default, or the size of the buffer pool can be specified through the buffer.memory parameter. At the same time, the buffer pool is divided into multiple memory blocks. The size of the memory block is the size of the batch.size passed when we created the Producer, and the default size is 16384, and each Batch contains a batch.size-sized memory block. The message is stored in the memory block. The structure of the entire buffer pool is shown in the following figure:
The client appends the message to a Batch of the corresponding topic partition, creates a new Batch if the Batch is full, and requests a batch.size block of memory from the buffer pool (RecordAccumulator) to store the message.
When a message from Batch is sent to Broker, Kafka Producer removes the Batch, and since Batch holds a block of memory, it is bound to involve a GC problem.
Frequent requests for memory will be discarded after use, which will inevitably lead to frequent GC and serious performance problems. So how does Kafka avoid frequent GC?
As mentioned earlier, the buffer pool is split into blocks of memory of the same size on the design logic, and when the message is sent, can't it be returned to the buffer pool to avoid being reclaimed?
The memory holding class of the buffer pool is BufferPool. Let's first take a look at the members of BufferPool:
Public class BufferPool {/ / Total memory size private final long totalMemory; / / each memory block size, that is, batch.size private final int poolableSize; / / synchronization lock of the method of applying for and returning memory private final ReentrantLock lock; / / idle memory block private final Deque free; / / need to wait for the event private final Deque waiters; / * * Total available memory is the sum of nonPooledAvailableMemory and the number of byte buffers in free * poolableSize of free memory blocks. * / / the unallocated free memory in the buffer pool, from which the newly applied memory block obtains the memory value private long nonPooledAvailableMemory; / /.}
As can be seen from the members of the BufferPool, the buffer pool is actually made up of ByteBuffer. The BufferPool holds these memory blocks and stores them in the member free. The total size of the free is limited by the totalMemory, while the nonPooledAvailableMemory indicates how much memory is left in the buffer pool that has not been allocated.
When the Batch message is sent, the memory block it holds will be returned to the free, so that when the later Batch applies for the memory block, it will not create a new ByteBuffer, but can take it from the free, thus avoiding the problem of the memory block being reclaimed by JVM.
Next, let's analyze how to apply for memory and return memory.
1. Apply for memory
Apply for access to memory:
Org.apache.kafka.clients.producer.internals.BufferPool#allocate
1) sufficient memory
When a user requests for memory, if he discovers that there is free memory in the free, he or she will take it directly:
If (size = = poolableSize & &! this.free.isEmpty ()) {return this.free.pollFirst ();}
The size here is the requested memory size, which is equal to Math.max (this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound (maxUsableMagic, compression, key, value, headers))
That is, if your message size is less than batchSize, the memory size of the application is batchSize, then the above logic is that if the memory size of the application is equal to batchSize and the free is not idle, it will be obtained directly from free.
Let's think about it: why does Kafka have to apply for memory equal to batchSize in order to get free blocks of memory from free?
As mentioned earlier, the memory block size of the buffer pool is fixed, which is equal to batchSize. If the applied memory is larger than batchSize, it means that a message needs to store more memory than the memory block, so if it does not meet the demand, what if it is not satisfied with the needs of the group? Let's move on to the analysis:
/ / now check if the request is immediately satisfiable with the// memory on hand or if we need to blockint freeListSize = freeSize () * this.poolableSize;if (this.nonPooledAvailableMemory + freeListSize > = size) {/ / we have enough unallocated or pooled memory to immediately / / satisfy the request, but need to allocate the buffer freeUp (size); this.nonPooledAvailableMemory-= size;}
FreeListSize: refers to the total size of free memory blocks allocated and reclaimed in the free
NonPooledAvailableMemory: the unallocated free memory in the buffer pool, where the newly requested memory block gets the memory value.
This.nonPooledAvailableMemory + freeListSize: the total free memory space in the buffer pool.
If the memory space of the buffer pool is larger than the requested memory size, the freeUp (size); method is called, and then the free memory size is subtracted from the requested memory size.
Private void freeUp (int size) {while (! this.free.isEmpty () & & this.nonPooledAvailableMemory < size) this.nonPooledAvailableMemory + = this.free.pollLast () .capacity ();}
FreeUp is an interesting method, and its idea goes like this:
If the unallocated memory size is smaller than the requested memory, you can only recover the memory space from the allocated memory list free until the nonPooledAvailableMemory is larger than the requested memory.
2) insufficient memory
In my "Kafka Producer asynchronously sends messages can also block?" "it is also mentioned in this article that when the memory block of the buffer pool is used up, the message append call will be blocked until there are free memory blocks.
How is the logic of blocking waiting implemented?
/ / we are out of memory and will have to blockint accumulated = 0 th condition moreMemory = this.lock.newCondition (); try {long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos (maxTimeToBlockMs); this.waiters.addLast (moreMemory); / / loop over and over until we have a buffer or have reserved / / enough memory to allocate one while (accumulated < size) {long startWaitNs = time.nanoseconds (); long timeNs; boolean waitingTimeElapsed; try {waitingTimeElapsed =! moreMemory.await (remainingTimeToBlockNs, TimeUnit.NANOSECONDS) } finally {long endWaitNs = time.nanoseconds (); timeNs = Math.max (0L, endWaitNs-startWaitNs); recordWaitTime (timeNs);} if (waitingTimeElapsed) {throw new TimeoutException ("Failed to allocate memory within the configured max blocking time" + maxTimeToBlockMs + "ms.");} remainingTimeToBlockNs-= timeNs / / check if we can satisfy this request from the free list, / / otherwise allocate memory if (accumulated = = 0 & & size = = this.poolableSize & &! this.free.isEmpty ()) {/ / just grab a buffer from the free list buffer = this.free.pollFirst (); accumulated = size;} else {/ / we'll need to allocate memory, but we may only get / / part of what we need on this iteration freeUp (size-accumulated) Int got = (int) Math.min (size-accumulated, this.nonPooledAvailableMemory); this.nonPooledAvailableMemory-= got; accumulated + = got;}}
The general logic of the above source code:
First, create a waiting Condition and add it to the waiters of type Deque (which will be awakened later in the returned memory). The while loop continues to collect free memory until it exits when the memory is larger than the requested memory. During the while loop, the Condition#await method is called to block waiting, and it will be awakened when the memory is returned. After waking up, it will determine whether the currently applied memory is larger than batchSize. If you wait for batchSize, you can return the returned memory directly. If the currently applied memory is greater than batchSize, you need to call the freeUp method to free free memory from free, and then accumulate it until it is larger than the requested memory.
2. Return the memory
Apply for access to memory:
Org.apache.kafka.clients.producer.internals.BufferPool#deallocate (java.nio.ByteBuffer, int)
Public void deallocate (ByteBuffer buffer, int size) {lock.lock (); try {if (size = = this.poolableSize & & size = = buffer.capacity ()) {buffer.clear (); this.free.add (buffer);} else {this.nonPooledAvailableMemory + = size;} Condition moreMem = this.waiters.peekFirst (); if (moreMem! = null) moreMem.signal ();} finally {lock.unlock ();}}
The logic for returning memory blocks is relatively simple:
If the size of the returned memory block is equal to batchSize, it is added to the free of the buffer pool after it is emptied, that is, it is returned to the buffer pool, preventing the JVM GC from reclaiming the memory block. What if it doesn't equal? Just add the memory size to the unallocated and free memory size value, and there is no need to return the memory, wait for the JVM GC to be reclaimed, and finally wake up the thread waiting for free memory.
After the above source code analysis, point out a problem that you should pay attention to, which will have a serious performance impact on the Producer side:
If your message size is larger than batchSize, the allocated memory block will not be recycled from the free, but a new ByteBuffer will be recreated, and the ByteBuffer will not be returned to the buffer pool (JVM GC recycling), and if the nonPooledAvailableMemory is smaller than the message body, the free memory blocks in the free will be destroyed (JVM GC recycling) so that there is enough memory space in the buffer pool for users to apply for. All of these actions can lead to frequent GC problems.
Therefore, it is necessary to adjust the size of batch.size appropriately according to the size of business messages to avoid frequent GC.
The above is the editor for you to share how to deeply analyze the buffer pool mechanism of Kafka Producer, if you happen to have similar doubts, you might as well refer to the above analysis. If you want to know more about it, you are welcome to follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.