Skip to content

Commit 7e258af

Browse files
[fix][client] moving get sequenceId into the sync code segment (#17836)
### Motivation When the producer sends messages in multiple threads, the message with the smaller sequence Id can be pushed later than the message with the bigger sequence Id. The `internalSendWithTxnAsync` call `internalSendAsync` Asynchronously when `txn != null` https://github.com/apache/pulsar/blob/aeb4503be59f9a9450dfd47cf5dfcb375735d064/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L409 And the `sendAsync` acquire sequence ID is not included in the synchronized block with `serializeAndSendMessage`. https://github.com/apache/pulsar/blob/aeb4503be59f9a9450dfd47cf5dfcb375735d064/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L490 https://github.com/apache/pulsar/blob/aeb4503be59f9a9450dfd47cf5dfcb375735d064/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L555-L560 For example: We send 4 messages (msg1, msg2, msg3, msg4) to the broker and then the 4 messages may get 4 sequence Id (1, 3, 2, 4) which is not in order due to the logic to get the sequence ID and send the message is not in the same synchronous code block. And then the msg3 with sequence ID 2 will never be persistent successfully. ### Modification Add a method to update `sequenceId` and move the method in the sync code. Via #16196 we should update message metadata before computing the message size.
1 parent 4a0b775 commit 7e258af

File tree

2 files changed

+68
-16
lines changed

2 files changed

+68
-16
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727
import java.util.Arrays;
2828
import java.util.List;
2929
import java.util.concurrent.CompletableFuture;
30+
import java.util.concurrent.CountDownLatch;
31+
import java.util.concurrent.ExecutorService;
32+
import java.util.concurrent.Executors;
3033
import java.util.concurrent.TimeUnit;
3134
import java.util.stream.Collectors;
3235
import lombok.extern.slf4j.Slf4j;
@@ -372,4 +375,51 @@ public void testKeyBasedBatchingOrder() throws Exception {
372375
consumer.close();
373376
producer.close();
374377
}
378+
379+
@Test
380+
public void testUpdateSequenceIdInSyncCodeSegment() throws Exception {
381+
final String topic = "persistent://my-property/my-ns/testUpdateSequenceIdInSyncCodeSegment";
382+
int totalMessage = 200;
383+
int threadSize = 5;
384+
String topicName = "subscription";
385+
ExecutorService executorService = Executors.newFixedThreadPool(threadSize);
386+
conf.setBrokerDeduplicationEnabled(true);
387+
388+
//build producer/consumer
389+
Producer<byte[]> producer = pulsarClient.newProducer()
390+
.topic(topic)
391+
.producerName("producer")
392+
.sendTimeout(0, TimeUnit.SECONDS)
393+
.create();
394+
395+
Consumer<byte[]> consumer = pulsarClient.newConsumer()
396+
.topic(topic)
397+
.subscriptionType(SubscriptionType.Exclusive)
398+
.subscriptionName(topicName)
399+
.subscribe();
400+
401+
CountDownLatch countDownLatch = new CountDownLatch(threadSize);
402+
//Send messages in multiple-thread
403+
for (int i = 0; i < threadSize; i++) {
404+
executorService.submit(() -> {
405+
try {
406+
for (int j = 0; j < totalMessage; j++) {
407+
//The message will be sent with out-of-order sequence ID.
408+
producer.newMessage().sendAsync();
409+
}
410+
} catch (Exception e) {
411+
log.error("Failed to send/ack messages with transaction.", e);
412+
} finally {
413+
countDownLatch.countDown();
414+
}
415+
});
416+
}
417+
//wait the all send op is executed and store its futures in the arraylist.
418+
countDownLatch.await();
419+
420+
for (int i = 0; i < threadSize * totalMessage; i++) {
421+
Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
422+
assertNotNull(msg);
423+
}
424+
}
375425
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
101101
// Producer id, used to identify a producer within a single connection
102102
protected final long producerId;
103103

104-
// Variable is used through the atomic updater
104+
// Variable is updated in a synchronized block
105105
private volatile long msgIdGenerator;
106106

107107
private final OpSendMsgQueue pendingMessages;
@@ -169,10 +169,6 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
169169

170170
private boolean errorState;
171171

172-
@SuppressWarnings("rawtypes")
173-
private static final AtomicLongFieldUpdater<ProducerImpl> msgIdGeneratorUpdater = AtomicLongFieldUpdater
174-
.newUpdater(ProducerImpl.class, "msgIdGenerator");
175-
176172
public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf,
177173
CompletableFuture<Producer<T>> producerCreatedFuture, int partitionIndex, Schema<T> schema,
178174
ProducerInterceptors interceptors, Optional<String> overrideProducerName) {
@@ -487,7 +483,7 @@ public void sendAsync(Message<?> message, SendCallback callback) {
487483

488484
// Update the message metadata before computing the payload chunk size to avoid a large message cannot be split
489485
// into chunks.
490-
final long sequenceId = updateMessageMetadata(msgMetadata, uncompressedSize);
486+
updateMessageMetadata(msgMetadata, uncompressedSize);
491487

492488
// send in chunks
493489
int totalChunks;
@@ -527,7 +523,6 @@ public void sendAsync(Message<?> message, SendCallback callback) {
527523

528524
try {
529525
int readStartIndex = 0;
530-
String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
531526
ChunkedMessageCtx chunkedMessageCtx = totalChunks > 1 ? ChunkedMessageCtx.get(totalChunks) : null;
532527
byte[] schemaVersion = totalChunks > 1 && msg.getMessageBuilder().hasSchemaVersion()
533528
? msg.getMessageBuilder().getSchemaVersion() : null;
@@ -555,6 +550,11 @@ public void sendAsync(Message<?> message, SendCallback callback) {
555550
return;
556551
}
557552
synchronized (this) {
553+
// Update the message metadata before computing the payload chunk size
554+
// to avoid a large message cannot be split into chunks.
555+
final long sequenceId = updateMessageMetadataSequenceId(msgMetadata);
556+
String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
557+
558558
serializeAndSendMessage(msg, payload, sequenceId, uuid, chunkId, totalChunks,
559559
readStartIndex, payloadChunkSize, compressedPayload, compressed,
560560
compressedPayload.readableBytes(), callback, chunkedMessageCtx, messageId);
@@ -577,15 +577,7 @@ public void sendAsync(Message<?> message, SendCallback callback) {
577577
* @param uncompressedSize
578578
* @return the sequence id
579579
*/
580-
private long updateMessageMetadata(final MessageMetadata msgMetadata, final int uncompressedSize) {
581-
final long sequenceId;
582-
if (!msgMetadata.hasSequenceId()) {
583-
sequenceId = msgIdGeneratorUpdater.getAndIncrement(this);
584-
msgMetadata.setSequenceId(sequenceId);
585-
} else {
586-
sequenceId = msgMetadata.getSequenceId();
587-
}
588-
580+
private void updateMessageMetadata(final MessageMetadata msgMetadata, final int uncompressedSize) {
589581
if (!msgMetadata.hasPublishTime()) {
590582
msgMetadata.setPublishTime(client.getClientClock().millis());
591583

@@ -599,6 +591,16 @@ private long updateMessageMetadata(final MessageMetadata msgMetadata, final int
599591
}
600592
msgMetadata.setUncompressedSize(uncompressedSize);
601593
}
594+
}
595+
596+
private long updateMessageMetadataSequenceId(final MessageMetadata msgMetadata) {
597+
final long sequenceId;
598+
if (!msgMetadata.hasSequenceId()) {
599+
sequenceId = msgIdGenerator++;
600+
msgMetadata.setSequenceId(sequenceId);
601+
} else {
602+
sequenceId = msgMetadata.getSequenceId();
603+
}
602604
return sequenceId;
603605
}
604606

0 commit comments

Comments
 (0)