Skip to content

DelayedClientTransport can trigger RejectedExecutionException #6283

@ejona86

Description

@ejona86

We've seen the following exception (b/142475326):

java.util.concurrent.RejectedExecutionException
        at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2086)
        at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:848)
        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1394)
        at io.grpc.internal.SerializingExecutor.schedule(SerializingExecutor.java:93)
        at io.grpc.internal.SerializingExecutor.execute(SerializingExecutor.java:13)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.closedInternal(ClientCallImpl.java:696)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.closed(ClientCallImpl.java:646)
        at io.grpc.internal.DelayedStream$DelayedStreamListener$5.run(DelayedStream.java:475)
        at io.grpc.internal.DelayedStream$DelayedStreamListener.drainPendingCallbacks(DelayedStream.java:500)
        at io.grpc.internal.DelayedStream.drainPendingCalls(DelayedStream.java:168)
        at io.grpc.internal.DelayedStream.setStream(DelayedStream.java:132)
        at io.grpc.internal.DelayedClientTransport$PendingStream.createRealStream(DelayedClientTransport.java:358)
        at io.grpc.internal.DelayedClientTransport$5.run(DelayedClientTransport.java:4)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
        at java.lang.Thread.run(Thread.java:919)

The application was using our default executor, which means we scheduled work after the channel terminated or the reference counting is broken.

When the channel terminates it guarantees that it is done scheduling work on the executor, but not that all the scheduled work is complete. The problem here appears that some of that work is scheduling more work on the same executor.

Specifically, this runnable can produce more runnables:

executor.execute(new Runnable() {
@Override
public void run() {
stream.createRealStream(transport);
}
});

But while that runnable is running, 'stream' is likely to have already been removed from 'pendingStreams'. So if the channel is shut down at that point the transport will terminate abruptly and no other transport will "own" the call yet, so the entire channel can terminate before the runnable completes.

We could either split start() draining to a separate method in DelayedStream so that newStream()+start() can be called directly within reprocess(), or we could keep a counter for the number of streams still draining and delay transport termination until it reaches zero.

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions