4848import java .io .IOException ;
4949import java .util .ArrayList ;
5050import java .util .Collections ;
51- import java .util .EnumSet ;
52- import java .util .HashMap ;
5351import java .util .Iterator ;
5452import java .util .LinkedList ;
5553import java .util .List ;
56- import java .util .Map ;
57- import java .util .Set ;
58- import java .util .concurrent .Callable ;
5954import java .util .concurrent .ScheduledExecutorService ;
6055import java .util .concurrent .ScheduledFuture ;
6156import java .util .concurrent .TimeUnit ;
@@ -89,17 +84,15 @@ public class Publisher {
8984 private final String topicName ;
9085
9186 private final BatchingSettings batchingSettings ;
92- private final boolean enableMessageOrdering ;
9387
9488 private final Lock messagesBatchLock ;
95- private final Map < String , MessagesBatch > messagesBatches ;
89+ private MessagesBatch messagesBatch ;
9690
9791 private final AtomicBoolean activeAlarm ;
9892
9993 private final PublisherStub publisherStub ;
10094
10195 private final ScheduledExecutorService executor ;
102- private final SequentialExecutorService .CallbackExecutor sequentialExecutor ;
10396 private final AtomicBoolean shutdown ;
10497 private final List <AutoCloseable > closeables ;
10598 private final MessageWaiter messagesWaiter ;
@@ -120,14 +113,12 @@ private Publisher(Builder builder) throws IOException {
120113 topicName = builder .topicName ;
121114
122115 this .batchingSettings = builder .batchingSettings ;
123- this .enableMessageOrdering = builder .enableMessageOrdering ;
124116 this .messageTransform = builder .messageTransform ;
125117
126- messagesBatches = new HashMap <> ();
118+ messagesBatch = new MessagesBatch ();
127119 messagesBatchLock = new ReentrantLock ();
128120 activeAlarm = new AtomicBoolean (false );
129121 executor = builder .executorProvider .getExecutor ();
130- sequentialExecutor = new SequentialExecutorService .CallbackExecutor (executor );
131122 if (builder .executorProvider .shouldAutoClose ()) {
132123 closeables =
133124 Collections .<AutoCloseable >singletonList (new ExecutorAsBackgroundResource (executor ));
@@ -137,31 +128,9 @@ private Publisher(Builder builder) throws IOException {
137128
138129 // Publisher used to take maxAttempt == 0 to mean infinity, but to GAX it means don't retry.
139130 // We post-process this here to keep backward-compatibility.
140- // Also, if "message ordering" is enabled, the publisher should retry sending the failed
141- // message infinitely rather than sending the next one.
142- RetrySettings .Builder retrySettingsBuilder = builder .retrySettings .toBuilder ();
143- if (retrySettingsBuilder .getMaxAttempts () == 0 ) {
144- retrySettingsBuilder .setMaxAttempts (Integer .MAX_VALUE );
145- }
146- if (enableMessageOrdering ) {
147- retrySettingsBuilder
148- .setMaxAttempts (Integer .MAX_VALUE )
149- .setTotalTimeout (Duration .ofNanos (Long .MAX_VALUE ));
150- }
151-
152- Set <StatusCode .Code > retryCodes ;
153- if (enableMessageOrdering ) {
154- retryCodes = EnumSet .allOf (StatusCode .Code .class );
155- } else {
156- retryCodes =
157- EnumSet .of (
158- StatusCode .Code .ABORTED ,
159- StatusCode .Code .CANCELLED ,
160- StatusCode .Code .DEADLINE_EXCEEDED ,
161- StatusCode .Code .INTERNAL ,
162- StatusCode .Code .RESOURCE_EXHAUSTED ,
163- StatusCode .Code .UNKNOWN ,
164- StatusCode .Code .UNAVAILABLE );
131+ RetrySettings retrySettings = builder .retrySettings ;
132+ if (retrySettings .getMaxAttempts () == 0 ) {
133+ retrySettings = retrySettings .toBuilder ().setMaxAttempts (Integer .MAX_VALUE ).build ();
165134 }
166135
167136 PublisherStubSettings .Builder stubSettings =
@@ -171,8 +140,15 @@ private Publisher(Builder builder) throws IOException {
171140 .setTransportChannelProvider (builder .channelProvider );
172141 stubSettings
173142 .publishSettings ()
174- .setRetryableCodes (retryCodes )
175- .setRetrySettings (retrySettingsBuilder .build ())
143+ .setRetryableCodes (
144+ StatusCode .Code .ABORTED ,
145+ StatusCode .Code .CANCELLED ,
146+ StatusCode .Code .DEADLINE_EXCEEDED ,
147+ StatusCode .Code .INTERNAL ,
148+ StatusCode .Code .RESOURCE_EXHAUSTED ,
149+ StatusCode .Code .UNKNOWN ,
150+ StatusCode .Code .UNAVAILABLE )
151+ .setRetrySettings (retrySettings )
176152 .setBatchingSettings (BatchingSettings .newBuilder ().setIsEnabled (false ).build ());
177153 this .publisherStub = GrpcPublisherStub .create (stubSettings .build ());
178154
@@ -220,23 +196,12 @@ public ApiFuture<String> publish(PubsubMessage message) {
220196 throw new IllegalStateException ("Cannot publish on a shut-down publisher." );
221197 }
222198
223- final String orderingKey = message .getOrderingKey ();
224- if (orderingKey != null && !orderingKey .isEmpty () && !enableMessageOrdering ) {
225- throw new IllegalStateException (
226- "Cannot publish a message with an ordering key when message ordering is not enabled." );
227- }
228-
229199 message = messageTransform .apply (message );
230200 List <OutstandingBatch > batchesToSend = new ArrayList <>();
231201 final OutstandingPublish outstandingPublish = new OutstandingPublish (message );
232202 messagesBatchLock .lock ();
233203 try {
234204 // Check if the next message makes the current batch exceed the max batch byte size.
235- MessagesBatch messagesBatch = messagesBatches .get (orderingKey );
236- if (messagesBatch == null ) {
237- messagesBatch = new MessagesBatch (orderingKey );
238- messagesBatches .put (orderingKey , messagesBatch );
239- }
240205 if (!messagesBatch .isEmpty ()
241206 && hasBatchingBytes ()
242207 && messagesBatch .getBatchedBytes () + outstandingPublish .messageSize
@@ -254,7 +219,6 @@ && hasBatchingBytes()
254219 || messagesBatch .getMessagesCount () == getBatchingSettings ().getElementCountThreshold ()) {
255220 batchesToSend .add (messagesBatch .popOutstandingBatch ());
256221 }
257-
258222 // Setup the next duration based delivery alarm if there are messages batched.
259223 setupAlarm ();
260224 } finally {
@@ -263,9 +227,7 @@ && hasBatchingBytes()
263227
264228 messagesWaiter .incrementPendingMessages (1 );
265229
266-
267230 if (!batchesToSend .isEmpty ()) {
268- publishAllOutstanding ();
269231 for (final OutstandingBatch batch : batchesToSend ) {
270232 logger .log (Level .FINER , "Scheduling a batch for immediate sending." );
271233 executor .execute (
@@ -282,7 +244,7 @@ public void run() {
282244 }
283245
284246 private void setupAlarm () {
285- if (!messagesBatches .isEmpty ()) {
247+ if (!messagesBatch .isEmpty ()) {
286248 if (!activeAlarm .getAndSet (true )) {
287249 long delayThresholdMs = getBatchingSettings ().getDelayThreshold ().toMillis ();
288250 logger .log (Level .FINER , "Setting up alarm for the next {0} ms." , delayThresholdMs );
@@ -314,20 +276,16 @@ public void run() {
314276 */
315277 public void publishAllOutstanding () {
316278 messagesBatchLock .lock ();
279+ OutstandingBatch batchToSend ;
317280 try {
318- for (MessagesBatch batch : messagesBatches .values ()) {
319- if (!batch .isEmpty ()) {
320- // TODO(kimkyung-goog): Do not release `messagesBatchLock` when publishing a batch. If
321- // it's released, the order of publishing cannot be guaranteed if `publish()` is called
322- // while this function is running. This locking mechanism needs to be improved if it
323- // causes any performance degradation.
324- publishOutstandingBatch (batch .popOutstandingBatch ());
325- }
281+ if (messagesBatch .isEmpty ()) {
282+ return ;
326283 }
327- messagesBatches . clear ();
284+ batchToSend = messagesBatch . popOutstandingBatch ();
328285 } finally {
329286 messagesBatchLock .unlock ();
330287 }
288+ publishOutstandingBatch (batchToSend );
331289 }
332290
333291 private ApiFuture <PublishResponse > publishCall (OutstandingBatch outstandingBatch ) {
@@ -336,11 +294,12 @@ private ApiFuture<PublishResponse> publishCall(OutstandingBatch outstandingBatch
336294 for (OutstandingPublish outstandingPublish : outstandingBatch .outstandingPublishes ) {
337295 publishRequest .addMessages (outstandingPublish .message );
338296 }
297+
339298 return publisherStub .publishCallable ().futureCall (publishRequest .build ());
340299 }
341300
342301 private void publishOutstandingBatch (final OutstandingBatch outstandingBatch ) {
343- final ApiFutureCallback <PublishResponse > futureCallback =
302+ ApiFutureCallback <PublishResponse > futureCallback =
344303 new ApiFutureCallback <PublishResponse >() {
345304 @ Override
346305 public void onSuccess (PublishResponse result ) {
@@ -381,36 +340,20 @@ public void onFailure(Throwable t) {
381340 }
382341 };
383342
384- if (outstandingBatch .orderingKey == null || outstandingBatch .orderingKey .isEmpty ()) {
385- ApiFutures .addCallback (publishCall (outstandingBatch ), futureCallback , directExecutor ());
386- } else {
387- // If ordering key is specified, publish the batch using the sequential executor.
388- sequentialExecutor .submit (
389- outstandingBatch .orderingKey ,
390- new Callable <ApiFuture <PublishResponse >>() {
391- public ApiFuture <PublishResponse > call () {
392- ApiFuture <PublishResponse > future = publishCall (outstandingBatch );
393- ApiFutures .addCallback (future , futureCallback , directExecutor ());
394- return future ;
395- }
396- });
397- }
343+ ApiFutures .addCallback (publishCall (outstandingBatch ), futureCallback , directExecutor ());
398344 }
399345
400346 private static final class OutstandingBatch {
401347 final List <OutstandingPublish > outstandingPublishes ;
402348 final long creationTime ;
403349 int attempt ;
404350 int batchSizeBytes ;
405- final String orderingKey ;
406351
407- OutstandingBatch (
408- List <OutstandingPublish > outstandingPublishes , int batchSizeBytes , String orderingKey ) {
352+ OutstandingBatch (List <OutstandingPublish > outstandingPublishes , int batchSizeBytes ) {
409353 this .outstandingPublishes = outstandingPublishes ;
410354 attempt = 1 ;
411355 creationTime = System .currentTimeMillis ();
412356 this .batchSizeBytes = batchSizeBytes ;
413- this .orderingKey = orderingKey ;
414357 }
415358
416359 int size () {
@@ -546,7 +489,7 @@ public static final class Builder {
546489 .setRpcTimeoutMultiplier (2 )
547490 .setMaxRpcTimeout (DEFAULT_RPC_TIMEOUT )
548491 .build ();
549- static final boolean DEFAULT_ENABLE_MESSAGE_ORDERING = false ;
492+
550493 private static final int THREADS_PER_CPU = 5 ;
551494 static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
552495 InstantiatingExecutorProvider .newBuilder ()
@@ -560,8 +503,6 @@ public static final class Builder {
560503
561504 RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS ;
562505
563- boolean enableMessageOrdering = DEFAULT_ENABLE_MESSAGE_ORDERING ;
564-
565506 TransportChannelProvider channelProvider =
566507 TopicAdminSettings .defaultGrpcTransportProviderBuilder ().setChannelsPerCpu (1 ).build ();
567508
@@ -656,12 +597,6 @@ public Builder setRetrySettings(RetrySettings retrySettings) {
656597 return this ;
657598 }
658599
659- /** Sets the message ordering option. */
660- public Builder setEnableMessageOrdering (boolean enableMessageOrdering ) {
661- this .enableMessageOrdering = enableMessageOrdering ;
662- return this ;
663- }
664-
665600 /** Gives the ability to set a custom executor to be used by the library. */
666601 public Builder setExecutorProvider (ExecutorProvider executorProvider ) {
667602 this .executorProvider = Preconditions .checkNotNull (executorProvider );
@@ -687,14 +622,9 @@ public Publisher build() throws IOException {
687622 private static class MessagesBatch {
688623 private List <OutstandingPublish > messages = new LinkedList <>();
689624 private int batchedBytes ;
690- private String orderingKey ;
691-
692- private MessagesBatch (String orderingKey ) {
693- this .orderingKey = orderingKey ;
694- }
695625
696626 private OutstandingBatch popOutstandingBatch () {
697- OutstandingBatch batch = new OutstandingBatch (messages , batchedBytes , orderingKey );
627+ OutstandingBatch batch = new OutstandingBatch (messages , batchedBytes );
698628 reset ();
699629 return batch ;
700630 }
0 commit comments