Skip to content

Conversation

@RobertIndie
Copy link
Member

Motivation

Currently, when we send chunked messages, the producer returns the message-id of the last chunk. This can cause some problems. For example, when we use this message-id to seek, it will cause the consumer to consume from the position of the last chunk, and the consumer will mistakenly think that the previous chunks are lost and choose to skip the current message. If we use the inclusive seek, the consumer may skip the first message, which brings the wrong behavior.

Here is the simple code used to demonstrate the problem.

var msgId = producer.send(...); // eg. return 0:1:-1

var otherMsg = producer.send(...); // return 0:2:-1

consumer.seek(msgId); // inclusive seek

var receiveMsgId = consumer.receive().getMessageId(); // it may skip the
first message and return like 0:2:-1

Assert.assertEquals(msgId, receiveMsgId); // fail

Modifications

When the producer sends a chunked message, the message-id of the first chunk is stored temporarily in the producer until all chunks have been sent, and then it is returned to the user after all chunks has been sent successfully.

Verifying this change

This PR is still working in progress, still need to add some tests.

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (yes) It affects the behavior of sending chunked messages.
  • The schema: (no)
  • The default values of configurations: (no)
  • The wire protocol: (no)
  • The rest endpoints: (no)
  • The admin cli options: (no)
  • Anything that affects deployment: (no)

Documentation

  • no-need-doc

@Anonymitaet Anonymitaet added the doc-not-needed Your PR changes do not impact docs label Sep 26, 2021
} else if (op.chunkId == op.totalChunks - 1) {
MessageIdImpl firstChunkMsgId = chunkMessageIds.get(op.msg.getMessageBuilder().getUuid());
if (firstChunkMsgId != null) {
op.setMessageId(firstChunkMsgId.ledgerId, firstChunkMsgId.entryId, firstChunkMsgId.partitionIndex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the challenge here. last chunk is already published and persist at server side. this is at ackReceived method, means you have received the ack for already published message and updating messageId will not reflect any change to message which is already persist at server side.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is correct. But I think we don't need to reflect any change to the message on the server-side.

@RobertIndie
Copy link
Member Author

This has been fixed in #12403.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants