Skip to content

Commit afafadd

Browse files
committed
[#5505] Enforce Recycler limit when recycling from different threads
Motivation: Currently, the recycler max capacity it's only enforced on the thread-local stack which is used when the recycling happens on the same thread that requested the object. When the recycling happens in a different thread, then the objects will be queued into a linked list (where each node holds N objects, default=16). These objects are then transfered into the stack when new objects are requested and the stack is empty. The problem is that the queue doesn't have a max capacity and that can lead to bad scenarios. Eg: - Allocate 1M object from recycler - Recycle all of them from different thread - Recycler WeakOrderQueue will contain 1M objects - Reference graph will be very long to traverse and GC timeseems to be negatively impacted - Size of the queue will never shrink after this Modifications: Add some shared counter which is used to manage capacity limits when recycle from different thread then the allocation thread. We modify the counter whenever we allocate a new Link to reduce the overhead of increment / decrement it. Result: More predictable number of objects mantained in the recycler pool.
1 parent 771cfae commit afafadd

2 files changed

Lines changed: 114 additions & 10 deletions

File tree

common/src/main/java/io/netty/util/Recycler.java

Lines changed: 66 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@
2828
import java.util.WeakHashMap;
2929
import java.util.concurrent.atomic.AtomicInteger;
3030

31+
import static java.lang.Math.max;
32+
import static java.lang.Math.min;
33+
3134
/**
3235
* Light-weight object pool based on a thread-local stack.
3336
*
@@ -48,8 +51,10 @@ public void recycle(Object object) {
4851
private static final int OWN_THREAD_ID = ID_GENERATOR.getAndIncrement();
4952
// TODO: Some arbitrary large number - should adjust as we get more production experience.
5053
private static final int DEFAULT_INITIAL_MAX_CAPACITY = 262144;
54+
5155
private static final int DEFAULT_MAX_CAPACITY;
5256
private static final int INITIAL_CAPACITY;
57+
private static final int MAX_SHARED_CAPACITY_FACTOR;
5358
private static final int LINK_CAPACITY;
5459

5560
static {
@@ -60,30 +65,37 @@ public void recycle(Object object) {
6065
if (maxCapacity < 0) {
6166
maxCapacity = DEFAULT_INITIAL_MAX_CAPACITY;
6267
}
63-
6468
DEFAULT_MAX_CAPACITY = maxCapacity;
6569

70+
MAX_SHARED_CAPACITY_FACTOR = max(2,
71+
SystemPropertyUtil.getInt("io.netty.recycler.maxSharedCapacityFactor",
72+
2));
73+
6674
LINK_CAPACITY = MathUtil.findNextPositivePowerOfTwo(
67-
Math.max(SystemPropertyUtil.getInt("io.netty.recycler.linkCapacity", 16), 16));
75+
max(SystemPropertyUtil.getInt("io.netty.recycler.linkCapacity", 16), 16));
6876

6977
if (logger.isDebugEnabled()) {
7078
if (DEFAULT_MAX_CAPACITY == 0) {
7179
logger.debug("-Dio.netty.recycler.maxCapacity: disabled");
80+
logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: disabled");
7281
logger.debug("-Dio.netty.recycler.linkCapacity: disabled");
7382
} else {
7483
logger.debug("-Dio.netty.recycler.maxCapacity: {}", DEFAULT_MAX_CAPACITY);
84+
logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: {}", MAX_SHARED_CAPACITY_FACTOR);
7585
logger.debug("-Dio.netty.recycler.linkCapacity: {}", LINK_CAPACITY);
7686
}
7787
}
7888

79-
INITIAL_CAPACITY = Math.min(DEFAULT_MAX_CAPACITY, 256);
89+
INITIAL_CAPACITY = min(DEFAULT_MAX_CAPACITY, 256);
8090
}
8191

8292
private final int maxCapacity;
93+
private final int maxSharedCapacityFactor;
94+
8395
private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
8496
@Override
8597
protected Stack<T> initialValue() {
86-
return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacity);
98+
return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacity, maxSharedCapacityFactor);
8799
}
88100
};
89101

@@ -92,7 +104,17 @@ protected Recycler() {
92104
}
93105

94106
protected Recycler(int maxCapacity) {
95-
this.maxCapacity = Math.max(0, maxCapacity);
107+
this(maxCapacity, MAX_SHARED_CAPACITY_FACTOR);
108+
}
109+
110+
protected Recycler(int maxCapacity, int maxSharedCapacityFactor) {
111+
if (maxCapacity <= 0) {
112+
this.maxCapacity = 0;
113+
this.maxSharedCapacityFactor = 1;
114+
} else {
115+
this.maxCapacity = maxCapacity;
116+
this.maxSharedCapacityFactor = max(1, maxSharedCapacityFactor);
117+
}
96118
}
97119

98120
@SuppressWarnings("unchecked")
@@ -201,6 +223,7 @@ private static final class Link extends AtomicInteger {
201223
private WeakOrderQueue next;
202224
private final WeakReference<Thread> owner;
203225
private final int id = ID_GENERATOR.getAndIncrement();
226+
private final Stack<?> stack;
204227

205228
WeakOrderQueue(Stack<?> stack, Thread thread) {
206229
head = tail = new Link();
@@ -209,6 +232,10 @@ private static final class Link extends AtomicInteger {
209232
next = stack.head;
210233
stack.head = this;
211234
}
235+
this.stack = stack;
236+
// We allocated a Link so reserve the space
237+
boolean reserved = stack.reserveSpace(LINK_CAPACITY);
238+
assert reserved;
212239
}
213240

214241
void add(DefaultHandle<?> handle) {
@@ -217,7 +244,13 @@ void add(DefaultHandle<?> handle) {
217244
Link tail = this.tail;
218245
int writeIndex;
219246
if ((writeIndex = tail.get()) == LINK_CAPACITY) {
247+
if (!stack.reserveSpace(LINK_CAPACITY)) {
248+
// Drop it.
249+
return;
250+
}
251+
// We allocate a Link so reserve the space
220252
this.tail = tail = tail.next = new Link();
253+
221254
writeIndex = tail.get();
222255
}
223256
tail.elements[writeIndex] = handle;
@@ -259,7 +292,7 @@ boolean transfer(Stack<?> dst) {
259292

260293
if (expectedCapacity > dst.elements.length) {
261294
final int actualCapacity = dst.increaseCapacity(expectedCapacity);
262-
srcEnd = Math.min(srcStart + actualCapacity - dstSize, srcEnd);
295+
srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
263296
}
264297

265298
if (srcStart != srcEnd) {
@@ -280,6 +313,9 @@ boolean transfer(Stack<?> dst) {
280313
dst.size = newDstSize;
281314

282315
if (srcEnd == LINK_CAPACITY && head.next != null) {
316+
// Add capacity back as the Link is GCed.
317+
stack.reclaimSpace(LINK_CAPACITY);
318+
283319
this.head = head.next;
284320
}
285321

@@ -303,15 +339,35 @@ static final class Stack<T> {
303339
private DefaultHandle<?>[] elements;
304340
private final int maxCapacity;
305341
private int size;
342+
private final AtomicInteger availableSharedCapacity;
306343

307344
private volatile WeakOrderQueue head;
308345
private WeakOrderQueue cursor, prev;
309346

310-
Stack(Recycler<T> parent, Thread thread, int maxCapacity) {
347+
Stack(Recycler<T> parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor) {
311348
this.parent = parent;
312349
this.thread = thread;
313350
this.maxCapacity = maxCapacity;
314-
elements = new DefaultHandle[Math.min(INITIAL_CAPACITY, maxCapacity)];
351+
availableSharedCapacity = new AtomicInteger(max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY));
352+
elements = new DefaultHandle[min(INITIAL_CAPACITY, maxCapacity)];
353+
}
354+
355+
boolean reserveSpace(int space) {
356+
assert space >= 0;
357+
for (;;) {
358+
int available = availableSharedCapacity.get();
359+
if (available < space) {
360+
return false;
361+
}
362+
if (availableSharedCapacity.compareAndSet(available, available - space)) {
363+
return true;
364+
}
365+
}
366+
}
367+
368+
void reclaimSpace(int space) {
369+
assert space >= 0;
370+
availableSharedCapacity.addAndGet(space);
315371
}
316372

317373
int increaseCapacity(int expectedCapacity) {
@@ -321,7 +377,7 @@ int increaseCapacity(int expectedCapacity) {
321377
newCapacity <<= 1;
322378
} while (newCapacity < expectedCapacity && newCapacity < maxCapacity);
323379

324-
newCapacity = Math.min(newCapacity, maxCapacity);
380+
newCapacity = min(newCapacity, maxCapacity);
325381
if (newCapacity != elements.length) {
326382
elements = Arrays.copyOf(elements, newCapacity);
327383
}
@@ -421,7 +477,7 @@ void push(DefaultHandle<?> item) {
421477
return;
422478
}
423479
if (size == elements.length) {
424-
elements = Arrays.copyOf(elements, Math.min(size << 1, maxCapacity));
480+
elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
425481
}
426482

427483
elements[size] = item;

common/src/test/java/io/netty/util/RecyclerTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.junit.Test;
1919

2020
import java.util.Random;
21+
import java.util.concurrent.atomic.AtomicInteger;
2122

2223
import static org.hamcrest.CoreMatchers.*;
2324
import static org.junit.Assert.*;
@@ -199,6 +200,53 @@ public void run() {
199200
assertThat(recycler.threadLocalSize(), is(0));
200201
}
201202

203+
@Test
204+
public void testDiscardingExceedingElementsWithRecycleAtDifferentThread() throws Exception {
205+
final int maxCapacity = 32;
206+
final AtomicInteger instancesCount = new AtomicInteger(0);
207+
208+
final Recycler<HandledObject> recycler = new Recycler<HandledObject>(maxCapacity, 2) {
209+
@Override
210+
protected HandledObject newObject(Recycler.Handle<HandledObject> handle) {
211+
instancesCount.incrementAndGet();
212+
return new HandledObject(handle);
213+
}
214+
};
215+
216+
// Borrow 2 * maxCapacity objects.
217+
final HandledObject[] array = new HandledObject[maxCapacity * 2];
218+
for (int i = 0; i < array.length; i++) {
219+
array[i] = recycler.get();
220+
}
221+
222+
assertEquals(array.length, instancesCount.get());
223+
// Reset counter.
224+
instancesCount.set(0);
225+
226+
// Recycle from other thread.
227+
final Thread thread = new Thread() {
228+
@Override
229+
public void run() {
230+
for (HandledObject object: array) {
231+
object.recycle();
232+
}
233+
}
234+
};
235+
thread.start();
236+
thread.join();
237+
238+
assertEquals(0, instancesCount.get());
239+
240+
// Borrow 2 * maxCapacity objects. Half of them should come from
241+
// the recycler queue, the other half should be freshly allocated.
242+
for (int i = 0; i < array.length; i++) {
243+
recycler.get();
244+
}
245+
246+
// The implementation uses maxCapacity / 2 as limit per WeakOrderQueue
247+
assertEquals(array.length - maxCapacity / 2, instancesCount.get());
248+
}
249+
202250
static final class HandledObject {
203251
Recycler.Handle<HandledObject> handle;
204252

0 commit comments

Comments
 (0)