4949import com .google .pubsub .v1 .TopicNames ;
5050import java .io .IOException ;
5151import java .util .ArrayList ;
52+ import java .util .EnumSet ;
53+ import java .util .HashMap ;
5254import java .util .Iterator ;
5355import java .util .LinkedList ;
5456import java .util .List ;
57+ import java .util .Map ;
58+ import java .util .Set ;
59+ import java .util .concurrent .Callable ;
5560import java .util .concurrent .ScheduledExecutorService ;
5661import java .util .concurrent .ScheduledFuture ;
5762import java .util .concurrent .TimeUnit ;
@@ -85,15 +90,17 @@ public class Publisher {
8590 private final String topicName ;
8691
8792 private final BatchingSettings batchingSettings ;
93+ private final boolean enableMessageOrdering ;
8894
8995 private final Lock messagesBatchLock ;
90- private MessagesBatch messagesBatch ;
96+ final Map < String , MessagesBatch > messagesBatches ;
9197
9298 private final AtomicBoolean activeAlarm ;
9399
94100 private final PublisherStub publisherStub ;
95101
96102 private final ScheduledExecutorService executor ;
103+ final SequentialExecutorService .CallbackExecutor sequentialExecutor ;
97104 private final AtomicBoolean shutdown ;
98105 private final BackgroundResource backgroundResources ;
99106 private final MessageWaiter messagesWaiter ;
@@ -114,22 +121,46 @@ private Publisher(Builder builder) throws IOException {
114121 topicName = builder .topicName ;
115122
116123 this .batchingSettings = builder .batchingSettings ;
124+ this .enableMessageOrdering = builder .enableMessageOrdering ;
117125 this .messageTransform = builder .messageTransform ;
118126
119- messagesBatch = new MessagesBatch ( batchingSettings );
127+ messagesBatches = new HashMap <>( );
120128 messagesBatchLock = new ReentrantLock ();
121129 activeAlarm = new AtomicBoolean (false );
122130 executor = builder .executorProvider .getExecutor ();
131+ sequentialExecutor = new SequentialExecutorService .CallbackExecutor (executor );
123132 List <BackgroundResource > backgroundResourceList = new ArrayList <>();
124133 if (builder .executorProvider .shouldAutoClose ()) {
125134 backgroundResourceList .add (new ExecutorAsBackgroundResource (executor ));
126135 }
127136
128137 // Publisher used to take maxAttempt == 0 to mean infinity, but to GAX it means don't retry.
129138 // We post-process this here to keep backward-compatibility.
130- RetrySettings retrySettings = builder .retrySettings ;
131- if (retrySettings .getMaxAttempts () == 0 ) {
132- retrySettings = retrySettings .toBuilder ().setMaxAttempts (Integer .MAX_VALUE ).build ();
139+ // Also, if "message ordering" is enabled, the publisher should retry sending the failed
140+ // message infinitely rather than sending the next one.
141+ RetrySettings .Builder retrySettingsBuilder = builder .retrySettings .toBuilder ();
142+ if (retrySettingsBuilder .getMaxAttempts () == 0 ) {
143+ retrySettingsBuilder .setMaxAttempts (Integer .MAX_VALUE );
144+ }
145+ if (enableMessageOrdering ) {
146+ retrySettingsBuilder
147+ .setMaxAttempts (Integer .MAX_VALUE )
148+ .setTotalTimeout (Duration .ofNanos (Long .MAX_VALUE ));
149+ }
150+
151+ Set <StatusCode .Code > retryCodes ;
152+ if (enableMessageOrdering ) {
153+ retryCodes = EnumSet .allOf (StatusCode .Code .class );
154+ } else {
155+ retryCodes =
156+ EnumSet .of (
157+ StatusCode .Code .ABORTED ,
158+ StatusCode .Code .CANCELLED ,
159+ StatusCode .Code .DEADLINE_EXCEEDED ,
160+ StatusCode .Code .INTERNAL ,
161+ StatusCode .Code .RESOURCE_EXHAUSTED ,
162+ StatusCode .Code .UNKNOWN ,
163+ StatusCode .Code .UNAVAILABLE );
133164 }
134165
135166 PublisherStubSettings .Builder stubSettings =
@@ -139,15 +170,8 @@ private Publisher(Builder builder) throws IOException {
139170 .setTransportChannelProvider (builder .channelProvider );
140171 stubSettings
141172 .publishSettings ()
142- .setRetryableCodes (
143- StatusCode .Code .ABORTED ,
144- StatusCode .Code .CANCELLED ,
145- StatusCode .Code .DEADLINE_EXCEEDED ,
146- StatusCode .Code .INTERNAL ,
147- StatusCode .Code .RESOURCE_EXHAUSTED ,
148- StatusCode .Code .UNKNOWN ,
149- StatusCode .Code .UNAVAILABLE )
150- .setRetrySettings (retrySettings )
173+ .setRetryableCodes (retryCodes )
174+ .setRetrySettings (retrySettingsBuilder .build ())
151175 .setBatchingSettings (BatchingSettings .newBuilder ().setIsEnabled (false ).build ());
152176 this .publisherStub = GrpcPublisherStub .create (stubSettings .build ());
153177 backgroundResourceList .add (publisherStub );
@@ -194,13 +218,27 @@ public String getTopicNameString() {
194218 public ApiFuture <String > publish (PubsubMessage message ) {
195219 Preconditions .checkState (!shutdown .get (), "Cannot publish on a shut-down publisher." );
196220
221+ final String orderingKey = message .getOrderingKey ();
222+ Preconditions .checkState (
223+ orderingKey .isEmpty () || enableMessageOrdering ,
224+ "Cannot publish a message with an ordering key when message ordering is not enabled." );
225+
197226 final OutstandingPublish outstandingPublish =
198227 new OutstandingPublish (messageTransform .apply (message ));
199228 List <OutstandingBatch > batchesToSend ;
200229 messagesBatchLock .lock ();
201230 try {
231+ // Check if the next message makes the current batch exceed the max batch byte size.
232+ MessagesBatch messagesBatch = messagesBatches .get (orderingKey );
233+ if (messagesBatch == null ) {
234+ messagesBatch = new MessagesBatch (batchingSettings , orderingKey );
235+ messagesBatches .put (orderingKey , messagesBatch );
236+ }
237+
202238 batchesToSend = messagesBatch .add (outstandingPublish );
203- // Setup the next duration based delivery alarm if there are messages batched.
239+ if (!batchesToSend .isEmpty () && messagesBatch .isEmpty ()) {
240+ messagesBatches .remove (orderingKey );
241+ }
204242 setupAlarm ();
205243 } finally {
206244 messagesBatchLock .unlock ();
@@ -209,6 +247,7 @@ public ApiFuture<String> publish(PubsubMessage message) {
209247 messagesWaiter .incrementPendingMessages (1 );
210248
211249 if (!batchesToSend .isEmpty ()) {
250+ publishAllWithoutInflight ();
212251 for (final OutstandingBatch batch : batchesToSend ) {
213252 logger .log (Level .FINER , "Scheduling a batch for immediate sending." );
214253 executor .execute (
@@ -225,7 +264,7 @@ public void run() {
225264 }
226265
227266 private void setupAlarm () {
228- if (!messagesBatch .isEmpty ()) {
267+ if (!messagesBatches .isEmpty ()) {
229268 if (!activeAlarm .getAndSet (true )) {
230269 long delayThresholdMs = getBatchingSettings ().getDelayThreshold ().toMillis ();
231270 logger .log (Level .FINER , "Setting up alarm for the next {0} ms." , delayThresholdMs );
@@ -236,7 +275,7 @@ private void setupAlarm() {
236275 public void run () {
237276 logger .log (Level .FINER , "Sending messages based on schedule." );
238277 activeAlarm .getAndSet (false );
239- publishAllOutstanding ();
278+ publishAllWithoutInflight ();
240279 }
241280 },
242281 delayThresholdMs ,
@@ -257,16 +296,49 @@ public void run() {
257296 */
258297 public void publishAllOutstanding () {
259298 messagesBatchLock .lock ();
260- OutstandingBatch batchToSend ;
261299 try {
262- if (messagesBatch .isEmpty ()) {
263- return ;
300+ for (MessagesBatch batch : messagesBatches .values ()) {
301+ if (!batch .isEmpty ()) {
302+ // TODO(kimkyung-goog): Do not release `messagesBatchLock` when publishing a batch. If
303+ // it's released, the order of publishing cannot be guaranteed if `publish()` is called
304+ // while this function is running. This locking mechanism needs to be improved if it
305+ // causes any performance degradation.
306+ publishOutstandingBatch (batch .popOutstandingBatch ());
307+ }
308+ }
309+ messagesBatches .clear ();
310+ } finally {
311+ messagesBatchLock .unlock ();
312+ }
313+ }
314+
315+ /**
316+ * Publish any outstanding batches if non-empty and there are no other batches in flight. This
317+ * method sends buffered messages, but does not wait for the send operations to complete. To wait
318+ * for messages to send, call {@code get} on the futures returned from {@code publish}.
319+ */
320+ private void publishAllWithoutInflight () {
321+ messagesBatchLock .lock ();
322+ try {
323+ Iterator <Map .Entry <String , MessagesBatch >> it = messagesBatches .entrySet ().iterator ();
324+ while (it .hasNext ()) {
325+ Map .Entry <String , MessagesBatch > entry = it .next ();
326+ MessagesBatch batch = entry .getValue ();
327+ String key = entry .getKey ();
328+ if (batch .isEmpty ()) {
329+ it .remove ();
330+ } else if (key .isEmpty () || !sequentialExecutor .hasTasksInflight (key )) {
331+ // TODO(kimkyung-goog): Do not release `messagesBatchLock` when publishing a batch. If
332+ // it's released, the order of publishing cannot be guaranteed if `publish()` is called
333+ // while this function is running. This locking mechanism needs to be improved if it
334+ // causes any performance degradation.
335+ publishOutstandingBatch (batch .popOutstandingBatch ());
336+ it .remove ();
337+ }
264338 }
265- batchToSend = messagesBatch .popOutstandingBatch ();
266339 } finally {
267340 messagesBatchLock .unlock ();
268341 }
269- publishOutstandingBatch (batchToSend );
270342 }
271343
272344 private ApiFuture <PublishResponse > publishCall (OutstandingBatch outstandingBatch ) {
@@ -280,12 +352,12 @@ private ApiFuture<PublishResponse> publishCall(OutstandingBatch outstandingBatch
280352 }
281353
282354 private void publishOutstandingBatch (final OutstandingBatch outstandingBatch ) {
283- ApiFutureCallback <PublishResponse > futureCallback =
355+ final ApiFutureCallback <PublishResponse > futureCallback =
284356 new ApiFutureCallback <PublishResponse >() {
285357 @ Override
286358 public void onSuccess (PublishResponse result ) {
287359 try {
288- if (result .getMessageIdsCount () != outstandingBatch .size ()) {
360+ if (result == null || result .getMessageIdsCount () != outstandingBatch .size ()) {
289361 outstandingBatch .onFailure (
290362 new IllegalStateException (
291363 String .format (
@@ -311,20 +383,36 @@ public void onFailure(Throwable t) {
311383 }
312384 };
313385
314- ApiFutures .addCallback (publishCall (outstandingBatch ), futureCallback , directExecutor ());
386+ if (outstandingBatch .orderingKey == null || outstandingBatch .orderingKey .isEmpty ()) {
387+ ApiFutures .addCallback (publishCall (outstandingBatch ), futureCallback , directExecutor ());
388+ } else {
389+ // If ordering key is specified, publish the batch using the sequential executor.
390+ ApiFuture <PublishResponse > future =
391+ sequentialExecutor .submit (
392+ outstandingBatch .orderingKey ,
393+ new Callable <ApiFuture <PublishResponse >>() {
394+ public ApiFuture <PublishResponse > call () {
395+ return publishCall (outstandingBatch );
396+ }
397+ });
398+ ApiFutures .addCallback (future , futureCallback , directExecutor ());
399+ }
315400 }
316401
317402 private static final class OutstandingBatch {
318403 final List <OutstandingPublish > outstandingPublishes ;
319404 final long creationTime ;
320405 int attempt ;
321406 int batchSizeBytes ;
407+ final String orderingKey ;
322408
323- OutstandingBatch (List <OutstandingPublish > outstandingPublishes , int batchSizeBytes ) {
409+ OutstandingBatch (
410+ List <OutstandingPublish > outstandingPublishes , int batchSizeBytes , String orderingKey ) {
324411 this .outstandingPublishes = outstandingPublishes ;
325412 attempt = 1 ;
326413 creationTime = System .currentTimeMillis ();
327414 this .batchSizeBytes = batchSizeBytes ;
415+ this .orderingKey = orderingKey ;
328416 }
329417
330418 int size () {
@@ -468,7 +556,7 @@ public static final class Builder {
468556 .setRpcTimeoutMultiplier (2 )
469557 .setMaxRpcTimeout (DEFAULT_RPC_TIMEOUT )
470558 .build ();
471-
559+ static final boolean DEFAULT_ENABLE_MESSAGE_ORDERING = false ;
472560 private static final int THREADS_PER_CPU = 5 ;
473561 static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
474562 InstantiatingExecutorProvider .newBuilder ()
@@ -482,6 +570,8 @@ public static final class Builder {
482570
483571 RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS ;
484572
573+ private boolean enableMessageOrdering = DEFAULT_ENABLE_MESSAGE_ORDERING ;
574+
485575 private TransportChannelProvider channelProvider =
486576 TopicAdminSettings .defaultGrpcTransportProviderBuilder ().setChannelsPerCpu (1 ).build ();
487577
@@ -576,6 +666,12 @@ public Builder setRetrySettings(RetrySettings retrySettings) {
576666 return this ;
577667 }
578668
669+ /** Sets the message ordering option. */
670+ public Builder setEnableMessageOrdering (boolean enableMessageOrdering ) {
671+ this .enableMessageOrdering = enableMessageOrdering ;
672+ return this ;
673+ }
674+
579675 /** Gives the ability to set a custom executor to be used by the library. */
580676 public Builder setExecutorProvider (ExecutorProvider executorProvider ) {
581677 this .executorProvider = Preconditions .checkNotNull (executorProvider );
@@ -601,15 +697,17 @@ public Publisher build() throws IOException {
601697 private static class MessagesBatch {
602698 private List <OutstandingPublish > messages ;
603699 private int batchedBytes ;
700+ private String orderingKey ;
604701 private final BatchingSettings batchingSettings ;
605702
606- public MessagesBatch (BatchingSettings batchingSettings ) {
703+ private MessagesBatch (BatchingSettings batchingSettings , String orderingKey ) {
607704 this .batchingSettings = batchingSettings ;
705+ this .orderingKey = orderingKey ;
608706 reset ();
609707 }
610708
611709 private OutstandingBatch popOutstandingBatch () {
612- OutstandingBatch batch = new OutstandingBatch (messages , batchedBytes );
710+ OutstandingBatch batch = new OutstandingBatch (messages , batchedBytes , orderingKey );
613711 reset ();
614712 return batch ;
615713 }
0 commit comments