expandDeep uses unbounded queue internally which results in OutOfMemoryException when subscriber is not fast enough to keep up with publisher
Expected Behavior
not sure
Actual Behavior
OutOfMemoryException thrown
Steps to Reproduce
@Test
void reproCase() {
val genBytes = { Array(1024 * 1024 * 100) { 0 } }
val i = AtomicInteger(0)
Flux. just(genBytes()).expandDeep({
if (i.incrementAndGet() > 100)
Flux.empty()
else
Flux.just(genBytes())
}, 1)
.delayElements(Duration.ofMinutes(5))
.subscribe {
println("byte array processed")
}
}
Possible Solution
add variety of expand and expandDeep methods with bounded queue in order to limit memory usage
Your Environment
- Reactor version(s) used: 3.4.27
- Other relevant libraries versions (eg.
netty, ...):
- JVM version (
java -version): 11
- OS and version (eg
uname -a): Windows 10
expandDeep uses unbounded queue internally which results in OutOfMemoryException when subscriber is not fast enough to keep up with publisher
Expected Behavior
not sure
Actual Behavior
OutOfMemoryException thrown
Steps to Reproduce
@Test void reproCase() { val genBytes = { Array(1024 * 1024 * 100) { 0 } } val i = AtomicInteger(0) Flux. just(genBytes()).expandDeep({ if (i.incrementAndGet() > 100) Flux.empty() else Flux.just(genBytes()) }, 1) .delayElements(Duration.ofMinutes(5)) .subscribe { println("byte array processed") } }Possible Solution
add variety of expand and expandDeep methods with bounded queue in order to limit memory usage
Your Environment
netty, ...):java -version): 11uname -a): Windows 10