Skip to content

Commit 9d44617

Browse files
Avoid adding duplicated BrokerEntryMetadata (#12018)
### Motivation When the Pulsar cluster enables broker entry metadata, sometimes there're some corrupted entries. See streamnative/kop#442 for example. It's because the broker entry metadata has been added twice. This bug might be introduced from #9039 https://github.com/apache/pulsar/blob/9b7c3275c904ac1e6a8ef67487a10a0506bb2c58/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L1516-L1518 It happened during a managed ledger's rollover while there're some pending `OpAddEntry`s in `updateLedgersIdsComplete`, only the ledger id should be updated and the data of `OpAddEntry` should not be modified. ### Modifications Only call `beforeAddEntry` for once at the beginning of `internalAsyncAddEntry`.
1 parent dbd09d7 commit 9d44617

File tree

1 file changed

+5
-7
lines changed

1 file changed

+5
-7
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -717,6 +717,9 @@ public void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AddEntryCallback
717717
}
718718

719719
private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
720+
if (!beforeAddEntry(addOperation)) {
721+
return;
722+
}
720723
pendingAddEntries.add(addOperation);
721724
final State state = STATE_UPDATER.get(this);
722725
if (state == State.Fenced) {
@@ -779,10 +782,7 @@ private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
779782
addOperation.setCloseWhenDone(true);
780783
STATE_UPDATER.set(this, State.ClosingLedger);
781784
}
782-
// interceptor entry before add to bookie
783-
if (beforeAddEntry(addOperation)) {
784-
addOperation.initiate();
785-
}
785+
addOperation.initiate();
786786
}
787787
}
788788

@@ -1513,9 +1513,7 @@ public synchronized void updateLedgersIdsComplete(Stat stat) {
15131513
ReferenceCountUtil.release(existsOp.data);
15141514
}
15151515
existsOp.setLedger(currentLedger);
1516-
if (beforeAddEntry(existsOp)) {
1517-
pendingAddEntries.add(existsOp);
1518-
}
1516+
pendingAddEntries.add(existsOp);
15191517
}
15201518
} while (existsOp != null && --pendingSize > 0);
15211519

0 commit comments

Comments
 (0)