pubsub: fix race condition in streaming connection#2416
pubsub: fix race condition in streaming connection#2416pongad merged 8 commits intogoogleapis:masterfrom pongad:stream-race
Conversation
mdietz94
left a comment
There was a problem hiding this comment.
I would replace ReentrantLock with ReadWriteLock. I think it will simplify the concurrency model + we can allow almost all operations here to happen concurrently, except for recreating the stream so that we only call Ack on the new stream.
| private final AtomicLong channelReconnectBackoffMillis = | ||
| new AtomicLong(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis()); | ||
|
|
||
| private final Lock lock = new ReentrantLock(); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
|
||
| lock.lock(); | ||
| try { | ||
| requestObserver.onError(Status.CANCELLED.asException()); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| StreamingSubscriberConnection.this.requestObserver = requestObserver; | ||
| requestObserver.disableAutoInboundFlowControl(); | ||
| thisRequestObserver = requestObserver; | ||
| lock.lock(); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| requestObserver.request(1); | ||
| lock.lock(); | ||
| try { | ||
| thisRequestObserver.request(1); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| partitionAckOperations(acksToSend, ackDeadlineExtensions, MAX_PER_REQUEST_CHANGES); | ||
| for (StreamingPullRequest request : requests) { | ||
| requestObserver.onNext(request); | ||
| lock.lock(); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| StreamingPullRequest.newBuilder() | ||
| .setStreamAckDeadlineSeconds(newAckDeadlineSeconds) | ||
| .build()); | ||
| lock.lock(); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| requestObserver.request(1); | ||
| lock.lock(); | ||
| try { | ||
| thisRequestObserver.request(1); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| partitionAckOperations(acksToSend, ackDeadlineExtensions, MAX_PER_REQUEST_CHANGES); | ||
| for (StreamingPullRequest request : requests) { | ||
| requestObserver.onNext(request); | ||
| lock.lock(); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| StreamingPullRequest.newBuilder() | ||
| .setStreamAckDeadlineSeconds(newAckDeadlineSeconds) | ||
| .build()); | ||
| lock.lock(); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
@mdietz94 PTAL |
| } finally { | ||
| lock.unlock(); | ||
| } | ||
| thisRequestObserver.request(1); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
mdietz94
left a comment
There was a problem hiding this comment.
Still require a reader lock around request to avoid concurrent access with onNext/onError
|
Thinking about this more, I think I found more issues with streams. I'll follow up with you and David on email. |
|
@mdietz94 @davidcavazos Please let me know if this looks OK to you. The error handling is still obviously wrong. However, I think this is a big enough improvement that we can submit this as an intermediate state.
|
|
Did you mean @davidtorres ? |
|
Yes I did! Sorry for the spam! |
| // If errorFuture is done, the stream has either failed or hung up, | ||
| // and we don't need to request. | ||
| if (isAlive() && !errorFuture.isDone()) { | ||
| lock.lock(); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
…33.0 (#2416) * chore(deps): update dependency com.google.cloud:libraries-bom to v26.33.0 * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
* chore(main): release 1.139.3 * chore: generate libraries at Tue May 6 15:01:24 UTC 2025 --------- Co-authored-by: release-please[bot] <55107282+release-please[bot]@users.noreply.github.com> Co-authored-by: cloud-java-bot <[email protected]>
* chore(main): release 1.139.3 * chore: generate libraries at Tue May 6 15:01:24 UTC 2025 --------- Co-authored-by: release-please[bot] <55107282+release-please[bot]@users.noreply.github.com> Co-authored-by: cloud-java-bot <[email protected]>
No description provided.