Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Example Analysis of optimizing Zero copy of memory Pool in JDK buffer by Netty

2025-04-09 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article mainly introduces Netty's example analysis of zero-copy optimization of memory pool in JDK buffer. It is very detailed and has certain reference value. Friends who are interested must finish it!

The buffer in NIO is the basis of data transmission, and JDK is implemented through ByteBuffer. Netty framework does not use JDK native ByteBuffer, but constructs ByteBuf.

ByteBuf has made a lot of optimizations for ByteBuffer, such as memory pool, zero copy, reference counting (independent of GC). This paper mainly analyzes these optimizations, learns these optimization ideas, and uses these optimization schemes and ideas for reference in practical engineering.

Direct memory and heap memory

First of all, let's talk about the basics that need to be used. In JVM, memory can be divided into two large blocks, one is heap memory, the other is direct memory. Here's a brief introduction.

Heap memory:

Heap memory is the memory managed by Jvm, compared to the method area, stack memory, heap memory is the largest. All object instance instances and arrays are allocated on the heap.

Java's garbage collector can collect garbage on the heap.

Direct memory:

JVM uses the Native function to allocate memory outside the heap, which is then manipulated through the DirectByteBuffer object in the Java heap as a reference to that memory. Direct memory is not limited by the Java heap, only by native memory.

Java's GC will only clean up obsolete objects in direct memory when the old age area is full of triggers for Full GC.

JDK native buffer ByteBuffer

In NIO, all data is processed with buffers. Reading and writing data is carried out in the buffer. A cache is essentially an array, usually using a byte buffer-- ByteBuffer.

Attributes:

Mode of use:

ByteBuffer can apply for two ways of memory, namely heap memory and direct memory. First, look at applying for heap memory.

/ / Application heap memory ByteBuffer HeapbyteBuffer = ByteBuffer.allocate (1024)

It's very simple, just one line of code, and then look at the allocate method.

Public static ByteBuffer allocate (int capacity) {if (capacity)

< 0) throw new IllegalArgumentException(); return new HeapByteBuffer(capacity, capacity); } 其实就是new一个HeapByteBuffer对象。这个 HeapByteBuffer继承自ByteBuffer,构造器采用了父类的构造器,如下所示: HeapByteBuffer(int cap, int lim) { // package-private super(-1, 0, lim, cap, new byte[cap], 0); /* hb = new byte[cap]; offset = 0; */ }//ByteBuffer构造器 ByteBuffer(int mark, int pos, int lim, int cap, // package-private byte[] hb, int offset) { super(mark, pos, lim, cap); this.hb = hb; this.offset = offset; } 结合ByteBuffer的四个属性,初始化的时候就可以赋值capaticy,limit,position,mark,至于byte[] hb, int offsef这两个属性,JDK文档给出的解释是 backing array , and array offset 。它是一个回滚数组,offset是数组的偏移值。 申请直接内存: // 申请直接内存 ByteBuffer DirectbyteBuffer = ByteBuffer.allocateDirect(1024); allocateDirect()实际上就是new的一个DirectByteBuffer对象,不过这个new 一个普通对象不一样。这里使用了Native函数来申请内存,在Java中就是调用unsafe对象 public static ByteBuffer allocateDirect(int capacity) { return new DirectByteBuffer(capacity); } DirectByteBuffer(int cap) { // package-private super(-1, 0, cap, cap); boolean pa = VM.isDirectMemoryPageAligned(); int ps = Bits.pageSize(); long size = Math.max(1L, (long)cap + (pa ? ps : 0)); Bits.reserveMemory(size, cap); long base = 0; try { base = unsafe.allocateMemory(size); } catch (OutOfMemoryError x) { Bits.unreserveMemory(size, cap); throw x; } unsafe.setMemory(base, size, (byte) 0); if (pa && (base % ps != 0)) { // Round up to page boundary address = base + ps - (base & (ps - 1)); } else { address = base; } cleaner = Cleaner.create(this, new Deallocator(base, size, cap)); att = null; } View Code 申请方法不同的内存有不同的用法。接下来看一看ByteBuffer的常用方法与如何使用 ByteBuffer的常用方法与使用方式 Bytebuf的读和写是使用put()和get()方法实现的 // 读操作public byte get() { return hb[ix(nextGetIndex())]; }final int nextGetIndex() { if (position >

= limit) throw new BufferUnderflowException (); return position++;} / / write operation public ByteBuffer put (byte x) {hb [ix (nextPutIndex ())] = x; return this;} final int nextPutIndex () {if (position > = limit) throw new BufferOverflowException (); return position++;}

As you can see from the code, both read and write operations change the position property of ByteBuffer, and these two operations are common position properties. This will bring a problem, read and write operations will lead to data errors, ah, data location errors.

ByteBuffer provides the flip () method, which changes the positions of position and limit when switching between read and write modes. Take a look at how flip () works:

Public final Buffer flip () {/ / 1. Set limit to the current location limit = position; / / 2. Set position to 0 position = 0; mark =-1; return this;}

I won't focus on it here. Some details can be delved into by myself.

ByteBuf of Netty

Netty uses its own ByteBuf object for data transfer, which essentially uses the appearance pattern to encapsulate the ByteBuffer of JDK.

Compared to the native ByteBuffer,Netty ByteBuf does a lot of optimization, zero copy, memory pool acceleration, read and write indexes.

Why use memory pools?

First of all, it is important to understand that Netty's memory pool is not dependent on JVM's own GC.

Review the GC of direct memory:

As mentioned above, Java's GC will only clean up obsolete objects in direct memory when the seniority area is full of Full GC.

The direct memory in JVM, stored in the heap memory, is actually the DirectByteBuffer class, which is actually very small. The real memory is outside the heap. Here is the mapping relationship.

Every time you apply for direct memory, check to see if it exceeds the limit-the limit for direct memory is default (available-XX:MaxDirectMemorySize reset).

If the limit is exceeded, System.gc () is actively executed, which has an impact and the system interrupts 100ms. If direct memory is not successfully reclaimed and the direct memory limit is exceeded, an OOM-- memory overflow will be thrown.

Continue to analyze from the perspective of GC, DirectByteBuffer will enter the old age after several times of young gc. When the old age is over, Full GC will be triggered.

Because it is very small, it is difficult to fill the old age, so basically will not trigger Full GC, the consequence is that a large number of out-of-stack memory has been occupied, unable to carry out memory recovery.

A last resort is to rely on the system.gc () triggered when the application limit is exceeded, but as mentioned earlier, it interrupts the process 100ms, and if the system does not complete the GC in between the 100ms, it will still throw an OOM.

So this last option is not completely safe.

Netty uses reference counting to actively reclaim memory. Recycled objects include non-pooled direct memory and memory in the memory pool.

Memory leak detection for memory pools?

Netty uses the reference counting mechanism to manage resources, and ByteBuf actually implements the ReferenceCounted interface, adding 1 to the reference count when the ByteBuf object is instantiated.

When the application code maintains an object reference, the retain method is called to increase the count by 1, the object is released after use, and release is called to subtract the counter by 1.

When the reference count changes to 0, the object releases all resources and returns to the memory pool.

Netty memory leak detection level:

Disable (DISABLED)-completely disables leak detection. Not recommended.

Simple (SIMPLE)-tells us whether the 1% buffer sampled has been leaked. Default.

Advanced (ADVANCED)-tells us where 1% of the sampled buffer is leaked

Paranoia (PARANOID)-similar to the advanced option, but this option detects all buffers, not just the 1% sampled. This option is useful during the automated testing phase. If the build output contains LEAK, you can assume that the build failed or use the-Dio.netty.leakDetectionLevel option of JVM to specify the leak detection level.

Memory tracking

Allocate memory in the memory pool, and the resulting ByteBuf objects are encapsulated by the toLeakAwareBuffer () method, which counts references to the ByteBuf object and wraps the ByteBuf with SimpleLeakAwareByteBuf or AdvancedLeakAwareByteBuf. In addition, this method only detects memory leaks for direct memory in non-pool memory and memory in memory pool.

/ / decorator mode, using SimpleLeakAwareByteBuf or AdvancedLeakAwareByteBuf to wrap the original ByteBufprotected static ByteBuf toLeakAwareBuffer (ByteBuf buf) {ResourceLeakTracker leak; / / choose which decorator to use according to the set Level switch (ResourceLeakDetector.getLevel ()) {case SIMPLE:// to create a ResourcLeak object leak = AbstractByteBuf.leakDetector.track (buf) for tracking and representing content leaks If (leak! = null) {/ / only call ResourceLeak.record buf = new SimpleLeakAwareByteBuf (buf, leak) in the ByteBuf.order method;} break; case ADVANCED: case PARANOID: leak = AbstractByteBuf.leakDetector.track (buf) If (leak! = null) {/ / only call ResourceLeak.record buf = new AdvancedLeakAwareByteBuf (buf, leak) in the ByteBuf.order method;} break; default: break;} return buf;}

In fact, memory leak detection is done in AbstractByteBuf.leakDetector.track (buf), so let's take a look at the implementation of the track method.

/ * * Creates a new {@ link ResourceLeakTracker} which is expected to be closed via * {@ link ResourceLeakTracker#close (Object)} when the related resource is deallocated. * * @ return the {@ link ResourceLeakTracker} or {@ code null} * / @ SuppressWarnings ("unchecked") public final ResourceLeakTracker track (T obj) {return track0 (obj);} @ SuppressWarnings ("unchecked") private DefaultResourceLeak track0 (T obj) {Level level = ResourceLeakDetector.level; / / No memory tracking if (level = = Level.DISABLED) {return null } if (level.ordinal ()

< Level.PARANOID.ordinal()) { //如果监控级别低于PARANOID,在一定的采样频率下报告内存泄露 if ((PlatformDependent.threadLocalRandom().nextInt(samplingInterval)) == 0) { reportLeak(); return new DefaultResourceLeak(obj, refQueue, allLeaks); } return null; } //每次需要分配 ByteBuf 时,报告内存泄露情况 reportLeak(); return new DefaultResourceLeak(obj, refQueue, allLeaks); } 再来看看返回对象--DefaultResourceLeak,他的实现方式如下: private static final class DefaultResourceLeak extends WeakReference implements ResourceLeakTracker, ResourceLeak { 它继承了虚引用WeakReference,虚引用完全不影响目标对象的垃圾回收,但是会在目标对象被VM垃圾回收时加入到引用队列, 正常情况下ResourceLeak对象,会将监控的资源的引用计数为0时被清理掉。 但是当资源的引用计数失常,ResourceLeak对象也会被加入到引用队列. 存在着这样一种情况:没有成对调用ByteBuf的retain和relaease方法,导致ByteBuf没有被正常释放,当 ResourceLeak(引用队列) 中存在元素时,即表明有内存泄露。 Netty中的 reportLeak()方法来报告内存泄露情况,通过检查引用队列来判断是否有内存泄露,并报告跟踪情况. 方法代码如下: View Code Handler中的内存处理机制 Netty中有handler链,消息有本Handler传到下一个Handler。所以Netty引入了一个规则,谁是最后使用者,谁负责释放。 根据谁最后使用谁负责释放的原则,每个Handler对消息可能有三种处理方式 对原消息不做处理,调用 ctx.fireChannelRead(msg)把原消息往下传,那不用做什么释放。 将原消息转化为新的消息并调用 ctx.fireChannelRead(newMsg)往下传,那必须把原消息release掉。 如果已经不再调用ctx.fireChannelRead(msg)传递任何消息,那更要把原消息release掉。 假设每一个Handler都把消息往下传,Handler并也不知道谁是启动Netty时所设定的Handler链的最后一员,所以Netty在Handler链的最末补了一个TailHandler,如果此时消息仍然是ReferenceCounted类型就会被release掉。 总结: 1.Netty在不同的内存泄漏检测级别情况下,采样概率是不一样的,在Simple情况下出现了Leak,要设置"-Dio.netty.leakDetectionLevel=advanced"再跑一次代码,找到创建和访问的地方。 2.Netty中的内存泄露检测是通过对ByteBuf对象进行装饰,利用虚引用和引用计数来对非池中的直接内存和内存池中内存进行跟踪,判断是否发生内存泄露。 3.计数器基于 AtomicIntegerFieldUpdater,因为ByteBuf对象很多,如果都把int包一层AtomicInteger花销较大,而AtomicIntegerFieldUpdater只需要一个全局的静态变量。 Netty中的内存单位 Netty中将内存池分为五种不同的形态:Arena,ChunkList,Chunk,Page,SubPage. 首先来看Netty最大的内存单位PoolArena--连续的内存块。它是由多个PoolChunkList和两个SubPagePools(一个是tinySubPagePool,一个是smallSubPagePool)组成的。如下图所示: 1.PoolChunkList是一个双向的链表,PoolChunkList负责管理多个PoolChunk的生命周期。 2.PoolChunk中包含多个Page,Page的大小默认是8192字节,也可以设置系统变量io.netty.allocator.pageSize来改变页的大小。自定义页大小有如下限制:1.必须大于4096字节,2.必须是2的整次数幂。 3.块(PoolChunk)的大小是由页的大小和maxOrder算出来的,计算公式是: chunkSize = 2^{maxOrder} * pageSize。 maxOrder的默认值是11,也可以通过io.netty.allocator.maxOrder系统变量设置,只能是0-14的范围,所以chunksize的默认大小为:(2^11)*8192=16MB Page中包含多个SubPage。 PoolChunk内部维护了一个平衡二叉树,如下图所示: PoolSubPage 通常一个页(page)的大小就达到了10^13(8192字节),通常一次申请分配内存没有这么大,可能很小。 于是Netty将页(page)划分成更小的片段--SubPage Netty定义这样的内存单元是为了更好的分配内存,接下来看一下一个ByteBuf是如何在内存池中申请内存的。 Netty如何分配内存池中的内存? 分配原则: 内存池中的内存分配是在PoolArea中进行的。 申请小于PageSize(默认8192字节)的内存,会在SubPagePools中进行分配,如果申请内存小于512字节,则会在tingSubPagePools中进行分配,如果大于512小于PageSize字节,则会在smallSubPagePools进行分配。 申请大于PageSize的内存,则会在PoolChunkList中进行分配。 申请大于ChunkSize的内存,则不会在内存池中申请,而且也不会重用该内存。 应用中在内存池中申请内存的方法: // 在内存池中申请 直接内存 ByteBuf directByteBuf = ByteBufAllocator.DEFAULT.directBuffer(1024); // 在内存池中申请 堆内存 ByteBuf heapByteBuf = ByteBufAllocator.DEFAULT.heapBuffer(1024); 接下来,一层一层的看下来,在Netty中申请内存是如何实现的。就拿申请直接内存举例,首先看directBuffer方法。 // directBuffer方法实现 @Override public ByteBuf directBuffer(int initialCapacity) { return directBuffer(initialCapacity, DEFAULT_MAX_CAPACITY); } // 校验申请大小,返回申请的直接内存 @Override public ByteBuf directBuffer(int initialCapacity, int maxCapacity) { if (initialCapacity == 0 && maxCapacity == 0) { return emptyBuf; } validate(initialCapacity, maxCapacity); return newDirectBuffer(initialCapacity, maxCapacity); } //PooledByteBufAllocator类中的 newDirectBuffer方法的实现 @Override protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { // Netty避免每个线程对内存池的竞争,在每个线程都提供了PoolThreadCache线程内的内存池 PoolThreadCache cache = threadCache.get(); PoolArena directArena = cache.directArena; // 如果缓存存在,则分配内存 final ByteBuf buf; if (directArena != null) { buf = directArena.allocate(cache, initialCapacity, maxCapacity); } else { // 缓存不存在,则分配非池内存 buf = PlatformDependent.hasUnsafe() ? UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) : new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } // 通过toLeakAwareBuffer包装成内存泄漏检测的buffer return toLeakAwareBuffer(buf); } 一般情况下,内存都是在buf = directArena.allocate(cache, initialCapacity, maxCapacity)这行代码进行内存分配的,也就是说在内存的连续块PoolArena中进行的内存分配。 接下来,我们根据内存分配原则来进行内存研读PoolArena中的allocate方法。 1 PooledByteBuf allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) { 2 PooledByteBuf buf = newByteBuf(maxCapacity); 3 allocate(cache, buf, reqCapacity); 4 return buf; 5 } 6 7 private void allocate(PoolThreadCache cache, PooledByteBuf buf, final int reqCapacity) { 8 final int normCapacity = normalizeCapacity(reqCapacity); 9 if (isTinyOrSmall(normCapacity)) { // capacity < pageSize10 int tableIdx;11 PoolSubpage[] table;12 boolean tiny = isTiny(normCapacity);13 if (tiny) { // < 51214 15 // 如果申请内存小于512字节,则会在tingSubPagePools中进行分配16 if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {17 // was able to allocate out of the cache so move on18 return;19 }20 tableIdx = tinyIdx(normCapacity);21 table = tinySubpagePools;22 } else {23 // 如果大于512小于PageSize字节,则会在smallSubPagePools进行分配24 if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {25 // was able to allocate out of the cache so move on26 return;27 }28 tableIdx = smallIdx(normCapacity);29 table = smallSubpagePools;30 }31 32 final PoolSubpage head = table[tableIdx];33 34 /** 35 * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and 36 * {@link PoolChunk#free(long)} may modify the doubly linked list as well. 37 */38 synchronized (head) {39 final PoolSubpage s = head.next;40 if (s != head) {41 assert s.doNotDestroy && s.elemSize == normCapacity;42 long handle = s.allocate();43 assert handle >

= 0 allocateNormal 44 s.chunk.initBufWithSubpage (buf, handle, reqCapacity); 45 incTinySmallAllocation (tiny); 46 return;47} 48} 49 synchronized (this) {50 allocateNormal (buf, reqCapacity, normCapacity); 51} 52 53 incTinySmallAllocation (tiny) 54 return;55} 56 if (normCapacity

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report