-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Closed
Labels
type/bugA general bugA general bug
Milestone
Description
It seems that Sinks.many().multicast() keeps subscribers even after cancelling subscription if subscription.dispose() executes concurrently.
We had this memory leak issue #3001 so we upgraded to 3.4.17 and it went much better. But there is still a memory leak in another emitter, SinkManySerialized. Here is a test which reproduces the issue:
@ParameterizedTest
@ValueSource(ints = {1,2,3,4,5})
public void testConcurrentUnsubscribe(int threadCount) {
int subsCount = 10;
Many<Object> emitter = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
List<Disposable> subscriptions = IntStream.range(0, subsCount)
.mapToObj(i -> emitter.asFlux().subscribe())
.toList();
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
List<? extends Future<?>> disposeFutures = subscriptions.stream()
.map(subscription -> executor.submit(subscription::dispose))
.toList();
List<?> disposals = disposeFutures.stream().map(this::getQuietly).toList();
Assertions.assertEquals(subsCount, disposals.size());
Assertions.assertEquals(0, emitter.currentSubscriberCount());
}
private <T> T getQuietly(Future<T> it) {
try {
return it.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}The test passes with a single thread executor and it is more likely to fail with 2-5 threads. But it is expected that there is no subscribers even if subscription.dispose() executes concurrently.
Workaround
As a temporary solution we had to do all the subscription.dispose() calls in a single thread executor.
Environment
- Reactor version: 3.4.17
- Spring-webflux, netty
- JVM corretto 17
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
type/bugA general bugA general bug