4949import com .google .pubsub .v1 .TopicNames ;
5050import java .io .IOException ;
5151import java .util .ArrayList ;
52+ import java .util .HashMap ;
5253import java .util .Iterator ;
5354import java .util .LinkedList ;
5455import java .util .List ;
56+ import java .util .Map ;
57+ import java .util .concurrent .Callable ;
5558import java .util .concurrent .ScheduledExecutorService ;
5659import java .util .concurrent .ScheduledFuture ;
5760import java .util .concurrent .TimeUnit ;
@@ -85,15 +88,17 @@ public class Publisher {
8588 private final String topicName ;
8689
8790 private final BatchingSettings batchingSettings ;
91+ private final boolean enableMessageOrdering ;
8892
8993 private final Lock messagesBatchLock ;
90- private MessagesBatch messagesBatch ;
94+ final Map < String , MessagesBatch > messagesBatches ;
9195
9296 private final AtomicBoolean activeAlarm ;
9397
9498 private final PublisherStub publisherStub ;
9599
96100 private final ScheduledExecutorService executor ;
101+ final SequentialExecutorService .CallbackExecutor sequentialExecutor ;
97102 private final AtomicBoolean shutdown ;
98103 private final BackgroundResource backgroundResources ;
99104 private final MessageWaiter messagesWaiter ;
@@ -114,22 +119,33 @@ private Publisher(Builder builder) throws IOException {
114119 topicName = builder .topicName ;
115120
116121 this .batchingSettings = builder .batchingSettings ;
122+ this .enableMessageOrdering = builder .enableMessageOrdering ;
117123 this .messageTransform = builder .messageTransform ;
118124
119- messagesBatch = new MessagesBatch ( batchingSettings );
125+ messagesBatches = new HashMap <>( );
120126 messagesBatchLock = new ReentrantLock ();
121127 activeAlarm = new AtomicBoolean (false );
122128 executor = builder .executorProvider .getExecutor ();
129+ sequentialExecutor = new SequentialExecutorService .CallbackExecutor (executor );
123130 List <BackgroundResource > backgroundResourceList = new ArrayList <>();
124131 if (builder .executorProvider .shouldAutoClose ()) {
125132 backgroundResourceList .add (new ExecutorAsBackgroundResource (executor ));
126133 }
127134
128135 // Publisher used to take maxAttempt == 0 to mean infinity, but to GAX it means don't retry.
129136 // We post-process this here to keep backward-compatibility.
130- RetrySettings retrySettings = builder .retrySettings ;
131- if (retrySettings .getMaxAttempts () == 0 ) {
132- retrySettings = retrySettings .toBuilder ().setMaxAttempts (Integer .MAX_VALUE ).build ();
137+ // Also, if "message ordering" is enabled, the publisher should retry sending the failed
138+ // message infinitely rather than sending the next one.
139+ RetrySettings .Builder retrySettingsBuilder = builder .retrySettings .toBuilder ();
140+ if (retrySettingsBuilder .getMaxAttempts () == 0 ) {
141+ retrySettingsBuilder .setMaxAttempts (Integer .MAX_VALUE );
142+ }
143+ if (enableMessageOrdering ) {
144+ // TODO: is there a way to have the default retry settings for requests without an ordering
145+ // key?
146+ retrySettingsBuilder
147+ .setMaxAttempts (Integer .MAX_VALUE )
148+ .setTotalTimeout (Duration .ofNanos (Long .MAX_VALUE ));
133149 }
134150
135151 PublisherStubSettings .Builder stubSettings =
@@ -147,7 +163,7 @@ private Publisher(Builder builder) throws IOException {
147163 StatusCode .Code .RESOURCE_EXHAUSTED ,
148164 StatusCode .Code .UNKNOWN ,
149165 StatusCode .Code .UNAVAILABLE )
150- .setRetrySettings (retrySettings )
166+ .setRetrySettings (retrySettingsBuilder . build () )
151167 .setBatchingSettings (BatchingSettings .newBuilder ().setIsEnabled (false ).build ());
152168 this .publisherStub = GrpcPublisherStub .create (stubSettings .build ());
153169 backgroundResourceList .add (publisherStub );
@@ -194,38 +210,61 @@ public String getTopicNameString() {
194210 public ApiFuture <String > publish (PubsubMessage message ) {
195211 Preconditions .checkState (!shutdown .get (), "Cannot publish on a shut-down publisher." );
196212
213+ final String orderingKey = message .getOrderingKey ();
214+ Preconditions .checkState (
215+ orderingKey .isEmpty () || enableMessageOrdering ,
216+ "Cannot publish a message with an ordering key when message ordering is not enabled." );
217+
197218 final OutstandingPublish outstandingPublish =
198219 new OutstandingPublish (messageTransform .apply (message ));
199220 List <OutstandingBatch > batchesToSend ;
200221 messagesBatchLock .lock ();
201222 try {
223+ MessagesBatch messagesBatch = messagesBatches .get (orderingKey );
224+ if (messagesBatch == null ) {
225+ messagesBatch = new MessagesBatch (batchingSettings , orderingKey );
226+ messagesBatches .put (orderingKey , messagesBatch );
227+ }
228+
202229 batchesToSend = messagesBatch .add (outstandingPublish );
203- // Setup the next duration based delivery alarm if there are messages batched.
230+ if (!batchesToSend .isEmpty () && messagesBatch .isEmpty ()) {
231+ messagesBatches .remove (orderingKey );
232+ }
204233 setupAlarm ();
234+ if (!batchesToSend .isEmpty ()) {
235+ // TODO: if this is not an ordering keys scenario, will this do anything?
236+ publishAllWithoutInflight ();
237+
238+ // TODO: if this is an ordering keys scenario, is this safe without messagesBatchLock?
239+ for (final OutstandingBatch batch : batchesToSend ) {
240+ logger .log (Level .FINER , "Scheduling a batch for immediate sending." );
241+ publishOutstandingBatch (batch );
242+ }
243+ }
205244 } finally {
206245 messagesBatchLock .unlock ();
207246 }
208247
209248 messagesWaiter .incrementPendingMessages (1 );
210-
211- if (!batchesToSend .isEmpty ()) {
212- for (final OutstandingBatch batch : batchesToSend ) {
213- logger .log (Level .FINER , "Scheduling a batch for immediate sending." );
214- executor .execute (
215- new Runnable () {
216- @ Override
217- public void run () {
218- publishOutstandingBatch (batch );
219- }
220- });
221- }
222- }
223-
224249 return outstandingPublish .publishResult ;
225250 }
226251
252+ /**
253+ * There may be non-recoverable problems with a request for an ordering key. In that case, all
254+ * subsequent requests will fail until this method is called. If the key is not currently paused,
255+ * calling this method will be a no-op.
256+ *
257+ * @param key The key for which to resume publishing.
258+ */
259+ // TODO: make this public when Ordering keys is live
260+ @ BetaApi
261+ void resumePublish (String key ) {
262+ Preconditions .checkState (!shutdown .get (), "Cannot publish on a shut-down publisher." );
263+ sequentialExecutor .resumePublish (key );
264+ }
265+
227266 private void setupAlarm () {
228- if (!messagesBatch .isEmpty ()) {
267+ if (!messagesBatches .isEmpty ()) {
229268 if (!activeAlarm .getAndSet (true )) {
230269 long delayThresholdMs = getBatchingSettings ().getDelayThreshold ().toMillis ();
231270 logger .log (Level .FINER , "Setting up alarm for the next {0} ms." , delayThresholdMs );
@@ -236,7 +275,7 @@ private void setupAlarm() {
236275 public void run () {
237276 logger .log (Level .FINER , "Sending messages based on schedule." );
238277 activeAlarm .getAndSet (false );
239- publishAllOutstanding ();
278+ publishAllWithoutInflight ();
240279 }
241280 },
242281 delayThresholdMs ,
@@ -257,16 +296,46 @@ public void run() {
257296 */
258297 public void publishAllOutstanding () {
259298 messagesBatchLock .lock ();
260- OutstandingBatch batchToSend ;
261299 try {
262- if (messagesBatch .isEmpty ()) {
263- return ;
300+ for (MessagesBatch batch : messagesBatches .values ()) {
301+ if (!batch .isEmpty ()) {
302+ // TODO(kimkyung-goog): Do not release `messagesBatchLock` when publishing a batch. If
303+ // it's released, the order of publishing cannot be guaranteed if `publish()` is called
304+ // while this function is running. This locking mechanism needs to be improved if it
305+ // causes any performance degradation.
306+ publishOutstandingBatch (batch .popOutstandingBatch ());
307+ }
308+ }
309+ messagesBatches .clear ();
310+ } finally {
311+ messagesBatchLock .unlock ();
312+ }
313+ }
314+
315+ /**
316+ * Publish any outstanding batches if non-empty and there are no other batches in flight. This
317+ * method sends buffered messages, but does not wait for the send operations to complete. To wait
318+ * for messages to send, call {@code get} on the futures returned from {@code publish}.
319+ */
320+ private void publishAllWithoutInflight () {
321+ messagesBatchLock .lock ();
322+ try {
323+ Iterator <Map .Entry <String , MessagesBatch >> it = messagesBatches .entrySet ().iterator ();
324+ while (it .hasNext ()) {
325+ Map .Entry <String , MessagesBatch > entry = it .next ();
326+ MessagesBatch batch = entry .getValue ();
327+ String key = entry .getKey ();
328+ if (batch .isEmpty ()) {
329+ it .remove ();
330+ } else if (key .isEmpty () || !sequentialExecutor .hasTasksInflight (key )) {
331+ // TODO: Will this cause a performance problem for non-ordering keys scenarios?
332+ publishOutstandingBatch (batch .popOutstandingBatch ());
333+ it .remove ();
334+ }
264335 }
265- batchToSend = messagesBatch .popOutstandingBatch ();
266336 } finally {
267337 messagesBatchLock .unlock ();
268338 }
269- publishOutstandingBatch (batchToSend );
270339 }
271340
272341 private ApiFuture <PublishResponse > publishCall (OutstandingBatch outstandingBatch ) {
@@ -280,12 +349,12 @@ private ApiFuture<PublishResponse> publishCall(OutstandingBatch outstandingBatch
280349 }
281350
282351 private void publishOutstandingBatch (final OutstandingBatch outstandingBatch ) {
283- ApiFutureCallback <PublishResponse > futureCallback =
352+ final ApiFutureCallback <PublishResponse > futureCallback =
284353 new ApiFutureCallback <PublishResponse >() {
285354 @ Override
286355 public void onSuccess (PublishResponse result ) {
287356 try {
288- if (result .getMessageIdsCount () != outstandingBatch .size ()) {
357+ if (result == null || result .getMessageIdsCount () != outstandingBatch .size ()) {
289358 outstandingBatch .onFailure (
290359 new IllegalStateException (
291360 String .format (
@@ -311,20 +380,37 @@ public void onFailure(Throwable t) {
311380 }
312381 };
313382
314- ApiFutures .addCallback (publishCall (outstandingBatch ), futureCallback , directExecutor ());
383+ ApiFuture <PublishResponse > future ;
384+ if (outstandingBatch .orderingKey == null || outstandingBatch .orderingKey .isEmpty ()) {
385+ future = publishCall (outstandingBatch );
386+ } else {
387+ // If ordering key is specified, publish the batch using the sequential executor.
388+ future =
389+ sequentialExecutor .submit (
390+ outstandingBatch .orderingKey ,
391+ new Callable <ApiFuture <PublishResponse >>() {
392+ public ApiFuture <PublishResponse > call () {
393+ return publishCall (outstandingBatch );
394+ }
395+ });
396+ }
397+ ApiFutures .addCallback (future , futureCallback , directExecutor ());
315398 }
316399
317400 private static final class OutstandingBatch {
318401 final List <OutstandingPublish > outstandingPublishes ;
319402 final long creationTime ;
320403 int attempt ;
321404 int batchSizeBytes ;
405+ final String orderingKey ;
322406
323- OutstandingBatch (List <OutstandingPublish > outstandingPublishes , int batchSizeBytes ) {
407+ OutstandingBatch (
408+ List <OutstandingPublish > outstandingPublishes , int batchSizeBytes , String orderingKey ) {
324409 this .outstandingPublishes = outstandingPublishes ;
325410 attempt = 1 ;
326411 creationTime = System .currentTimeMillis ();
327412 this .batchSizeBytes = batchSizeBytes ;
413+ this .orderingKey = orderingKey ;
328414 }
329415
330416 int size () {
@@ -468,7 +554,7 @@ public static final class Builder {
468554 .setRpcTimeoutMultiplier (2 )
469555 .setMaxRpcTimeout (DEFAULT_RPC_TIMEOUT )
470556 .build ();
471-
557+ static final boolean DEFAULT_ENABLE_MESSAGE_ORDERING = false ;
472558 private static final int THREADS_PER_CPU = 5 ;
473559 static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
474560 InstantiatingExecutorProvider .newBuilder ()
@@ -482,6 +568,8 @@ public static final class Builder {
482568
483569 RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS ;
484570
571+ private boolean enableMessageOrdering = DEFAULT_ENABLE_MESSAGE_ORDERING ;
572+
485573 private TransportChannelProvider channelProvider =
486574 TopicAdminSettings .defaultGrpcTransportProviderBuilder ().setChannelsPerCpu (1 ).build ();
487575
@@ -576,6 +664,14 @@ public Builder setRetrySettings(RetrySettings retrySettings) {
576664 return this ;
577665 }
578666
667+ /** Sets the message ordering option. */
668+ // TODO: make this public when Ordering keys is live
669+ @ BetaApi
670+ Builder setEnableMessageOrdering (boolean enableMessageOrdering ) {
671+ this .enableMessageOrdering = enableMessageOrdering ;
672+ return this ;
673+ }
674+
579675 /** Gives the ability to set a custom executor to be used by the library. */
580676 public Builder setExecutorProvider (ExecutorProvider executorProvider ) {
581677 this .executorProvider = Preconditions .checkNotNull (executorProvider );
@@ -601,15 +697,17 @@ public Publisher build() throws IOException {
601697 private static class MessagesBatch {
602698 private List <OutstandingPublish > messages ;
603699 private int batchedBytes ;
700+ private String orderingKey ;
604701 private final BatchingSettings batchingSettings ;
605702
606- public MessagesBatch (BatchingSettings batchingSettings ) {
703+ private MessagesBatch (BatchingSettings batchingSettings , String orderingKey ) {
607704 this .batchingSettings = batchingSettings ;
705+ this .orderingKey = orderingKey ;
608706 reset ();
609707 }
610708
611709 private OutstandingBatch popOutstandingBatch () {
612- OutstandingBatch batch = new OutstandingBatch (messages , batchedBytes );
710+ OutstandingBatch batch = new OutstandingBatch (messages , batchedBytes , orderingKey );
613711 reset ();
614712 return batch ;
615713 }
0 commit comments