-
Notifications
You must be signed in to change notification settings - Fork 69
fix: ensure that channel pool ref count never goes negative (take2) #2065
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -68,7 +68,7 @@ | |
| * <p>Package-private for internal use. | ||
| */ | ||
| class ChannelPool extends ManagedChannel { | ||
| private static final Logger LOG = Logger.getLogger(ChannelPool.class.getName()); | ||
| @VisibleForTesting static final Logger LOG = Logger.getLogger(ChannelPool.class.getName()); | ||
| private static final Duration REFRESH_PERIOD = Duration.ofMinutes(50); | ||
|
|
||
| private final ChannelPoolSettings settings; | ||
|
|
@@ -381,14 +381,14 @@ void refresh() { | |
| * Get and retain a Channel Entry. The returned Entry will have its rpc count incremented, | ||
| * preventing it from getting recycled. | ||
| */ | ||
| Entry getRetainedEntry(int affinity) { | ||
| RetainedEntry getRetainedEntry(int affinity) { | ||
| // The maximum number of concurrent calls to this method for any given time span is at most 2, | ||
| // so the loop can actually be 2 times. But going for 5 times for a safety margin for potential | ||
| // code evolving | ||
| for (int i = 0; i < 5; i++) { | ||
| Entry entry = getEntry(affinity); | ||
| if (entry.retain()) { | ||
| return entry; | ||
| return new RetainedEntry(entry); | ||
| } | ||
| } | ||
| // It is unlikely to reach here unless the pool code evolves to increase the maximum possible | ||
|
|
@@ -415,10 +415,37 @@ private Entry getEntry(int affinity) { | |
| return localEntries.get(index); | ||
| } | ||
|
|
||
| /** | ||
| * This represents the reserved refcount of a single RPC using a channel. It the responsibility of | ||
| * that RPC to call release exactly once when it completes to release the Channel. | ||
| */ | ||
| private static class RetainedEntry { | ||
| private final Entry entry; | ||
| private final AtomicBoolean wasReleased; | ||
|
|
||
| public RetainedEntry(Entry entry) { | ||
| this.entry = entry; | ||
| wasReleased = new AtomicBoolean(false); | ||
| } | ||
|
|
||
| void release() { | ||
| if (!wasReleased.compareAndSet(false, true)) { | ||
| Exception e = new IllegalStateException("Entry was already released"); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is another stacktrace we don't want to log, can we just log the WARNING without a stacktrace?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We definitely need a stacktrace here, it will tell us how the refcount got negative. Otherwise we are no wiser than before.....we are just hiding the problem under the rug. And in this case alert firing woud be correct
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To me
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because throwing will bubble up to the enduser, who had nothing to do with the error. Instead I want to keep it localized to the double release and protect the rest of the application |
||
| LOG.log(Level.WARNING, e.getMessage(), e); | ||
| return; | ||
| } | ||
| entry.release(); | ||
igorbernstein2 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| public Channel getChannel() { | ||
| return entry.channel; | ||
| } | ||
| } | ||
|
|
||
| /** Bundles a gRPC {@link ManagedChannel} with some usage accounting. */ | ||
| private static class Entry { | ||
| static class Entry { | ||
| private final ManagedChannel channel; | ||
| private final AtomicInteger outstandingRpcs = new AtomicInteger(0); | ||
| final AtomicInteger outstandingRpcs = new AtomicInteger(0); | ||
| private final AtomicInteger maxOutstanding = new AtomicInteger(); | ||
|
|
||
| // Flag that the channel should be closed once all of the outstanding RPC complete. | ||
|
|
@@ -511,18 +538,19 @@ public String authority() { | |
| public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall( | ||
| MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) { | ||
|
|
||
| Entry entry = getRetainedEntry(affinity); | ||
| RetainedEntry entry = getRetainedEntry(affinity); | ||
|
|
||
| return new ReleasingClientCall<>(entry.channel.newCall(methodDescriptor, callOptions), entry); | ||
| return new ReleasingClientCall<>( | ||
| entry.getChannel().newCall(methodDescriptor, callOptions), entry); | ||
| } | ||
| } | ||
|
|
||
| /** ClientCall wrapper that makes sure to decrement the outstanding RPC count on completion. */ | ||
| static class ReleasingClientCall<ReqT, RespT> extends SimpleForwardingClientCall<ReqT, RespT> { | ||
| @Nullable private CancellationException cancellationException; | ||
| final Entry entry; | ||
| final RetainedEntry entry; | ||
|
|
||
| public ReleasingClientCall(ClientCall<ReqT, RespT> delegate, Entry entry) { | ||
| public ReleasingClientCall(ClientCall<ReqT, RespT> delegate, RetainedEntry entry) { | ||
| super(delegate); | ||
| this.entry = entry; | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.