Skip to content

Commit 8ef7001

Browse files
committed
[grid] Making EventBus more stable by ignoring events
Before, the EventBus was pulling and processing all events, no matter if the component was meant to listen to them or not. This caused issues when some messages were sent and not followed the expected structure. With this change, the component tells the bus to only process events it listens to. Helps with #10485
1 parent 7bbf24b commit 8ef7001

3 files changed

Lines changed: 33 additions & 27 deletions

File tree

java/src/org/openqa/selenium/events/zeromq/UnboundZmqEventBus.java

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,8 @@
1717

1818
package org.openqa.selenium.events.zeromq;
1919

20-
import static java.nio.charset.StandardCharsets.UTF_8;
21-
2220
import com.google.common.collect.EvictingQueue;
2321

24-
import dev.failsafe.Failsafe;
25-
import dev.failsafe.RetryPolicy;
26-
2722
import org.openqa.selenium.events.Event;
2823
import org.openqa.selenium.events.EventBus;
2924
import org.openqa.selenium.events.EventListener;
@@ -37,6 +32,9 @@
3732
import org.zeromq.ZContext;
3833
import org.zeromq.ZMQ;
3934

35+
import dev.failsafe.Failsafe;
36+
import dev.failsafe.RetryPolicy;
37+
4038
import java.net.Inet6Address;
4139
import java.net.InetAddress;
4240
import java.net.URI;
@@ -58,6 +56,8 @@
5856
import java.util.logging.Level;
5957
import java.util.logging.Logger;
6058

59+
import static java.nio.charset.StandardCharsets.UTF_8;
60+
6161
class UnboundZmqEventBus implements EventBus {
6262

6363
static final EventName REJECTED_EVENT = new EventName("selenium-rejected-event");
@@ -229,36 +229,41 @@ public void run() {
229229
ZMQ.Socket socket = poller.getSocket(i);
230230

231231
EventName eventName = new EventName(new String(socket.recv(), UTF_8));
232+
// Processing only events we are listening to
233+
if (!listeners.containsKey(eventName)) {
234+
// LOG.log(Level.SEVERE, "Ignoring {0}", eventName);
235+
continue;
236+
}
232237

233238
Secret eventSecret;
234239
String receivedEventSecret = new String(socket.recv(), UTF_8);
235240
try {
236241
eventSecret = JSON.toType(receivedEventSecret, Secret.class);
237-
} catch (JsonException e) {
242+
} catch (JsonException ignore) {
238243
rejectEvent(
239244
eventName,
240245
receivedEventSecret,
241-
"Could not parse event secret, rejecting event. " + e.getMessage());
242-
return;
246+
"Could not parse event secret, rejecting event.");
247+
continue;
243248
}
244249

245250
UUID id;
246251
String eventId = new String(socket.recv(), UTF_8);
247252
try {
248253
id = UUID.fromString(eventId);
249-
} catch (IllegalArgumentException e) {
254+
} catch (IllegalArgumentException ignore) {
250255
rejectEvent(
251256
eventName,
252-
receivedEventSecret,
253-
"Could not parse event id, rejecting event. " + e.getMessage());
254-
return;
257+
eventId,
258+
"Could not parse event id, rejecting event.");
259+
continue;
255260
}
256261

257262
String data = new String(socket.recv(), UTF_8);
258263

259264
// Don't bother doing more work if we've seen this message.
260265
if (recentMessages.contains(id)) {
261-
return;
266+
continue;
262267
}
263268

264269
Object converted = JSON.toType(data, Object.class);
@@ -268,7 +273,7 @@ public void run() {
268273

269274
if (!Secret.matches(secret, eventSecret)) {
270275
rejectEvent(eventName, data, "Rejecting message without a valid secret");
271-
return;
276+
continue;
272277
}
273278

274279
notifyListeners(eventName, event);

java/src/org/openqa/selenium/grid/node/local/LocalNode.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ private LocalNode(
214214
r -> {
215215
Thread thread = new Thread(r);
216216
thread.setDaemon(true);
217-
thread.setName("TempFile Cleanup Node " + externalUri);
217+
thread.setName("HeartBeat Node " + externalUri);
218218
return thread;
219219
});
220220
heartbeatNodeService.scheduleAtFixedRate(

java/src/org/openqa/selenium/netty/server/RequestConverter.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,6 @@
1717

1818
package org.openqa.selenium.netty.server;
1919

20-
import static io.netty.handler.codec.http.HttpMethod.DELETE;
21-
import static io.netty.handler.codec.http.HttpMethod.GET;
22-
import static io.netty.handler.codec.http.HttpMethod.HEAD;
23-
import static io.netty.handler.codec.http.HttpMethod.OPTIONS;
24-
import static io.netty.handler.codec.http.HttpMethod.POST;
25-
import static org.openqa.selenium.remote.http.Contents.memoize;
26-
2720
import com.google.common.io.ByteStreams;
2821

2922
import org.openqa.selenium.internal.Debug;
@@ -57,19 +50,26 @@
5750
import java.util.concurrent.Executors;
5851
import java.util.logging.Logger;
5952

53+
import static io.netty.handler.codec.http.HttpMethod.DELETE;
54+
import static io.netty.handler.codec.http.HttpMethod.GET;
55+
import static io.netty.handler.codec.http.HttpMethod.HEAD;
56+
import static io.netty.handler.codec.http.HttpMethod.OPTIONS;
57+
import static io.netty.handler.codec.http.HttpMethod.POST;
58+
import static org.openqa.selenium.remote.http.Contents.memoize;
59+
6060
class RequestConverter extends SimpleChannelInboundHandler<HttpObject> {
6161

6262
private static final Logger LOG = Logger.getLogger(RequestConverter.class.getName());
6363
private static final ExecutorService EXECUTOR = Executors.newSingleThreadExecutor();
64-
private volatile PipedOutputStream out;
6564
private static final List<io.netty.handler.codec.http.HttpMethod> supportedMethods =
6665
Arrays.asList(DELETE, GET, POST, OPTIONS);
66+
private volatile PipedOutputStream out;
6767

6868
@Override
6969
protected void channelRead0(
7070
ChannelHandlerContext ctx,
7171
HttpObject msg) throws Exception {
72-
LOG.log(Debug.getDebugLogLevel(), "Incoming message: " + msg);
72+
LOG.fine("Incoming message: " + msg);
7373

7474
if (msg instanceof io.netty.handler.codec.http.HttpRequest) {
7575
LOG.log(Debug.getDebugLogLevel(), "Start of http request: " + msg);
@@ -121,7 +121,7 @@ protected void channelRead0(
121121
}
122122

123123
if (msg instanceof LastHttpContent) {
124-
LOG.log(Debug.getDebugLogLevel(), "Closing input pipe.");
124+
LOG.fine("Closing input pipe.");
125125
EXECUTOR.submit(() -> {
126126
try {
127127
out.close();
@@ -169,10 +169,11 @@ private HttpRequest createRequest(
169169
.filter(entry -> entry.getKey() != null)
170170
.forEach(entry -> req.addHeader(entry.getKey(), entry.getValue()));
171171
return req;
172-
} catch (Exception e) {
172+
} catch (Exception ignore) {
173173
ctx.writeAndFlush(
174174
new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
175-
LOG.log(Debug.getDebugLogLevel(), "Not possible to decode parameters.", e);
175+
LOG.log(Debug.getDebugLogLevel(), "Not possible to decode parameters. {0}",
176+
nettyRequest.uri());
176177
return null;
177178
}
178179
}

0 commit comments

Comments
 (0)