@@ -157,14 +157,13 @@ private Publisher(Builder builder) throws IOException {
157157 stubSettings
158158 .publishSettings ()
159159 .setRetryableCodes (
160- EnumSet .of (
161- StatusCode .Code .ABORTED ,
162- StatusCode .Code .CANCELLED ,
163- StatusCode .Code .DEADLINE_EXCEEDED ,
164- StatusCode .Code .INTERNAL ,
165- StatusCode .Code .RESOURCE_EXHAUSTED ,
166- StatusCode .Code .UNKNOWN ,
167- StatusCode .Code .UNAVAILABLE ))
160+ StatusCode .Code .ABORTED ,
161+ StatusCode .Code .CANCELLED ,
162+ StatusCode .Code .DEADLINE_EXCEEDED ,
163+ StatusCode .Code .INTERNAL ,
164+ StatusCode .Code .RESOURCE_EXHAUSTED ,
165+ StatusCode .Code .UNKNOWN ,
166+ StatusCode .Code .UNAVAILABLE )
168167 .setRetrySettings (retrySettingsBuilder .build ())
169168 .setBatchingSettings (BatchingSettings .newBuilder ().setIsEnabled (false ).build ());
170169 this .publisherStub = GrpcPublisherStub .create (stubSettings .build ());
@@ -241,7 +240,10 @@ public ApiFuture<String> publish(PubsubMessage message) {
241240 messagesWaiter .incrementPendingMessages (1 );
242241
243242 if (!batchesToSend .isEmpty ()) {
243+ // TODO: if this is not an ordering keys scenario, will this do anything?
244244 publishAllWithoutInflight ();
245+
246+ // TODO: if this is an ordering keys scenario, is this safe without messagesBatchLock?
245247 for (final OutstandingBatch batch : batchesToSend ) {
246248 logger .log (Level .FINER , "Scheduling a batch for immediate sending." );
247249 executor .execute (
@@ -264,7 +266,9 @@ public void run() {
264266 *
265267 * @param key The key for which to resume publishing.
266268 */
267- public void resumePublish (String key ) {
269+ // TODO: make this public when Ordering keys is live
270+ @ BetaApi
271+ void resumePublish (String key ) {
268272 Preconditions .checkState (!shutdown .get (), "Cannot publish on a shut-down publisher." );
269273 sequentialExecutor .resumePublish (key );
270274 }
@@ -338,6 +342,8 @@ private void publishAllWithoutInflight() {
338342 // it's released, the order of publishing cannot be guaranteed if `publish()` is called
339343 // while this function is running. This locking mechanism needs to be improved if it
340344 // causes any performance degradation.
345+
346+ // TODO: Will this cause a performance problem for non-ordering keys scenarios?
341347 publishOutstandingBatch (batch .popOutstandingBatch ());
342348 it .remove ();
343349 }
@@ -389,20 +395,21 @@ public void onFailure(Throwable t) {
389395 }
390396 };
391397
398+ ApiFuture <PublishResponse > future ;
392399 if (outstandingBatch .orderingKey == null || outstandingBatch .orderingKey .isEmpty ()) {
393- ApiFutures . addCallback ( publishCall ( outstandingBatch ), futureCallback , directExecutor () );
400+ future = publishCall ( outstandingBatch );
394401 } else {
395402 // If ordering key is specified, publish the batch using the sequential executor.
396- ApiFuture < PublishResponse > future =
403+ future =
397404 sequentialExecutor .submit (
398405 outstandingBatch .orderingKey ,
399406 new Callable <ApiFuture <PublishResponse >>() {
400407 public ApiFuture <PublishResponse > call () {
401408 return publishCall (outstandingBatch );
402409 }
403410 });
404- ApiFutures .addCallback (future , futureCallback , directExecutor ());
405411 }
412+ ApiFutures .addCallback (future , futureCallback , directExecutor ());
406413 }
407414
408415 private static final class OutstandingBatch {
@@ -673,6 +680,8 @@ public Builder setRetrySettings(RetrySettings retrySettings) {
673680 }
674681
675682 /** Sets the message ordering option. */
683+ // TODO: make this public when Ordering keys is live
684+ @ BetaApi
676685 Builder setEnableMessageOrdering (boolean enableMessageOrdering ) {
677686 this .enableMessageOrdering = enableMessageOrdering ;
678687 return this ;
0 commit comments