-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[Feature] Introduce continuous offset for pulsar #9039
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
/pulsarbot run-failure-checks |
| /** | ||
| * Find position by sequenceId. | ||
| * */ | ||
| CompletableFuture<PositionImpl> asyncFindPosition(com.google.common.base.Predicate<Entry> predicate); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| CompletableFuture<PositionImpl> asyncFindPosition(com.google.common.base.Predicate<Entry> predicate); | |
| CompletableFuture<Position> asyncFindPosition(com.google.common.base.Predicate<Entry> predicate); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
| /** | ||
| * Find position by sequenceId. | ||
| * */ | ||
| CompletableFuture<PositionImpl> asyncFindPosition(com.google.common.base.Predicate<Entry> predicate); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| CompletableFuture<PositionImpl> asyncFindPosition(com.google.common.base.Predicate<Entry> predicate); | |
| CompletableFuture<PositionImpl> asyncFindPosition(com.google.common.base.Predicate<Entry> predicate); |
Can we use the java.util.Predicate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
condition of OpFindNewest is com.google.common.base.Predicate type, so the parameter type here should also be com.google.common.base.Predicate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd recommend using java.util.Predicate in the interface. You can just write a wrapper to convert a java.util.Predicate to a guava Predicate.
|
|
||
| @Override | ||
| public Position addEntry(byte[] data, int batchSize) throws InterruptedException, ManagedLedgerException { | ||
| return addEntry(data, batchSize,0, data.length); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| return addEntry(data, batchSize,0, data.length); | |
| return addEntry(data, batchSize, 0, data.length); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
| addOperation.failed( | ||
| new ManagedLedgerInterceptException("Interceptor managed ledger before add to bookie failed.")); | ||
| ReferenceCountUtil.release(addOperation.data); | ||
| log.error("[{}] Failed to interceptor entry before add to bookie.", name, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| log.error("[{}] Failed to interceptor entry before add to bookie.", name, e); | |
| log.error("[{}] Failed to intercept adding an entry to bookie.", name, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
| /** | ||
| * Interceptor for ManagedLedger. | ||
| * */ | ||
| public interface ManagedLedgerInterceptor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add the interface annotations to the new interface? See https://github.com/apache/pulsar/wiki/PIP-72%3A-Introduce-Pulsar-Interface-Taxonomy%3A-Audience-and-Stability-Classification
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
| * Interceptor for ManagedLedger. | ||
| * */ | ||
| public interface ManagedLedgerInterceptor { | ||
| OpAddEntry beforeAddEntry(OpAddEntry op, int batchSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add javadoc to the methods?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
|
|
||
| public class ManagedLedgerInterceptorImpl implements ManagedLedgerInterceptor { | ||
| private static final Logger log = LoggerFactory.getLogger(ManagedLedgerInterceptorImpl.class); | ||
| private static final String OFFSET = "offset"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kafka offset is not a very good term to be used for describing the index of a message within a log stream. Instead, I would suggest calling it index or logIndex similar to the term used in the Raft algorithm (https://cs.stackexchange.com/questions/97542/raft-algorithm-whats-the-meaning-of-concept-index).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
modify offset to index
| if (appendBrokerEntryMetadata(headersAndPayload, publishContext)) { | ||
| ledger.asyncAddEntry(headersAndPayload, this, publishContext); | ||
| } | ||
| ledger.asyncAddEntry(headersAndPayload, this, publishContext); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is the batchSize passed to asyncAddEntry? I failed to see how did you do that in this pull request.
At the same time, I think batchSize is not a good term. If I understand this correctly, it should be numberOfMessages, correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
|
@aloyszhang thanks for your great work. Shall the changes be documented to the Pulsar docs? If so, could you please help add the docs accordingly? Then you can ping me to review, thanks |
### Motivation When the Pulsar cluster enables broker entry metadata, sometimes there're some corrupted entries. See streamnative/kop#442 for example. It's because the broker entry metadata has been added twice. This bug might be introduced from #9039 https://github.com/apache/pulsar/blob/9b7c3275c904ac1e6a8ef67487a10a0506bb2c58/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L1516-L1518 It happened during a managed ledger's rollover while there're some pending `OpAddEntry`s in `updateLedgersIdsComplete`, only the ledger id should be updated and the data of `OpAddEntry` should not be modified. ### Modifications Only call `beforeAddEntry` for once at the beginning of `internalAsyncAddEntry`.
### Motivation When the Pulsar cluster enables broker entry metadata, sometimes there're some corrupted entries. See streamnative/kop#442 for example. It's because the broker entry metadata has been added twice. This bug might be introduced from #9039 https://github.com/apache/pulsar/blob/9b7c3275c904ac1e6a8ef67487a10a0506bb2c58/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L1516-L1518 It happened during a managed ledger's rollover while there're some pending `OpAddEntry`s in `updateLedgersIdsComplete`, only the ledger id should be updated and the data of `OpAddEntry` should not be modified. ### Modifications Only call `beforeAddEntry` for once at the beginning of `internalAsyncAddEntry`. (cherry picked from commit 9d44617)
### Motivation When the Pulsar cluster enables broker entry metadata, sometimes there're some corrupted entries. See streamnative/kop#442 for example. It's because the broker entry metadata has been added twice. This bug might be introduced from apache#9039 https://github.com/apache/pulsar/blob/9b7c3275c904ac1e6a8ef67487a10a0506bb2c58/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L1516-L1518 It happened during a managed ledger's rollover while there're some pending `OpAddEntry`s in `updateLedgersIdsComplete`, only the ledger id should be updated and the data of `OpAddEntry` should not be modified. ### Modifications Only call `beforeAddEntry` for once at the beginning of `internalAsyncAddEntry`. (cherry picked from commit 9d44617)
### Motivation When the Pulsar cluster enables broker entry metadata, sometimes there're some corrupted entries. See streamnative/kop#442 for example. It's because the broker entry metadata has been added twice. This bug might be introduced from apache#9039 https://github.com/apache/pulsar/blob/9b7c3275c904ac1e6a8ef67487a10a0506bb2c58/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L1516-L1518 It happened during a managed ledger's rollover while there're some pending `OpAddEntry`s in `updateLedgersIdsComplete`, only the ledger id should be updated and the data of `OpAddEntry` should not be modified. ### Modifications Only call `beforeAddEntry` for once at the beginning of `internalAsyncAddEntry`.
Fixes #9038
Motivation
As described in PIP-70.
One of the use case for Broker entry metadata is providing continuous message sequence-Id for messages in one topic-partition which is useful for Protocol Hanlder like KOP.
This PR enable Pulsar to support continuous offset for message based on Broker entry metadata.
Modifications
Introduce a new field for broker entry metadta named
offset;Introduce a new interceptor type
ManagedLedgerInterceptorwhich intercept entry inManagedLedger;Each partition will be assigned a
ManagedLedgerInterceptorwhenManagedLedgercreated;Each Entry will be intercept for adding a monotone increasing offset in Broker entry metadata and the offet is added by batchSize of entry;
Support find position by a given offset.