前文传送门:bytebuf使用subpage级别内存分配
bytebuf回收
之前的章节我们提到过, 堆外内存是不受jvm垃圾回收机制控制的, 所以我们分配一块堆外内存进行bytebuf操作时, 使用完毕要对对象进行回收, 这一小节, 就以pooledunsafedirectbytebuf为例讲解有关内存分配的相关逻辑
pooledunsafedirectbytebuf中内存释放的入口方法是其父类abstractreferencecountedbytebuf中的release方法:
@override public boolean release() { return release0(1); }
这里调用了release0, 跟进去
private boolean release0(int decrement) { for (;;) { int refcnt = this.refcnt; if (refcnt < decrement) { throw new illegalreferencecountexception(refcnt, -decrement); } if (refcntupdater.compareandset(this, refcnt, refcnt - decrement)) { if (refcnt == decrement) { deallocate(); return true; } return false; } } }
if (refcnt == decrement) 中判断当前bytebuf是否没有被引用了, 如果没有被引用, 则通过deallocate()方法进行释放
因为我们是以pooledunsafedirectbytebuf为例, 所以这里会调用其父类pooledbytebuf的deallocate方法:
protected final void deallocate() { if (handle >= 0) { final long handle = this.handle; this.handle = -1; memory = null; chunk.arena.free(chunk, handle, maxlength, cache); recycle(); } }
this.handle = -1表示当前的bytebuf不再指向任何一块内存
memory = null这里将memory也设置为null
chunk.arena.free(chunk, handle, maxlength, cache)这一步是将bytebuf的内存进行释放
recycle()是将对象放入的对象回收站, 循环利用
我们首先分析free方法
void free(poolchunk<t> chunk, long handle, int normcapacity, poolthreadcache cache) { //是否为unpooled if (chunk.unpooled) { int size = chunk.chunksize(); destroychunk(chunk); activebyteshuge.add(-size); deallocationshuge.increment(); } else { //那种级别的size sizeclass sizeclass = sizeclass(normcapacity); //加到缓存里 if (cache != null && cache.add(this, chunk, handle, normcapacity, sizeclass)) { return; } //将缓存对象标记为未使用 freechunk(chunk, handle, sizeclass); } }
首先判断是不是unpooled, 我们这里是pooled, 所以会走到else块中:
sizeclass(normcapacity)计算是哪种级别的size, 我们按照tiny级别进行分析
cache.add(this, chunk, handle, normcapacity, sizeclass)是将当前当前bytebuf进行缓存
我们之前讲过, 再分配bytebuf时首先在缓存上分配, 而这步, 就是将其缓存的过程, 跟进去:
boolean add(poolarena<?> area, poolchunk chunk, long handle, int normcapacity, sizeclass sizeclass) { //拿到memoryregioncache节点 memoryregioncache<?> cache = cache(area, normcapacity, sizeclass); if (cache == null) { return false; } //将chunk, 和handle封装成实体加到queue里面 return cache.add(chunk, handle); }
首先根据根据类型拿到相关类型缓存节点, 这里会根据不同的内存规格去找不同的对象, 我们简单回顾一下, 每个缓存对象都包含一个queue, queue中每个节点是entry, 每一个entry中包含一个chunk和handle, 可以指向唯一的连续的内存
我们跟到cache中
private memoryregioncache<?> cache(poolarena<?> area, int normcapacity, sizeclass sizeclass) { switch (sizeclass) { case normal: return cachefornormal(area, normcapacity); case small: return cacheforsmall(area, normcapacity); case tiny: return cachefortiny(area, normcapacity); default: throw new error(); } }
假设我们是tiny类型, 这里就会走到cachefortiny(area, normcapacity)方法中, 跟进去:
private memoryregioncache<?> cachefortiny(poolarena<?> area, int normcapacity) { int idx = poolarena.tinyidx(normcapacity); if (area.isdirect()) { return cache(tinysubpagedirectcaches, idx); } return cache(tinysubpageheapcaches, idx); }
这个方法我们之前剖析过, 就是根据大小找到第几个缓存中的第几个缓存, 拿到下标之后, 通过cache去超相对应的缓存对象:
private static <t> memoryregioncache<t> cache(memoryregioncache<t>[] cache, int idx) { if (cache == null || idx > cache.length - 1) { return null; } return cache[idx]; }
我们这里看到, 是直接通过下标拿的缓存对象
回到add方法中
boolean add(poolarena<?> area, poolchunk chunk, long handle, int normcapacity, sizeclass sizeclass) { //拿到memoryregioncache节点 memoryregioncache<?> cache = cache(area, normcapacity, sizeclass); if (cache == null) { return false; } //将chunk, 和handle封装成实体加到queue里面 return cache.add(chunk, handle); }
这里的cache对象调用了一个add方法, 这个方法就是将chunk和handle封装成一个entry加到queue里面
我们跟到add方法中:
public final boolean add(poolchunk<t> chunk, long handle) { entry<t> entry = newentry(chunk, handle); boolean queued = queue.offer(entry); if (!queued) { entry.recycle(); } return queued; }
我们之前介绍过, 从在缓存中分配的时候从queue弹出一个entry, 会放到一个对象池里面, 而这里entry<t> entry = newentry(chunk, handle)就是从对象池里去取一个entry对象, 然后将chunk和handle进行赋值
然后通过queue.offer(entry)加到queue中
我们回到free方法中
void free(poolchunk<t> chunk, long handle, int normcapacity, poolthreadcache cache) { //是否为unpooled if (chunk.unpooled) { int size = chunk.chunksize(); destroychunk(chunk); activebyteshuge.add(-size); deallocationshuge.increment(); } else { //那种级别的size sizeclass sizeclass = sizeclass(normcapacity); //加到缓存里 if (cache != null && cache.add(this, chunk, handle, normcapacity, sizeclass)) { return; } freechunk(chunk, handle, sizeclass); } }
这里加到缓存之后, 如果成功, 就会return, 如果不成功, 就会调用freechunk(chunk, handle, sizeclass)方法, 这个方法的意义是, 将原先给bytebuf分配的内存区段标记为未使用
跟进freechunk简单分析下:
void freechunk(poolchunk<t> chunk, long handle, sizeclass sizeclass) { final boolean destroychunk; synchronized (this) { switch (sizeclass) { case normal: ++deallocationsnormal; break; case small: ++deallocationssmall; break; case tiny: ++deallocationstiny; break; default: throw new error(); } destroychunk = !chunk.parent.free(chunk, handle); } if (destroychunk) { destroychunk(chunk); } }
我们再跟到free方法中:
boolean free(poolchunk<t> chunk, long handle) { chunk.free(handle); if (chunk.usage() < minusage) { remove(chunk); return move0(chunk); } return true; }
chunk.free(handle)的意思是通过chunk释放一段连续的内存
再跟到free方法中:
void free(long handle) { int memorymapidx = memorymapidx(handle); int bitmapidx = bitmapidx(handle); if (bitmapidx != 0) { poolsubpage<t> subpage = subpages[subpageidx(memorymapidx)]; assert subpage != null && subpage.donotdestroy; poolsubpage<t> head = arena.findsubpagepoolhead(subpage.elemsize); synchronized (head) { if (subpage.free(head, bitmapidx & 0x3fffffff)) { return; } } } freebytes += runlength(memorymapidx); setvalue(memorymapidx, depth(memorymapidx)); updateparentsfree(memorymapidx); }
if (bitmapidx != 0)这 里判断是当前缓冲区分配的级别是page还是subpage, 如果是subpage, 则会找到相关的subpage将其位图标记为0
如果不是subpage, 这里通过分配内存的反向标记, 将该内存标记为未使用
这段逻辑可以读者自行分析, 如果之前分配相关的知识掌握扎实的话, 这里的逻辑也不是很难
回到pooledbytebuf的deallocate方法中:
protected final void deallocate() { if (handle >= 0) { final long handle = this.handle; this.handle = -1; memory = null; chunk.arena.free(chunk, handle, maxlength, cache); recycle(); } }
最后, 通过recycle()将释放的bytebuf放入对象回收站, 有关对象回收站的知识, 会在以后的章节进行剖析
以上就是内存回收的大概逻辑,更多关于netty分布式bytebuf使用回收的资料请关注其它相关文章!