Skip to content

Commit cec8e94

Browse files
committed
Cleanup before merge to main line
- Removing EnumSet from Status codes - Adding `@BetaApi` to `resumePublish`, and making it package private.
1 parent 33ebb40 commit cec8e94

1 file changed

Lines changed: 21 additions & 12 deletions

File tree

  • google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1

google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -157,14 +157,13 @@ private Publisher(Builder builder) throws IOException {
157157
stubSettings
158158
.publishSettings()
159159
.setRetryableCodes(
160-
EnumSet.of(
161-
StatusCode.Code.ABORTED,
162-
StatusCode.Code.CANCELLED,
163-
StatusCode.Code.DEADLINE_EXCEEDED,
164-
StatusCode.Code.INTERNAL,
165-
StatusCode.Code.RESOURCE_EXHAUSTED,
166-
StatusCode.Code.UNKNOWN,
167-
StatusCode.Code.UNAVAILABLE))
160+
StatusCode.Code.ABORTED,
161+
StatusCode.Code.CANCELLED,
162+
StatusCode.Code.DEADLINE_EXCEEDED,
163+
StatusCode.Code.INTERNAL,
164+
StatusCode.Code.RESOURCE_EXHAUSTED,
165+
StatusCode.Code.UNKNOWN,
166+
StatusCode.Code.UNAVAILABLE)
168167
.setRetrySettings(retrySettingsBuilder.build())
169168
.setBatchingSettings(BatchingSettings.newBuilder().setIsEnabled(false).build());
170169
this.publisherStub = GrpcPublisherStub.create(stubSettings.build());
@@ -241,7 +240,10 @@ public ApiFuture<String> publish(PubsubMessage message) {
241240
messagesWaiter.incrementPendingMessages(1);
242241

243242
if (!batchesToSend.isEmpty()) {
243+
// TODO: if this is not an ordering keys scenario, will this do anything?
244244
publishAllWithoutInflight();
245+
246+
// TODO: if this is an ordering keys scenario, is this safe without messagesBatchLock?
245247
for (final OutstandingBatch batch : batchesToSend) {
246248
logger.log(Level.FINER, "Scheduling a batch for immediate sending.");
247249
executor.execute(
@@ -264,7 +266,9 @@ public void run() {
264266
*
265267
* @param key The key for which to resume publishing.
266268
*/
267-
public void resumePublish(String key) {
269+
// TODO: make this public when Ordering keys is live
270+
@BetaApi
271+
void resumePublish(String key) {
268272
Preconditions.checkState(!shutdown.get(), "Cannot publish on a shut-down publisher.");
269273
sequentialExecutor.resumePublish(key);
270274
}
@@ -338,6 +342,8 @@ private void publishAllWithoutInflight() {
338342
// it's released, the order of publishing cannot be guaranteed if `publish()` is called
339343
// while this function is running. This locking mechanism needs to be improved if it
340344
// causes any performance degradation.
345+
346+
// TODO: Will this cause a performance problem for non-ordering keys scenarios?
341347
publishOutstandingBatch(batch.popOutstandingBatch());
342348
it.remove();
343349
}
@@ -389,20 +395,21 @@ public void onFailure(Throwable t) {
389395
}
390396
};
391397

398+
ApiFuture<PublishResponse> future;
392399
if (outstandingBatch.orderingKey == null || outstandingBatch.orderingKey.isEmpty()) {
393-
ApiFutures.addCallback(publishCall(outstandingBatch), futureCallback, directExecutor());
400+
future = publishCall(outstandingBatch);
394401
} else {
395402
// If ordering key is specified, publish the batch using the sequential executor.
396-
ApiFuture<PublishResponse> future =
403+
future =
397404
sequentialExecutor.submit(
398405
outstandingBatch.orderingKey,
399406
new Callable<ApiFuture<PublishResponse>>() {
400407
public ApiFuture<PublishResponse> call() {
401408
return publishCall(outstandingBatch);
402409
}
403410
});
404-
ApiFutures.addCallback(future, futureCallback, directExecutor());
405411
}
412+
ApiFutures.addCallback(future, futureCallback, directExecutor());
406413
}
407414

408415
private static final class OutstandingBatch {
@@ -673,6 +680,8 @@ public Builder setRetrySettings(RetrySettings retrySettings) {
673680
}
674681

675682
/** Sets the message ordering option. */
683+
// TODO: make this public when Ordering keys is live
684+
@BetaApi
676685
Builder setEnableMessageOrdering(boolean enableMessageOrdering) {
677686
this.enableMessageOrdering = enableMessageOrdering;
678687
return this;

0 commit comments

Comments
 (0)