Skip to content

Commit f808056

Browse files
feat: add explicit duplicate subscriber handling to SubscriberImpl for beam (#1088)
Beam needs a way to know that a <subscription, partition> has been relocated to another worker, and provides no in-model way to do this. We can reuse the duplicate connection error to detect occurences of this and respect them.
1 parent 032fcd4 commit f808056

7 files changed

Lines changed: 121 additions & 37 deletions

File tree

google-cloud-pubsublite/clirr-ignored-differences.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,9 @@
66
<className>com/google/cloud/pubsublite/internal/**</className>
77
<method>*</method>
88
</difference>
9+
<difference>
10+
<differenceType>7013</differenceType>
11+
<className>com/google/cloud/pubsublite/internal/**</className>
12+
<method>*</method>
13+
</difference>
914
</differences>

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/ExtractStatus.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@
2222
import com.google.api.gax.rpc.StatusCode.Code;
2323
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
2424
import com.google.common.util.concurrent.MoreExecutors;
25+
import com.google.protobuf.Any;
26+
import com.google.rpc.ErrorInfo;
27+
import com.google.rpc.Status;
28+
import io.grpc.protobuf.StatusProto;
2529
import java.util.Optional;
2630
import java.util.concurrent.ExecutionException;
2731
import java.util.function.BiConsumer;
@@ -49,6 +53,25 @@ public static CheckedApiException toCanonical(Throwable t) {
4953
return new CheckedApiException(t, Code.INTERNAL);
5054
}
5155

56+
public static String getErrorInfoReason(CheckedApiException error) {
57+
Status status = StatusProto.fromThrowable(error.underlying);
58+
if (status == null) {
59+
return "";
60+
}
61+
for (Any any : status.getDetailsList()) {
62+
if (any.is(ErrorInfo.class)) {
63+
try {
64+
ErrorInfo errorInfo = any.unpack(ErrorInfo.class);
65+
if ("pubsublite.googleapis.com".equals(errorInfo.getDomain())) {
66+
return errorInfo.getReason();
67+
}
68+
} catch (Throwable unused) {
69+
}
70+
}
71+
}
72+
return "";
73+
}
74+
5275
public static <T> ApiFuture<T> toClientFuture(ApiFuture<T> source) {
5376
return ApiFutures.catchingAsync(
5477
source,

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ResetSignal.java

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,37 +18,18 @@
1818

1919
import com.google.cloud.pubsublite.ErrorCodes;
2020
import com.google.cloud.pubsublite.internal.CheckedApiException;
21-
import com.google.protobuf.Any;
22-
import com.google.rpc.ErrorInfo;
23-
import com.google.rpc.Status;
24-
import io.grpc.protobuf.StatusProto;
21+
import com.google.cloud.pubsublite.internal.ExtractStatus;
2522

2623
// Pub/Sub Lite's stream RESET signal is sent by the server to instruct the client to reset the
2724
// stream state.
2825
public final class ResetSignal {
2926
private static final String REASON = "RESET";
30-
private static final String DOMAIN = "pubsublite.googleapis.com";
3127

3228
public static boolean isResetSignal(CheckedApiException checkedApiException) {
3329
if (!ErrorCodes.IsRetryableForStreams(checkedApiException.code())) {
3430
return false;
3531
}
36-
Status status = StatusProto.fromThrowable(checkedApiException.underlying);
37-
if (status == null) {
38-
return false;
39-
}
40-
for (Any any : status.getDetailsList()) {
41-
if (any.is(ErrorInfo.class)) {
42-
try {
43-
ErrorInfo errorInfo = any.unpack(ErrorInfo.class);
44-
if (REASON.equals(errorInfo.getReason()) && DOMAIN.equals(errorInfo.getDomain())) {
45-
return true;
46-
}
47-
} catch (Throwable t) {
48-
}
49-
}
50-
}
51-
return false;
32+
return ExtractStatus.getErrorInfoReason(checkedApiException).equals(REASON);
5233
}
5334

5435
private ResetSignal() {}

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/RetryingConnectionImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ protected void doStop() {
118118
currentConnection.close();
119119
}
120120
} catch (Throwable t) {
121-
logger.atWarning().withCause(t).log(
121+
logger.atInfo().withCause(t).log(
122122
"Failed while terminating connection for %s", streamDescription());
123123
notifyFailed(t);
124124
return;
@@ -215,7 +215,7 @@ private void triggerReinitialize(CheckedApiException streamError) {
215215
try {
216216
observer.triggerReinitialize(streamError);
217217
} catch (Throwable t) {
218-
logger.atWarning().withCause(t).log("Error occurred in triggerReinitialize.");
218+
logger.atInfo().withCause(t).log("Error occurred in triggerReinitialize.");
219219
onError(t);
220220
}
221221
});

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SubscriberBuilder.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,13 @@ public abstract class SubscriberBuilder {
4343
// Optional parameters.
4444
abstract SubscriberResetHandler resetHandler();
4545

46+
// Whether to retry races when streams are created by other clients.
47+
abstract boolean retryStreamRaces();
48+
4649
public static Builder newBuilder() {
4750
return new AutoValue_SubscriberBuilder.Builder()
48-
.setResetHandler(SubscriberResetHandler::unhandled);
51+
.setResetHandler(SubscriberResetHandler::unhandled)
52+
.setRetryStreamRaces(true);
4953
}
5054

5155
@AutoValue.Builder
@@ -64,6 +68,9 @@ public abstract static class Builder {
6468
// Optional parameters.
6569
public abstract Builder setResetHandler(SubscriberResetHandler resetHandler);
6670

71+
// Whether to re
72+
public abstract Builder setRetryStreamRaces(boolean retryStreamRaces);
73+
6774
abstract SubscriberBuilder autoBuild();
6875

6976
@SuppressWarnings("CheckReturnValue")
@@ -80,7 +87,8 @@ public Subscriber build() throws ApiException {
8087
initialSubscribeRequest,
8188
autoBuilt.initialLocation(),
8289
autoBuilt.messageConsumer(),
83-
autoBuilt.resetHandler());
90+
autoBuilt.resetHandler(),
91+
autoBuilt.retryStreamRaces());
8492
}
8593
}
8694
}

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SubscriberImpl.java

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@
1717
package com.google.cloud.pubsublite.internal.wire;
1818

1919
import static com.google.cloud.pubsublite.internal.CheckedApiPreconditions.checkArgument;
20+
import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
2021

2122
import com.google.api.gax.rpc.ApiException;
2223
import com.google.cloud.pubsublite.SequencedMessage;
2324
import com.google.cloud.pubsublite.internal.AlarmFactory;
2425
import com.google.cloud.pubsublite.internal.CheckedApiException;
2526
import com.google.cloud.pubsublite.internal.CloseableMonitor;
27+
import com.google.cloud.pubsublite.internal.ExtractStatus;
2628
import com.google.cloud.pubsublite.internal.ProxyService;
2729
import com.google.cloud.pubsublite.internal.SerialExecutor;
2830
import com.google.cloud.pubsublite.internal.wire.StreamFactories.SubscribeStreamFactory;
@@ -32,6 +34,7 @@
3234
import com.google.cloud.pubsublite.proto.SeekRequest.NamedTarget;
3335
import com.google.cloud.pubsublite.proto.SubscribeRequest;
3436
import com.google.common.annotations.VisibleForTesting;
37+
import com.google.common.flogger.GoogleLogger;
3538
import java.time.Duration;
3639
import java.util.List;
3740
import java.util.Optional;
@@ -41,6 +44,8 @@
4144

4245
public class SubscriberImpl extends ProxyService
4346
implements Subscriber, RetryingConnectionObserver<List<SequencedMessage>> {
47+
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
48+
4449
private static final Duration FLOW_REQUESTS_FLUSH_INTERVAL = Duration.ofMillis(100);
4550

4651
private final AlarmFactory alarmFactory;
@@ -51,6 +56,9 @@ public class SubscriberImpl extends ProxyService
5156

5257
private final InitialSubscribeRequest baseInitialRequest;
5358

59+
// Whether to retry `DUPLICATE_SUBSCRIBER_CONNECTIONS` errors.
60+
private final boolean retryStreamRaces;
61+
5462
// Used to ensure messages are delivered to consumers in order.
5563
private final SerialExecutor messageDeliveryExecutor;
5664

@@ -82,12 +90,14 @@ public class SubscriberImpl extends ProxyService
8290
InitialSubscribeRequest baseInitialRequest,
8391
SeekRequest initialLocation,
8492
Consumer<List<SequencedMessage>> messageConsumer,
85-
SubscriberResetHandler resetHandler)
93+
SubscriberResetHandler resetHandler,
94+
boolean retryStreamRaces)
8695
throws ApiException {
8796
this.alarmFactory = alarmFactory;
8897
this.messageConsumer = messageConsumer;
8998
this.resetHandler = resetHandler;
9099
this.baseInitialRequest = baseInitialRequest;
100+
this.retryStreamRaces = retryStreamRaces;
91101
this.messageDeliveryExecutor = new SerialExecutor(SystemExecutors.getFuturesExecutor());
92102
this.initialLocation = initialLocation;
93103
this.connection =
@@ -100,7 +110,8 @@ public SubscriberImpl(
100110
InitialSubscribeRequest baseInitialRequest,
101111
SeekRequest initialLocation,
102112
Consumer<List<SequencedMessage>> messageConsumer,
103-
SubscriberResetHandler resetHandler)
113+
SubscriberResetHandler resetHandler,
114+
boolean retryStreamRaces)
104115
throws ApiException {
105116
this(
106117
streamFactory,
@@ -109,7 +120,8 @@ public SubscriberImpl(
109120
baseInitialRequest,
110121
initialLocation,
111122
messageConsumer,
112-
resetHandler);
123+
resetHandler,
124+
retryStreamRaces);
113125
}
114126

115127
// ProxyService implementation.
@@ -142,8 +154,7 @@ public void allowFlow(FlowControlRequest clientRequest) throws CheckedApiExcepti
142154
flowControlBatcher.onClientFlowRequest(clientRequest);
143155
if (flowControlBatcher.shouldExpediteBatchRequest()) {
144156
connection.modifyConnection(
145-
connectedSubscriber ->
146-
connectedSubscriber.ifPresent(subscriber -> flushBatchFlowRequest(subscriber)));
157+
connectedSubscriber -> connectedSubscriber.ifPresent(this::flushBatchFlowRequest));
147158
}
148159
}
149160
}
@@ -172,6 +183,12 @@ public void reset() {
172183
@Override
173184
@SuppressWarnings("GuardedBy")
174185
public void triggerReinitialize(CheckedApiException streamError) {
186+
if (!retryStreamRaces
187+
&& ExtractStatus.getErrorInfoReason(streamError)
188+
.equals("DUPLICATE_SUBSCRIBER_CONNECTIONS")) {
189+
onPermanentError(streamError);
190+
return;
191+
}
175192
if (ResetSignal.isResetSignal(streamError)) {
176193
try {
177194
if (resetHandler.handleReset()) {
@@ -205,26 +222,32 @@ public void onClientResponse(List<SequencedMessage> messages) throws CheckedApiE
205222
if (shutdown) return;
206223
nextOffsetTracker.onMessages(messages);
207224
flowControlBatcher.onMessages(messages);
208-
messageDeliveryExecutor.execute(() -> messageConsumer.accept(messages));
225+
messageDeliveryExecutor.execute(
226+
() -> {
227+
try {
228+
messageConsumer.accept(messages);
229+
} catch (Throwable t) {
230+
logger.atWarning().withCause(t).log(
231+
"Consumer threw an exception- failing subscriber. %s", baseInitialRequest);
232+
onPermanentError(toCanonical(t));
233+
}
234+
});
209235
}
210236
}
211237

212238
private void processBatchFlowRequest() {
213239
try (CloseableMonitor.Hold h = monitor.enter()) {
214240
if (shutdown) return;
215241
connection.modifyConnection(
216-
connectedSubscriber ->
217-
connectedSubscriber.ifPresent(subscriber -> flushBatchFlowRequest(subscriber)));
242+
connectedSubscriber -> connectedSubscriber.ifPresent(this::flushBatchFlowRequest));
218243
} catch (CheckedApiException e) {
219244
onPermanentError(e);
220245
}
221246
}
222247

223248
private void flushBatchFlowRequest(ConnectedSubscriber subscriber) {
224249
try (CloseableMonitor.Hold h = monitor.enter()) {
225-
flowControlBatcher
226-
.releasePendingRequest()
227-
.ifPresent(request -> subscriber.allowFlow(request));
250+
flowControlBatcher.releasePendingRequest().ifPresent(subscriber::allowFlow);
228251
}
229252
}
230253
}

google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/SubscriberImplTest.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,11 @@
5353
import com.google.cloud.pubsublite.proto.SeekRequest.NamedTarget;
5454
import com.google.cloud.pubsublite.proto.SubscribeRequest;
5555
import com.google.common.collect.ImmutableList;
56+
import com.google.protobuf.Any;
5657
import com.google.protobuf.util.Timestamps;
58+
import com.google.rpc.ErrorInfo;
59+
import com.google.rpc.Status;
60+
import io.grpc.protobuf.StatusProto;
5761
import java.util.List;
5862
import java.util.concurrent.CountDownLatch;
5963
import java.util.concurrent.Future;
@@ -80,6 +84,20 @@ public class SubscriberImplTest {
8084
private static final SeekRequest INITIAL_LOCATION =
8185
SeekRequest.newBuilder().setNamedTarget(NamedTarget.COMMITTED_CURSOR).build();
8286

87+
private static final CheckedApiException DUPLICATE_CONNECTION_SIGNAL =
88+
new CheckedApiException(
89+
StatusProto.toStatusRuntimeException(
90+
Status.newBuilder()
91+
.setCode(Code.ABORTED.ordinal())
92+
.addDetails(
93+
Any.pack(
94+
ErrorInfo.newBuilder()
95+
.setReason("DUPLICATE_SUBSCRIBER_CONNECTIONS")
96+
.setDomain("pubsublite.googleapis.com")
97+
.build()))
98+
.build()),
99+
Code.ABORTED);
100+
83101
private static SubscribeRequest initialRequest() {
84102
return SubscribeRequest.newBuilder()
85103
.setInitial(BASE_INITIAL_SUBSCRIBE_REQUEST.toBuilder().setInitialLocation(INITIAL_LOCATION))
@@ -135,7 +153,8 @@ public void setUp() throws CheckedApiException {
135153
BASE_INITIAL_SUBSCRIBE_REQUEST,
136154
INITIAL_LOCATION,
137155
mockMessageConsumer,
138-
mockResetHandler);
156+
mockResetHandler,
157+
true);
139158
subscriber.startAsync().awaitRunning();
140159
}
141160

@@ -300,6 +319,31 @@ public void reinitialize_reconnectsToNextOffset() throws Exception {
300319
FlowControlRequest.newBuilder().setAllowedBytes(80).setAllowedMessages(98).build());
301320
}
302321

322+
@Test
323+
public void reinitialize_retriesDuplicateConnectionByDefault() {
324+
subscriber.triggerReinitialize(DUPLICATE_CONNECTION_SIGNAL);
325+
verify(mockSubscriberFactory, times(2)).New(any(), any(), eq(initialRequest()));
326+
}
327+
328+
@Test
329+
public void reinitialize_doesntRetryDuplicateConnectionIfDisabled() throws Exception {
330+
subscriber =
331+
new SubscriberImpl(
332+
unusedStreamFactory,
333+
mockSubscriberFactory,
334+
alarmFactory,
335+
BASE_INITIAL_SUBSCRIBE_REQUEST,
336+
INITIAL_LOCATION,
337+
mockMessageConsumer,
338+
mockResetHandler,
339+
false);
340+
Future<Void> failed = whenFailed(subscriber);
341+
subscriber.startAsync().awaitRunning();
342+
subscriber.triggerReinitialize(DUPLICATE_CONNECTION_SIGNAL);
343+
failed.get();
344+
assertThrowableMatches(subscriber.failureCause(), DUPLICATE_CONNECTION_SIGNAL.code());
345+
}
346+
303347
@Test
304348
public void reinitialize_handlesSuccessfulReset() throws Exception {
305349
subscriber.allowFlow(

0 commit comments

Comments
 (0)