@@ -220,7 +220,7 @@ public ApiFuture<String> publish(PubsubMessage message) {
220220
221221 final String orderingKey = message .getOrderingKey ();
222222 Preconditions .checkState (
223- orderingKey != null && ! orderingKey .isEmpty () && ! enableMessageOrdering ,
223+ orderingKey .isEmpty () || enableMessageOrdering ,
224224 "Cannot publish a message with an ordering key when message ordering is not enabled." );
225225
226226 final OutstandingPublish outstandingPublish =
@@ -357,7 +357,7 @@ private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) {
357357 @ Override
358358 public void onSuccess (PublishResponse result ) {
359359 try {
360- if (result .getMessageIdsCount () != outstandingBatch .size ()) {
360+ if (result == null || result .getMessageIdsCount () != outstandingBatch .size ()) {
361361 outstandingBatch .onFailure (
362362 new IllegalStateException (
363363 String .format (
@@ -387,15 +387,15 @@ public void onFailure(Throwable t) {
387387 ApiFutures .addCallback (publishCall (outstandingBatch ), futureCallback , directExecutor ());
388388 } else {
389389 // If ordering key is specified, publish the batch using the sequential executor.
390- sequentialExecutor . submit (
391- outstandingBatch . orderingKey ,
392- new Callable < ApiFuture < PublishResponse >>() {
393- public ApiFuture <PublishResponse > call () {
394- ApiFuture <PublishResponse > future = publishCall ( outstandingBatch );
395- ApiFutures . addCallback ( future , futureCallback , directExecutor () );
396- return future ;
397- }
398- } );
390+ ApiFuture < PublishResponse > future =
391+ sequentialExecutor . submit (
392+ outstandingBatch . orderingKey ,
393+ new Callable < ApiFuture <PublishResponse >> () {
394+ public ApiFuture <PublishResponse > call () {
395+ return publishCall ( outstandingBatch );
396+ }
397+ });
398+ ApiFutures . addCallback ( future , futureCallback , directExecutor () );
399399 }
400400 }
401401
0 commit comments