@@ -192,9 +192,7 @@ public String getTopicNameString() {
192192 * @return the message ID wrapped in a future.
193193 */
194194 public ApiFuture <String > publish (PubsubMessage message ) {
195- if (shutdown .get ()) {
196- throw new IllegalStateException ("Cannot publish on a shut-down publisher." );
197- }
195+ Preconditions .checkState (!shutdown .get (), "Cannot publish on a shut-down publisher." );
198196
199197 final OutstandingPublish outstandingPublish =
200198 new OutstandingPublish (messageTransform .apply (message ));
@@ -288,23 +286,15 @@ private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) {
288286 public void onSuccess (PublishResponse result ) {
289287 try {
290288 if (result .getMessageIdsCount () != outstandingBatch .size ()) {
291- Throwable t =
289+ outstandingBatch . onFailure (
292290 new IllegalStateException (
293291 String .format (
294292 "The publish result count %s does not match "
295293 + "the expected %s results. Please contact Cloud Pub/Sub support "
296294 + "if this frequently occurs" ,
297- result .getMessageIdsCount (), outstandingBatch .size ()));
298- for (OutstandingPublish oustandingMessage : outstandingBatch .outstandingPublishes ) {
299- oustandingMessage .publishResult .setException (t );
300- }
301- return ;
302- }
303-
304- Iterator <OutstandingPublish > messagesResultsIt =
305- outstandingBatch .outstandingPublishes .iterator ();
306- for (String messageId : result .getMessageIdsList ()) {
307- messagesResultsIt .next ().publishResult .set (messageId );
295+ result .getMessageIdsCount (), outstandingBatch .size ())));
296+ } else {
297+ outstandingBatch .onSuccess (result .getMessageIdsList ());
308298 }
309299 } finally {
310300 messagesWaiter .incrementPendingMessages (-outstandingBatch .size ());
@@ -314,9 +304,7 @@ public void onSuccess(PublishResponse result) {
314304 @ Override
315305 public void onFailure (Throwable t ) {
316306 try {
317- for (OutstandingPublish outstandingPublish : outstandingBatch .outstandingPublishes ) {
318- outstandingPublish .publishResult .setException (t );
319- }
307+ outstandingBatch .onFailure (t );
320308 } finally {
321309 messagesWaiter .incrementPendingMessages (-outstandingBatch .size ());
322310 }
@@ -350,6 +338,19 @@ private List<PubsubMessage> getMessages() {
350338 }
351339 return results ;
352340 }
341+
342+ private void onFailure (Throwable t ) {
343+ for (OutstandingPublish outstandingPublish : outstandingPublishes ) {
344+ outstandingPublish .publishResult .setException (t );
345+ }
346+ }
347+
348+ private void onSuccess (Iterable <String > results ) {
349+ Iterator <OutstandingPublish > messagesResultsIt = outstandingPublishes .iterator ();
350+ for (String messageId : results ) {
351+ messagesResultsIt .next ().publishResult .set (messageId );
352+ }
353+ }
353354 }
354355
355356 private static final class OutstandingPublish {
@@ -376,10 +377,9 @@ public BatchingSettings getBatchingSettings() {
376377 * should be invoked prior to deleting the {@link Publisher} object in order to ensure that no
377378 * pending messages are lost.
378379 */
379- public void shutdown () throws Exception {
380- if (shutdown .getAndSet (true )) {
381- throw new IllegalStateException ("Cannot shut down a publisher already shut-down." );
382- }
380+ public void shutdown () {
381+ Preconditions .checkState (
382+ !shutdown .getAndSet (true ), "Cannot shut down a publisher already shut-down." );
383383 if (currentAlarmFuture != null && activeAlarm .getAndSet (false )) {
384384 currentAlarmFuture .cancel (false );
385385 }
0 commit comments