4949import com .google .pubsub .v1 .TopicNames ;
5050import java .io .IOException ;
5151import java .util .ArrayList ;
52+ import java .util .Collections ;
53+ import java .util .EnumSet ;
54+ import java .util .HashMap ;
5255import java .util .Iterator ;
5356import java .util .LinkedList ;
5457import java .util .List ;
58+ import java .util .Map ;
59+ import java .util .Set ;
60+ import java .util .concurrent .Callable ;
5561import java .util .concurrent .ScheduledExecutorService ;
5662import java .util .concurrent .ScheduledFuture ;
5763import java .util .concurrent .TimeUnit ;
@@ -85,15 +91,17 @@ public class Publisher {
8591 private final String topicName ;
8692
8793 private final BatchingSettings batchingSettings ;
94+ private final boolean enableMessageOrdering ;
8895
8996 private final Lock messagesBatchLock ;
90- private MessagesBatch messagesBatch ;
97+ private final Map < String , MessagesBatch > messagesBatches ;
9198
9299 private final AtomicBoolean activeAlarm ;
93100
94101 private final PublisherStub publisherStub ;
95102
96103 private final ScheduledExecutorService executor ;
104+ private final SequentialExecutorService .CallbackExecutor sequentialExecutor ;
97105 private final AtomicBoolean shutdown ;
98106 private final BackgroundResource backgroundResources ;
99107 private final MessageWaiter messagesWaiter ;
@@ -114,22 +122,46 @@ private Publisher(Builder builder) throws IOException {
114122 topicName = builder .topicName ;
115123
116124 this .batchingSettings = builder .batchingSettings ;
125+ this .enableMessageOrdering = builder .enableMessageOrdering ;
117126 this .messageTransform = builder .messageTransform ;
118127
119- messagesBatch = new MessagesBatch ( batchingSettings );
128+ messagesBatches = new HashMap <>( );
120129 messagesBatchLock = new ReentrantLock ();
121130 activeAlarm = new AtomicBoolean (false );
122131 executor = builder .executorProvider .getExecutor ();
132+ sequentialExecutor = new SequentialExecutorService .CallbackExecutor (executor );
123133 List <BackgroundResource > backgroundResourceList = new ArrayList <>();
124134 if (builder .executorProvider .shouldAutoClose ()) {
125135 backgroundResourceList .add (new ExecutorAsBackgroundResource (executor ));
126136 }
127137
128138 // Publisher used to take maxAttempt == 0 to mean infinity, but to GAX it means don't retry.
129139 // We post-process this here to keep backward-compatibility.
130- RetrySettings retrySettings = builder .retrySettings ;
131- if (retrySettings .getMaxAttempts () == 0 ) {
132- retrySettings = retrySettings .toBuilder ().setMaxAttempts (Integer .MAX_VALUE ).build ();
140+ // Also, if "message ordering" is enabled, the publisher should retry sending the failed
141+ // message infinitely rather than sending the next one.
142+ RetrySettings .Builder retrySettingsBuilder = builder .retrySettings .toBuilder ();
143+ if (retrySettingsBuilder .getMaxAttempts () == 0 ) {
144+ retrySettingsBuilder .setMaxAttempts (Integer .MAX_VALUE );
145+ }
146+ if (enableMessageOrdering ) {
147+ retrySettingsBuilder
148+ .setMaxAttempts (Integer .MAX_VALUE )
149+ .setTotalTimeout (Duration .ofNanos (Long .MAX_VALUE ));
150+ }
151+
152+ Set <StatusCode .Code > retryCodes ;
153+ if (enableMessageOrdering ) {
154+ retryCodes = EnumSet .allOf (StatusCode .Code .class );
155+ } else {
156+ retryCodes =
157+ EnumSet .of (
158+ StatusCode .Code .ABORTED ,
159+ StatusCode .Code .CANCELLED ,
160+ StatusCode .Code .DEADLINE_EXCEEDED ,
161+ StatusCode .Code .INTERNAL ,
162+ StatusCode .Code .RESOURCE_EXHAUSTED ,
163+ StatusCode .Code .UNKNOWN ,
164+ StatusCode .Code .UNAVAILABLE );
133165 }
134166
135167 PublisherStubSettings .Builder stubSettings =
@@ -139,15 +171,8 @@ private Publisher(Builder builder) throws IOException {
139171 .setTransportChannelProvider (builder .channelProvider );
140172 stubSettings
141173 .publishSettings ()
142- .setRetryableCodes (
143- StatusCode .Code .ABORTED ,
144- StatusCode .Code .CANCELLED ,
145- StatusCode .Code .DEADLINE_EXCEEDED ,
146- StatusCode .Code .INTERNAL ,
147- StatusCode .Code .RESOURCE_EXHAUSTED ,
148- StatusCode .Code .UNKNOWN ,
149- StatusCode .Code .UNAVAILABLE )
150- .setRetrySettings (retrySettings )
174+ .setRetryableCodes (retryCodes )
175+ .setRetrySettings (retrySettingsBuilder .build ())
151176 .setBatchingSettings (BatchingSettings .newBuilder ().setIsEnabled (false ).build ());
152177 this .publisherStub = GrpcPublisherStub .create (stubSettings .build ());
153178 backgroundResourceList .add (publisherStub );
@@ -196,13 +221,25 @@ public ApiFuture<String> publish(PubsubMessage message) {
196221 throw new IllegalStateException ("Cannot publish on a shut-down publisher." );
197222 }
198223
224+ final String orderingKey = message .getOrderingKey ();
225+ if (orderingKey != null && !orderingKey .isEmpty () && !enableMessageOrdering ) {
226+ throw new IllegalStateException (
227+ "Cannot publish a message with an ordering key when message ordering is not enabled." );
228+ }
229+
199230 final OutstandingPublish outstandingPublish =
200231 new OutstandingPublish (messageTransform .apply (message ));
201232 List <OutstandingBatch > batchesToSend ;
202233 messagesBatchLock .lock ();
203234 try {
235+ // Check if the next message makes the current batch exceed the max batch byte size.
236+ MessagesBatch messagesBatch = messagesBatches .get (orderingKey );
237+ if (messagesBatch == null ) {
238+ messagesBatch = new MessagesBatch (batchingSettings , orderingKey );
239+ messagesBatches .put (orderingKey , messagesBatch );
240+ }
241+
204242 batchesToSend = messagesBatch .add (outstandingPublish );
205- // Setup the next duration based delivery alarm if there are messages batched.
206243 setupAlarm ();
207244 } finally {
208245 messagesBatchLock .unlock ();
@@ -211,6 +248,7 @@ public ApiFuture<String> publish(PubsubMessage message) {
211248 messagesWaiter .incrementPendingMessages (1 );
212249
213250 if (!batchesToSend .isEmpty ()) {
251+ publishAllOutstanding ();
214252 for (final OutstandingBatch batch : batchesToSend ) {
215253 logger .log (Level .FINER , "Scheduling a batch for immediate sending." );
216254 executor .execute (
@@ -227,7 +265,7 @@ public void run() {
227265 }
228266
229267 private void setupAlarm () {
230- if (!messagesBatch .isEmpty ()) {
268+ if (!messagesBatches .isEmpty ()) {
231269 if (!activeAlarm .getAndSet (true )) {
232270 long delayThresholdMs = getBatchingSettings ().getDelayThreshold ().toMillis ();
233271 logger .log (Level .FINER , "Setting up alarm for the next {0} ms." , delayThresholdMs );
@@ -259,16 +297,20 @@ public void run() {
259297 */
260298 public void publishAllOutstanding () {
261299 messagesBatchLock .lock ();
262- OutstandingBatch batchToSend ;
263300 try {
264- if (messagesBatch .isEmpty ()) {
265- return ;
301+ for (MessagesBatch batch : messagesBatches .values ()) {
302+ if (!batch .isEmpty ()) {
303+ // TODO(kimkyung-goog): Do not release `messagesBatchLock` when publishing a batch. If
304+ // it's released, the order of publishing cannot be guaranteed if `publish()` is called
305+ // while this function is running. This locking mechanism needs to be improved if it
306+ // causes any performance degradation.
307+ publishOutstandingBatch (batch .popOutstandingBatch ());
308+ }
266309 }
267- batchToSend = messagesBatch . popOutstandingBatch ();
310+ messagesBatches . clear ();
268311 } finally {
269312 messagesBatchLock .unlock ();
270313 }
271- publishOutstandingBatch (batchToSend );
272314 }
273315
274316 private ApiFuture <PublishResponse > publishCall (OutstandingBatch outstandingBatch ) {
@@ -282,7 +324,7 @@ private ApiFuture<PublishResponse> publishCall(OutstandingBatch outstandingBatch
282324 }
283325
284326 private void publishOutstandingBatch (final OutstandingBatch outstandingBatch ) {
285- ApiFutureCallback <PublishResponse > futureCallback =
327+ final ApiFutureCallback <PublishResponse > futureCallback =
286328 new ApiFutureCallback <PublishResponse >() {
287329 @ Override
288330 public void onSuccess (PublishResponse result ) {
@@ -323,20 +365,36 @@ public void onFailure(Throwable t) {
323365 }
324366 };
325367
326- ApiFutures .addCallback (publishCall (outstandingBatch ), futureCallback , directExecutor ());
368+ if (outstandingBatch .orderingKey == null || outstandingBatch .orderingKey .isEmpty ()) {
369+ ApiFutures .addCallback (publishCall (outstandingBatch ), futureCallback , directExecutor ());
370+ } else {
371+ // If ordering key is specified, publish the batch using the sequential executor.
372+ sequentialExecutor .submit (
373+ outstandingBatch .orderingKey ,
374+ new Callable <ApiFuture <PublishResponse >>() {
375+ public ApiFuture <PublishResponse > call () {
376+ ApiFuture <PublishResponse > future = publishCall (outstandingBatch );
377+ ApiFutures .addCallback (future , futureCallback , directExecutor ());
378+ return future ;
379+ }
380+ });
381+ }
327382 }
328383
329384 private static final class OutstandingBatch {
330385 final List <OutstandingPublish > outstandingPublishes ;
331386 final long creationTime ;
332387 int attempt ;
333388 int batchSizeBytes ;
389+ final String orderingKey ;
334390
335- OutstandingBatch (List <OutstandingPublish > outstandingPublishes , int batchSizeBytes ) {
391+ OutstandingBatch (
392+ List <OutstandingPublish > outstandingPublishes , int batchSizeBytes , String orderingKey ) {
336393 this .outstandingPublishes = outstandingPublishes ;
337394 attempt = 1 ;
338395 creationTime = System .currentTimeMillis ();
339396 this .batchSizeBytes = batchSizeBytes ;
397+ this .orderingKey = orderingKey ;
340398 }
341399
342400 int size () {
@@ -460,7 +518,7 @@ public static final class Builder {
460518 .setRpcTimeoutMultiplier (2 )
461519 .setMaxRpcTimeout (DEFAULT_RPC_TIMEOUT )
462520 .build ();
463-
521+ static final boolean DEFAULT_ENABLE_MESSAGE_ORDERING = false ;
464522 private static final int THREADS_PER_CPU = 5 ;
465523 static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
466524 InstantiatingExecutorProvider .newBuilder ()
@@ -474,6 +532,8 @@ public static final class Builder {
474532
475533 RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS ;
476534
535+ boolean enableMessageOrdering = DEFAULT_ENABLE_MESSAGE_ORDERING ;
536+
477537 TransportChannelProvider channelProvider =
478538 TopicAdminSettings .defaultGrpcTransportProviderBuilder ().setChannelsPerCpu (1 ).build ();
479539
@@ -568,6 +628,12 @@ public Builder setRetrySettings(RetrySettings retrySettings) {
568628 return this ;
569629 }
570630
631+ /** Sets the message ordering option. */
632+ public Builder setEnableMessageOrdering (boolean enableMessageOrdering ) {
633+ this .enableMessageOrdering = enableMessageOrdering ;
634+ return this ;
635+ }
636+
571637 /** Gives the ability to set a custom executor to be used by the library. */
572638 public Builder setExecutorProvider (ExecutorProvider executorProvider ) {
573639 this .executorProvider = Preconditions .checkNotNull (executorProvider );
@@ -593,15 +659,17 @@ public Publisher build() throws IOException {
593659 private static class MessagesBatch {
594660 private List <OutstandingPublish > messages ;
595661 private int batchedBytes ;
662+ private String orderingKey ;
596663 private final BatchingSettings batchingSettings ;
597664
598- public MessagesBatch (BatchingSettings batchingSettings ) {
665+ private MessagesBatch (BatchingSettings batchingSettings , String orderingKey ) {
599666 this .batchingSettings = batchingSettings ;
667+ this .orderingKey = orderingKey ;
600668 reset ();
601669 }
602670
603671 private OutstandingBatch popOutstandingBatch () {
604- OutstandingBatch batch = new OutstandingBatch (messages , batchedBytes );
672+ OutstandingBatch batch = new OutstandingBatch (messages , batchedBytes , orderingKey );
605673 reset ();
606674 return batch ;
607675 }
0 commit comments