check pending stream completion at delayed transport lifecycle#7720
check pending stream completion at delayed transport lifecycle#7720YifeiZhuang merged 11 commits intogrpc:masterfrom
Conversation
| private Collection<PendingStream> pendingStreams = new LinkedHashSet<>(); | ||
| @GuardedBy("lock") | ||
| private Collection<PendingStream> toCheckCompletionStreams = new LinkedHashSet<>(); | ||
| private Runnable pollForStreamTransferCompletion = new Runnable() { |
| } | ||
|
|
||
| protected boolean isStreamTransferCompleted() { | ||
| return realStreamStarted.getCount() == 0; |
There was a problem hiding this comment.
Note that getCount() is typically used for debugging and testing purposes. https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html#getCount()
Better avoid it in the main source if possible.
| } | ||
| pendingStreams.removeAll(toRemove); | ||
| pendingStreams.removeAll(newlyCreated); | ||
| toCheckCompletionStreams.addAll(newlyCreated); |
There was a problem hiding this comment.
I would prefer making toCheckCompletionStreams be either 'non-final and immutable' or 'final and mutable', but not 'non-final' and 'mutable'.
| @VisibleForTesting | ||
| final int getUncommittedStreamsCount() { | ||
| synchronized (lock) { | ||
| return toCheckCompletionStreams.size(); |
There was a problem hiding this comment.
What about using the same name for toCheckCompletionStreams and uncommittedStreams?
| public void run() { | ||
| ArrayList<PendingStream> savedToCheckCompletionStreams; | ||
| synchronized (lock) { | ||
| savedToCheckCompletionStreams = new ArrayList<>(toCheckCompletionStreams); |
There was a problem hiding this comment.
You don't need copy the list, just saving the reference should be sufficient.
| lastPicker = picker; | ||
| lastPickerVersion++; | ||
| if (picker == null || !hasPendingStreams()) { | ||
| if ((picker == null || !hasPendingStreams()) && !hasUncommittedStreams()) { |
There was a problem hiding this comment.
I think changing this line is not necessary. Regardless of hasUncommittedStreams, if return then stream.createRealStream() or drain() will not be called and will not cause trouble.
There was a problem hiding this comment.
This line is because it needs to take care another case: when shutdown() is called but there are still uncommittedStreams, the shutdown path would return, then it will never shutdown. It relied on reprocess() to trigger the termination. (Similar to cancel() takes care of the last item and then trigger termination callback). This is different in shudownNow() which would take care of waiting uncommitted streams and then finalize termination.
There was a problem hiding this comment.
hen shutdown() is called but there are still uncommittedStreams, the shutdown path would return, then it will never shutdown. It relied on reprocess() to trigger the termination.
uncommittedStreams can be considered as existing RPCs, so shutdown() should not terminate them. uncommittedStreams will complete transfer by themselves, and they don't rely on a second reprocess().
There was a problem hiding this comment.
shutdown() never terminates then existing RPCs. Do you mean we await() during shutdown()?
It seems it's hard to avoid that. Say, if there are both pendingStreams and uncommittedStreams when shutdown is called, so shutdown has to return. Then during next reprocess(), the newly created stream has not been drained , which would cause pendingStreams empty but uncommittedstream still has items in it, and we need a way to drain it. It looks reprocess() is that place, it would trigger in the next call or after idle timer.
There was a problem hiding this comment.
I think we should await() during shutdown(), because there is no "next" reprocess() after shutdown, so we can not rely on reprocess() for existing uncommittedStreams.
Another way is introducing an abstract method DelayedStream.onTransferComplete(), and implementing DelayedClientTransport.PendingStream.onTransferComplete() to managed decrement of `uncommittedStreams.
There was a problem hiding this comment.
Thanks. Indeed it appears that idle timeout is permanently cancelled immediately after shutdown is called so there is no next reprocess(). It looks like just awaitTransferCompletion() during shutdown is not enough, moreover, I believe generally we are not supposed to await() during shutdown.
| * Provides the place to define actions at the point when transfer is done. | ||
| * Call this method to trigger those transfer completion activities. No-op by default. | ||
| */ | ||
| public void onTransferComplete() { |
| } | ||
| } | ||
| boolean justRemovedAnElement = pendingStreams.remove(this); | ||
| if (!hasPendingStreams() && justRemovedAnElement && reportTransportTerminated != null) { |
There was a problem hiding this comment.
No need to check reportTransportTerminated != null after the change. Previously if reportTransportTerminated == null, pendingStream is guaranteed reset to empty and justRemovedAnElement will never happen.
There was a problem hiding this comment.
I was thinking that in time order: 1. super.cancel() 2.onTransferComplete(), reportTransportTerminated=null 3. this lock block might reportNotInUse, which won't happen previously.
There was a problem hiding this comment.
I still think the check for reportTransportTerminated != null is unnecessary and the (!hasPendingStreams() && justRemovedAnElement) is the canonical invariant for reportTransportNotInUse. But ether way seems working anyway.
…#7720) add onTransferComplete() at delayedStream and wait for all pending streams to complete transfer when shutting down delayedClientTransport
To fix #6283
Problem
The issue was that delayedClientTransport
reprocess()schedules workcreateRealStream()but does not track whether it is committed, i.e. the real stream done set to the delayed stream which completes the delegation. Because it does not track the completion, shutdown prematurely goes through and then executor was destroyed.The inflight work that has been scheduled happen to use the same executor, causing
RejectedExecutionExceptionReproduce
The problem can be reproduced by putting a pause at
createRealStream(), and then run a client call.grpc-java/core/src/main/java/io/grpc/internal/DelayedStream.java
Line 140 in ddaf1c8