Skip to content

Commit 333888a

Browse files
authored
[Issue 6168] Fix Unacked Message Tracker by Using Time Partition on C++ (#6391)
### Motivation Fix #6168 . >On C++ lib, like the following log, unacked messages are redelivered after about 2 * unAckedMessagesTimeout. ### Modifications As same #3118, by using TimePartition, fixed ` UnackedMessageTracker` . - Add `TickDurationInMs` - Add `redeliverUnacknowledgedMessages` which require `MessageIds` to `ConsumerImpl`, `MultiTopicsConsumerImpl` and `PartitionedConsumerImpl`.
1 parent 330e782 commit 333888a

14 files changed

+152
-50
lines changed

pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,10 @@ class PULSAR_PUBLIC ConsumerConfiguration {
155155
*/
156156
long getUnAckedMessagesTimeoutMs() const;
157157

158+
void setTickDurationInMs(const uint64_t milliSeconds);
159+
160+
long getTickDurationInMs() const;
161+
158162
/**
159163
* Set the delay to wait before re-delivering messages that have failed to be process.
160164
* <p>

pulsar-client-cpp/include/pulsar/c/producer_configuration.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,4 +178,4 @@ PULSAR_PUBLIC void pulsar_producer_configuration_set_property(pulsar_producer_co
178178

179179
#ifdef __cplusplus
180180
}
181-
#endif
181+
#endif

pulsar-client-cpp/lib/ConsumerConfiguration.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,12 @@ void ConsumerConfiguration::setUnAckedMessagesTimeoutMs(const uint64_t milliSeco
9898
impl_->unAckedMessagesTimeoutMs = milliSeconds;
9999
}
100100

101+
long ConsumerConfiguration::getTickDurationInMs() const { return impl_->tickDurationInMs; }
102+
103+
void ConsumerConfiguration::setTickDurationInMs(const uint64_t milliSeconds) {
104+
impl_->tickDurationInMs = milliSeconds;
105+
}
106+
101107
void ConsumerConfiguration::setNegativeAckRedeliveryDelayMs(long redeliveryDelayMillis) {
102108
impl_->negativeAckRedeliveryDelayMs = redeliveryDelayMillis;
103109
}

pulsar-client-cpp/lib/ConsumerConfigurationImpl.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ namespace pulsar {
2727
struct ConsumerConfigurationImpl {
2828
SchemaInfo schemaInfo;
2929
long unAckedMessagesTimeoutMs;
30+
long tickDurationInMs;
3031

3132
long negativeAckRedeliveryDelayMs;
3233
ConsumerType consumerType;
@@ -45,6 +46,7 @@ struct ConsumerConfigurationImpl {
4546
ConsumerConfigurationImpl()
4647
: schemaInfo(),
4748
unAckedMessagesTimeoutMs(0),
49+
tickDurationInMs(1000),
4850
negativeAckRedeliveryDelayMs(60000),
4951
consumerType(ConsumerExclusive),
5052
messageListener(),

pulsar-client-cpp/lib/ConsumerImpl.cc

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,13 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
6666
consumerStrStream << "[" << topic_ << ", " << subscription_ << ", " << consumerId_ << "] ";
6767
consumerStr_ = consumerStrStream.str();
6868
if (conf.getUnAckedMessagesTimeoutMs() != 0) {
69-
unAckedMessageTrackerPtr_.reset(
70-
new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this));
69+
if (conf.getTickDurationInMs() > 0) {
70+
unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
71+
conf.getUnAckedMessagesTimeoutMs(), conf.getTickDurationInMs(), client, *this));
72+
} else {
73+
unAckedMessageTrackerPtr_.reset(
74+
new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this));
75+
}
7176
} else {
7277
unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled());
7378
}
@@ -953,6 +958,18 @@ Result ConsumerImpl::resumeMessageListener() {
953958
void ConsumerImpl::redeliverUnacknowledgedMessages() {
954959
static std::set<MessageId> emptySet;
955960
redeliverMessages(emptySet);
961+
unAckedMessageTrackerPtr_->clear();
962+
}
963+
964+
void ConsumerImpl::redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds) {
965+
if (messageIds.empty()) {
966+
return;
967+
}
968+
if (config_.getConsumerType() != ConsumerShared && config_.getConsumerType() != ConsumerKeyShared) {
969+
redeliverUnacknowledgedMessages();
970+
return;
971+
}
972+
redeliverMessages(messageIds);
956973
}
957974

958975
void ConsumerImpl::redeliverMessages(const std::set<MessageId>& messageIds) {

pulsar-client-cpp/lib/ConsumerImpl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ class ConsumerImpl : public ConsumerImplBase,
9898
virtual bool isCumulativeAcknowledgementAllowed(ConsumerType consumerType);
9999

100100
virtual void redeliverMessages(const std::set<MessageId>& messageIds);
101+
virtual void redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds);
101102
virtual void negativeAcknowledge(const MessageId& msgId);
102103

103104
virtual void closeAsync(ResultCallback callback);

pulsar-client-cpp/lib/ConsumerImplBase.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ class ConsumerImplBase {
4848
virtual Result pauseMessageListener() = 0;
4949
virtual Result resumeMessageListener() = 0;
5050
virtual void redeliverUnacknowledgedMessages() = 0;
51+
virtual void redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds) = 0;
5152
virtual const std::string& getName() const = 0;
5253
virtual int getNumOfPrefetchedMessages() const = 0;
5354
virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) = 0;

pulsar-client-cpp/lib/LogUtils.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,4 +55,4 @@ std::string LogUtils::getLoggerName(const std::string& path) {
5555
return path.substr(startIdx + 1, endIdx - startIdx - 1);
5656
}
5757

58-
} // namespace pulsar
58+
} // namespace pulsar

pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,13 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std
4545
consumerStr_ = consumerStrStream.str();
4646

4747
if (conf.getUnAckedMessagesTimeoutMs() != 0) {
48-
unAckedMessageTrackerPtr_.reset(
49-
new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this));
48+
if (conf.getTickDurationInMs() > 0) {
49+
unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
50+
conf.getUnAckedMessagesTimeoutMs(), conf.getTickDurationInMs(), client, *this));
51+
} else {
52+
unAckedMessageTrackerPtr_.reset(
53+
new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this));
54+
}
5055
} else {
5156
unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled());
5257
}
@@ -653,6 +658,22 @@ void MultiTopicsConsumerImpl::redeliverUnacknowledgedMessages() {
653658
consumer++) {
654659
(consumer->second)->redeliverUnacknowledgedMessages();
655660
}
661+
unAckedMessageTrackerPtr_->clear();
662+
}
663+
664+
void MultiTopicsConsumerImpl::redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds) {
665+
if (messageIds.empty()) {
666+
return;
667+
}
668+
if (conf_.getConsumerType() != ConsumerShared && conf_.getConsumerType() != ConsumerKeyShared) {
669+
redeliverUnacknowledgedMessages();
670+
return;
671+
}
672+
LOG_DEBUG("Sending RedeliverUnacknowledgedMessages command for partitioned consumer.");
673+
for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer != consumers_.end();
674+
consumer++) {
675+
(consumer->second)->redeliverUnacknowledgedMessages(messageIds);
676+
}
656677
}
657678

658679
int MultiTopicsConsumerImpl::getNumOfPrefetchedMessages() const { return messages_.size(); }

pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
6969
virtual Result pauseMessageListener();
7070
virtual Result resumeMessageListener();
7171
virtual void redeliverUnacknowledgedMessages();
72+
virtual void redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds);
7273
virtual int getNumOfPrefetchedMessages() const;
7374
virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback);
7475
void handleGetConsumerStats(Result, BrokerConsumerStats, LatchPtr, MultiTopicsBrokerConsumerStatsPtr,

0 commit comments

Comments
 (0)