4848import com .google .pubsub .v1 .TopicNames ;
4949import java .io .IOException ;
5050import java .util .Collections ;
51- import java .util .EnumSet ;
52- import java .util .HashMap ;
5351import java .util .Iterator ;
5452import java .util .LinkedList ;
5553import java .util .List ;
56- import java .util .Map ;
57- import java .util .Set ;
58- import java .util .concurrent .Callable ;
5954import java .util .concurrent .ScheduledExecutorService ;
6055import java .util .concurrent .ScheduledFuture ;
6156import java .util .concurrent .TimeUnit ;
@@ -89,17 +84,15 @@ public class Publisher {
8984 private final String topicName ;
9085
9186 private final BatchingSettings batchingSettings ;
92- private final boolean enableMessageOrdering ;
9387
9488 private final Lock messagesBatchLock ;
95- private final Map < String , MessagesBatch > messagesBatches ;
89+ private MessagesBatch messagesBatch ;
9690
9791 private final AtomicBoolean activeAlarm ;
9892
9993 private final PublisherStub publisherStub ;
10094
10195 private final ScheduledExecutorService executor ;
102- private final SequentialExecutorService <PublishResponse > sequentialExecutor ;
10396 private final AtomicBoolean shutdown ;
10497 private final List <AutoCloseable > closeables ;
10598 private final MessageWaiter messagesWaiter ;
@@ -120,14 +113,12 @@ private Publisher(Builder builder) throws IOException {
120113 topicName = builder .topicName ;
121114
122115 this .batchingSettings = builder .batchingSettings ;
123- this .enableMessageOrdering = builder .enableMessageOrdering ;
124116 this .messageTransform = builder .messageTransform ;
125117
126- messagesBatches = new HashMap <> ();
118+ messagesBatch = new MessagesBatch ();
127119 messagesBatchLock = new ReentrantLock ();
128120 activeAlarm = new AtomicBoolean (false );
129121 executor = builder .executorProvider .getExecutor ();
130- sequentialExecutor = new SequentialExecutorService <>(executor );
131122 if (builder .executorProvider .shouldAutoClose ()) {
132123 closeables =
133124 Collections .<AutoCloseable >singletonList (new ExecutorAsBackgroundResource (executor ));
@@ -137,31 +128,9 @@ private Publisher(Builder builder) throws IOException {
137128
138129 // Publisher used to take maxAttempt == 0 to mean infinity, but to GAX it means don't retry.
139130 // We post-process this here to keep backward-compatibility.
140- // Also, if "message ordering" is enabled, the publisher should retry sending the failed
141- // message infinitely rather than sending the next one.
142- RetrySettings .Builder retrySettingsBuilder = builder .retrySettings .toBuilder ();
143- if (retrySettingsBuilder .getMaxAttempts () == 0 ) {
144- retrySettingsBuilder .setMaxAttempts (Integer .MAX_VALUE );
145- }
146- if (enableMessageOrdering ) {
147- retrySettingsBuilder
148- .setMaxAttempts (Integer .MAX_VALUE )
149- .setTotalTimeout (Duration .ofNanos (Long .MAX_VALUE ));
150- }
151-
152- Set <StatusCode .Code > retryCodes ;
153- if (enableMessageOrdering ) {
154- retryCodes = EnumSet .allOf (StatusCode .Code .class );
155- } else {
156- retryCodes =
157- EnumSet .of (
158- StatusCode .Code .ABORTED ,
159- StatusCode .Code .CANCELLED ,
160- StatusCode .Code .DEADLINE_EXCEEDED ,
161- StatusCode .Code .INTERNAL ,
162- StatusCode .Code .RESOURCE_EXHAUSTED ,
163- StatusCode .Code .UNKNOWN ,
164- StatusCode .Code .UNAVAILABLE );
131+ RetrySettings retrySettings = builder .retrySettings ;
132+ if (retrySettings .getMaxAttempts () == 0 ) {
133+ retrySettings = retrySettings .toBuilder ().setMaxAttempts (Integer .MAX_VALUE ).build ();
165134 }
166135
167136 PublisherStubSettings .Builder stubSettings =
@@ -171,8 +140,15 @@ private Publisher(Builder builder) throws IOException {
171140 .setTransportChannelProvider (builder .channelProvider );
172141 stubSettings
173142 .publishSettings ()
174- .setRetryableCodes (retryCodes )
175- .setRetrySettings (retrySettingsBuilder .build ())
143+ .setRetryableCodes (
144+ StatusCode .Code .ABORTED ,
145+ StatusCode .Code .CANCELLED ,
146+ StatusCode .Code .DEADLINE_EXCEEDED ,
147+ StatusCode .Code .INTERNAL ,
148+ StatusCode .Code .RESOURCE_EXHAUSTED ,
149+ StatusCode .Code .UNKNOWN ,
150+ StatusCode .Code .UNAVAILABLE )
151+ .setRetrySettings (retrySettings )
176152 .setBatchingSettings (BatchingSettings .newBuilder ().setIsEnabled (false ).build ());
177153 this .publisherStub = GrpcPublisherStub .create (stubSettings .build ());
178154
@@ -220,12 +196,6 @@ public ApiFuture<String> publish(PubsubMessage message) {
220196 throw new IllegalStateException ("Cannot publish on a shut-down publisher." );
221197 }
222198
223- final String orderingKey = message .getOrderingKey ();
224- if (orderingKey != null && !orderingKey .isEmpty () && !enableMessageOrdering ) {
225- throw new IllegalStateException (
226- "Cannot publish a message with an ordering key when message ordering is not enabled." );
227- }
228-
229199 message = messageTransform .apply (message );
230200 final int messageSize = message .getSerializedSize ();
231201 OutstandingBatch batchToSend = null ;
@@ -234,38 +204,30 @@ public ApiFuture<String> publish(PubsubMessage message) {
234204 messagesBatchLock .lock ();
235205 try {
236206 // Check if the next message makes the current batch exceed the max batch byte size.
237- MessagesBatch messageBatch = messagesBatches .get (orderingKey );
238- if (messageBatch == null ) {
239- messageBatch = new MessagesBatch (orderingKey );
240- messagesBatches .put (orderingKey , messageBatch );
241- }
242- if (!messageBatch .isEmpty ()
207+ if (!messagesBatch .isEmpty ()
243208 && hasBatchingBytes ()
244- && messageBatch .getBatchedBytes () + messageSize >= getMaxBatchBytes ()) {
245- batchToSend = messageBatch .popOutstandingBatch ();
209+ && messagesBatch .getBatchedBytes () + messageSize >= getMaxBatchBytes ()) {
210+ batchToSend = messagesBatch .popOutstandingBatch ();
246211 }
247212
248213 // Border case if the message to send is greater or equals to the max batch size then can't
249214 // be included in the current batch and instead sent immediately.
250215 if (!hasBatchingBytes () || messageSize < getMaxBatchBytes ()) {
251- messageBatch .addMessage (outstandingPublish , messageSize );
216+ messagesBatch .addMessage (outstandingPublish , messageSize );
217+
252218 // If after adding the message we have reached the batch max messages then we have a batch
253219 // to send.
254- if (messageBatch .getMessagesCount () == getBatchingSettings ().getElementCountThreshold ()) {
255- batchToSend = messageBatch .popOutstandingBatch ();
220+ if (messagesBatch .getMessagesCount () == getBatchingSettings ().getElementCountThreshold ()) {
221+ batchToSend = messagesBatch .popOutstandingBatch ();
256222 }
257223 }
258-
259224 // Setup the next duration based delivery alarm if there are messages batched.
260- if (!messageBatch .isEmpty ()) {
225+ if (!messagesBatch .isEmpty ()) {
261226 setupDurationBasedPublishAlarm ();
262- } else {
263- messagesBatches .remove (orderingKey );
264- if (currentAlarmFuture != null ) {
265- logger .log (Level .FINER , "Cancelling alarm, no more messages" );
266- if (activeAlarm .getAndSet (false )) {
267- currentAlarmFuture .cancel (false );
268- }
227+ } else if (currentAlarmFuture != null ) {
228+ logger .log (Level .FINER , "Cancelling alarm, no more messages" );
229+ if (activeAlarm .getAndSet (false )) {
230+ currentAlarmFuture .cancel (false );
269231 }
270232 }
271233 } finally {
@@ -276,18 +238,29 @@ && hasBatchingBytes()
276238
277239 if (batchToSend != null ) {
278240 logger .log (Level .FINER , "Scheduling a batch for immediate sending." );
279- publishAllOutstanding ();
280- publishOutstandingBatch (batchToSend );
241+ final OutstandingBatch finalBatchToSend = batchToSend ;
242+ executor .execute (
243+ new Runnable () {
244+ @ Override
245+ public void run () {
246+ publishOutstandingBatch (finalBatchToSend );
247+ }
248+ });
281249 }
282250
283251 // If the message is over the size limit, it was not added to the pending messages and it will
284252 // be sent in its own batch immediately.
285253 if (hasBatchingBytes () && messageSize >= getMaxBatchBytes ()) {
286254 logger .log (
287255 Level .FINER , "Message exceeds the max batch bytes, scheduling it for immediate send." );
288- publishAllOutstanding ();
289- publishOutstandingBatch (
290- new OutstandingBatch (ImmutableList .of (outstandingPublish ), messageSize , orderingKey ));
256+ executor .execute (
257+ new Runnable () {
258+ @ Override
259+ public void run () {
260+ publishOutstandingBatch (
261+ new OutstandingBatch (ImmutableList .of (outstandingPublish ), messageSize ));
262+ }
263+ });
291264 }
292265
293266 return publishResult ;
@@ -319,33 +292,27 @@ public void run() {
319292 */
320293 public void publishAllOutstanding () {
321294 messagesBatchLock .lock ();
295+ OutstandingBatch batchToSend ;
322296 try {
323- for (MessagesBatch batch : messagesBatches .values ()) {
324- if (!batch .isEmpty ()) {
325- // TODO(kimkyung-goog): Do not release `messagesBatchLock` when publishing a batch. If
326- // it's released, the order of publishing cannot be guaranteed if `publish()` is called
327- // while this function is running. This locking mechanism needs to be improved if it
328- // causes any performance degradation.
329- publishOutstandingBatch (batch .popOutstandingBatch ());
330- }
297+ if (messagesBatch .isEmpty ()) {
298+ return ;
331299 }
332- messagesBatches . clear ();
300+ batchToSend = messagesBatch . popOutstandingBatch ();
333301 } finally {
334302 messagesBatchLock .unlock ();
335303 }
304+ publishOutstandingBatch (batchToSend );
336305 }
337306
338- private ApiFuture publishCall (final OutstandingBatch outstandingBatch ) {
307+ private void publishOutstandingBatch (final OutstandingBatch outstandingBatch ) {
339308 PublishRequest .Builder publishRequest = PublishRequest .newBuilder ();
340309 publishRequest .setTopic (topicName );
341310 for (OutstandingPublish outstandingPublish : outstandingBatch .outstandingPublishes ) {
342311 publishRequest .addMessages (outstandingPublish .message );
343312 }
344- return publisherStub .publishCallable ().futureCall (publishRequest .build ());
345- }
346313
347- private void publishOutstandingBatch ( final OutstandingBatch outstandingBatch ) {
348- final ApiFutureCallback < PublishResponse > futureCallback =
314+ ApiFutures . addCallback (
315+ publisherStub . publishCallable (). futureCall ( publishRequest . build ()),
349316 new ApiFutureCallback <PublishResponse >() {
350317 @ Override
351318 public void onSuccess (PublishResponse result ) {
@@ -384,47 +351,21 @@ public void onFailure(Throwable t) {
384351 messagesWaiter .incrementPendingMessages (-outstandingBatch .size ());
385352 }
386353 }
387- };
388-
389- if (outstandingBatch .orderingKey == null || outstandingBatch .orderingKey .isEmpty ()) {
390- // If ordering key is empty, publish the batch using the normal executor.
391- Runnable task =
392- new Runnable () {
393- public void run () {
394- ApiFutures .addCallback (
395- publishCall (outstandingBatch ), futureCallback , directExecutor ());
396- }
397- };
398- executor .execute (task );
399- } else {
400- // If ordering key is specified, publish the batch using the sequential executor.
401- Callable <ApiFuture > func =
402- new Callable <ApiFuture >() {
403- public ApiFuture call () {
404- return publishCall (outstandingBatch );
405- }
406- };
407- ApiFutures .addCallback (
408- sequentialExecutor .submit (outstandingBatch .orderingKey , func ),
409- futureCallback ,
410- directExecutor ());
411- }
354+ },
355+ directExecutor ());
412356 }
413357
414358 private static final class OutstandingBatch {
415359 final List <OutstandingPublish > outstandingPublishes ;
416360 final long creationTime ;
417361 int attempt ;
418362 int batchSizeBytes ;
419- final String orderingKey ;
420363
421- OutstandingBatch (
422- List <OutstandingPublish > outstandingPublishes , int batchSizeBytes , String orderingKey ) {
364+ OutstandingBatch (List <OutstandingPublish > outstandingPublishes , int batchSizeBytes ) {
423365 this .outstandingPublishes = outstandingPublishes ;
424366 attempt = 1 ;
425367 creationTime = System .currentTimeMillis ();
426368 this .batchSizeBytes = batchSizeBytes ;
427- this .orderingKey = orderingKey ;
428369 }
429370
430371 public int getAttempt () {
@@ -562,7 +503,7 @@ public static final class Builder {
562503 .setRpcTimeoutMultiplier (2 )
563504 .setMaxRpcTimeout (DEFAULT_RPC_TIMEOUT )
564505 .build ();
565- static final boolean DEFAULT_ENABLE_MESSAGE_ORDERING = false ;
506+
566507 private static final int THREADS_PER_CPU = 5 ;
567508 static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
568509 InstantiatingExecutorProvider .newBuilder ()
@@ -576,8 +517,6 @@ public static final class Builder {
576517
577518 RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS ;
578519
579- boolean enableMessageOrdering = DEFAULT_ENABLE_MESSAGE_ORDERING ;
580-
581520 TransportChannelProvider channelProvider =
582521 TopicAdminSettings .defaultGrpcTransportProviderBuilder ().setChannelsPerCpu (1 ).build ();
583522
@@ -672,12 +611,6 @@ public Builder setRetrySettings(RetrySettings retrySettings) {
672611 return this ;
673612 }
674613
675- /** Sets the message ordering option. */
676- public Builder setEnableMessageOrdering (boolean enableMessageOrdering ) {
677- this .enableMessageOrdering = enableMessageOrdering ;
678- return this ;
679- }
680-
681614 /** Gives the ability to set a custom executor to be used by the library. */
682615 public Builder setExecutorProvider (ExecutorProvider executorProvider ) {
683616 this .executorProvider = Preconditions .checkNotNull (executorProvider );
@@ -703,14 +636,9 @@ public Publisher build() throws IOException {
703636 private static class MessagesBatch {
704637 private List <OutstandingPublish > messages = new LinkedList <>();
705638 private int batchedBytes ;
706- private String orderingKey ;
707-
708- private MessagesBatch (String orderingKey ) {
709- this .orderingKey = orderingKey ;
710- }
711639
712640 private OutstandingBatch popOutstandingBatch () {
713- OutstandingBatch batch = new OutstandingBatch (messages , batchedBytes , orderingKey );
641+ OutstandingBatch batch = new OutstandingBatch (messages , batchedBytes );
714642 reset ();
715643 return batch ;
716644 }
0 commit comments