-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[PIP 107][Client] Introduce chunk message ID #12403
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
38c9642
Introduce chunk message ID.
RobertIndie f6dd531
Add serialization for ChunkMessageIdImpl.
RobertIndie af9c11a
Implement chunk message id in producer and consumer.
RobertIndie 14e8fa1
Fix unAckedChunkedMessageIdSequenceMap issue.
RobertIndie e995dfb
Add tests.
RobertIndie b6a50eb
Only create ChunkMessageCtx when chunk is enabled.
RobertIndie 4f507aa
Merge remote-tracking branch 'apache/master' into chunk-msgid
RobertIndie f00ed27
Remove unused changes.
RobertIndie d71a6d7
Merge branch 'master' into chunk-msgid
RobertIndie c1d009c
Merge master
RobertIndie 8c5a46d
Merge remote-tracking branch 'origin/chunk-msgid' into chunk-msgid
RobertIndie cee7d36
Fix some comments.
RobertIndie 0563a4a
Remove redundant check.
RobertIndie 09587e7
Remove redundant null check
BewareMyPower File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
75 changes: 75 additions & 0 deletions
75
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ChunkMessageIdImpl.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,75 @@ | ||
| /** | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.pulsar.client.impl; | ||
|
|
||
| import io.netty.buffer.ByteBuf; | ||
| import io.netty.buffer.Unpooled; | ||
| import java.util.Objects; | ||
| import org.apache.pulsar.client.api.MessageId; | ||
| import org.apache.pulsar.common.api.proto.MessageIdData; | ||
|
|
||
| public class ChunkMessageIdImpl extends MessageIdImpl implements MessageId { | ||
| private final MessageIdImpl firstChunkMsgId; | ||
|
|
||
| public ChunkMessageIdImpl(MessageIdImpl firstChunkMsgId, MessageIdImpl lastChunkMsgId) { | ||
| super(lastChunkMsgId.getLedgerId(), lastChunkMsgId.getEntryId(), lastChunkMsgId.getPartitionIndex()); | ||
| this.firstChunkMsgId = firstChunkMsgId; | ||
| } | ||
|
|
||
| public MessageIdImpl getFirstChunkMessageId() { | ||
| return firstChunkMsgId; | ||
| } | ||
|
|
||
| public MessageIdImpl getLastChunkMessageId() { | ||
| return this; | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return firstChunkMsgId.toString() + ';' + super.toString(); | ||
| } | ||
|
|
||
| @Override | ||
| public byte[] toByteArray() { | ||
|
|
||
| // write last chunk message id | ||
| MessageIdData msgId = super.writeMessageIdData(null, -1, 0); | ||
|
|
||
| // write first chunk message id | ||
| msgId.setFirstChunkMessageId(); | ||
| firstChunkMsgId.writeMessageIdData(msgId.getFirstChunkMessageId(), -1, 0); | ||
|
|
||
| int size = msgId.getSerializedSize(); | ||
| ByteBuf serialized = Unpooled.buffer(size, size); | ||
| msgId.writeTo(serialized); | ||
|
|
||
| return serialized.array(); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean equals(Object o) { | ||
| return super.equals(o); | ||
| } | ||
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return Objects.hash(super.hashCode(), firstChunkMsgId.hashCode()); | ||
| } | ||
|
|
||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdhabalia Here is where the consumer sets the first chunk message id and the last chunk message-id. It does not depend on the first chunk message-id set by the producer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we remove all producer-side changes then because the producer can't give a guarantee to set messageId(ledgerId,entryId) of the chunk.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdhabalia The producer will not set the message ID of the chunk, the producer side change just return a chunk message ID(start, end) to users.