4848import com .google .pubsub .v1 .TopicNames ;
4949import java .io .IOException ;
5050import java .util .Collections ;
51- import java .util .EnumSet ;
52- import java .util .HashMap ;
5351import java .util .Iterator ;
5452import java .util .LinkedList ;
5553import java .util .List ;
56- import java .util .Map ;
57- import java .util .Set ;
58- import java .util .concurrent .Callable ;
5954import java .util .concurrent .ScheduledExecutorService ;
6055import java .util .concurrent .ScheduledFuture ;
6156import java .util .concurrent .TimeUnit ;
@@ -89,17 +84,15 @@ public class Publisher {
8984 private final String topicName ;
9085
9186 private final BatchingSettings batchingSettings ;
92- private final boolean enableMessageOrdering ;
9387
9488 private final Lock messagesBatchLock ;
95- private final Map < String , MessagesBatch > messagesBatches ;
89+ private MessagesBatch messagesBatch ;
9690
9791 private final AtomicBoolean activeAlarm ;
9892
9993 private final PublisherStub publisherStub ;
10094
10195 private final ScheduledExecutorService executor ;
102- private final SequentialExecutorService .CallbackExecutor sequentialExecutor ;
10396 private final AtomicBoolean shutdown ;
10497 private final List <AutoCloseable > closeables ;
10598 private final MessageWaiter messagesWaiter ;
@@ -120,14 +113,12 @@ private Publisher(Builder builder) throws IOException {
120113 topicName = builder .topicName ;
121114
122115 this .batchingSettings = builder .batchingSettings ;
123- this .enableMessageOrdering = builder .enableMessageOrdering ;
124116 this .messageTransform = builder .messageTransform ;
125117
126- messagesBatches = new HashMap <> ();
118+ messagesBatch = new MessagesBatch ();
127119 messagesBatchLock = new ReentrantLock ();
128120 activeAlarm = new AtomicBoolean (false );
129121 executor = builder .executorProvider .getExecutor ();
130- sequentialExecutor = new SequentialExecutorService .CallbackExecutor (executor );
131122 if (builder .executorProvider .shouldAutoClose ()) {
132123 closeables =
133124 Collections .<AutoCloseable >singletonList (new ExecutorAsBackgroundResource (executor ));
@@ -137,31 +128,9 @@ private Publisher(Builder builder) throws IOException {
137128
138129 // Publisher used to take maxAttempt == 0 to mean infinity, but to GAX it means don't retry.
139130 // We post-process this here to keep backward-compatibility.
140- // Also, if "message ordering" is enabled, the publisher should retry sending the failed
141- // message infinitely rather than sending the next one.
142- RetrySettings .Builder retrySettingsBuilder = builder .retrySettings .toBuilder ();
143- if (retrySettingsBuilder .getMaxAttempts () == 0 ) {
144- retrySettingsBuilder .setMaxAttempts (Integer .MAX_VALUE );
145- }
146- if (enableMessageOrdering ) {
147- retrySettingsBuilder
148- .setMaxAttempts (Integer .MAX_VALUE )
149- .setTotalTimeout (Duration .ofNanos (Long .MAX_VALUE ));
150- }
151-
152- Set <StatusCode .Code > retryCodes ;
153- if (enableMessageOrdering ) {
154- retryCodes = EnumSet .allOf (StatusCode .Code .class );
155- } else {
156- retryCodes =
157- EnumSet .of (
158- StatusCode .Code .ABORTED ,
159- StatusCode .Code .CANCELLED ,
160- StatusCode .Code .DEADLINE_EXCEEDED ,
161- StatusCode .Code .INTERNAL ,
162- StatusCode .Code .RESOURCE_EXHAUSTED ,
163- StatusCode .Code .UNKNOWN ,
164- StatusCode .Code .UNAVAILABLE );
131+ RetrySettings retrySettings = builder .retrySettings ;
132+ if (retrySettings .getMaxAttempts () == 0 ) {
133+ retrySettings = retrySettings .toBuilder ().setMaxAttempts (Integer .MAX_VALUE ).build ();
165134 }
166135
167136 PublisherStubSettings .Builder stubSettings =
@@ -171,8 +140,15 @@ private Publisher(Builder builder) throws IOException {
171140 .setTransportChannelProvider (builder .channelProvider );
172141 stubSettings
173142 .publishSettings ()
174- .setRetryableCodes (retryCodes )
175- .setRetrySettings (retrySettingsBuilder .build ())
143+ .setRetryableCodes (
144+ StatusCode .Code .ABORTED ,
145+ StatusCode .Code .CANCELLED ,
146+ StatusCode .Code .DEADLINE_EXCEEDED ,
147+ StatusCode .Code .INTERNAL ,
148+ StatusCode .Code .RESOURCE_EXHAUSTED ,
149+ StatusCode .Code .UNKNOWN ,
150+ StatusCode .Code .UNAVAILABLE )
151+ .setRetrySettings (retrySettings )
176152 .setBatchingSettings (BatchingSettings .newBuilder ().setIsEnabled (false ).build ());
177153 this .publisherStub = GrpcPublisherStub .create (stubSettings .build ());
178154
@@ -220,23 +196,12 @@ public ApiFuture<String> publish(PubsubMessage message) {
220196 throw new IllegalStateException ("Cannot publish on a shut-down publisher." );
221197 }
222198
223- final String orderingKey = message .getOrderingKey ();
224- if (orderingKey != null && !orderingKey .isEmpty () && !enableMessageOrdering ) {
225- throw new IllegalStateException (
226- "Cannot publish a message with an ordering key when message ordering is not enabled." );
227- }
228-
229199 message = messageTransform .apply (message );
230200 OutstandingBatch batchToSend = null ;
231201 final OutstandingPublish outstandingPublish = new OutstandingPublish (message );
232202 messagesBatchLock .lock ();
233203 try {
234204 // Check if the next message makes the current batch exceed the max batch byte size.
235- MessagesBatch messagesBatch = messagesBatches .get (orderingKey );
236- if (messagesBatch == null ) {
237- messagesBatch = new MessagesBatch (orderingKey );
238- messagesBatches .put (orderingKey , messagesBatch );
239- }
240205 if (!messagesBatch .isEmpty ()
241206 && hasBatchingBytes ()
242207 && messagesBatch .getBatchedBytes () + outstandingPublish .messageSize
@@ -255,7 +220,6 @@ && hasBatchingBytes()
255220 batchToSend = messagesBatch .popOutstandingBatch ();
256221 }
257222 }
258-
259223 // Setup the next duration based delivery alarm if there are messages batched.
260224 setupAlarm ();
261225 } finally {
@@ -266,26 +230,37 @@ && hasBatchingBytes()
266230
267231 if (batchToSend != null ) {
268232 logger .log (Level .FINER , "Scheduling a batch for immediate sending." );
269- publishAllOutstanding ();
270- publishOutstandingBatch (batchToSend );
233+ final OutstandingBatch finalBatchToSend = batchToSend ;
234+ executor .execute (
235+ new Runnable () {
236+ @ Override
237+ public void run () {
238+ publishOutstandingBatch (finalBatchToSend );
239+ }
240+ });
271241 }
272242
273243 // If the message is over the size limit, it was not added to the pending messages and it will
274244 // be sent in its own batch immediately.
275245 if (hasBatchingBytes () && outstandingPublish .messageSize >= getMaxBatchBytes ()) {
276246 logger .log (
277247 Level .FINER , "Message exceeds the max batch bytes, scheduling it for immediate send." );
278- publishAllOutstanding ();
279- publishOutstandingBatch (
280- new OutstandingBatch (
281- ImmutableList .of (outstandingPublish ), outstandingPublish .messageSize , orderingKey ));
248+ executor .execute (
249+ new Runnable () {
250+ @ Override
251+ public void run () {
252+ publishOutstandingBatch (
253+ new OutstandingBatch (
254+ ImmutableList .of (outstandingPublish ), outstandingPublish .messageSize ));
255+ }
256+ });
282257 }
283258
284259 return outstandingPublish .publishResult ;
285260 }
286261
287262 private void setupAlarm () {
288- if (!messagesBatches .isEmpty ()) {
263+ if (!messagesBatch .isEmpty ()) {
289264 if (!activeAlarm .getAndSet (true )) {
290265 long delayThresholdMs = getBatchingSettings ().getDelayThreshold ().toMillis ();
291266 logger .log (Level .FINER , "Setting up alarm for the next {0} ms." , delayThresholdMs );
@@ -317,20 +292,16 @@ public void run() {
317292 */
318293 public void publishAllOutstanding () {
319294 messagesBatchLock .lock ();
295+ OutstandingBatch batchToSend ;
320296 try {
321- for (MessagesBatch batch : messagesBatches .values ()) {
322- if (!batch .isEmpty ()) {
323- // TODO(kimkyung-goog): Do not release `messagesBatchLock` when publishing a batch. If
324- // it's released, the order of publishing cannot be guaranteed if `publish()` is called
325- // while this function is running. This locking mechanism needs to be improved if it
326- // causes any performance degradation.
327- publishOutstandingBatch (batch .popOutstandingBatch ());
328- }
297+ if (messagesBatch .isEmpty ()) {
298+ return ;
329299 }
330- messagesBatches . clear ();
300+ batchToSend = messagesBatch . popOutstandingBatch ();
331301 } finally {
332302 messagesBatchLock .unlock ();
333303 }
304+ publishOutstandingBatch (batchToSend );
334305 }
335306
336307 private ApiFuture <PublishResponse > publishCall (OutstandingBatch outstandingBatch ) {
@@ -339,11 +310,12 @@ private ApiFuture<PublishResponse> publishCall(OutstandingBatch outstandingBatch
339310 for (OutstandingPublish outstandingPublish : outstandingBatch .outstandingPublishes ) {
340311 publishRequest .addMessages (outstandingPublish .message );
341312 }
313+
342314 return publisherStub .publishCallable ().futureCall (publishRequest .build ());
343315 }
344316
345317 private void publishOutstandingBatch (final OutstandingBatch outstandingBatch ) {
346- final ApiFutureCallback <PublishResponse > futureCallback =
318+ ApiFutureCallback <PublishResponse > futureCallback =
347319 new ApiFutureCallback <PublishResponse >() {
348320 @ Override
349321 public void onSuccess (PublishResponse result ) {
@@ -384,36 +356,20 @@ public void onFailure(Throwable t) {
384356 }
385357 };
386358
387- if (outstandingBatch .orderingKey == null || outstandingBatch .orderingKey .isEmpty ()) {
388- ApiFutures .addCallback (publishCall (outstandingBatch ), futureCallback , directExecutor ());
389- } else {
390- // If ordering key is specified, publish the batch using the sequential executor.
391- sequentialExecutor .submit (
392- outstandingBatch .orderingKey ,
393- new Callable <ApiFuture <PublishResponse >>() {
394- public ApiFuture <PublishResponse > call () {
395- ApiFuture <PublishResponse > future = publishCall (outstandingBatch );
396- ApiFutures .addCallback (future , futureCallback , directExecutor ());
397- return future ;
398- }
399- });
400- }
359+ ApiFutures .addCallback (publishCall (outstandingBatch ), futureCallback , directExecutor ());
401360 }
402361
403362 private static final class OutstandingBatch {
404363 final List <OutstandingPublish > outstandingPublishes ;
405364 final long creationTime ;
406365 int attempt ;
407366 int batchSizeBytes ;
408- final String orderingKey ;
409367
410- OutstandingBatch (
411- List <OutstandingPublish > outstandingPublishes , int batchSizeBytes , String orderingKey ) {
368+ OutstandingBatch (List <OutstandingPublish > outstandingPublishes , int batchSizeBytes ) {
412369 this .outstandingPublishes = outstandingPublishes ;
413370 attempt = 1 ;
414371 creationTime = System .currentTimeMillis ();
415372 this .batchSizeBytes = batchSizeBytes ;
416- this .orderingKey = orderingKey ;
417373 }
418374
419375 public int getAttempt () {
@@ -553,7 +509,7 @@ public static final class Builder {
553509 .setRpcTimeoutMultiplier (2 )
554510 .setMaxRpcTimeout (DEFAULT_RPC_TIMEOUT )
555511 .build ();
556- static final boolean DEFAULT_ENABLE_MESSAGE_ORDERING = false ;
512+
557513 private static final int THREADS_PER_CPU = 5 ;
558514 static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
559515 InstantiatingExecutorProvider .newBuilder ()
@@ -567,8 +523,6 @@ public static final class Builder {
567523
568524 RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS ;
569525
570- boolean enableMessageOrdering = DEFAULT_ENABLE_MESSAGE_ORDERING ;
571-
572526 TransportChannelProvider channelProvider =
573527 TopicAdminSettings .defaultGrpcTransportProviderBuilder ().setChannelsPerCpu (1 ).build ();
574528
@@ -663,12 +617,6 @@ public Builder setRetrySettings(RetrySettings retrySettings) {
663617 return this ;
664618 }
665619
666- /** Sets the message ordering option. */
667- public Builder setEnableMessageOrdering (boolean enableMessageOrdering ) {
668- this .enableMessageOrdering = enableMessageOrdering ;
669- return this ;
670- }
671-
672620 /** Gives the ability to set a custom executor to be used by the library. */
673621 public Builder setExecutorProvider (ExecutorProvider executorProvider ) {
674622 this .executorProvider = Preconditions .checkNotNull (executorProvider );
@@ -694,14 +642,9 @@ public Publisher build() throws IOException {
694642 private static class MessagesBatch {
695643 private List <OutstandingPublish > messages = new LinkedList <>();
696644 private int batchedBytes ;
697- private String orderingKey ;
698-
699- private MessagesBatch (String orderingKey ) {
700- this .orderingKey = orderingKey ;
701- }
702645
703646 private OutstandingBatch popOutstandingBatch () {
704- OutstandingBatch batch = new OutstandingBatch (messages , batchedBytes , orderingKey );
647+ OutstandingBatch batch = new OutstandingBatch (messages , batchedBytes );
705648 reset ();
706649 return batch ;
707650 }
0 commit comments