4949import com .google .pubsub .v1 .TopicNames ;
5050import java .io .IOException ;
5151import java .util .ArrayList ;
52+ import java .util .HashMap ;
5253import java .util .Iterator ;
5354import java .util .LinkedList ;
5455import java .util .List ;
56+ import java .util .Map ;
57+ import java .util .concurrent .Callable ;
5558import java .util .concurrent .ScheduledExecutorService ;
5659import java .util .concurrent .ScheduledFuture ;
5760import java .util .concurrent .TimeUnit ;
@@ -85,15 +88,17 @@ public class Publisher {
8588 private final String topicName ;
8689
8790 private final BatchingSettings batchingSettings ;
91+ private final boolean enableMessageOrdering ;
8892
8993 private final Lock messagesBatchLock ;
90- private MessagesBatch messagesBatch ;
94+ final Map < String , MessagesBatch > messagesBatches ;
9195
9296 private final AtomicBoolean activeAlarm ;
9397
9498 private final PublisherStub publisherStub ;
9599
96100 private final ScheduledExecutorService executor ;
101+ final SequentialExecutorService .CallbackExecutor sequentialExecutor ;
97102 private final AtomicBoolean shutdown ;
98103 private final BackgroundResource backgroundResources ;
99104 private final MessageWaiter messagesWaiter ;
@@ -114,22 +119,33 @@ private Publisher(Builder builder) throws IOException {
114119 topicName = builder .topicName ;
115120
116121 this .batchingSettings = builder .batchingSettings ;
122+ this .enableMessageOrdering = builder .enableMessageOrdering ;
117123 this .messageTransform = builder .messageTransform ;
118124
119- messagesBatch = new MessagesBatch ( batchingSettings );
125+ messagesBatches = new HashMap <>( );
120126 messagesBatchLock = new ReentrantLock ();
121127 activeAlarm = new AtomicBoolean (false );
122128 executor = builder .executorProvider .getExecutor ();
129+ sequentialExecutor = new SequentialExecutorService .CallbackExecutor (executor );
123130 List <BackgroundResource > backgroundResourceList = new ArrayList <>();
124131 if (builder .executorProvider .shouldAutoClose ()) {
125132 backgroundResourceList .add (new ExecutorAsBackgroundResource (executor ));
126133 }
127134
128135 // Publisher used to take maxAttempt == 0 to mean infinity, but to GAX it means don't retry.
129136 // We post-process this here to keep backward-compatibility.
130- RetrySettings retrySettings = builder .retrySettings ;
131- if (retrySettings .getMaxAttempts () == 0 ) {
132- retrySettings = retrySettings .toBuilder ().setMaxAttempts (Integer .MAX_VALUE ).build ();
137+ // Also, if "message ordering" is enabled, the publisher should retry sending the failed
138+ // message infinitely rather than sending the next one.
139+ RetrySettings .Builder retrySettingsBuilder = builder .retrySettings .toBuilder ();
140+ if (retrySettingsBuilder .getMaxAttempts () == 0 ) {
141+ retrySettingsBuilder .setMaxAttempts (Integer .MAX_VALUE );
142+ }
143+ if (enableMessageOrdering ) {
144+ // TODO: is there a way to have the default retry settings for requests without an ordering
145+ // key?
146+ retrySettingsBuilder
147+ .setMaxAttempts (Integer .MAX_VALUE )
148+ .setTotalTimeout (Duration .ofNanos (Long .MAX_VALUE ));
133149 }
134150
135151 PublisherStubSettings .Builder stubSettings =
@@ -147,7 +163,7 @@ private Publisher(Builder builder) throws IOException {
147163 StatusCode .Code .RESOURCE_EXHAUSTED ,
148164 StatusCode .Code .UNKNOWN ,
149165 StatusCode .Code .UNAVAILABLE )
150- .setRetrySettings (retrySettings )
166+ .setRetrySettings (retrySettingsBuilder . build () )
151167 .setBatchingSettings (BatchingSettings .newBuilder ().setIsEnabled (false ).build ());
152168 this .publisherStub = GrpcPublisherStub .create (stubSettings .build ());
153169 backgroundResourceList .add (publisherStub );
@@ -194,13 +210,27 @@ public String getTopicNameString() {
194210 public ApiFuture <String > publish (PubsubMessage message ) {
195211 Preconditions .checkState (!shutdown .get (), "Cannot publish on a shut-down publisher." );
196212
213+ final String orderingKey = message .getOrderingKey ();
214+ Preconditions .checkState (
215+ orderingKey .isEmpty () || enableMessageOrdering ,
216+ "Cannot publish a message with an ordering key when message ordering is not enabled." );
217+
197218 final OutstandingPublish outstandingPublish =
198219 new OutstandingPublish (messageTransform .apply (message ));
199220 List <OutstandingBatch > batchesToSend ;
200221 messagesBatchLock .lock ();
201222 try {
223+ // Check if the next message makes the current batch exceed the max batch byte size.
224+ MessagesBatch messagesBatch = messagesBatches .get (orderingKey );
225+ if (messagesBatch == null ) {
226+ messagesBatch = new MessagesBatch (batchingSettings , orderingKey );
227+ messagesBatches .put (orderingKey , messagesBatch );
228+ }
229+
202230 batchesToSend = messagesBatch .add (outstandingPublish );
203- // Setup the next duration based delivery alarm if there are messages batched.
231+ if (!batchesToSend .isEmpty () && messagesBatch .isEmpty ()) {
232+ messagesBatches .remove (orderingKey );
233+ }
204234 setupAlarm ();
205235 } finally {
206236 messagesBatchLock .unlock ();
@@ -209,6 +239,10 @@ public ApiFuture<String> publish(PubsubMessage message) {
209239 messagesWaiter .incrementPendingMessages (1 );
210240
211241 if (!batchesToSend .isEmpty ()) {
242+ // TODO: if this is not an ordering keys scenario, will this do anything?
243+ publishAllWithoutInflight ();
244+
245+ // TODO: if this is an ordering keys scenario, is this safe without messagesBatchLock?
212246 for (final OutstandingBatch batch : batchesToSend ) {
213247 logger .log (Level .FINER , "Scheduling a batch for immediate sending." );
214248 executor .execute (
@@ -224,8 +258,22 @@ public void run() {
224258 return outstandingPublish .publishResult ;
225259 }
226260
261+ /**
262+ * There may be non-recoverable problems with a request for an ordering key. In that case, all
263+ * subsequent requests will fail until this method is called. If the key is not currently paused,
264+ * calling this method will be a no-op.
265+ *
266+ * @param key The key for which to resume publishing.
267+ */
268+ // TODO: make this public when Ordering keys is live
269+ @ BetaApi
270+ void resumePublish (String key ) {
271+ Preconditions .checkState (!shutdown .get (), "Cannot publish on a shut-down publisher." );
272+ sequentialExecutor .resumePublish (key );
273+ }
274+
227275 private void setupAlarm () {
228- if (!messagesBatch .isEmpty ()) {
276+ if (!messagesBatches .isEmpty ()) {
229277 if (!activeAlarm .getAndSet (true )) {
230278 long delayThresholdMs = getBatchingSettings ().getDelayThreshold ().toMillis ();
231279 logger .log (Level .FINER , "Setting up alarm for the next {0} ms." , delayThresholdMs );
@@ -236,7 +284,7 @@ private void setupAlarm() {
236284 public void run () {
237285 logger .log (Level .FINER , "Sending messages based on schedule." );
238286 activeAlarm .getAndSet (false );
239- publishAllOutstanding ();
287+ publishAllWithoutInflight ();
240288 }
241289 },
242290 delayThresholdMs ,
@@ -257,16 +305,51 @@ public void run() {
257305 */
258306 public void publishAllOutstanding () {
259307 messagesBatchLock .lock ();
260- OutstandingBatch batchToSend ;
261308 try {
262- if (messagesBatch .isEmpty ()) {
263- return ;
309+ for (MessagesBatch batch : messagesBatches .values ()) {
310+ if (!batch .isEmpty ()) {
311+ // TODO(kimkyung-goog): Do not release `messagesBatchLock` when publishing a batch. If
312+ // it's released, the order of publishing cannot be guaranteed if `publish()` is called
313+ // while this function is running. This locking mechanism needs to be improved if it
314+ // causes any performance degradation.
315+ publishOutstandingBatch (batch .popOutstandingBatch ());
316+ }
317+ }
318+ messagesBatches .clear ();
319+ } finally {
320+ messagesBatchLock .unlock ();
321+ }
322+ }
323+
324+ /**
325+ * Publish any outstanding batches if non-empty and there are no other batches in flight. This
326+ * method sends buffered messages, but does not wait for the send operations to complete. To wait
327+ * for messages to send, call {@code get} on the futures returned from {@code publish}.
328+ */
329+ private void publishAllWithoutInflight () {
330+ messagesBatchLock .lock ();
331+ try {
332+ Iterator <Map .Entry <String , MessagesBatch >> it = messagesBatches .entrySet ().iterator ();
333+ while (it .hasNext ()) {
334+ Map .Entry <String , MessagesBatch > entry = it .next ();
335+ MessagesBatch batch = entry .getValue ();
336+ String key = entry .getKey ();
337+ if (batch .isEmpty ()) {
338+ it .remove ();
339+ } else if (key .isEmpty () || !sequentialExecutor .hasTasksInflight (key )) {
340+ // TODO(kimkyung-goog): Do not release `messagesBatchLock` when publishing a batch. If
341+ // it's released, the order of publishing cannot be guaranteed if `publish()` is called
342+ // while this function is running. This locking mechanism needs to be improved if it
343+ // causes any performance degradation.
344+
345+ // TODO: Will this cause a performance problem for non-ordering keys scenarios?
346+ publishOutstandingBatch (batch .popOutstandingBatch ());
347+ it .remove ();
348+ }
264349 }
265- batchToSend = messagesBatch .popOutstandingBatch ();
266350 } finally {
267351 messagesBatchLock .unlock ();
268352 }
269- publishOutstandingBatch (batchToSend );
270353 }
271354
272355 private ApiFuture <PublishResponse > publishCall (OutstandingBatch outstandingBatch ) {
@@ -280,12 +363,12 @@ private ApiFuture<PublishResponse> publishCall(OutstandingBatch outstandingBatch
280363 }
281364
282365 private void publishOutstandingBatch (final OutstandingBatch outstandingBatch ) {
283- ApiFutureCallback <PublishResponse > futureCallback =
366+ final ApiFutureCallback <PublishResponse > futureCallback =
284367 new ApiFutureCallback <PublishResponse >() {
285368 @ Override
286369 public void onSuccess (PublishResponse result ) {
287370 try {
288- if (result .getMessageIdsCount () != outstandingBatch .size ()) {
371+ if (result == null || result .getMessageIdsCount () != outstandingBatch .size ()) {
289372 outstandingBatch .onFailure (
290373 new IllegalStateException (
291374 String .format (
@@ -311,20 +394,37 @@ public void onFailure(Throwable t) {
311394 }
312395 };
313396
314- ApiFutures .addCallback (publishCall (outstandingBatch ), futureCallback , directExecutor ());
397+ ApiFuture <PublishResponse > future ;
398+ if (outstandingBatch .orderingKey == null || outstandingBatch .orderingKey .isEmpty ()) {
399+ future = publishCall (outstandingBatch );
400+ } else {
401+ // If ordering key is specified, publish the batch using the sequential executor.
402+ future =
403+ sequentialExecutor .submit (
404+ outstandingBatch .orderingKey ,
405+ new Callable <ApiFuture <PublishResponse >>() {
406+ public ApiFuture <PublishResponse > call () {
407+ return publishCall (outstandingBatch );
408+ }
409+ });
410+ }
411+ ApiFutures .addCallback (future , futureCallback , directExecutor ());
315412 }
316413
317414 private static final class OutstandingBatch {
318415 final List <OutstandingPublish > outstandingPublishes ;
319416 final long creationTime ;
320417 int attempt ;
321418 int batchSizeBytes ;
419+ final String orderingKey ;
322420
323- OutstandingBatch (List <OutstandingPublish > outstandingPublishes , int batchSizeBytes ) {
421+ OutstandingBatch (
422+ List <OutstandingPublish > outstandingPublishes , int batchSizeBytes , String orderingKey ) {
324423 this .outstandingPublishes = outstandingPublishes ;
325424 attempt = 1 ;
326425 creationTime = System .currentTimeMillis ();
327426 this .batchSizeBytes = batchSizeBytes ;
427+ this .orderingKey = orderingKey ;
328428 }
329429
330430 int size () {
@@ -468,7 +568,7 @@ public static final class Builder {
468568 .setRpcTimeoutMultiplier (2 )
469569 .setMaxRpcTimeout (DEFAULT_RPC_TIMEOUT )
470570 .build ();
471-
571+ static final boolean DEFAULT_ENABLE_MESSAGE_ORDERING = false ;
472572 private static final int THREADS_PER_CPU = 5 ;
473573 static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
474574 InstantiatingExecutorProvider .newBuilder ()
@@ -482,6 +582,8 @@ public static final class Builder {
482582
483583 RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS ;
484584
585+ private boolean enableMessageOrdering = DEFAULT_ENABLE_MESSAGE_ORDERING ;
586+
485587 private TransportChannelProvider channelProvider =
486588 TopicAdminSettings .defaultGrpcTransportProviderBuilder ().setChannelsPerCpu (1 ).build ();
487589
@@ -576,6 +678,14 @@ public Builder setRetrySettings(RetrySettings retrySettings) {
576678 return this ;
577679 }
578680
681+ /** Sets the message ordering option. */
682+ // TODO: make this public when Ordering keys is live
683+ @ BetaApi
684+ Builder setEnableMessageOrdering (boolean enableMessageOrdering ) {
685+ this .enableMessageOrdering = enableMessageOrdering ;
686+ return this ;
687+ }
688+
579689 /** Gives the ability to set a custom executor to be used by the library. */
580690 public Builder setExecutorProvider (ExecutorProvider executorProvider ) {
581691 this .executorProvider = Preconditions .checkNotNull (executorProvider );
@@ -601,15 +711,17 @@ public Publisher build() throws IOException {
601711 private static class MessagesBatch {
602712 private List <OutstandingPublish > messages ;
603713 private int batchedBytes ;
714+ private String orderingKey ;
604715 private final BatchingSettings batchingSettings ;
605716
606- public MessagesBatch (BatchingSettings batchingSettings ) {
717+ private MessagesBatch (BatchingSettings batchingSettings , String orderingKey ) {
607718 this .batchingSettings = batchingSettings ;
719+ this .orderingKey = orderingKey ;
608720 reset ();
609721 }
610722
611723 private OutstandingBatch popOutstandingBatch () {
612- OutstandingBatch batch = new OutstandingBatch (messages , batchedBytes );
724+ OutstandingBatch batch = new OutstandingBatch (messages , batchedBytes , orderingKey );
613725 reset ();
614726 return batch ;
615727 }
0 commit comments