Skip to content

Conversation

@RobertIndie
Copy link
Member

@RobertIndie RobertIndie commented Oct 18, 2021

Master Issue: #12402

Motivation

This is an implementation for the PIP: #12402

Modifications

  • Introduce a new Message-ID type: Chunk Message-ID. The chunk message-id inherits from MessageIdImpl and adds two new methods: getFirstChunkMessageId and getLastChunkMessageID. For other method implementations, the lastChunkMessageID is called directly, which is compatible with much of the existing business logic.

  • Return the chunk message-id to the user when the Producer produces the chunk message or when the consumer consumes the chunk message.

  • In cosumer.seek, use the first chunk message-id of the chunk message-id. This will solve the problem caused by seeking chunk messages. This is also the impact of this PIP on the original business logic.

Verifying this change

This change is already covered by existing tests, such as ChunkMessageIdImplTest and testSeekChunkMessages.

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (yes)
  • The schema: (no)
  • The default values of configurations: (no)
  • The wire protocol: (no)
  • The rest endpoints: (no)
  • The admin cli options: (no)
  • Anything that affects deployment: (no)

Documentation

  • doc-required

@eolivelli
Copy link
Contributor

@RobertIndie:Thanks for your contribution. For this PR, do we need to update docs?
(The PR template contains info about doc, which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks)

@eolivelli
Copy link
Contributor

@RobertIndie:Thanks for your contribution. For this PR, do we need to update docs?
(The PR template contains info about doc, which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks)

@github-actions
Copy link

github-actions bot commented Nov 1, 2021

@RobertIndie:Thanks for providing doc info!

@github-actions github-actions bot added doc-required Your PR changes impact docs and you will update later. and removed doc-label-missing labels Nov 1, 2021
@github-actions
Copy link

github-actions bot commented Nov 1, 2021

@RobertIndie:Thanks for providing doc info!

@RobertIndie
Copy link
Member Author

/pulsarbot run-failure-checks

Signed-off-by: Zike Yang <[email protected]>
@RobertIndie RobertIndie marked this pull request as ready for review November 4, 2021 09:11
optional int32 batch_size = 6;

// For the chunk message id, we need to specify the first chunk message id.
optional MessageIdData first_chunk_message_id = 7;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this change? The chunk message ID is a new type at the client-side, looks like do not need to pass it to the server-side.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since ChunkMessageID serialization and deserialization depend on the MessageIdData defined by the proto, we need to change it here even if it does not need to be passed to the server-side.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems a fundamental issue and it doesn't solve the actual problem. you need messageId (entryId, ledgerId) of the first chunk to be present into every chunk of entire message so, you can know what's the first messageId among all chunks when you seek the message. and in this PR you are trying to set that first chunk's messageId (entryId, ledgerId) into every chunk on ProducerImpl::ackReceived which doesn't work because you are considering best case where you will receive the first chunk's ack before you send other chunks and that's not guaranteed. Becasuse if you have 10 chunks which will be individual messages and you publish them async and you will get ack of first chunk's message after you publish all other 9 chunks, in that case none of the chunk will have first_chunk_message_id and it will not work as Pulsar will claim it should work.
This is a tricky to solve at client side because we can not update message metadata based on previous message's ack from broker unless you block the publish of other 9 chunks until you receive ack from first chunk, but that will highly impact publish performance and not usable feature.

Copy link
Member Author

@RobertIndie RobertIndie Nov 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we send chunk messages, each chunk is sent in order, so I don't think there is a case where the last chunk will be acked before the first chunk.

In the original logic, we determine whether the whole message is sent successfully by receiving the ack of the last chunk, and then return the last chunk message id to the user instead of returning message ids of all chunks to the user.
When we receive the ack of the last chunk, it actually proves that the whole message has been sent successfully and the message id of the first chunk will not be null. In this PR, the first chunk message id and last chunk message id will be packaged into ChunkMessageIdImpl and returned to the user only when the ack of the last chunk is received.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

each chunk is sent in order, so I don't think there is a case where the last chunk will be acked before the first chunk.

The issue is not ordering of ack but the timing of ack. if you don't receive ack of first-chunk while publishing other chunks then you can't update first_chunk_message_id in rest of the chunks. if chunks won't have first_chunk_message_id then when the consumer receives chunked-message, those chunks won't have firstChunkMessageId in the metadata and consumer won't be able to seek to first messageId. which will defeat purpose of this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Consumer does not access the first_chunk_message_id in the MessageIdData during this process.

Please check my below comment in ConsumerImpl class where it accesses it and that's the purpose of this PR to access first_chunk_message_id and seek to that messageId. but it's not guaranteed that first_chunk_message_id will be present and pulsar can claim this feature incorrectly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdhabalia This one is only for the MessageId.toByteArray() method, it is a little tricky to add to the wire protocol, but currently we are using the MessageIdData to generate the byte array for users, it's will not pass to the broker side let the broker know which one is the first chunk message ID.

Users can store the byte array of the chunk message ID, so that they can recover a chunk message ID by the byte array. If they want to seek to this chunk message continues to consume data with inclusive mode, they don't need to do lots of staff.

And introducing the new chunk message ID will allow us to peek the message from the topic if the application recorded the chunk message ID.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@merlimat Please help confirm this part.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And introducing the new chunk message ID will allow us to peek the message from the topic if the application recorded the chunk message ID.

@codelipenghui issue is not with introducing chunk message Id or changes at MessageId. the fundamental issue is this entire change is related get messageId of first chunk into last chunk data. but that's not guaranteed, so, what's the point of changes which is built on wrong assumption.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdhabalia No, we don't want to set the first chunk message ID into the last chunk data, just return a chunk message ID to users. Users can persistent the chunk message ID to the state storage, and if they want to start to consume from the stored chunk message, we are able to provide the ability. Otherwise, we will always skip this message, start from the next chunk or normal message.

@RobertIndie
Copy link
Member Author

/pulsarbot run-failure-checks

seek = Commands.newSeek(consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId(), ackSetArr);
} else if(messageId instanceof ChunkMessageIdImpl){
ChunkMessageIdImpl msgId = (ChunkMessageIdImpl) messageId;
seek = Commands.newSeek(consumerId, requestId, msgId.getFirstChunkMessageId().getLedgerId(), msgId.getFirstChunkMessageId().getEntryId(), new long[0]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change is submitted and new message-type ChunkMessageIdImpl is introduced based on consideration of the presence of firstChunkMessageId and that will not be present for most of the time. so, this will not work as expected.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because chunk messages are sent with guaranteed order for each chunk, the first chunk message id here will not be null.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because chunk messages are sent with guaranteed order for each chunk

Yes, ordering is guaranteed. but it's not guaranteed that you will get publish ACK of first chunk before you publish last chunk. It seems I am explaining the same thing again and again. Am I really missing anything here?

Copy link
Member Author

@RobertIndie RobertIndie Nov 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but it's not guaranteed that you will get publish ACK of first chunk before you publish last chunk.

As I said earlier. We don't need that guarantee. I think this is the confusing point of the discussion. The firstChunkMessageId that the consumer gets at the time of seek is not given by the producer, but is calculated at the consumer side.
I think my previous comments would help understand: #12403 (comment)

Noted(Here I reply to your previous comment)

and when we processing to the last chunk, the first chunk message id must be present.

it's not guaranteed.

What I mean by when we processing to the last chunk here is processing the messageid of the last chunk rather than sending the last chunk. I think it is guaranteed. But it's indeed not guaranteed that we will get ACK of first chunk before we publish last chunk.

The Consumer does not access the first_chunk_message_id in the MessageIdData during this process.

Please check my below comment in ConsumerImpl class where it accesses it and that's the purpose of this PR to access first_chunk_message_id and seek to that messageId. but it's not guaranteed that first_chunk_message_id will be present and pulsar can claim this feature incorrectly.

The producer does not tell the firstChunkMessageId to the consumer. The Consumer receives chunk messages in order, so it is guaranteed that the firstChunkMessageId exists after the last chunk message is received.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the fundamental assumption at the consumer side is : msgId.getFirstChunkMessageId().getLedgerId() will be present. and this assumption is incorrect. msg -> getFirstChunkMessageId() can be null because what I explained earlier: you can't give guarantee to set firstChunkMessageId in any chunk of the message.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I explained earlier(here and here), the Consumer does not depend on the first chunk message-id set by the producer. Here is the comment to illustrate where the consumer sets the chunk message-id. Is there something wrong with my explanation?

you can't give guarantee to set firstChunkMessageId in any chunk of the message.

Why do we need that guarantee? I think this is the key point of our discussion. The consumer does not rely on this guarantee.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if consumer doesn't depend on first chunk id then why do we have seek method in consumerImpl on this line which uses firstChunkMessageId?

seek = Commands.newSeek(consumerId, requestId, msgId.getFirstChunkMessageId().getLedgerId(), msgId.getFirstChunkMessageId().getEntryId(), new long[0]);

not sure what are we trying to achieve in this PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The consumer doesn't depend on the first chunk id set by the producer, but does depend on the first chunk id calculated by the consumer itself. If you go through the code, you can find that this first chunk id here is either calculated by the producer or by the consumer. If this chunk msg id is from the consumer.receive, then the first chunk id here is calculated by the consumer but not the producer.

Comment on lines +1220 to +1221
msgId = new ChunkMessageIdImpl(chunkedMsgCtx.chunkedMessageIds[0],
chunkedMsgCtx.chunkedMessageIds[chunkedMsgCtx.chunkedMessageIds.length - 1]);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdhabalia Here is where the consumer sets the first chunk message id and the last chunk message-id. It does not depend on the first chunk message-id set by the producer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove all producer-side changes then because the producer can't give a guarantee to set messageId(ledgerId,entryId) of the chunk.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdhabalia The producer will not set the message ID of the chunk, the producer side change just return a chunk message ID(start, end) to users.

@RobertIndie
Copy link
Member Author

/pulsarbot run-failure-checks

Copy link
Contributor

@gaoran10 gaoran10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@BewareMyPower BewareMyPower left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments for code style first. I'm not sure why the checkstyle plugin doesn't check blanks.

@BewareMyPower
Copy link
Contributor

This PR overall LGTM. I saw the debate about the producer side changes and here are my points.

I saw the following comment from @rdhabalia.

but it's not guaranteed that you will get publish ACK of first chunk before you publish last chunk.

See following code in ProducerImpl#ackReceived.

        op.setMessageId(ledgerId, entryId, partitionIndex);
        if (op.totalChunks > 1 && op.chunkedMessageCtx != null) {
            if (op.chunkId == 0) { // [1]
                op.chunkedMessageCtx.firstChunkMessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex);
            } else if (op.chunkId == op.totalChunks - 1) { // [2]
                op.chunkedMessageCtx.lastChunkMessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex);
                op.setMessageId(op.chunkedMessageCtx.getChunkMessageId());
            }
        }

Do you concern about [2] happens after [1]? In this case, the firstChunkMessageId field of ChunkMessageIdImpl will be null and the seek operation won't work if we pass the message id returned by Producer#send to Consumer#seek.

IMO, it should be guaranteed that[2] must happen after [1] but I'm not very sure. If it's not guaranteed, then I will agree with @rdhabalia that the producer side change are meaningless. Please also help confirm it. @codelipenghui

I also saw the comment from @RobertIndie.

The consumer doesn't depend on the first chunk id set by the producer, but does depend on the first chunk id calculated by the consumer itself.

I think not. Because in #12402, the motivation is from the following code

var msgId = producer.send(...); // eg. return 0:1:-1

var otherMsg = producer.send(...); // return 0:2:-1

consumer.seek(msgId); // inclusive seek

var receiveMsgId = consumer.receive().getMessageId(); // it may skip the
first message and return like 0:2:-1

Assert.assertEquals(msgId, receiveMsgId); // fail

The consumer seek must be enable to work with the message ID returned by producer.

@RobertIndie
Copy link
Member Author

RobertIndie commented Dec 25, 2021

Hi, @BewareMyPower . Thanks for your review.

I saw the following comment from @rdhabalia.

but it's not guaranteed that you will get publish ACK of first chunk before you publish last chunk.

See following code in ProducerImpl#ackReceived.

        op.setMessageId(ledgerId, entryId, partitionIndex);
        if (op.totalChunks > 1 && op.chunkedMessageCtx != null) {
            if (op.chunkId == 0) { // [1]
                op.chunkedMessageCtx.firstChunkMessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex);
            } else if (op.chunkId == op.totalChunks - 1) { // [2]
                op.chunkedMessageCtx.lastChunkMessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex);
                op.setMessageId(op.chunkedMessageCtx.getChunkMessageId());
            }
        }

Do you concern about [2] happens after [1]? In this case, the firstChunkMessageId field of ChunkMessageIdImpl will be null and the seek operation won't work if we pass the message id returned by Producer#send to Consumer#seek.

IMO, it should be guaranteed that[2] must happen after [1] but I'm not very sure. If it's not guaranteed, then I will agree with @rdhabalia that the producer side change are meaningless. Please also help confirm it. @codelipenghui

Are you talking about the ordering guarantee? If so, I don't think we're talking about what you're talking about. Whether sending a message or receiving the publish ack, the order is guaranteed. I think we're discussing the possibility of not being able to receive the PUBLISH ack of the first chunk before publishing the last chunk.

I also saw the comment from @RobertIndie.

The consumer doesn't depend on the first chunk id set by the producer, but does depend on the first chunk id calculated by the consumer itself.

I think not. Because in #12402, the motivation is from the following code

var msgId = producer.send(...); // eg. return 0:1:-1

var otherMsg = producer.send(...); // return 0:2:-1

consumer.seek(msgId); // inclusive seek

var receiveMsgId = consumer.receive().getMessageId(); // it may skip the
first message and return like 0:2:-1

Assert.assertEquals(msgId, receiveMsgId); // fail

The consumer seek must be enable to work with the message ID returned by producer.

According to the previous context, here we are discussing the construction of the chunk message id on the consumer side. So the exact meaning of this statement is "The construction of the chunk message id on the consumer side doesn't depend on the first chunk id set by the producer, but does depend on the first chunk id calculated by the consumer itself." If you read the full content of this comment, you should feel more clear: #12403 (comment)

The consumer doesn't depend on the first chunk id set by the producer, but does depend on the first chunk id calculated by the consumer itself. If you go through the code, you can find that this first chunk id here is either calculated by the producer or by the consumer. If this chunk msg id is from the consumer.receive, then the first chunk id here is calculated by the consumer but not the producer.

@BewareMyPower
Copy link
Contributor

I think we're discussing the possibility of not being able to receive the PUBLISH ack of the first chunk before publishing the last chunk.

Does it affect anything? I see when the last chunk was published, only ackReceived can modify the firstChunkMessageId as the code shows.

@RobertIndie
Copy link
Member Author

I think we're discussing the possibility of not being able to receive the PUBLISH ack of the first chunk before publishing the last chunk.

Does it affect anything? I see when the last chunk was published, only ackReceived can modify the firstChunkMessageId as the code shows.

IMO. No. I don't think we need to handle this case.

@BewareMyPower
Copy link
Contributor

hink we're discussing the possibility of not being able to receive the PUBLISH ack of the first chunk before publishing the last chunk.

Does it affect anything? I see when the last chunk was published, only ackReceived can modify the firstChunkMessageId as the code shows.

IMO. No. I don't think we need to handle this case.

I don't think it affect anything as well. Assuming

  1. receive the PUBLISH ack means ackReceived is called.
  2. publishing means writeAndFlush for the chunk is called.

Then I added some logs and a test.

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
index 38a7d4b8ab2..1d242081201 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
@@ -84,6 +84,19 @@ public class MessageChunkingTest extends ProducerConsumerBase {
         return new Object[][] { { true }, { false } };
     }
 
+    @Test
+    public void test() throws Exception {
+        this.conf.setMaxMessageSize(5);
+        final Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic("test")
+                .enableBatching(false)
+                .enableChunking(true)
+                .create();
+        final MessageId id = producer.send("1234567890abcde".getBytes());
+        log.info("XYZ Send to {}", id);
+        producer.close();
+    }
+
     @Test
     public void testInvalidConfig() throws Exception {
         final String topicName = "persistent://my-property/my-ns/my-topic1";
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 618b509a68d..653dc688d32 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -844,6 +844,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
             }
 
             try {
+                log.info("CHUNK [{}] before writeAndFlush", op.chunkId);
                 cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
                 op.updateSentTimestamp();
             } finally {
@@ -1034,6 +1035,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         LAST_SEQ_ID_PUBLISHED_UPDATER.getAndUpdate(this, last -> Math.max(last, getHighestSequenceId(finalOp)));
         op.setMessageId(ledgerId, entryId, partitionIndex);
         if (op.totalChunks > 1 && op.chunkedMessageCtx != null) {
+            log.info("CHUNK [{}] before setting chunkMessageId", op.chunkId);
             if (op.chunkId == 0) {
                 op.chunkedMessageCtx.firstChunkMessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex);

Here are the logs:

2021-12-26T00:18:56,734+0800 [pulsar-client-io-32-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - CHUNK [0] before writeAndFlush
2021-12-26T00:18:56,735+0800 [pulsar-client-io-32-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - CHUNK [1] before writeAndFlush
2021-12-26T00:18:56,736+0800 [pulsar-client-io-32-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - CHUNK [2] before writeAndFlush
2021-12-26T00:18:56,756+0800 [pulsar-client-io-32-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - CHUNK [0] before setting chunkMessageId
2021-12-26T00:18:56,756+0800 [pulsar-client-io-32-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - CHUNK [1] before setting chunkMessageId
2021-12-26T00:18:56,756+0800 [pulsar-client-io-32-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - CHUNK [2] before setting chunkMessageI

We can see when the last chunk is published, the PUBLISH ack of the first chunk is never received. And then it works well. Actually this condition is guaranteed. I added a sleep call before writeAndFlush.

            try {
                Thread.sleep(1000 * op.chunkId);
                log.info("CHUNK [{}] before writeAndFlush", op.chunkId);
                cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
                op.updateSentTimestamp();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                recycle();
            }

The logs became

2021-12-26T00:22:25,340+0800 [pulsar-client-io-32-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - CHUNK [0] before writeAndFlush
2021-12-26T00:22:26,344+0800 [pulsar-client-io-32-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - CHUNK [1] before writeAndFlush
2021-12-26T00:22:28,350+0800 [pulsar-client-io-32-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - CHUNK [2] before writeAndFlush
2021-12-26T00:22:28,351+0800 [pulsar-client-io-32-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - CHUNK [0] before setting chunkMessageId
2021-12-26T00:22:28,352+0800 [pulsar-client-io-32-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - CHUNK [1] before setting chunkMessageId
2021-12-26T00:22:28,353+0800 [pulsar-client-io-32-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - CHUNK [2] before setting chunkMessageId

@RobertIndie
Copy link
Member Author

We can see when the last chunk is published, the PUBLISH ack of the first chunk is never received.

It may actually happen that the ack of the first chunk is received before the last chunk is published. I think you should not make the Thread.sleep in the IO thread. Otherwise, the IO thread does not process the ack from other chunks before publishing all other chunks. For example, I add the Thread.sleep here:

Thread.sleep(1000);
cnx.ctx().channel().eventLoop().execute(WriteInEventLoopCallback.create(this, cnx, op));

Then we can get the log like:

2021-12-28T10:32:37,528+0800 [pulsar-client-io-32-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - CHUNK [0] before writeAndFlush
2021-12-28T10:32:42,448+0800 [main] INFO  org.apache.pulsar.client.impl.ProducerImpl - Sleep
2021-12-28T10:32:42,449+0800 [pulsar-client-io-32-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - CHUNK [1] before writeAndFlush
2021-12-28T10:32:44,929+0800 [main] INFO  org.apache.pulsar.client.impl.ProducerImpl - Sleep
2021-12-28T10:32:49,956+0800 [pulsar-client-io-32-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - CHUNK [0] before setting chunkMessageId
2021-12-28T10:32:49,956+0800 [pulsar-client-io-32-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - CHUNK [2] before writeAndFlush
2021-12-28T10:32:51,450+0800 [pulsar-client-io-32-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - CHUNK [1] before setting chunkMessageId
2021-12-28T10:32:52,041+0800 [pulsar-client-io-32-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - CHUNK [2] before setting chunkMessageId

But anyway, these test results don't seem to have an impact on this PR. The producer does not rely on this guarantee. The Producer only needs to ensure that the ack of the other chunks is received before the ack of the last chunk is received. And this is guaranteed. The same is true for the consumer. But the logic of processing chunk message ids in producer and consumer do not interfere with each other.

Signed-off-by: Zike Yang <[email protected]>
@BewareMyPower
Copy link
Contributor

@RobertIndie I have also tried it before. I just realized the following expression is wrong (or opposite).

not being able to receive the PUBLISH ack of the first chunk before publishing the last chunk.

Because it's true at most cases. You example just shows the opposite case that PUBLISH ack is received before publishing the last chunk via sleeping for a while. Generally, the PUBLISH ack happens after the last chunk was published.

Since the shared ChunkedMessageCtx is created before publishing the first chunk, I think the only thing matters is the order of ACKs, not the order among the ackReceived() calls and writeAndFlush() calls.

@RobertIndie
Copy link
Member Author

Since the shared ChunkedMessageCtx is created before publishing the first chunk, I think the only thing matters is the order of ACKs, not the order among the ackReceived() calls and writeAndFlush() calls.

I think the order is guaranteed. For the producer before this PR change, it still needs to rely on this ordering guarantee.

I have fixed all the comments. PTAL. @BewareMyPower

@RobertIndie
Copy link
Member Author

/pulsarbot run-failure-checks

@codelipenghui codelipenghui merged commit 862e120 into apache:master Dec 29, 2021
wuzhanpeng pushed a commit to wuzhanpeng/pulsar that referenced this pull request Jan 5, 2022
Master Issue: apache#12402

### Motivation

This is an implementation for the PIP: apache#12402

### Modifications

* Introduce a new Message-ID type: Chunk Message-ID. The chunk message-id inherits from MessageIdImpl and adds two new methods: getFirstChunkMessageId and getLastChunkMessageID. For other method implementations, the lastChunkMessageID is called directly, which is compatible with much of the existing business logic.

* Return the chunk message-id to the user when the Producer produces the chunk message or when the consumer consumes the chunk message.

* In cosumer.seek, use the first chunk message-id of the chunk message-id. This will solve the problem caused by seeking chunk messages. This is also the impact of this PIP on the original business logic.
@Anonymitaet Anonymitaet added doc-complete Your PR changes impact docs and the related docs have been already added. and removed doc-required Your PR changes impact docs and you will update later. labels Jan 24, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-complete Your PR changes impact docs and the related docs have been already added. type/PIP

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants