-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[PIP 107][Client] Introduce chunk message ID #12403
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: Zike Yang <[email protected]>
|
@RobertIndie:Thanks for your contribution. For this PR, do we need to update docs? |
|
@RobertIndie:Thanks for your contribution. For this PR, do we need to update docs? |
Signed-off-by: Zike Yang <[email protected]>
|
@RobertIndie:Thanks for providing doc info! |
|
@RobertIndie:Thanks for providing doc info! |
Signed-off-by: Zike Yang <[email protected]>
Signed-off-by: Zike Yang <[email protected]>
|
/pulsarbot run-failure-checks |
Signed-off-by: Zike Yang <[email protected]>
| optional int32 batch_size = 6; | ||
|
|
||
| // For the chunk message id, we need to specify the first chunk message id. | ||
| optional MessageIdData first_chunk_message_id = 7; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this change? The chunk message ID is a new type at the client-side, looks like do not need to pass it to the server-side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since ChunkMessageID serialization and deserialization depend on the MessageIdData defined by the proto, we need to change it here even if it does not need to be passed to the server-side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems a fundamental issue and it doesn't solve the actual problem. you need messageId (entryId, ledgerId) of the first chunk to be present into every chunk of entire message so, you can know what's the first messageId among all chunks when you seek the message. and in this PR you are trying to set that first chunk's messageId (entryId, ledgerId) into every chunk on ProducerImpl::ackReceived which doesn't work because you are considering best case where you will receive the first chunk's ack before you send other chunks and that's not guaranteed. Becasuse if you have 10 chunks which will be individual messages and you publish them async and you will get ack of first chunk's message after you publish all other 9 chunks, in that case none of the chunk will have first_chunk_message_id and it will not work as Pulsar will claim it should work.
This is a tricky to solve at client side because we can not update message metadata based on previous message's ack from broker unless you block the publish of other 9 chunks until you receive ack from first chunk, but that will highly impact publish performance and not usable feature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we send chunk messages, each chunk is sent in order, so I don't think there is a case where the last chunk will be acked before the first chunk.
In the original logic, we determine whether the whole message is sent successfully by receiving the ack of the last chunk, and then return the last chunk message id to the user instead of returning message ids of all chunks to the user.
When we receive the ack of the last chunk, it actually proves that the whole message has been sent successfully and the message id of the first chunk will not be null. In this PR, the first chunk message id and last chunk message id will be packaged into ChunkMessageIdImpl and returned to the user only when the ack of the last chunk is received.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
each chunk is sent in order, so I don't think there is a case where the last chunk will be acked before the first chunk.
The issue is not ordering of ack but the timing of ack. if you don't receive ack of first-chunk while publishing other chunks then you can't update first_chunk_message_id in rest of the chunks. if chunks won't have first_chunk_message_id then when the consumer receives chunked-message, those chunks won't have firstChunkMessageId in the metadata and consumer won't be able to seek to first messageId. which will defeat purpose of this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Consumer does not access the first_chunk_message_id in the MessageIdData during this process.
Please check my below comment in ConsumerImpl class where it accesses it and that's the purpose of this PR to access first_chunk_message_id and seek to that messageId. but it's not guaranteed that first_chunk_message_id will be present and pulsar can claim this feature incorrectly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdhabalia This one is only for the MessageId.toByteArray() method, it is a little tricky to add to the wire protocol, but currently we are using the MessageIdData to generate the byte array for users, it's will not pass to the broker side let the broker know which one is the first chunk message ID.
Users can store the byte array of the chunk message ID, so that they can recover a chunk message ID by the byte array. If they want to seek to this chunk message continues to consume data with inclusive mode, they don't need to do lots of staff.
And introducing the new chunk message ID will allow us to peek the message from the topic if the application recorded the chunk message ID.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@merlimat Please help confirm this part.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And introducing the new chunk message ID will allow us to peek the message from the topic if the application recorded the chunk message ID.
@codelipenghui issue is not with introducing chunk message Id or changes at MessageId. the fundamental issue is this entire change is related get messageId of first chunk into last chunk data. but that's not guaranteed, so, what's the point of changes which is built on wrong assumption.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdhabalia No, we don't want to set the first chunk message ID into the last chunk data, just return a chunk message ID to users. Users can persistent the chunk message ID to the state storage, and if they want to start to consume from the stored chunk message, we are able to provide the ability. Otherwise, we will always skip this message, start from the next chunk or normal message.
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
Outdated
Show resolved
Hide resolved
Signed-off-by: Zike Yang <[email protected]>
|
/pulsarbot run-failure-checks |
| seek = Commands.newSeek(consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId(), ackSetArr); | ||
| } else if(messageId instanceof ChunkMessageIdImpl){ | ||
| ChunkMessageIdImpl msgId = (ChunkMessageIdImpl) messageId; | ||
| seek = Commands.newSeek(consumerId, requestId, msgId.getFirstChunkMessageId().getLedgerId(), msgId.getFirstChunkMessageId().getEntryId(), new long[0]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this change is submitted and new message-type ChunkMessageIdImpl is introduced based on consideration of the presence of firstChunkMessageId and that will not be present for most of the time. so, this will not work as expected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because chunk messages are sent with guaranteed order for each chunk, the first chunk message id here will not be null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because chunk messages are sent with guaranteed order for each chunk
Yes, ordering is guaranteed. but it's not guaranteed that you will get publish ACK of first chunk before you publish last chunk. It seems I am explaining the same thing again and again. Am I really missing anything here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but it's not guaranteed that you will get publish ACK of first chunk before you publish last chunk.
As I said earlier. We don't need that guarantee. I think this is the confusing point of the discussion. The firstChunkMessageId that the consumer gets at the time of seek is not given by the producer, but is calculated at the consumer side.
I think my previous comments would help understand: #12403 (comment)
Noted(Here I reply to your previous comment)
and when we processing to the last chunk, the first chunk message id must be present.
it's not guaranteed.
What I mean by when we processing to the last chunk here is processing the messageid of the last chunk rather than sending the last chunk. I think it is guaranteed. But it's indeed not guaranteed that we will get ACK of first chunk before we publish last chunk.
The Consumer does not access the first_chunk_message_id in the MessageIdData during this process.
Please check my below comment in ConsumerImpl class where it accesses it and that's the purpose of this PR to access first_chunk_message_id and seek to that messageId. but it's not guaranteed that first_chunk_message_id will be present and pulsar can claim this feature incorrectly.
The producer does not tell the firstChunkMessageId to the consumer. The Consumer receives chunk messages in order, so it is guaranteed that the firstChunkMessageId exists after the last chunk message is received.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the fundamental assumption at the consumer side is : msgId.getFirstChunkMessageId().getLedgerId() will be present. and this assumption is incorrect. msg -> getFirstChunkMessageId() can be null because what I explained earlier: you can't give guarantee to set firstChunkMessageId in any chunk of the message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I explained earlier(here and here), the Consumer does not depend on the first chunk message-id set by the producer. Here is the comment to illustrate where the consumer sets the chunk message-id. Is there something wrong with my explanation?
you can't give guarantee to set firstChunkMessageId in any chunk of the message.
Why do we need that guarantee? I think this is the key point of our discussion. The consumer does not rely on this guarantee.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if consumer doesn't depend on first chunk id then why do we have seek method in consumerImpl on this line which uses firstChunkMessageId?
seek = Commands.newSeek(consumerId, requestId, msgId.getFirstChunkMessageId().getLedgerId(), msgId.getFirstChunkMessageId().getEntryId(), new long[0]);
not sure what are we trying to achieve in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The consumer doesn't depend on the first chunk id set by the producer, but does depend on the first chunk id calculated by the consumer itself. If you go through the code, you can find that this first chunk id here is either calculated by the producer or by the consumer. If this chunk msg id is from the consumer.receive, then the first chunk id here is calculated by the consumer but not the producer.
Signed-off-by: Zike Yang <[email protected]>
| msgId = new ChunkMessageIdImpl(chunkedMsgCtx.chunkedMessageIds[0], | ||
| chunkedMsgCtx.chunkedMessageIds[chunkedMsgCtx.chunkedMessageIds.length - 1]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdhabalia Here is where the consumer sets the first chunk message id and the last chunk message-id. It does not depend on the first chunk message-id set by the producer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we remove all producer-side changes then because the producer can't give a guarantee to set messageId(ledgerId,entryId) of the chunk.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdhabalia The producer will not set the message ID of the chunk, the producer side change just return a chunk message ID(start, end) to users.
|
/pulsarbot run-failure-checks |
gaoran10
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
BewareMyPower
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments for code style first. I'm not sure why the checkstyle plugin doesn't check blanks.
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ChunkMessageIdImpl.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ChunkMessageIdImpl.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
Outdated
Show resolved
Hide resolved
|
This PR overall LGTM. I saw the debate about the producer side changes and here are my points. I saw the following comment from @rdhabalia.
See following code in op.setMessageId(ledgerId, entryId, partitionIndex);
if (op.totalChunks > 1 && op.chunkedMessageCtx != null) {
if (op.chunkId == 0) { // [1]
op.chunkedMessageCtx.firstChunkMessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex);
} else if (op.chunkId == op.totalChunks - 1) { // [2]
op.chunkedMessageCtx.lastChunkMessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex);
op.setMessageId(op.chunkedMessageCtx.getChunkMessageId());
}
}Do you concern about IMO, it should be guaranteed that I also saw the comment from @RobertIndie.
I think not. Because in #12402, the motivation is from the following code The consumer seek must be enable to work with the message ID returned by producer. |
|
Hi, @BewareMyPower . Thanks for your review.
Are you talking about the ordering guarantee? If so, I don't think we're talking about what you're talking about. Whether sending a message or receiving the publish ack, the order is guaranteed. I think we're discussing the possibility of not being able to receive the PUBLISH ack of the first chunk before publishing the last chunk.
According to the previous context, here we are discussing the construction of the chunk message id on the consumer side. So the exact meaning of this statement is "The construction of the chunk message id on the consumer side doesn't depend on the first chunk id set by the producer, but does depend on the first chunk id calculated by the consumer itself." If you read the full content of this comment, you should feel more clear: #12403 (comment)
|
Does it affect anything? I see when the last chunk was published, only |
IMO. No. I don't think we need to handle this case. |
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
Outdated
Show resolved
Hide resolved
I don't think it affect anything as well. Assuming
Then I added some logs and a test. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
index 38a7d4b8ab2..1d242081201 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
@@ -84,6 +84,19 @@ public class MessageChunkingTest extends ProducerConsumerBase {
return new Object[][] { { true }, { false } };
}
+ @Test
+ public void test() throws Exception {
+ this.conf.setMaxMessageSize(5);
+ final Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic("test")
+ .enableBatching(false)
+ .enableChunking(true)
+ .create();
+ final MessageId id = producer.send("1234567890abcde".getBytes());
+ log.info("XYZ Send to {}", id);
+ producer.close();
+ }
+
@Test
public void testInvalidConfig() throws Exception {
final String topicName = "persistent://my-property/my-ns/my-topic1";
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 618b509a68d..653dc688d32 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -844,6 +844,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
}
try {
+ log.info("CHUNK [{}] before writeAndFlush", op.chunkId);
cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
op.updateSentTimestamp();
} finally {
@@ -1034,6 +1035,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
LAST_SEQ_ID_PUBLISHED_UPDATER.getAndUpdate(this, last -> Math.max(last, getHighestSequenceId(finalOp)));
op.setMessageId(ledgerId, entryId, partitionIndex);
if (op.totalChunks > 1 && op.chunkedMessageCtx != null) {
+ log.info("CHUNK [{}] before setting chunkMessageId", op.chunkId);
if (op.chunkId == 0) {
op.chunkedMessageCtx.firstChunkMessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex);Here are the logs: We can see when the last chunk is published, the PUBLISH ack of the first chunk is never received. And then it works well. Actually this condition is guaranteed. I added a try {
Thread.sleep(1000 * op.chunkId);
log.info("CHUNK [{}] before writeAndFlush", op.chunkId);
cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
op.updateSentTimestamp();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
recycle();
}The logs became |
Signed-off-by: Zike Yang <[email protected]>
Signed-off-by: Zike Yang <[email protected]>
It may actually happen that the ack of the first chunk is received before the last chunk is published. I think you should not make the Thread.sleep(1000);
cnx.ctx().channel().eventLoop().execute(WriteInEventLoopCallback.create(this, cnx, op));Then we can get the log like: But anyway, these test results don't seem to have an impact on this PR. The producer does not rely on this guarantee. The Producer only needs to ensure that the ack of the other chunks is received before the ack of the last chunk is received. And this is guaranteed. The same is true for the consumer. But the logic of processing chunk message ids in producer and consumer do not interfere with each other. |
Signed-off-by: Zike Yang <[email protected]>
|
@RobertIndie I have also tried it before. I just realized the following expression is wrong (or opposite).
Because it's true at most cases. You example just shows the opposite case that PUBLISH ack is received before publishing the last chunk via sleeping for a while. Generally, the PUBLISH ack happens after the last chunk was published. Since the shared |
I think the order is guaranteed. For the producer before this PR change, it still needs to rely on this ordering guarantee. I have fixed all the comments. PTAL. @BewareMyPower |
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
Outdated
Show resolved
Hide resolved
|
/pulsarbot run-failure-checks |
Master Issue: apache#12402 ### Motivation This is an implementation for the PIP: apache#12402 ### Modifications * Introduce a new Message-ID type: Chunk Message-ID. The chunk message-id inherits from MessageIdImpl and adds two new methods: getFirstChunkMessageId and getLastChunkMessageID. For other method implementations, the lastChunkMessageID is called directly, which is compatible with much of the existing business logic. * Return the chunk message-id to the user when the Producer produces the chunk message or when the consumer consumes the chunk message. * In cosumer.seek, use the first chunk message-id of the chunk message-id. This will solve the problem caused by seeking chunk messages. This is also the impact of this PIP on the original business logic.
Master Issue: #12402
Motivation
This is an implementation for the PIP: #12402
Modifications
Introduce a new Message-ID type: Chunk Message-ID. The chunk message-id inherits from MessageIdImpl and adds two new methods: getFirstChunkMessageId and getLastChunkMessageID. For other method implementations, the lastChunkMessageID is called directly, which is compatible with much of the existing business logic.
Return the chunk message-id to the user when the Producer produces the chunk message or when the consumer consumes the chunk message.
In cosumer.seek, use the first chunk message-id of the chunk message-id. This will solve the problem caused by seeking chunk messages. This is also the impact of this PIP on the original business logic.
Verifying this change
This change is already covered by existing tests, such as ChunkMessageIdImplTest and testSeekChunkMessages.
Does this pull request potentially affect one of the following parts:
If
yeswas chosen, please highlight the changesDocumentation
doc-required