4848import com .google .pubsub .v1 .TopicNames ;
4949import java .io .IOException ;
5050import java .util .Collections ;
51- import java .util .EnumSet ;
52- import java .util .HashMap ;
5351import java .util .Iterator ;
5452import java .util .LinkedList ;
5553import java .util .List ;
56- import java .util .Map ;
57- import java .util .Set ;
58- import java .util .concurrent .Callable ;
5954import java .util .concurrent .ScheduledExecutorService ;
6055import java .util .concurrent .ScheduledFuture ;
6156import java .util .concurrent .TimeUnit ;
@@ -89,17 +84,15 @@ public class Publisher {
8984 private final String topicName ;
9085
9186 private final BatchingSettings batchingSettings ;
92- private final boolean enableMessageOrdering ;
9387
9488 private final Lock messagesBatchLock ;
95- private final Map < String , MessagesBatch > messagesBatches ;
89+ private MessagesBatch messagesBatch ;
9690
9791 private final AtomicBoolean activeAlarm ;
9892
9993 private final PublisherStub publisherStub ;
10094
10195 private final ScheduledExecutorService executor ;
102- private final SequentialExecutorService .CallbackExecutor sequentialExecutor ;
10396 private final AtomicBoolean shutdown ;
10497 private final List <AutoCloseable > closeables ;
10598 private final MessageWaiter messagesWaiter ;
@@ -120,14 +113,12 @@ private Publisher(Builder builder) throws IOException {
120113 topicName = builder .topicName ;
121114
122115 this .batchingSettings = builder .batchingSettings ;
123- this .enableMessageOrdering = builder .enableMessageOrdering ;
124116 this .messageTransform = builder .messageTransform ;
125117
126- messagesBatches = new HashMap <> ();
118+ messagesBatch = new MessagesBatch ();
127119 messagesBatchLock = new ReentrantLock ();
128120 activeAlarm = new AtomicBoolean (false );
129121 executor = builder .executorProvider .getExecutor ();
130- sequentialExecutor = new SequentialExecutorService .CallbackExecutor (executor );
131122 if (builder .executorProvider .shouldAutoClose ()) {
132123 closeables =
133124 Collections .<AutoCloseable >singletonList (new ExecutorAsBackgroundResource (executor ));
@@ -137,31 +128,9 @@ private Publisher(Builder builder) throws IOException {
137128
138129 // Publisher used to take maxAttempt == 0 to mean infinity, but to GAX it means don't retry.
139130 // We post-process this here to keep backward-compatibility.
140- // Also, if "message ordering" is enabled, the publisher should retry sending the failed
141- // message infinitely rather than sending the next one.
142- RetrySettings .Builder retrySettingsBuilder = builder .retrySettings .toBuilder ();
143- if (retrySettingsBuilder .getMaxAttempts () == 0 ) {
144- retrySettingsBuilder .setMaxAttempts (Integer .MAX_VALUE );
145- }
146- if (enableMessageOrdering ) {
147- retrySettingsBuilder
148- .setMaxAttempts (Integer .MAX_VALUE )
149- .setTotalTimeout (Duration .ofNanos (Long .MAX_VALUE ));
150- }
151-
152- Set <StatusCode .Code > retryCodes ;
153- if (enableMessageOrdering ) {
154- retryCodes = EnumSet .allOf (StatusCode .Code .class );
155- } else {
156- retryCodes =
157- EnumSet .of (
158- StatusCode .Code .ABORTED ,
159- StatusCode .Code .CANCELLED ,
160- StatusCode .Code .DEADLINE_EXCEEDED ,
161- StatusCode .Code .INTERNAL ,
162- StatusCode .Code .RESOURCE_EXHAUSTED ,
163- StatusCode .Code .UNKNOWN ,
164- StatusCode .Code .UNAVAILABLE );
131+ RetrySettings retrySettings = builder .retrySettings ;
132+ if (retrySettings .getMaxAttempts () == 0 ) {
133+ retrySettings = retrySettings .toBuilder ().setMaxAttempts (Integer .MAX_VALUE ).build ();
165134 }
166135
167136 PublisherStubSettings .Builder stubSettings =
@@ -171,8 +140,15 @@ private Publisher(Builder builder) throws IOException {
171140 .setTransportChannelProvider (builder .channelProvider );
172141 stubSettings
173142 .publishSettings ()
174- .setRetryableCodes (retryCodes )
175- .setRetrySettings (retrySettingsBuilder .build ())
143+ .setRetryableCodes (
144+ StatusCode .Code .ABORTED ,
145+ StatusCode .Code .CANCELLED ,
146+ StatusCode .Code .DEADLINE_EXCEEDED ,
147+ StatusCode .Code .INTERNAL ,
148+ StatusCode .Code .RESOURCE_EXHAUSTED ,
149+ StatusCode .Code .UNKNOWN ,
150+ StatusCode .Code .UNAVAILABLE )
151+ .setRetrySettings (retrySettings )
176152 .setBatchingSettings (BatchingSettings .newBuilder ().setIsEnabled (false ).build ());
177153 this .publisherStub = GrpcPublisherStub .create (stubSettings .build ());
178154
@@ -220,42 +196,30 @@ public ApiFuture<String> publish(PubsubMessage message) {
220196 throw new IllegalStateException ("Cannot publish on a shut-down publisher." );
221197 }
222198
223- final String orderingKey = message .getOrderingKey ();
224- if (orderingKey != null && !orderingKey .isEmpty () && !enableMessageOrdering ) {
225- throw new IllegalStateException (
226- "Cannot publish a message with an ordering key when message ordering is not enabled." );
227- }
228-
229199 message = messageTransform .apply (message );
230- final int messageSize = message .getSerializedSize ();
231200 OutstandingBatch batchToSend = null ;
232- SettableApiFuture <String > publishResult = SettableApiFuture .<String >create ();
233- final OutstandingPublish outstandingPublish = new OutstandingPublish (publishResult , message );
201+ final OutstandingPublish outstandingPublish = new OutstandingPublish (message );
234202 messagesBatchLock .lock ();
235203 try {
236204 // Check if the next message makes the current batch exceed the max batch byte size.
237- MessagesBatch messagesBatch = messagesBatches .get (orderingKey );
238- if (messagesBatch == null ) {
239- messagesBatch = new MessagesBatch (orderingKey );
240- messagesBatches .put (orderingKey , messagesBatch );
241- }
242205 if (!messagesBatch .isEmpty ()
243206 && hasBatchingBytes ()
244- && messagesBatch .getBatchedBytes () + messageSize >= getMaxBatchBytes ()) {
207+ && messagesBatch .getBatchedBytes () + outstandingPublish .messageSize
208+ >= getMaxBatchBytes ()) {
245209 batchToSend = messagesBatch .popOutstandingBatch ();
246210 }
247211
248212 // Border case if the message to send is greater or equals to the max batch size then can't
249213 // be included in the current batch and instead sent immediately.
250- if (!hasBatchingBytes () || messageSize < getMaxBatchBytes ()) {
251- messagesBatch .addMessage (outstandingPublish , messageSize );
214+ if (!hasBatchingBytes () || outstandingPublish .messageSize < getMaxBatchBytes ()) {
215+ messagesBatch .addMessage (outstandingPublish , outstandingPublish .messageSize );
216+
252217 // If after adding the message we have reached the batch max messages then we have a batch
253218 // to send.
254219 if (messagesBatch .getMessagesCount () == getBatchingSettings ().getElementCountThreshold ()) {
255220 batchToSend = messagesBatch .popOutstandingBatch ();
256221 }
257222 }
258-
259223 // Setup the next duration based delivery alarm if there are messages batched.
260224 setupAlarm ();
261225 } finally {
@@ -266,25 +230,37 @@ && hasBatchingBytes()
266230
267231 if (batchToSend != null ) {
268232 logger .log (Level .FINER , "Scheduling a batch for immediate sending." );
269- publishAllOutstanding ();
270- publishOutstandingBatch (batchToSend );
233+ final OutstandingBatch finalBatchToSend = batchToSend ;
234+ executor .execute (
235+ new Runnable () {
236+ @ Override
237+ public void run () {
238+ publishOutstandingBatch (finalBatchToSend );
239+ }
240+ });
271241 }
272242
273243 // If the message is over the size limit, it was not added to the pending messages and it will
274244 // be sent in its own batch immediately.
275- if (hasBatchingBytes () && messageSize >= getMaxBatchBytes ()) {
245+ if (hasBatchingBytes () && outstandingPublish . messageSize >= getMaxBatchBytes ()) {
276246 logger .log (
277247 Level .FINER , "Message exceeds the max batch bytes, scheduling it for immediate send." );
278- publishAllOutstanding ();
279- publishOutstandingBatch (
280- new OutstandingBatch (ImmutableList .of (outstandingPublish ), messageSize , orderingKey ));
248+ executor .execute (
249+ new Runnable () {
250+ @ Override
251+ public void run () {
252+ publishOutstandingBatch (
253+ new OutstandingBatch (
254+ ImmutableList .of (outstandingPublish ), outstandingPublish .messageSize ));
255+ }
256+ });
281257 }
282258
283- return publishResult ;
259+ return outstandingPublish . publishResult ;
284260 }
285261
286262 private void setupAlarm () {
287- if (!messagesBatches .isEmpty ()) {
263+ if (!messagesBatch .isEmpty ()) {
288264 if (!activeAlarm .getAndSet (true )) {
289265 long delayThresholdMs = getBatchingSettings ().getDelayThreshold ().toMillis ();
290266 logger .log (Level .FINER , "Setting up alarm for the next {0} ms." , delayThresholdMs );
@@ -316,20 +292,16 @@ public void run() {
316292 */
317293 public void publishAllOutstanding () {
318294 messagesBatchLock .lock ();
295+ OutstandingBatch batchToSend ;
319296 try {
320- for (MessagesBatch batch : messagesBatches .values ()) {
321- if (!batch .isEmpty ()) {
322- // TODO(kimkyung-goog): Do not release `messagesBatchLock` when publishing a batch. If
323- // it's released, the order of publishing cannot be guaranteed if `publish()` is called
324- // while this function is running. This locking mechanism needs to be improved if it
325- // causes any performance degradation.
326- publishOutstandingBatch (batch .popOutstandingBatch ());
327- }
297+ if (messagesBatch .isEmpty ()) {
298+ return ;
328299 }
329- messagesBatches . clear ();
300+ batchToSend = messagesBatch . popOutstandingBatch ();
330301 } finally {
331302 messagesBatchLock .unlock ();
332303 }
304+ publishOutstandingBatch (batchToSend );
333305 }
334306
335307 private ApiFuture <PublishResponse > publishCall (OutstandingBatch outstandingBatch ) {
@@ -338,11 +310,12 @@ private ApiFuture<PublishResponse> publishCall(OutstandingBatch outstandingBatch
338310 for (OutstandingPublish outstandingPublish : outstandingBatch .outstandingPublishes ) {
339311 publishRequest .addMessages (outstandingPublish .message );
340312 }
313+
341314 return publisherStub .publishCallable ().futureCall (publishRequest .build ());
342315 }
343316
344317 private void publishOutstandingBatch (final OutstandingBatch outstandingBatch ) {
345- final ApiFutureCallback <PublishResponse > futureCallback =
318+ ApiFutureCallback <PublishResponse > futureCallback =
346319 new ApiFutureCallback <PublishResponse >() {
347320 @ Override
348321 public void onSuccess (PublishResponse result ) {
@@ -383,36 +356,20 @@ public void onFailure(Throwable t) {
383356 }
384357 };
385358
386- if (outstandingBatch .orderingKey == null || outstandingBatch .orderingKey .isEmpty ()) {
387- ApiFutures .addCallback (publishCall (outstandingBatch ), futureCallback , directExecutor ());
388- } else {
389- // If ordering key is specified, publish the batch using the sequential executor.
390- sequentialExecutor .submit (
391- outstandingBatch .orderingKey ,
392- new Callable <ApiFuture <PublishResponse >>() {
393- public ApiFuture <PublishResponse > call () {
394- ApiFuture <PublishResponse > future = publishCall (outstandingBatch );
395- ApiFutures .addCallback (future , futureCallback , directExecutor ());
396- return future ;
397- }
398- });
399- }
359+ ApiFutures .addCallback (publishCall (outstandingBatch ), futureCallback , directExecutor ());
400360 }
401361
402362 private static final class OutstandingBatch {
403363 final List <OutstandingPublish > outstandingPublishes ;
404364 final long creationTime ;
405365 int attempt ;
406366 int batchSizeBytes ;
407- final String orderingKey ;
408367
409- OutstandingBatch (
410- List <OutstandingPublish > outstandingPublishes , int batchSizeBytes , String orderingKey ) {
368+ OutstandingBatch (List <OutstandingPublish > outstandingPublishes , int batchSizeBytes ) {
411369 this .outstandingPublishes = outstandingPublishes ;
412370 attempt = 1 ;
413371 creationTime = System .currentTimeMillis ();
414372 this .batchSizeBytes = batchSizeBytes ;
415- this .orderingKey = orderingKey ;
416373 }
417374
418375 public int getAttempt () {
@@ -425,12 +382,14 @@ public int size() {
425382 }
426383
427384 private static final class OutstandingPublish {
428- SettableApiFuture <String > publishResult ;
429- PubsubMessage message ;
385+ final SettableApiFuture <String > publishResult ;
386+ final PubsubMessage message ;
387+ final int messageSize ;
430388
431- OutstandingPublish (SettableApiFuture < String > publishResult , PubsubMessage message ) {
432- this .publishResult = publishResult ;
389+ OutstandingPublish (PubsubMessage message ) {
390+ this .publishResult = SettableApiFuture . create () ;
433391 this .message = message ;
392+ this .messageSize = message .getSerializedSize ();
434393 }
435394 }
436395
@@ -550,7 +509,7 @@ public static final class Builder {
550509 .setRpcTimeoutMultiplier (2 )
551510 .setMaxRpcTimeout (DEFAULT_RPC_TIMEOUT )
552511 .build ();
553- static final boolean DEFAULT_ENABLE_MESSAGE_ORDERING = false ;
512+
554513 private static final int THREADS_PER_CPU = 5 ;
555514 static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
556515 InstantiatingExecutorProvider .newBuilder ()
@@ -564,8 +523,6 @@ public static final class Builder {
564523
565524 RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS ;
566525
567- boolean enableMessageOrdering = DEFAULT_ENABLE_MESSAGE_ORDERING ;
568-
569526 TransportChannelProvider channelProvider =
570527 TopicAdminSettings .defaultGrpcTransportProviderBuilder ().setChannelsPerCpu (1 ).build ();
571528
@@ -660,12 +617,6 @@ public Builder setRetrySettings(RetrySettings retrySettings) {
660617 return this ;
661618 }
662619
663- /** Sets the message ordering option. */
664- public Builder setEnableMessageOrdering (boolean enableMessageOrdering ) {
665- this .enableMessageOrdering = enableMessageOrdering ;
666- return this ;
667- }
668-
669620 /** Gives the ability to set a custom executor to be used by the library. */
670621 public Builder setExecutorProvider (ExecutorProvider executorProvider ) {
671622 this .executorProvider = Preconditions .checkNotNull (executorProvider );
@@ -691,14 +642,9 @@ public Publisher build() throws IOException {
691642 private static class MessagesBatch {
692643 private List <OutstandingPublish > messages = new LinkedList <>();
693644 private int batchedBytes ;
694- private String orderingKey ;
695-
696- private MessagesBatch (String orderingKey ) {
697- this .orderingKey = orderingKey ;
698- }
699645
700646 private OutstandingBatch popOutstandingBatch () {
701- OutstandingBatch batch = new OutstandingBatch (messages , batchedBytes , orderingKey );
647+ OutstandingBatch batch = new OutstandingBatch (messages , batchedBytes );
702648 reset ();
703649 return batch ;
704650 }
0 commit comments