Skip to content

Commit 281163b

Browse files
authored
[Transaction] Add the batch size in transaction ack command. (#8659)
## Motivation Now in Failover sub we ack with transaction will not get the batch size from consumer pendingAcks, so we should ack request carry the batch size. ## implement We ack with transaction will carry the bath size for individual ack delete the consumer pendingAcks.
1 parent 1b790ba commit 281163b

File tree

21 files changed

+294
-114
lines changed

21 files changed

+294
-114
lines changed

conf/broker.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1171,5 +1171,5 @@ brokerServicePurgeInactiveFrequencyInSeconds=60
11711171
### --- Transaction config variables --- ###
11721172

11731173
# Enable transaction coordinator in broker
1174-
transactionCoordinatorEnabled=true
1174+
transactionCoordinatorEnabled=false
11751175
transactionMetadataStoreProviderClassName=org.apache.pulsar.transaction.coordinator.impl.InMemTransactionMetadataStoreProvider

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1849,7 +1849,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
18491849
category = CATEGORY_TRANSACTION,
18501850
doc = "Enable transaction coordinator in broker"
18511851
)
1852-
private boolean transactionCoordinatorEnabled = true;
1852+
private boolean transactionCoordinatorEnabled = false;
18531853

18541854
@FieldContext(
18551855
category = CATEGORY_TRANSACTION,

pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,9 @@ public void start() throws Exception {
319319
});
320320
broker.start();
321321

322-
broker.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
322+
if (config.isTransactionCoordinatorEnabled()) {
323+
broker.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
324+
}
323325

324326
final String cluster = config.getClusterName();
325327

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java

Lines changed: 35 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -355,9 +355,6 @@ public CompletableFuture<Void> messageAcked(CommandAck ack) {
355355
private CompletableFuture<Void> individualAckNormal(CommandAck ack, Map<String,Long> properties) {
356356
List<Position> positionsAcked = new ArrayList<>();
357357
List<PositionImpl> checkBatchPositions = null;
358-
if (isTransactionEnabled()) {
359-
checkBatchPositions = new ArrayList<>();
360-
}
361358
for (int i = 0; i < ack.getMessageIdCount(); i++) {
362359
MessageIdData msgId = ack.getMessageId(i);
363360
PositionImpl position;
@@ -366,21 +363,9 @@ private CompletableFuture<Void> individualAckNormal(CommandAck ack, Map<String,L
366363
SafeCollectionUtils.longListToArray(msgId.getAckSetList()));
367364
if (isTransactionEnabled()) {
368365
//sync the batch position bit set point, in order to delete the position in pending acks
369-
checkBatchPositions.add(position);
370-
LongPair batchSizePair = this.pendingAcks.get(msgId.getLedgerId(), msgId.getEntryId());
371-
if (batchSizePair == null) {
372-
String error = "Batch position [" + position + "] could not find " +
373-
"it's batch size from consumer pendingAcks!";
374-
log.warn(error);
375-
return FutureUtil.failedFuture(
376-
new BrokerServiceException.NotAllowedException(error));
377-
}
378-
((PersistentSubscription) subscription)
379-
.syncBatchPositionBitSetForPendingAck(new MutablePair<>(position, batchSizePair.first));
380-
//check if the position can remove from the consumer pending acks.
381-
// the bit set is empty in pending ack handle.
382-
if (((PersistentSubscription) subscription).checkIsCanDeleteConsumerPendingAck(position)) {
383-
removePendingAcks(position);
366+
if (Subscription.isIndividualAckMode(subType)) {
367+
((PersistentSubscription) subscription)
368+
.syncBatchPositionBitSetForPendingAck(position);
384369
}
385370
}
386371
} else {
@@ -393,14 +378,28 @@ private CompletableFuture<Void> individualAckNormal(CommandAck ack, Map<String,L
393378
checkAckValidationError(ack, position);
394379
}
395380
subscription.acknowledgeMessage(positionsAcked, AckType.Individual, properties);
396-
return CompletableFuture.completedFuture(null);
381+
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
382+
completableFuture.complete(null);
383+
if (isTransactionEnabled() && Subscription.isIndividualAckMode(subType)) {
384+
completableFuture.whenComplete((v, e) -> positionsAcked.forEach(position -> {
385+
//check if the position can remove from the consumer pending acks.
386+
// the bit set is empty in pending ack handle.
387+
if (((PositionImpl) position).getAckSet() != null) {
388+
if (((PersistentSubscription) subscription)
389+
.checkIsCanDeleteConsumerPendingAck((PositionImpl) position)) {
390+
removePendingAcks((PositionImpl) position);
391+
}
392+
}
393+
}));
394+
}
395+
return completableFuture;
397396
}
398397

399398

400399
//this method is for individual ack carry the transaction
401400
private CompletableFuture<Void> individualAckWithTransaction(CommandAck ack) {
402401
// Individual ack
403-
List<MutablePair<PositionImpl, Long>> positionsAcked = new ArrayList<>();
402+
List<MutablePair<PositionImpl, Integer>> positionsAcked = new ArrayList<>();
404403

405404
if (!isTransactionEnabled()) {
406405
return FutureUtil.failedFuture(
@@ -417,15 +416,11 @@ private CompletableFuture<Void> individualAckWithTransaction(CommandAck ack) {
417416
position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
418417
}
419418

420-
LongPair batchSizePair = this.pendingAcks.get(msgId.getLedgerId(), msgId.getEntryId());
421-
if (batchSizePair == null) {
422-
String error = "Batch position [" + position + "] could not find " +
423-
"it's batch size from consumer pendingAcks!";
424-
log.error(error);
425-
return FutureUtil.failedFuture(
426-
new BrokerServiceException.NotAllowedException(error));
419+
if (msgId.hasBatchIndex()) {
420+
positionsAcked.add(new MutablePair<>(position, msgId.getBatchSize()));
421+
} else {
422+
positionsAcked.add(new MutablePair<>(position, 0));
427423
}
428-
positionsAcked.add(new MutablePair<>(position, batchSizePair.first));
429424

430425
checkCanRemovePendingAcksAndHandle(position, msgId);
431426

@@ -434,12 +429,17 @@ private CompletableFuture<Void> individualAckWithTransaction(CommandAck ack) {
434429

435430
CompletableFuture<Void> completableFuture = transactionIndividualAcknowledge(ack.getTxnidMostBits(),
436431
ack.getTxnidLeastBits(), positionsAcked);
437-
positionsAcked.forEach(positionLongMutablePair -> {
438-
if (((PersistentSubscription) subscription)
439-
.checkIsCanDeleteConsumerPendingAck(positionLongMutablePair.left)) {
440-
removePendingAcks(positionLongMutablePair.left);
441-
}
442-
});
432+
if (Subscription.isIndividualAckMode(subType)) {
433+
completableFuture.whenComplete((v, e) ->
434+
positionsAcked.forEach(positionLongMutablePair -> {
435+
if (positionLongMutablePair.getLeft().getAckSet() != null) {
436+
if (((PersistentSubscription) subscription)
437+
.checkIsCanDeleteConsumerPendingAck(positionLongMutablePair.left)) {
438+
removePendingAcks(positionLongMutablePair.left);
439+
}
440+
}
441+
}));
442+
}
443443
return completableFuture;
444444
}
445445

@@ -465,7 +465,7 @@ private boolean isTransactionEnabled() {
465465
private CompletableFuture<Void> transactionIndividualAcknowledge(
466466
long txnidMostBits,
467467
long txnidLeastBits,
468-
List<MutablePair<PositionImpl, Long>> positionList) {
468+
List<MutablePair<PositionImpl, Integer>> positionList) {
469469
if (subscription instanceof PersistentSubscription) {
470470
TxnID txnID = new TxnID(txnidMostBits, txnidLeastBits);
471471
return ((PersistentSubscription) subscription).transactionIndividualAcknowledge(txnID, positionList);

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -467,7 +467,7 @@ protected void readMoreEntries(Consumer consumer) {
467467
return;
468468
}
469469

470-
if (messagesToRedeliver.size() > 0) {
470+
if (messagesToRedeliver != null && messagesToRedeliver.size() > 0) {
471471
if (log.isDebugEnabled()) {
472472
log.debug("[{}] Schedule replay of {} messages", name, messagesToRedeliver.size());
473473
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionFencedException;
5454
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionInvalidCursorPosition;
5555
import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle;
56+
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleDisabled;
5657
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
5758
import org.apache.pulsar.client.api.transaction.TxnID;
5859
import org.apache.pulsar.broker.service.Consumer;
@@ -71,7 +72,6 @@
7172
import org.apache.pulsar.common.protocol.Commands;
7273
import org.apache.pulsar.common.protocol.Markers;
7374
import org.apache.pulsar.common.util.FutureUtil;
74-
import org.apache.pulsar.transaction.common.exception.TransactionConflictException;
7575
import org.slf4j.Logger;
7676
import org.slf4j.LoggerFactory;
7777

@@ -133,7 +133,7 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma
133133
if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()) {
134134
this.pendingAckHandle = new PendingAckHandleImpl(this);
135135
} else {
136-
this.pendingAckHandle = null;
136+
this.pendingAckHandle = new PendingAckHandleDisabled();
137137
}
138138
IS_FENCED_UPDATER.set(this, FALSE);
139139
}
@@ -388,21 +388,11 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
388388
}
389389

390390
public CompletableFuture<Void> transactionIndividualAcknowledge(TxnID txnId,
391-
List<MutablePair<PositionImpl, Long>> positions) {
392-
if (pendingAckHandle == null) {
393-
return FutureUtil.failedFuture(
394-
new TransactionConflictException("Broker does't support Transaction pending ack!"));
395-
}
396-
391+
List<MutablePair<PositionImpl, Integer>> positions) {
397392
return pendingAckHandle.individualAcknowledgeMessage(txnId, positions);
398393
}
399394

400395
public CompletableFuture<Void> transactionCumulativeAcknowledge(TxnID txnId, List<PositionImpl> positions) {
401-
if (pendingAckHandle == null) {
402-
return FutureUtil.failedFuture(
403-
new TransactionConflictException("Broker does't support Transaction pending ack!"));
404-
}
405-
406396
return pendingAckHandle.cumulativeAcknowledgeMessage(txnId, positions);
407397
}
408398

@@ -1027,9 +1017,6 @@ public void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapsho
10271017

10281018
@Override
10291019
public CompletableFuture<Void> endTxn(long txnidMostBits, long txnidLeastBits, int txnAction) {
1030-
if (pendingAckHandle == null) {
1031-
return FutureUtil.failedFuture(new Exception("Broker does't support Transaction pending ack!"));
1032-
}
10331020
TxnID txnID = new TxnID(txnidMostBits, txnidLeastBits);
10341021
if (TxnAction.COMMIT.getNumber() == txnAction) {
10351022
return pendingAckHandle.commitTxn(txnID, Collections.emptyMap());
@@ -1050,7 +1037,7 @@ public ManagedCursor getCursor() {
10501037
return cursor;
10511038
}
10521039

1053-
public void syncBatchPositionBitSetForPendingAck(MutablePair<PositionImpl, Long> position) {
1040+
public void syncBatchPositionBitSetForPendingAck(PositionImpl position) {
10541041
this.pendingAckHandle.syncBatchPositionAckSetForTransaction(position);
10551042
}
10561043

pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public interface PendingAckHandle {
5454
* @throws NotAllowedException if Use this method incorrectly eg. not use
5555
* PositionImpl or cumulative ack with a list of positions.
5656
*/
57-
CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID, List<MutablePair<PositionImpl, Long>> positions);
57+
CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID, List<MutablePair<PositionImpl, Integer>> positions);
5858

5959
/**
6060
* Acknowledge message(s) for an ongoing transaction.
@@ -103,7 +103,7 @@ public interface PendingAckHandle {
103103
*
104104
* @param position {@link Position} which position need to sync and carry it batch size
105105
*/
106-
void syncBatchPositionAckSetForTransaction(MutablePair<PositionImpl, Long> position);
106+
void syncBatchPositionAckSetForTransaction(PositionImpl position);
107107

108108
/**
109109
* Judge the all ack set point have acked by normal ack and transaction pending ack.
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.transaction.pendingack.impl;
20+
21+
import org.apache.bookkeeper.mledger.Position;
22+
import org.apache.bookkeeper.mledger.impl.PositionImpl;
23+
import org.apache.commons.lang3.tuple.MutablePair;
24+
import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
25+
import org.apache.pulsar.broker.service.Consumer;
26+
import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle;
27+
import org.apache.pulsar.client.api.transaction.TxnID;
28+
import org.apache.pulsar.common.util.FutureUtil;
29+
30+
import java.util.List;
31+
import java.util.Map;
32+
import java.util.concurrent.CompletableFuture;
33+
34+
/**
35+
* The disabled implementation of {@link PendingAckHandle}.
36+
*/
37+
public class PendingAckHandleDisabled implements PendingAckHandle {
38+
39+
@Override
40+
public CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID,
41+
List<MutablePair<PositionImpl, Integer>> positions) {
42+
return FutureUtil.failedFuture(new NotAllowedException("The transaction is disabled"));
43+
}
44+
45+
@Override
46+
public CompletableFuture<Void> cumulativeAcknowledgeMessage(TxnID txnID, List<PositionImpl> positions) {
47+
return FutureUtil.failedFuture(new NotAllowedException("The transaction is disabled"));
48+
}
49+
50+
@Override
51+
public CompletableFuture<Void> commitTxn(TxnID txnID, Map<String, Long> properties) {
52+
return FutureUtil.failedFuture(new NotAllowedException("The transaction is disabled"));
53+
}
54+
55+
@Override
56+
public CompletableFuture<Void> abortTxn(TxnID txnId, Consumer consumer) {
57+
return FutureUtil.failedFuture(new NotAllowedException("The transaction is disabled"));
58+
}
59+
60+
@Override
61+
public void syncBatchPositionAckSetForTransaction(PositionImpl position) {
62+
//no operation
63+
}
64+
65+
@Override
66+
public boolean checkIsCanDeleteConsumerPendingAck(PositionImpl position) {
67+
return false;
68+
}
69+
70+
@Override
71+
public void clearIndividualPosition(Position position) {
72+
//no operation
73+
}
74+
}

0 commit comments

Comments
 (0)