4848import com .google .pubsub .v1 .TopicNames ;
4949import java .io .IOException ;
5050import java .util .Collections ;
51+ import java .util .EnumSet ;
52+ import java .util .HashMap ;
5153import java .util .Iterator ;
5254import java .util .LinkedList ;
5355import java .util .List ;
56+ import java .util .Map ;
57+ import java .util .Set ;
58+ import java .util .concurrent .Callable ;
5459import java .util .concurrent .ScheduledExecutorService ;
5560import java .util .concurrent .ScheduledFuture ;
5661import java .util .concurrent .TimeUnit ;
@@ -84,15 +89,17 @@ public class Publisher {
8489 private final String topicName ;
8590
8691 private final BatchingSettings batchingSettings ;
92+ private final boolean enableMessageOrdering ;
8793
8894 private final Lock messagesBatchLock ;
89- private MessagesBatch messagesBatch ;
95+ private final Map < String , MessagesBatch > messagesBatches ;
9096
9197 private final AtomicBoolean activeAlarm ;
9298
9399 private final PublisherStub publisherStub ;
94100
95101 private final ScheduledExecutorService executor ;
102+ private final SequentialExecutorService .CallbackExecutor sequentialExecutor ;
96103 private final AtomicBoolean shutdown ;
97104 private final List <AutoCloseable > closeables ;
98105 private final MessageWaiter messagesWaiter ;
@@ -113,12 +120,14 @@ private Publisher(Builder builder) throws IOException {
113120 topicName = builder .topicName ;
114121
115122 this .batchingSettings = builder .batchingSettings ;
123+ this .enableMessageOrdering = builder .enableMessageOrdering ;
116124 this .messageTransform = builder .messageTransform ;
117125
118- messagesBatch = new MessagesBatch ();
126+ messagesBatches = new HashMap <> ();
119127 messagesBatchLock = new ReentrantLock ();
120128 activeAlarm = new AtomicBoolean (false );
121129 executor = builder .executorProvider .getExecutor ();
130+ sequentialExecutor = new SequentialExecutorService .CallbackExecutor (executor );
122131 if (builder .executorProvider .shouldAutoClose ()) {
123132 closeables =
124133 Collections .<AutoCloseable >singletonList (new ExecutorAsBackgroundResource (executor ));
@@ -128,9 +137,31 @@ private Publisher(Builder builder) throws IOException {
128137
129138 // Publisher used to take maxAttempt == 0 to mean infinity, but to GAX it means don't retry.
130139 // We post-process this here to keep backward-compatibility.
131- RetrySettings retrySettings = builder .retrySettings ;
132- if (retrySettings .getMaxAttempts () == 0 ) {
133- retrySettings = retrySettings .toBuilder ().setMaxAttempts (Integer .MAX_VALUE ).build ();
140+ // Also, if "message ordering" is enabled, the publisher should retry sending the failed
141+ // message infinitely rather than sending the next one.
142+ RetrySettings .Builder retrySettingsBuilder = builder .retrySettings .toBuilder ();
143+ if (retrySettingsBuilder .getMaxAttempts () == 0 ) {
144+ retrySettingsBuilder .setMaxAttempts (Integer .MAX_VALUE );
145+ }
146+ if (enableMessageOrdering ) {
147+ retrySettingsBuilder
148+ .setMaxAttempts (Integer .MAX_VALUE )
149+ .setTotalTimeout (Duration .ofNanos (Long .MAX_VALUE ));
150+ }
151+
152+ Set <StatusCode .Code > retryCodes ;
153+ if (enableMessageOrdering ) {
154+ retryCodes = EnumSet .allOf (StatusCode .Code .class );
155+ } else {
156+ retryCodes =
157+ EnumSet .of (
158+ StatusCode .Code .ABORTED ,
159+ StatusCode .Code .CANCELLED ,
160+ StatusCode .Code .DEADLINE_EXCEEDED ,
161+ StatusCode .Code .INTERNAL ,
162+ StatusCode .Code .RESOURCE_EXHAUSTED ,
163+ StatusCode .Code .UNKNOWN ,
164+ StatusCode .Code .UNAVAILABLE );
134165 }
135166
136167 PublisherStubSettings .Builder stubSettings =
@@ -140,15 +171,8 @@ private Publisher(Builder builder) throws IOException {
140171 .setTransportChannelProvider (builder .channelProvider );
141172 stubSettings
142173 .publishSettings ()
143- .setRetryableCodes (
144- StatusCode .Code .ABORTED ,
145- StatusCode .Code .CANCELLED ,
146- StatusCode .Code .DEADLINE_EXCEEDED ,
147- StatusCode .Code .INTERNAL ,
148- StatusCode .Code .RESOURCE_EXHAUSTED ,
149- StatusCode .Code .UNKNOWN ,
150- StatusCode .Code .UNAVAILABLE )
151- .setRetrySettings (retrySettings )
174+ .setRetryableCodes (retryCodes )
175+ .setRetrySettings (retrySettingsBuilder .build ())
152176 .setBatchingSettings (BatchingSettings .newBuilder ().setIsEnabled (false ).build ());
153177 this .publisherStub = GrpcPublisherStub .create (stubSettings .build ());
154178
@@ -196,6 +220,12 @@ public ApiFuture<String> publish(PubsubMessage message) {
196220 throw new IllegalStateException ("Cannot publish on a shut-down publisher." );
197221 }
198222
223+ final String orderingKey = message .getOrderingKey ();
224+ if (orderingKey != null && !orderingKey .isEmpty () && !enableMessageOrdering ) {
225+ throw new IllegalStateException (
226+ "Cannot publish a message with an ordering key when message ordering is not enabled." );
227+ }
228+
199229 message = messageTransform .apply (message );
200230 final int messageSize = message .getSerializedSize ();
201231 OutstandingBatch batchToSend = null ;
@@ -204,30 +234,38 @@ public ApiFuture<String> publish(PubsubMessage message) {
204234 messagesBatchLock .lock ();
205235 try {
206236 // Check if the next message makes the current batch exceed the max batch byte size.
207- if (!messagesBatch .isEmpty ()
237+ MessagesBatch messageBatch = messagesBatches .get (orderingKey );
238+ if (messageBatch == null ) {
239+ messageBatch = new MessagesBatch (orderingKey );
240+ messagesBatches .put (orderingKey , messageBatch );
241+ }
242+ if (!messageBatch .isEmpty ()
208243 && hasBatchingBytes ()
209- && messagesBatch .getBatchedBytes () + messageSize >= getMaxBatchBytes ()) {
210- batchToSend = messagesBatch .popOutstandingBatch ();
244+ && messageBatch .getBatchedBytes () + messageSize >= getMaxBatchBytes ()) {
245+ batchToSend = messageBatch .popOutstandingBatch ();
211246 }
212247
213248 // Border case if the message to send is greater or equals to the max batch size then can't
214249 // be included in the current batch and instead sent immediately.
215250 if (!hasBatchingBytes () || messageSize < getMaxBatchBytes ()) {
216- messagesBatch .addMessage (outstandingPublish , messageSize );
217-
251+ messageBatch .addMessage (outstandingPublish , messageSize );
218252 // If after adding the message we have reached the batch max messages then we have a batch
219253 // to send.
220- if (messagesBatch .getMessagesCount () == getBatchingSettings ().getElementCountThreshold ()) {
221- batchToSend = messagesBatch .popOutstandingBatch ();
254+ if (messageBatch .getMessagesCount () == getBatchingSettings ().getElementCountThreshold ()) {
255+ batchToSend = messageBatch .popOutstandingBatch ();
222256 }
223257 }
258+
224259 // Setup the next duration based delivery alarm if there are messages batched.
225- if (!messagesBatch .isEmpty ()) {
260+ if (!messageBatch .isEmpty ()) {
226261 setupDurationBasedPublishAlarm ();
227- } else if (currentAlarmFuture != null ) {
228- logger .log (Level .FINER , "Cancelling alarm, no more messages" );
229- if (activeAlarm .getAndSet (false )) {
230- currentAlarmFuture .cancel (false );
262+ } else {
263+ messagesBatches .remove (orderingKey );
264+ if (currentAlarmFuture != null ) {
265+ logger .log (Level .FINER , "Cancelling alarm, no more messages" );
266+ if (activeAlarm .getAndSet (false )) {
267+ currentAlarmFuture .cancel (false );
268+ }
231269 }
232270 }
233271 } finally {
@@ -238,29 +276,18 @@ && hasBatchingBytes()
238276
239277 if (batchToSend != null ) {
240278 logger .log (Level .FINER , "Scheduling a batch for immediate sending." );
241- final OutstandingBatch finalBatchToSend = batchToSend ;
242- executor .execute (
243- new Runnable () {
244- @ Override
245- public void run () {
246- publishOutstandingBatch (finalBatchToSend );
247- }
248- });
279+ publishAllOutstanding ();
280+ publishOutstandingBatch (batchToSend );
249281 }
250282
251283 // If the message is over the size limit, it was not added to the pending messages and it will
252284 // be sent in its own batch immediately.
253285 if (hasBatchingBytes () && messageSize >= getMaxBatchBytes ()) {
254286 logger .log (
255287 Level .FINER , "Message exceeds the max batch bytes, scheduling it for immediate send." );
256- executor .execute (
257- new Runnable () {
258- @ Override
259- public void run () {
260- publishOutstandingBatch (
261- new OutstandingBatch (ImmutableList .of (outstandingPublish ), messageSize ));
262- }
263- });
288+ publishAllOutstanding ();
289+ publishOutstandingBatch (
290+ new OutstandingBatch (ImmutableList .of (outstandingPublish ), messageSize , orderingKey ));
264291 }
265292
266293 return publishResult ;
@@ -292,16 +319,20 @@ public void run() {
292319 */
293320 public void publishAllOutstanding () {
294321 messagesBatchLock .lock ();
295- OutstandingBatch batchToSend ;
296322 try {
297- if (messagesBatch .isEmpty ()) {
298- return ;
323+ for (MessagesBatch batch : messagesBatches .values ()) {
324+ if (!batch .isEmpty ()) {
325+ // TODO(kimkyung-goog): Do not release `messagesBatchLock` when publishing a batch. If
326+ // it's released, the order of publishing cannot be guaranteed if `publish()` is called
327+ // while this function is running. This locking mechanism needs to be improved if it
328+ // causes any performance degradation.
329+ publishOutstandingBatch (batch .popOutstandingBatch ());
330+ }
299331 }
300- batchToSend = messagesBatch . popOutstandingBatch ();
332+ messagesBatches . clear ();
301333 } finally {
302334 messagesBatchLock .unlock ();
303335 }
304- publishOutstandingBatch (batchToSend );
305336 }
306337
307338 private ApiFuture <PublishResponse > publishCall (OutstandingBatch outstandingBatch ) {
@@ -310,12 +341,11 @@ private ApiFuture<PublishResponse> publishCall(OutstandingBatch outstandingBatch
310341 for (OutstandingPublish outstandingPublish : outstandingBatch .outstandingPublishes ) {
311342 publishRequest .addMessages (outstandingPublish .message );
312343 }
313-
314344 return publisherStub .publishCallable ().futureCall (publishRequest .build ());
315345 }
316346
317347 private void publishOutstandingBatch (final OutstandingBatch outstandingBatch ) {
318- ApiFutureCallback <PublishResponse > futureCallback =
348+ final ApiFutureCallback <PublishResponse > futureCallback =
319349 new ApiFutureCallback <PublishResponse >() {
320350 @ Override
321351 public void onSuccess (PublishResponse result ) {
@@ -356,20 +386,45 @@ public void onFailure(Throwable t) {
356386 }
357387 };
358388
359- ApiFutures .addCallback (publishCall (outstandingBatch ), futureCallback , directExecutor ());
389+ if (outstandingBatch .orderingKey == null || outstandingBatch .orderingKey .isEmpty ()) {
390+ // If ordering key is empty, publish the batch using the normal executor.
391+ Runnable task =
392+ new Runnable () {
393+ public void run () {
394+ ApiFutures .addCallback (
395+ publishCall (outstandingBatch ), futureCallback , directExecutor ());
396+ }
397+ };
398+ executor .execute (task );
399+ } else {
400+ // If ordering key is specified, publish the batch using the sequential executor.
401+ Callable <ApiFuture <PublishResponse >> func =
402+ new Callable <ApiFuture <PublishResponse >>() {
403+ public ApiFuture <PublishResponse > call () {
404+ return publishCall (outstandingBatch );
405+ }
406+ };
407+ ApiFutures .addCallback (
408+ sequentialExecutor .submit (outstandingBatch .orderingKey , func ),
409+ futureCallback ,
410+ directExecutor ());
411+ }
360412 }
361413
362414 private static final class OutstandingBatch {
363415 final List <OutstandingPublish > outstandingPublishes ;
364416 final long creationTime ;
365417 int attempt ;
366418 int batchSizeBytes ;
419+ final String orderingKey ;
367420
368- OutstandingBatch (List <OutstandingPublish > outstandingPublishes , int batchSizeBytes ) {
421+ OutstandingBatch (
422+ List <OutstandingPublish > outstandingPublishes , int batchSizeBytes , String orderingKey ) {
369423 this .outstandingPublishes = outstandingPublishes ;
370424 attempt = 1 ;
371425 creationTime = System .currentTimeMillis ();
372426 this .batchSizeBytes = batchSizeBytes ;
427+ this .orderingKey = orderingKey ;
373428 }
374429
375430 public int getAttempt () {
@@ -507,7 +562,7 @@ public static final class Builder {
507562 .setRpcTimeoutMultiplier (2 )
508563 .setMaxRpcTimeout (DEFAULT_RPC_TIMEOUT )
509564 .build ();
510-
565+ static final boolean DEFAULT_ENABLE_MESSAGE_ORDERING = false ;
511566 private static final int THREADS_PER_CPU = 5 ;
512567 static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
513568 InstantiatingExecutorProvider .newBuilder ()
@@ -521,6 +576,8 @@ public static final class Builder {
521576
522577 RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS ;
523578
579+ boolean enableMessageOrdering = DEFAULT_ENABLE_MESSAGE_ORDERING ;
580+
524581 TransportChannelProvider channelProvider =
525582 TopicAdminSettings .defaultGrpcTransportProviderBuilder ().setChannelsPerCpu (1 ).build ();
526583
@@ -615,6 +672,12 @@ public Builder setRetrySettings(RetrySettings retrySettings) {
615672 return this ;
616673 }
617674
675+ /** Sets the message ordering option. */
676+ public Builder setEnableMessageOrdering (boolean enableMessageOrdering ) {
677+ this .enableMessageOrdering = enableMessageOrdering ;
678+ return this ;
679+ }
680+
618681 /** Gives the ability to set a custom executor to be used by the library. */
619682 public Builder setExecutorProvider (ExecutorProvider executorProvider ) {
620683 this .executorProvider = Preconditions .checkNotNull (executorProvider );
@@ -640,9 +703,14 @@ public Publisher build() throws IOException {
640703 private static class MessagesBatch {
641704 private List <OutstandingPublish > messages = new LinkedList <>();
642705 private int batchedBytes ;
706+ private String orderingKey ;
707+
708+ private MessagesBatch (String orderingKey ) {
709+ this .orderingKey = orderingKey ;
710+ }
643711
644712 private OutstandingBatch popOutstandingBatch () {
645- OutstandingBatch batch = new OutstandingBatch (messages , batchedBytes );
713+ OutstandingBatch batch = new OutstandingBatch (messages , batchedBytes , orderingKey );
646714 reset ();
647715 return batch ;
648716 }
0 commit comments