4848import java .io .IOException ;
4949import java .util .ArrayList ;
5050import java .util .Collections ;
51+ import java .util .EnumSet ;
52+ import java .util .HashMap ;
5153import java .util .Iterator ;
5254import java .util .LinkedList ;
5355import java .util .List ;
56+ import java .util .Map ;
57+ import java .util .Set ;
58+ import java .util .concurrent .Callable ;
5459import java .util .concurrent .ScheduledExecutorService ;
5560import java .util .concurrent .ScheduledFuture ;
5661import java .util .concurrent .TimeUnit ;
@@ -84,15 +89,17 @@ public class Publisher {
8489 private final String topicName ;
8590
8691 private final BatchingSettings batchingSettings ;
92+ private final boolean enableMessageOrdering ;
8793
8894 private final Lock messagesBatchLock ;
89- private MessagesBatch messagesBatch ;
95+ private final Map < String , MessagesBatch > messagesBatches ;
9096
9197 private final AtomicBoolean activeAlarm ;
9298
9399 private final PublisherStub publisherStub ;
94100
95101 private final ScheduledExecutorService executor ;
102+ private final SequentialExecutorService .CallbackExecutor sequentialExecutor ;
96103 private final AtomicBoolean shutdown ;
97104 private final List <AutoCloseable > closeables ;
98105 private final MessageWaiter messagesWaiter ;
@@ -113,12 +120,14 @@ private Publisher(Builder builder) throws IOException {
113120 topicName = builder .topicName ;
114121
115122 this .batchingSettings = builder .batchingSettings ;
123+ this .enableMessageOrdering = builder .enableMessageOrdering ;
116124 this .messageTransform = builder .messageTransform ;
117125
118- messagesBatch = new MessagesBatch ();
126+ messagesBatches = new HashMap <> ();
119127 messagesBatchLock = new ReentrantLock ();
120128 activeAlarm = new AtomicBoolean (false );
121129 executor = builder .executorProvider .getExecutor ();
130+ sequentialExecutor = new SequentialExecutorService .CallbackExecutor (executor );
122131 if (builder .executorProvider .shouldAutoClose ()) {
123132 closeables =
124133 Collections .<AutoCloseable >singletonList (new ExecutorAsBackgroundResource (executor ));
@@ -128,9 +137,31 @@ private Publisher(Builder builder) throws IOException {
128137
129138 // Publisher used to take maxAttempt == 0 to mean infinity, but to GAX it means don't retry.
130139 // We post-process this here to keep backward-compatibility.
131- RetrySettings retrySettings = builder .retrySettings ;
132- if (retrySettings .getMaxAttempts () == 0 ) {
133- retrySettings = retrySettings .toBuilder ().setMaxAttempts (Integer .MAX_VALUE ).build ();
140+ // Also, if "message ordering" is enabled, the publisher should retry sending the failed
141+ // message infinitely rather than sending the next one.
142+ RetrySettings .Builder retrySettingsBuilder = builder .retrySettings .toBuilder ();
143+ if (retrySettingsBuilder .getMaxAttempts () == 0 ) {
144+ retrySettingsBuilder .setMaxAttempts (Integer .MAX_VALUE );
145+ }
146+ if (enableMessageOrdering ) {
147+ retrySettingsBuilder
148+ .setMaxAttempts (Integer .MAX_VALUE )
149+ .setTotalTimeout (Duration .ofNanos (Long .MAX_VALUE ));
150+ }
151+
152+ Set <StatusCode .Code > retryCodes ;
153+ if (enableMessageOrdering ) {
154+ retryCodes = EnumSet .allOf (StatusCode .Code .class );
155+ } else {
156+ retryCodes =
157+ EnumSet .of (
158+ StatusCode .Code .ABORTED ,
159+ StatusCode .Code .CANCELLED ,
160+ StatusCode .Code .DEADLINE_EXCEEDED ,
161+ StatusCode .Code .INTERNAL ,
162+ StatusCode .Code .RESOURCE_EXHAUSTED ,
163+ StatusCode .Code .UNKNOWN ,
164+ StatusCode .Code .UNAVAILABLE );
134165 }
135166
136167 PublisherStubSettings .Builder stubSettings =
@@ -140,15 +171,8 @@ private Publisher(Builder builder) throws IOException {
140171 .setTransportChannelProvider (builder .channelProvider );
141172 stubSettings
142173 .publishSettings ()
143- .setRetryableCodes (
144- StatusCode .Code .ABORTED ,
145- StatusCode .Code .CANCELLED ,
146- StatusCode .Code .DEADLINE_EXCEEDED ,
147- StatusCode .Code .INTERNAL ,
148- StatusCode .Code .RESOURCE_EXHAUSTED ,
149- StatusCode .Code .UNKNOWN ,
150- StatusCode .Code .UNAVAILABLE )
151- .setRetrySettings (retrySettings )
174+ .setRetryableCodes (retryCodes )
175+ .setRetrySettings (retrySettingsBuilder .build ())
152176 .setBatchingSettings (BatchingSettings .newBuilder ().setIsEnabled (false ).build ());
153177 this .publisherStub = GrpcPublisherStub .create (stubSettings .build ());
154178
@@ -196,12 +220,23 @@ public ApiFuture<String> publish(PubsubMessage message) {
196220 throw new IllegalStateException ("Cannot publish on a shut-down publisher." );
197221 }
198222
223+ final String orderingKey = message .getOrderingKey ();
224+ if (orderingKey != null && !orderingKey .isEmpty () && !enableMessageOrdering ) {
225+ throw new IllegalStateException (
226+ "Cannot publish a message with an ordering key when message ordering is not enabled." );
227+ }
228+
199229 message = messageTransform .apply (message );
200230 List <OutstandingBatch > batchesToSend = new ArrayList <>();
201231 final OutstandingPublish outstandingPublish = new OutstandingPublish (message );
202232 messagesBatchLock .lock ();
203233 try {
204234 // Check if the next message makes the current batch exceed the max batch byte size.
235+ MessagesBatch messagesBatch = messagesBatches .get (orderingKey );
236+ if (messagesBatch == null ) {
237+ messagesBatch = new MessagesBatch (orderingKey );
238+ messagesBatches .put (orderingKey , messagesBatch );
239+ }
205240 if (!messagesBatch .isEmpty ()
206241 && hasBatchingBytes ()
207242 && messagesBatch .getBatchedBytes () + outstandingPublish .messageSize
@@ -219,6 +254,7 @@ && hasBatchingBytes()
219254 || messagesBatch .getMessagesCount () == getBatchingSettings ().getElementCountThreshold ()) {
220255 batchesToSend .add (messagesBatch .popOutstandingBatch ());
221256 }
257+
222258 // Setup the next duration based delivery alarm if there are messages batched.
223259 setupAlarm ();
224260 } finally {
@@ -227,7 +263,9 @@ && hasBatchingBytes()
227263
228264 messagesWaiter .incrementPendingMessages (1 );
229265
266+
230267 if (!batchesToSend .isEmpty ()) {
268+ publishAllOutstanding ();
231269 for (final OutstandingBatch batch : batchesToSend ) {
232270 logger .log (Level .FINER , "Scheduling a batch for immediate sending." );
233271 executor .execute (
@@ -244,7 +282,7 @@ public void run() {
244282 }
245283
246284 private void setupAlarm () {
247- if (!messagesBatch .isEmpty ()) {
285+ if (!messagesBatches .isEmpty ()) {
248286 if (!activeAlarm .getAndSet (true )) {
249287 long delayThresholdMs = getBatchingSettings ().getDelayThreshold ().toMillis ();
250288 logger .log (Level .FINER , "Setting up alarm for the next {0} ms." , delayThresholdMs );
@@ -276,16 +314,20 @@ public void run() {
276314 */
277315 public void publishAllOutstanding () {
278316 messagesBatchLock .lock ();
279- OutstandingBatch batchToSend ;
280317 try {
281- if (messagesBatch .isEmpty ()) {
282- return ;
318+ for (MessagesBatch batch : messagesBatches .values ()) {
319+ if (!batch .isEmpty ()) {
320+ // TODO(kimkyung-goog): Do not release `messagesBatchLock` when publishing a batch. If
321+ // it's released, the order of publishing cannot be guaranteed if `publish()` is called
322+ // while this function is running. This locking mechanism needs to be improved if it
323+ // causes any performance degradation.
324+ publishOutstandingBatch (batch .popOutstandingBatch ());
325+ }
283326 }
284- batchToSend = messagesBatch . popOutstandingBatch ();
327+ messagesBatches . clear ();
285328 } finally {
286329 messagesBatchLock .unlock ();
287330 }
288- publishOutstandingBatch (batchToSend );
289331 }
290332
291333 private ApiFuture <PublishResponse > publishCall (OutstandingBatch outstandingBatch ) {
@@ -294,12 +336,11 @@ private ApiFuture<PublishResponse> publishCall(OutstandingBatch outstandingBatch
294336 for (OutstandingPublish outstandingPublish : outstandingBatch .outstandingPublishes ) {
295337 publishRequest .addMessages (outstandingPublish .message );
296338 }
297-
298339 return publisherStub .publishCallable ().futureCall (publishRequest .build ());
299340 }
300341
301342 private void publishOutstandingBatch (final OutstandingBatch outstandingBatch ) {
302- ApiFutureCallback <PublishResponse > futureCallback =
343+ final ApiFutureCallback <PublishResponse > futureCallback =
303344 new ApiFutureCallback <PublishResponse >() {
304345 @ Override
305346 public void onSuccess (PublishResponse result ) {
@@ -340,20 +381,36 @@ public void onFailure(Throwable t) {
340381 }
341382 };
342383
343- ApiFutures .addCallback (publishCall (outstandingBatch ), futureCallback , directExecutor ());
384+ if (outstandingBatch .orderingKey == null || outstandingBatch .orderingKey .isEmpty ()) {
385+ ApiFutures .addCallback (publishCall (outstandingBatch ), futureCallback , directExecutor ());
386+ } else {
387+ // If ordering key is specified, publish the batch using the sequential executor.
388+ sequentialExecutor .submit (
389+ outstandingBatch .orderingKey ,
390+ new Callable <ApiFuture <PublishResponse >>() {
391+ public ApiFuture <PublishResponse > call () {
392+ ApiFuture <PublishResponse > future = publishCall (outstandingBatch );
393+ ApiFutures .addCallback (future , futureCallback , directExecutor ());
394+ return future ;
395+ }
396+ });
397+ }
344398 }
345399
346400 private static final class OutstandingBatch {
347401 final List <OutstandingPublish > outstandingPublishes ;
348402 final long creationTime ;
349403 int attempt ;
350404 int batchSizeBytes ;
405+ final String orderingKey ;
351406
352- OutstandingBatch (List <OutstandingPublish > outstandingPublishes , int batchSizeBytes ) {
407+ OutstandingBatch (
408+ List <OutstandingPublish > outstandingPublishes , int batchSizeBytes , String orderingKey ) {
353409 this .outstandingPublishes = outstandingPublishes ;
354410 attempt = 1 ;
355411 creationTime = System .currentTimeMillis ();
356412 this .batchSizeBytes = batchSizeBytes ;
413+ this .orderingKey = orderingKey ;
357414 }
358415
359416 int size () {
@@ -489,7 +546,7 @@ public static final class Builder {
489546 .setRpcTimeoutMultiplier (2 )
490547 .setMaxRpcTimeout (DEFAULT_RPC_TIMEOUT )
491548 .build ();
492-
549+ static final boolean DEFAULT_ENABLE_MESSAGE_ORDERING = false ;
493550 private static final int THREADS_PER_CPU = 5 ;
494551 static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
495552 InstantiatingExecutorProvider .newBuilder ()
@@ -503,6 +560,8 @@ public static final class Builder {
503560
504561 RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS ;
505562
563+ boolean enableMessageOrdering = DEFAULT_ENABLE_MESSAGE_ORDERING ;
564+
506565 TransportChannelProvider channelProvider =
507566 TopicAdminSettings .defaultGrpcTransportProviderBuilder ().setChannelsPerCpu (1 ).build ();
508567
@@ -597,6 +656,12 @@ public Builder setRetrySettings(RetrySettings retrySettings) {
597656 return this ;
598657 }
599658
659+ /** Sets the message ordering option. */
660+ public Builder setEnableMessageOrdering (boolean enableMessageOrdering ) {
661+ this .enableMessageOrdering = enableMessageOrdering ;
662+ return this ;
663+ }
664+
600665 /** Gives the ability to set a custom executor to be used by the library. */
601666 public Builder setExecutorProvider (ExecutorProvider executorProvider ) {
602667 this .executorProvider = Preconditions .checkNotNull (executorProvider );
@@ -622,9 +687,14 @@ public Publisher build() throws IOException {
622687 private static class MessagesBatch {
623688 private List <OutstandingPublish > messages = new LinkedList <>();
624689 private int batchedBytes ;
690+ private String orderingKey ;
691+
692+ private MessagesBatch (String orderingKey ) {
693+ this .orderingKey = orderingKey ;
694+ }
625695
626696 private OutstandingBatch popOutstandingBatch () {
627- OutstandingBatch batch = new OutstandingBatch (messages , batchedBytes );
697+ OutstandingBatch batch = new OutstandingBatch (messages , batchedBytes , orderingKey );
628698 reset ();
629699 return batch ;
630700 }
0 commit comments