4949import com .google .pubsub .v1 .TopicNames ;
5050import java .io .IOException ;
5151import java .util .ArrayList ;
52+ import java .util .EnumSet ;
53+ import java .util .HashMap ;
5254import java .util .Iterator ;
5355import java .util .LinkedList ;
5456import java .util .List ;
57+ import java .util .Map ;
58+ import java .util .concurrent .Callable ;
5559import java .util .concurrent .ScheduledExecutorService ;
5660import java .util .concurrent .ScheduledFuture ;
5761import java .util .concurrent .TimeUnit ;
@@ -85,15 +89,17 @@ public class Publisher {
8589 private final String topicName ;
8690
8791 private final BatchingSettings batchingSettings ;
92+ private final boolean enableMessageOrdering ;
8893
8994 private final Lock messagesBatchLock ;
90- private MessagesBatch messagesBatch ;
95+ final Map < String , MessagesBatch > messagesBatches ;
9196
9297 private final AtomicBoolean activeAlarm ;
9398
9499 private final PublisherStub publisherStub ;
95100
96101 private final ScheduledExecutorService executor ;
102+ final SequentialExecutorService .CallbackExecutor sequentialExecutor ;
97103 private final AtomicBoolean shutdown ;
98104 private final BackgroundResource backgroundResources ;
99105 private final MessageWaiter messagesWaiter ;
@@ -114,22 +120,33 @@ private Publisher(Builder builder) throws IOException {
114120 topicName = builder .topicName ;
115121
116122 this .batchingSettings = builder .batchingSettings ;
123+ this .enableMessageOrdering = builder .enableMessageOrdering ;
117124 this .messageTransform = builder .messageTransform ;
118125
119- messagesBatch = new MessagesBatch ( batchingSettings );
126+ messagesBatches = new HashMap <>( );
120127 messagesBatchLock = new ReentrantLock ();
121128 activeAlarm = new AtomicBoolean (false );
122129 executor = builder .executorProvider .getExecutor ();
130+ sequentialExecutor = new SequentialExecutorService .CallbackExecutor (executor );
123131 List <BackgroundResource > backgroundResourceList = new ArrayList <>();
124132 if (builder .executorProvider .shouldAutoClose ()) {
125133 backgroundResourceList .add (new ExecutorAsBackgroundResource (executor ));
126134 }
127135
128136 // Publisher used to take maxAttempt == 0 to mean infinity, but to GAX it means don't retry.
129137 // We post-process this here to keep backward-compatibility.
130- RetrySettings retrySettings = builder .retrySettings ;
131- if (retrySettings .getMaxAttempts () == 0 ) {
132- retrySettings = retrySettings .toBuilder ().setMaxAttempts (Integer .MAX_VALUE ).build ();
138+ // Also, if "message ordering" is enabled, the publisher should retry sending the failed
139+ // message infinitely rather than sending the next one.
140+ RetrySettings .Builder retrySettingsBuilder = builder .retrySettings .toBuilder ();
141+ if (retrySettingsBuilder .getMaxAttempts () == 0 ) {
142+ retrySettingsBuilder .setMaxAttempts (Integer .MAX_VALUE );
143+ }
144+ if (enableMessageOrdering ) {
145+ // TODO: is there a way to have the default retry settings for requests without an ordering
146+ // key?
147+ retrySettingsBuilder
148+ .setMaxAttempts (Integer .MAX_VALUE )
149+ .setTotalTimeout (Duration .ofNanos (Long .MAX_VALUE ));
133150 }
134151
135152 PublisherStubSettings .Builder stubSettings =
@@ -147,7 +164,7 @@ private Publisher(Builder builder) throws IOException {
147164 StatusCode .Code .RESOURCE_EXHAUSTED ,
148165 StatusCode .Code .UNKNOWN ,
149166 StatusCode .Code .UNAVAILABLE )
150- .setRetrySettings (retrySettings )
167+ .setRetrySettings (retrySettingsBuilder . build () )
151168 .setBatchingSettings (BatchingSettings .newBuilder ().setIsEnabled (false ).build ());
152169 this .publisherStub = GrpcPublisherStub .create (stubSettings .build ());
153170 backgroundResourceList .add (publisherStub );
@@ -194,13 +211,27 @@ public String getTopicNameString() {
194211 public ApiFuture <String > publish (PubsubMessage message ) {
195212 Preconditions .checkState (!shutdown .get (), "Cannot publish on a shut-down publisher." );
196213
214+ final String orderingKey = message .getOrderingKey ();
215+ Preconditions .checkState (
216+ orderingKey .isEmpty () || enableMessageOrdering ,
217+ "Cannot publish a message with an ordering key when message ordering is not enabled." );
218+
197219 final OutstandingPublish outstandingPublish =
198220 new OutstandingPublish (messageTransform .apply (message ));
199221 List <OutstandingBatch > batchesToSend ;
200222 messagesBatchLock .lock ();
201223 try {
224+ // Check if the next message makes the current batch exceed the max batch byte size.
225+ MessagesBatch messagesBatch = messagesBatches .get (orderingKey );
226+ if (messagesBatch == null ) {
227+ messagesBatch = new MessagesBatch (batchingSettings , orderingKey );
228+ messagesBatches .put (orderingKey , messagesBatch );
229+ }
230+
202231 batchesToSend = messagesBatch .add (outstandingPublish );
203- // Setup the next duration based delivery alarm if there are messages batched.
232+ if (!batchesToSend .isEmpty () && messagesBatch .isEmpty ()) {
233+ messagesBatches .remove (orderingKey );
234+ }
204235 setupAlarm ();
205236 } finally {
206237 messagesBatchLock .unlock ();
@@ -209,6 +240,10 @@ public ApiFuture<String> publish(PubsubMessage message) {
209240 messagesWaiter .incrementPendingMessages (1 );
210241
211242 if (!batchesToSend .isEmpty ()) {
243+ // TODO: if this is not an ordering keys scenario, will this do anything?
244+ publishAllWithoutInflight ();
245+
246+ // TODO: if this is an ordering keys scenario, is this safe without messagesBatchLock?
212247 for (final OutstandingBatch batch : batchesToSend ) {
213248 logger .log (Level .FINER , "Scheduling a batch for immediate sending." );
214249 executor .execute (
@@ -224,8 +259,22 @@ public void run() {
224259 return outstandingPublish .publishResult ;
225260 }
226261
262+ /**
263+ * There may be non-recoverable problems with a request for an ordering key. In that case, all
264+ * subsequent requests will fail until this method is called. If the key is not currently paused,
265+ * calling this method will be a no-op.
266+ *
267+ * @param key The key for which to resume publishing.
268+ */
269+ // TODO: make this public when Ordering keys is live
270+ @ BetaApi
271+ void resumePublish (String key ) {
272+ Preconditions .checkState (!shutdown .get (), "Cannot publish on a shut-down publisher." );
273+ sequentialExecutor .resumePublish (key );
274+ }
275+
227276 private void setupAlarm () {
228- if (!messagesBatch .isEmpty ()) {
277+ if (!messagesBatches .isEmpty ()) {
229278 if (!activeAlarm .getAndSet (true )) {
230279 long delayThresholdMs = getBatchingSettings ().getDelayThreshold ().toMillis ();
231280 logger .log (Level .FINER , "Setting up alarm for the next {0} ms." , delayThresholdMs );
@@ -236,7 +285,7 @@ private void setupAlarm() {
236285 public void run () {
237286 logger .log (Level .FINER , "Sending messages based on schedule." );
238287 activeAlarm .getAndSet (false );
239- publishAllOutstanding ();
288+ publishAllWithoutInflight ();
240289 }
241290 },
242291 delayThresholdMs ,
@@ -257,16 +306,51 @@ public void run() {
257306 */
258307 public void publishAllOutstanding () {
259308 messagesBatchLock .lock ();
260- OutstandingBatch batchToSend ;
261309 try {
262- if (messagesBatch .isEmpty ()) {
263- return ;
310+ for (MessagesBatch batch : messagesBatches .values ()) {
311+ if (!batch .isEmpty ()) {
312+ // TODO(kimkyung-goog): Do not release `messagesBatchLock` when publishing a batch. If
313+ // it's released, the order of publishing cannot be guaranteed if `publish()` is called
314+ // while this function is running. This locking mechanism needs to be improved if it
315+ // causes any performance degradation.
316+ publishOutstandingBatch (batch .popOutstandingBatch ());
317+ }
318+ }
319+ messagesBatches .clear ();
320+ } finally {
321+ messagesBatchLock .unlock ();
322+ }
323+ }
324+
325+ /**
326+ * Publish any outstanding batches if non-empty and there are no other batches in flight. This
327+ * method sends buffered messages, but does not wait for the send operations to complete. To wait
328+ * for messages to send, call {@code get} on the futures returned from {@code publish}.
329+ */
330+ private void publishAllWithoutInflight () {
331+ messagesBatchLock .lock ();
332+ try {
333+ Iterator <Map .Entry <String , MessagesBatch >> it = messagesBatches .entrySet ().iterator ();
334+ while (it .hasNext ()) {
335+ Map .Entry <String , MessagesBatch > entry = it .next ();
336+ MessagesBatch batch = entry .getValue ();
337+ String key = entry .getKey ();
338+ if (batch .isEmpty ()) {
339+ it .remove ();
340+ } else if (key .isEmpty () || !sequentialExecutor .hasTasksInflight (key )) {
341+ // TODO(kimkyung-goog): Do not release `messagesBatchLock` when publishing a batch. If
342+ // it's released, the order of publishing cannot be guaranteed if `publish()` is called
343+ // while this function is running. This locking mechanism needs to be improved if it
344+ // causes any performance degradation.
345+
346+ // TODO: Will this cause a performance problem for non-ordering keys scenarios?
347+ publishOutstandingBatch (batch .popOutstandingBatch ());
348+ it .remove ();
349+ }
264350 }
265- batchToSend = messagesBatch .popOutstandingBatch ();
266351 } finally {
267352 messagesBatchLock .unlock ();
268353 }
269- publishOutstandingBatch (batchToSend );
270354 }
271355
272356 private ApiFuture <PublishResponse > publishCall (OutstandingBatch outstandingBatch ) {
@@ -280,12 +364,12 @@ private ApiFuture<PublishResponse> publishCall(OutstandingBatch outstandingBatch
280364 }
281365
282366 private void publishOutstandingBatch (final OutstandingBatch outstandingBatch ) {
283- ApiFutureCallback <PublishResponse > futureCallback =
367+ final ApiFutureCallback <PublishResponse > futureCallback =
284368 new ApiFutureCallback <PublishResponse >() {
285369 @ Override
286370 public void onSuccess (PublishResponse result ) {
287371 try {
288- if (result .getMessageIdsCount () != outstandingBatch .size ()) {
372+ if (result == null || result .getMessageIdsCount () != outstandingBatch .size ()) {
289373 outstandingBatch .onFailure (
290374 new IllegalStateException (
291375 String .format (
@@ -311,20 +395,37 @@ public void onFailure(Throwable t) {
311395 }
312396 };
313397
314- ApiFutures .addCallback (publishCall (outstandingBatch ), futureCallback , directExecutor ());
398+ ApiFuture <PublishResponse > future ;
399+ if (outstandingBatch .orderingKey == null || outstandingBatch .orderingKey .isEmpty ()) {
400+ future = publishCall (outstandingBatch );
401+ } else {
402+ // If ordering key is specified, publish the batch using the sequential executor.
403+ future =
404+ sequentialExecutor .submit (
405+ outstandingBatch .orderingKey ,
406+ new Callable <ApiFuture <PublishResponse >>() {
407+ public ApiFuture <PublishResponse > call () {
408+ return publishCall (outstandingBatch );
409+ }
410+ });
411+ }
412+ ApiFutures .addCallback (future , futureCallback , directExecutor ());
315413 }
316414
317415 private static final class OutstandingBatch {
318416 final List <OutstandingPublish > outstandingPublishes ;
319417 final long creationTime ;
320418 int attempt ;
321419 int batchSizeBytes ;
420+ final String orderingKey ;
322421
323- OutstandingBatch (List <OutstandingPublish > outstandingPublishes , int batchSizeBytes ) {
422+ OutstandingBatch (
423+ List <OutstandingPublish > outstandingPublishes , int batchSizeBytes , String orderingKey ) {
324424 this .outstandingPublishes = outstandingPublishes ;
325425 attempt = 1 ;
326426 creationTime = System .currentTimeMillis ();
327427 this .batchSizeBytes = batchSizeBytes ;
428+ this .orderingKey = orderingKey ;
328429 }
329430
330431 int size () {
@@ -468,7 +569,7 @@ public static final class Builder {
468569 .setRpcTimeoutMultiplier (2 )
469570 .setMaxRpcTimeout (DEFAULT_RPC_TIMEOUT )
470571 .build ();
471-
572+ static final boolean DEFAULT_ENABLE_MESSAGE_ORDERING = false ;
472573 private static final int THREADS_PER_CPU = 5 ;
473574 static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
474575 InstantiatingExecutorProvider .newBuilder ()
@@ -482,6 +583,8 @@ public static final class Builder {
482583
483584 RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS ;
484585
586+ private boolean enableMessageOrdering = DEFAULT_ENABLE_MESSAGE_ORDERING ;
587+
485588 private TransportChannelProvider channelProvider =
486589 TopicAdminSettings .defaultGrpcTransportProviderBuilder ().setChannelsPerCpu (1 ).build ();
487590
@@ -576,6 +679,14 @@ public Builder setRetrySettings(RetrySettings retrySettings) {
576679 return this ;
577680 }
578681
682+ /** Sets the message ordering option. */
683+ // TODO: make this public when Ordering keys is live
684+ @ BetaApi
685+ Builder setEnableMessageOrdering (boolean enableMessageOrdering ) {
686+ this .enableMessageOrdering = enableMessageOrdering ;
687+ return this ;
688+ }
689+
579690 /** Gives the ability to set a custom executor to be used by the library. */
580691 public Builder setExecutorProvider (ExecutorProvider executorProvider ) {
581692 this .executorProvider = Preconditions .checkNotNull (executorProvider );
@@ -601,15 +712,17 @@ public Publisher build() throws IOException {
601712 private static class MessagesBatch {
602713 private List <OutstandingPublish > messages ;
603714 private int batchedBytes ;
715+ private String orderingKey ;
604716 private final BatchingSettings batchingSettings ;
605717
606- public MessagesBatch (BatchingSettings batchingSettings ) {
718+ private MessagesBatch (BatchingSettings batchingSettings , String orderingKey ) {
607719 this .batchingSettings = batchingSettings ;
720+ this .orderingKey = orderingKey ;
608721 reset ();
609722 }
610723
611724 private OutstandingBatch popOutstandingBatch () {
612- OutstandingBatch batch = new OutstandingBatch (messages , batchedBytes );
725+ OutstandingBatch batch = new OutstandingBatch (messages , batchedBytes , orderingKey );
613726 reset ();
614727 return batch ;
615728 }
0 commit comments