Skip to content

Commit 35f6ad1

Browse files
authored
Prevent ManualEventLoop to block on a racy task submission (#15937)
Motivation: ManualEventLoop can be blocked forever if not aware of task submissions before getting asleep Modification: Expands the ManualIoEventLoop API to make it aware of additional non-blocking conditions and correctly enqueue an empty task while shutting down to prevent a racy sleep to happen. Additionally, fix the blocking deadline math computation taking in account negativeness and whilst no deadline is set: for the latter, change the default previous deadline value of EpollIoHandler into NONE, to save performing timed wait, if no deadline is specified Result: ManualIoEventLoop correctly make progress if there's work to perform and terminate, while shutdown Fixes #15922
1 parent 2b29b5e commit 35f6ad1

File tree

5 files changed

+336
-47
lines changed

5 files changed

+336
-47
lines changed

pom.xml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1361,6 +1361,28 @@
13611361
<new>interface org.jboss.marshalling.UnmarshallingObjectInputFilter</new>
13621362
<justification>We need to build with upgraded JBoss Marshalling version. Otherwise, testsuite-jpms won't compile.</justification>
13631363
</item>
1364+
<item>
1365+
<ignore>true</ignore>
1366+
<code>java.class.modifierChanged</code>
1367+
<old>final class io.netty.channel.ManualIoEventLoop</old>
1368+
<new>class io.netty.channel.ManualIoEventLoop</new>
1369+
<justification>ManualIoEventLoop was made non-final to allow subclassing.</justification>
1370+
</item>
1371+
<item>
1372+
<ignore>true</ignore>
1373+
<code>java.method.nowFinal</code>
1374+
<classQualifiedName>io.netty.channel.ManualIoEventLoop</classQualifiedName>
1375+
<justification>Several methods were intentionally made final; these API changes are acceptable for
1376+
this class.
1377+
</justification>
1378+
</item>
1379+
<item>
1380+
<ignore>true</ignore>
1381+
<code>java.method.finalMethodAddedToNonFinalClass</code>
1382+
<classQualifiedName>io.netty.channel.ManualIoEventLoop</classQualifiedName>
1383+
<justification>Final methods added to an inheritable class are intentional and safe in this context.
1384+
</justification>
1385+
</item>
13641386
</differences>
13651387
</revapi.differences>
13661388
</analysisConfiguration>

transport-classes-epoll/src/main/java/io/netty/channel/epoll/EpollIoHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public class EpollIoHandler implements IoHandler {
6161
}
6262

6363
// Pick a number that no task could have previously used.
64-
private long prevDeadlineNanos = nanoTime() - 1;
64+
private long prevDeadlineNanos = NONE;
6565
private FileDescriptor epollFd;
6666
private FileDescriptor eventFd;
6767
private FileDescriptor timerFd;
Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
/*
2+
* Copyright 2025 The Netty Project
3+
*
4+
* The Netty Project licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package io.netty.channel.epoll;
17+
18+
import io.netty.channel.IoEventLoop;
19+
import io.netty.channel.IoEventLoopGroup;
20+
import io.netty.channel.IoHandlerFactory;
21+
import io.netty.channel.ManualIoEventLoop;
22+
import io.netty.channel.MultiThreadIoEventLoopGroup;
23+
import io.netty.util.concurrent.Future;
24+
import org.junit.jupiter.api.RepeatedTest;
25+
import org.junit.jupiter.api.Test;
26+
import org.junit.jupiter.api.Timeout;
27+
28+
import java.util.Queue;
29+
import java.util.concurrent.ConcurrentLinkedQueue;
30+
import java.util.concurrent.CountDownLatch;
31+
import java.util.concurrent.CyclicBarrier;
32+
import java.util.concurrent.ExecutionException;
33+
import java.util.concurrent.Executor;
34+
import java.util.concurrent.RejectedExecutionException;
35+
import java.util.concurrent.TimeUnit;
36+
import java.util.concurrent.atomic.AtomicLong;
37+
import java.util.function.Consumer;
38+
39+
@Timeout(5)
40+
public class ManualEventLoopTest {
41+
42+
@Test
43+
void firstRacySubmissionMissWakeupEpoll() throws Exception {
44+
racySubmissionMissWakeup(EpollIoHandler.newFactory(), 1);
45+
}
46+
47+
@Test
48+
void secondRacySubmissionMissWakeupEpoll() throws Exception {
49+
racySubmissionMissWakeup(EpollIoHandler.newFactory(), 2);
50+
}
51+
52+
@Test
53+
void firstRacyOtherSubmissionMissWakeupEpoll() throws Exception {
54+
racyOtherSubmissionMissWakeup(EpollIoHandler.newFactory(), 1);
55+
}
56+
57+
@Test
58+
void secondRacyOtherSubmissionMissWakeupEpoll() throws Exception {
59+
racyOtherSubmissionMissWakeup(EpollIoHandler.newFactory(), 2);
60+
}
61+
62+
private void racySubmissionMissWakeup(IoHandlerFactory handlerFactory, long canBlockAttempt)
63+
throws Exception {
64+
CyclicBarrier waitBeforeSubmittingTask = new CyclicBarrier(2);
65+
CountDownLatch taskSubmitted = new CountDownLatch(1);
66+
AtomicLong canBlock = new AtomicLong(0);
67+
ManualMultithreadedIoEventLoopGroup group = new ManualMultithreadedIoEventLoopGroup(handlerFactory) {
68+
@Override
69+
protected void beforeCanBlock(Executor executor) {
70+
if (canBlock.incrementAndGet() == canBlockAttempt) {
71+
try {
72+
waitBeforeSubmittingTask.await();
73+
} catch (Throwable ignore) {
74+
//
75+
}
76+
try {
77+
taskSubmitted.await();
78+
} catch (InterruptedException e) {
79+
throw new RuntimeException(e);
80+
}
81+
}
82+
}
83+
};
84+
waitBeforeSubmittingTask.await();
85+
// depending on canBlockAttempt this submission could observe an AWAKE event loop
86+
// or with a setup NONE deadline (e.g. Long.MAX_VALUE).
87+
// In the latter case, it can be already asleep or ready to do it.
88+
Future<?> submitted = group.submit(() -> {
89+
});
90+
// unblock canBlock
91+
taskSubmitted.countDown();
92+
submitted.get();
93+
group.shutdownGracefully(0, 0, TimeUnit.SECONDS).get();
94+
}
95+
96+
private void racyOtherSubmissionMissWakeup(IoHandlerFactory handlerFactory, long canBlockAttempt)
97+
throws Exception {
98+
CyclicBarrier waitBeforeSubmittingTask = new CyclicBarrier(2);
99+
CountDownLatch taskSubmitted = new CountDownLatch(1);
100+
AtomicLong canBlock = new AtomicLong(0);
101+
ManualMultithreadedIoEventLoopGroup group = new ManualMultithreadedIoEventLoopGroup(handlerFactory) {
102+
@Override
103+
protected void beforeCanBlock(Executor executor) {
104+
// this should be called when canBlock is called after setting the wakeup flag!
105+
if (canBlock.incrementAndGet() == canBlockAttempt) {
106+
try {
107+
waitBeforeSubmittingTask.await();
108+
} catch (Throwable ignore) {
109+
//
110+
}
111+
try {
112+
taskSubmitted.await();
113+
} catch (InterruptedException e) {
114+
throw new RuntimeException(e);
115+
}
116+
}
117+
}
118+
};
119+
waitBeforeSubmittingTask.await();
120+
CountDownLatch completed = new CountDownLatch(1);
121+
// depending on canBlockAttempt this submission could observe an AWAKE event loop
122+
// or with a setup NONE deadline (e.g. Long.MAX_VALUE).
123+
// In the latter case, it can be already asleep or ready to do it.
124+
group.ioEventLoopRunner.execute(completed::countDown);
125+
taskSubmitted.countDown();
126+
completed.await();
127+
group.shutdownGracefully(0, 0, TimeUnit.SECONDS).get();
128+
}
129+
130+
@RepeatedTest(value = 100, failureThreshold = 1)
131+
void testTightShutodownEpoll() throws InterruptedException, ExecutionException {
132+
ManualMultithreadedIoEventLoopGroup group = new ManualMultithreadedIoEventLoopGroup(
133+
EpollIoHandler.newFactory());
134+
group.shutdownGracefully(0, 0, TimeUnit.SECONDS).get();
135+
}
136+
137+
public static class ManualMultithreadedIoEventLoopGroup extends MultiThreadIoEventLoopGroup {
138+
139+
private ManualIoEventLoopRunner ioEventLoopRunner;
140+
private Consumer<ManualIoEventLoopRunner> beforeCanBlock;
141+
142+
public ManualMultithreadedIoEventLoopGroup(IoHandlerFactory ioHandlerFactory) {
143+
super(1, ioHandlerFactory);
144+
}
145+
146+
@Override
147+
protected IoEventLoop newChild(Executor executor, IoHandlerFactory ioHandlerFactory, Object... args) {
148+
this.ioEventLoopRunner = new ManualIoEventLoopRunner(this, ioHandlerFactory,
149+
executor, this::beforeCanBlock);
150+
return ioEventLoopRunner.ioEventLoop;
151+
}
152+
153+
protected void beforeCanBlock(Executor executor) {
154+
}
155+
156+
private static class ManualIoEventLoopRunner implements Executor {
157+
158+
private final ManualIoEventLoop ioEventLoop;
159+
private final Queue<Runnable> otherTasks = new ConcurrentLinkedQueue<>();
160+
161+
ManualIoEventLoopRunner(IoEventLoopGroup parent, IoHandlerFactory factory,
162+
Executor executor, Consumer<Executor> beforeCanBlock) {
163+
this.ioEventLoop = new ManualIoEventLoop(parent, null, factory) {
164+
@Override
165+
protected boolean canBlock() {
166+
if (beforeCanBlock != null) {
167+
beforeCanBlock.accept(ManualIoEventLoopRunner.this);
168+
}
169+
return otherTasks.isEmpty();
170+
}
171+
};
172+
CountDownLatch started = new CountDownLatch(1);
173+
executor.execute(() -> {
174+
ioEventLoop.setOwningThread(Thread.currentThread());
175+
// it would force a first init
176+
ioEventLoop.runNow();
177+
started.countDown();
178+
mainLoop();
179+
});
180+
try {
181+
started.await();
182+
} catch (InterruptedException e) {
183+
Thread.currentThread().interrupt();
184+
throw new RuntimeException(e);
185+
}
186+
}
187+
188+
private void mainLoop() {
189+
while (!ioEventLoop.isShuttingDown()) {
190+
ioEventLoop.run(0);
191+
Runnable otherTask = otherTasks.poll();
192+
if (otherTask != null) {
193+
safeExecute(otherTask);
194+
}
195+
}
196+
while (!ioEventLoop.isTerminated() || !otherTasks.isEmpty()) {
197+
ioEventLoop.runNow();
198+
Runnable otherTask = otherTasks.poll();
199+
if (otherTask != null) {
200+
safeExecute(otherTask);
201+
}
202+
}
203+
}
204+
205+
private void safeExecute(Runnable task) {
206+
try {
207+
task.run();
208+
} catch (Throwable ignore) {
209+
//
210+
}
211+
}
212+
213+
public void execute(Runnable otherTask) {
214+
otherTasks.add(otherTask);
215+
if (ioEventLoop.isShutdown()) {
216+
if (otherTasks.remove(otherTask)) {
217+
throw new RejectedExecutionException("Event loop shut down");
218+
}
219+
}
220+
ioEventLoop.wakeup();
221+
}
222+
}
223+
}
224+
}

0 commit comments

Comments
 (0)