Skip to content

Commit 88b47e5

Browse files
authored
[broker][monitoring] add message ack rate metric for consumer (#15674)
1 parent 8d8a19f commit 88b47e5

File tree

18 files changed

+151
-5
lines changed

18 files changed

+151
-5
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ public class Consumer {
8484
private final Rate msgRedeliver;
8585
private final LongAdder msgOutCounter;
8686
private final LongAdder bytesOutCounter;
87+
private final Rate messageAckRate;
8788

8889
private long lastConsumedTimestamp;
8990
private long lastAckedTimestamp;
@@ -159,6 +160,7 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
159160
this.msgRedeliver = new Rate();
160161
this.bytesOutCounter = new LongAdder();
161162
this.msgOutCounter = new LongAdder();
163+
this.messageAckRate = new Rate();
162164
this.appId = appId;
163165

164166
// Ensure we start from compacted view
@@ -363,6 +365,8 @@ public void doUnsubscribe(final long requestId) {
363365
}
364366

365367
public CompletableFuture<Void> messageAcked(CommandAck ack) {
368+
CompletableFuture<Void> future;
369+
366370
this.lastAckedTimestamp = System.currentTimeMillis();
367371
Map<String, Long> properties = Collections.emptyMap();
368372
if (ack.getPropertiesCount() > 0) {
@@ -396,20 +400,27 @@ public CompletableFuture<Void> messageAcked(CommandAck ack) {
396400
}
397401
if (ack.hasTxnidMostBits() && ack.hasTxnidLeastBits()) {
398402
List<PositionImpl> positionsAcked = Collections.singletonList(position);
399-
return transactionCumulativeAcknowledge(ack.getTxnidMostBits(),
403+
future = transactionCumulativeAcknowledge(ack.getTxnidMostBits(),
400404
ack.getTxnidLeastBits(), positionsAcked);
401405
} else {
402406
List<Position> positionsAcked = Collections.singletonList(position);
403407
subscription.acknowledgeMessage(positionsAcked, AckType.Cumulative, properties);
404-
return CompletableFuture.completedFuture(null);
408+
future = CompletableFuture.completedFuture(null);
405409
}
406410
} else {
407411
if (ack.hasTxnidLeastBits() && ack.hasTxnidMostBits()) {
408-
return individualAckWithTransaction(ack);
412+
future = individualAckWithTransaction(ack);
409413
} else {
410-
return individualAckNormal(ack, properties);
414+
future = individualAckNormal(ack, properties);
411415
}
412416
}
417+
418+
return future
419+
.whenComplete((__, t) -> {
420+
if (t == null) {
421+
this.messageAckRate.recordEvent(ack.getMessageIdsCount());
422+
}
423+
});
413424
}
414425

415426
//this method is for individual ack not carry the transaction
@@ -739,7 +750,10 @@ public void updateRates() {
739750
msgOut.calculateRate();
740751
chunkedMessageRate.calculateRate();
741752
msgRedeliver.calculateRate();
753+
messageAckRate.calculateRate();
754+
742755
stats.msgRateOut = msgOut.getRate();
756+
stats.messageAckRate = messageAckRate.getRate();
743757
stats.msgThroughputOut = msgOut.getValueRate();
744758
stats.msgRateRedeliver = msgRedeliver.getRate();
745759
stats.chunkedMessageRate = chunkedMessageRate.getRate();
@@ -752,7 +766,7 @@ public void updateStats(ConsumerStatsImpl consumerStats) {
752766
lastAckedTimestamp = consumerStats.lastAckedTimestamp;
753767
lastConsumedTimestamp = consumerStats.lastConsumedTimestamp;
754768
MESSAGE_PERMITS_UPDATER.set(this, consumerStats.availablePermits);
755-
if (log.isDebugEnabled()){
769+
if (log.isDebugEnabled()) {
756770
log.debug("[{}-{}] Setting broker.service.Consumer's messagePermits to {} for consumer {}", topicName,
757771
subscription, consumerStats.availablePermits, consumerId);
758772
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -616,6 +616,7 @@ ByteBuf createConsumerStatsResponse(Consumer consumer, long requestId) {
616616
.setConnectedSince(consumerStats.getConnectedSince())
617617
.setMsgBacklog(subscription.getNumberOfEntriesInBacklog(false))
618618
.setMsgRateExpired(subscription.getExpiredMessageRate())
619+
.setMessageAckRate(consumerStats.messageAckRate)
619620
.setType(subscription.getTypeString());
620621

621622
return Commands.serializeWithSize(cmd);

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StreamingStats.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public static void writeConsumerStats(StatsOutputStream statsStream, CommandSubs
6565
statsStream.writePair("msgThroughputOut", stats.msgThroughputOut);
6666
statsStream.writePair("msgRateRedeliver", stats.msgRateRedeliver);
6767
statsStream.writePair("avgMessagesPerEntry", stats.avgMessagesPerEntry);
68+
statsStream.writePair("messageAckRate", stats.messageAckRate);
6869

6970
if (Subscription.isIndividualAckMode(subType)) {
7071
statsStream.writePair("unackedMessages", stats.unackedMessages);

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -456,6 +456,7 @@ public NonPersistentSubscriptionStatsImpl getStats() {
456456
ConsumerStatsImpl consumerStats = consumer.getStats();
457457
subStats.consumers.add(consumerStats);
458458
subStats.msgRateOut += consumerStats.msgRateOut;
459+
subStats.messageAckRate += consumerStats.messageAckRate;
459460
subStats.msgThroughputOut += consumerStats.msgThroughputOut;
460461
subStats.bytesOutCounter += consumerStats.bytesOutCounter;
461462
subStats.msgOutCounter += consumerStats.msgOutCounter;

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -717,6 +717,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
717717
double subMsgRateOut = 0;
718718
double subMsgThroughputOut = 0;
719719
double subMsgRateRedeliver = 0;
720+
double subMsgAckRate = 0;
720721

721722
// Start subscription name & consumers
722723
try {
@@ -731,6 +732,8 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
731732

732733
ConsumerStatsImpl consumerStats = consumer.getStats();
733734
subMsgRateOut += consumerStats.msgRateOut;
735+
subMsgAckRate += consumerStats.messageAckRate;
736+
734737
subMsgThroughputOut += consumerStats.msgThroughputOut;
735738
subMsgRateRedeliver += consumerStats.msgRateRedeliver;
736739

@@ -745,6 +748,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
745748
topicStatsStream.writePair("msgBacklog", subscription.getNumberOfEntriesInBacklog(false));
746749
topicStatsStream.writePair("msgRateExpired", subscription.getExpiredMessageRate());
747750
topicStatsStream.writePair("msgRateOut", subMsgRateOut);
751+
topicStatsStream.writePair("messageAckRate", subMsgAckRate);
748752
topicStatsStream.writePair("msgThroughputOut", subMsgThroughputOut);
749753
topicStatsStream.writePair("msgRateRedeliver", subMsgRateRedeliver);
750754
topicStatsStream.writePair("type", subscription.getTypeString());

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -966,6 +966,7 @@ public SubscriptionStatsImpl getStats(Boolean getPreciseBacklog, boolean subscri
966966
subStats.bytesOutCounter += consumerStats.bytesOutCounter;
967967
subStats.msgOutCounter += consumerStats.msgOutCounter;
968968
subStats.msgRateRedeliver += consumerStats.msgRateRedeliver;
969+
subStats.messageAckRate += consumerStats.messageAckRate;
969970
subStats.chunkedMessageRate += consumerStats.chunkedMessageRate;
970971
subStats.unackedMessages += consumerStats.unackedMessages;
971972
subStats.lastConsumedTimestamp =

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1717,6 +1717,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
17171717
double subMsgRateOut = 0;
17181718
double subMsgThroughputOut = 0;
17191719
double subMsgRateRedeliver = 0;
1720+
double subMsgAckRate = 0;
17201721

17211722
// Start subscription name & consumers
17221723
try {
@@ -1730,6 +1731,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
17301731

17311732
ConsumerStatsImpl consumerStats = consumer.getStats();
17321733
subMsgRateOut += consumerStats.msgRateOut;
1734+
subMsgAckRate += consumerStats.messageAckRate;
17331735
subMsgThroughputOut += consumerStats.msgThroughputOut;
17341736
subMsgRateRedeliver += consumerStats.msgRateRedeliver;
17351737

@@ -1744,6 +1746,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
17441746
subscription.getNumberOfEntriesInBacklog(true));
17451747
topicStatsStream.writePair("msgRateExpired", subscription.getExpiredMessageRate());
17461748
topicStatsStream.writePair("msgRateOut", subMsgRateOut);
1749+
topicStatsStream.writePair("messageAckRate", subMsgAckRate);
17471750
topicStatsStream.writePair("msgThroughputOut", subMsgThroughputOut);
17481751
topicStatsStream.writePair("msgRateRedeliver", subMsgRateRedeliver);
17491752
topicStatsStream.writePair("numberOfEntriesSinceFirstNotAckedMessage",

pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ public class AggregatedConsumerStats {
2828

2929
public double msgRateOut;
3030

31+
public double msgAckRate;
32+
3133
public double msgThroughputOut;
3234

3335
public long availablePermits;

pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public class AggregatedNamespaceStats {
3333
public double throughputIn;
3434
public double throughputOut;
3535

36+
public long messageAckRate;
3637
public long bytesInCounter;
3738
public long msgInCounter;
3839
public long bytesOutCounter;
@@ -122,6 +123,7 @@ void updateStats(TopicStats stats) {
122123
consumerStats.blockedSubscriptionOnUnackedMsgs = v.blockedSubscriptionOnUnackedMsgs;
123124
consumerStats.msgRateRedeliver += v.msgRateRedeliver;
124125
consumerStats.unackedMessages += v.unackedMessages;
126+
messageAckRate += v.msgAckRate;
125127
});
126128
});
127129

pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ public class AggregatedSubscriptionStats {
3636

3737
public double msgRateOut;
3838

39+
public double messageAckRate;
40+
3941
public double msgThroughputOut;
4042

4143
public long msgDelayed;

0 commit comments

Comments
 (0)