Skip to content

Commit 67f8cf3

Browse files
authored
[pulsar-client-cpp] Fix Redelivery of Messages on UnackedMessageTracker When Ack Messages . (#6498)
### Motivation Because of #6391 , acked messages were counted as unacked messages. Although messages from brokers were acknowledged, the following log was output. ``` 2020-03-06 19:44:51.790 INFO ConsumerImpl:174 | [persistent://public/default/t1, sub1, 0] Created consumer on broker [127.0.0.1:58860 -> 127.0.0.1:6650] my-message-0: Fri Mar 6 19:45:05 2020 my-message-1: Fri Mar 6 19:45:05 2020 my-message-2: Fri Mar 6 19:45:05 2020 2020-03-06 19:45:15.818 INFO UnAckedMessageTrackerEnabled:53 | [persistent://public/default/t1, sub1, 0] : 3 Messages were not acked within 10000 time ``` This behavior happened on master branch.
1 parent 5285c68 commit 67f8cf3

File tree

2 files changed

+10
-7
lines changed

2 files changed

+10
-7
lines changed

pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,10 @@ UnAckedMessageTrackerEnabled::UnAckedMessageTrackerEnabled(long timeoutMs, long
9090
bool UnAckedMessageTrackerEnabled::add(const MessageId& m) {
9191
std::lock_guard<std::mutex> acquire(lock_);
9292
if (messageIdPartitionMap.count(m) == 0) {
93-
bool insert = messageIdPartitionMap.insert(std::make_pair(m, timePartitions.back())).second;
94-
return insert && timePartitions.back().insert(m).second;
93+
std::set<MessageId>& partition = timePartitions.back();
94+
bool emplace = messageIdPartitionMap.emplace(m, partition).second;
95+
bool insert = partition.insert(m).second;
96+
return emplace && insert;
9597
}
9698
return false;
9799
}
@@ -104,7 +106,8 @@ bool UnAckedMessageTrackerEnabled::isEmpty() {
104106
bool UnAckedMessageTrackerEnabled::remove(const MessageId& m) {
105107
std::lock_guard<std::mutex> acquire(lock_);
106108
bool removed = false;
107-
std::map<MessageId, std::set<MessageId>>::iterator exist = messageIdPartitionMap.find(m);
109+
110+
std::map<MessageId, std::set<MessageId>&>::iterator exist = messageIdPartitionMap.find(m);
108111
if (exist != messageIdPartitionMap.end()) {
109112
removed = exist->second.erase(m);
110113
}
@@ -121,7 +124,7 @@ void UnAckedMessageTrackerEnabled::removeMessagesTill(const MessageId& msgId) {
121124
for (auto it = messageIdPartitionMap.begin(); it != messageIdPartitionMap.end(); it++) {
122125
MessageId msgIdInMap = it->first;
123126
if (msgIdInMap < msgId) {
124-
std::map<MessageId, std::set<MessageId>>::iterator exist = messageIdPartitionMap.find(msgId);
127+
std::map<MessageId, std::set<MessageId>&>::iterator exist = messageIdPartitionMap.find(msgId);
125128
if (exist != messageIdPartitionMap.end()) {
126129
exist->second.erase(msgId);
127130
}
@@ -135,7 +138,8 @@ void UnAckedMessageTrackerEnabled::removeTopicMessage(const std::string& topic)
135138
for (auto it = messageIdPartitionMap.begin(); it != messageIdPartitionMap.end(); it++) {
136139
MessageId msgIdInMap = it->first;
137140
if (msgIdInMap.getTopicName().compare(topic) == 0) {
138-
std::map<MessageId, std::set<MessageId>>::iterator exist = messageIdPartitionMap.find(msgIdInMap);
141+
std::map<MessageId, std::set<MessageId>&>::iterator exist =
142+
messageIdPartitionMap.find(msgIdInMap);
139143
if (exist != messageIdPartitionMap.end()) {
140144
exist->second.erase(msgIdInMap);
141145
}

pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
#include <mutex>
2424

2525
namespace pulsar {
26-
2726
class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface {
2827
public:
2928
~UnAckedMessageTrackerEnabled();
@@ -41,7 +40,7 @@ class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface {
4140
void timeoutHandlerHelper();
4241
bool isEmpty();
4342
long size();
44-
std::map<MessageId, std::set<MessageId>> messageIdPartitionMap;
43+
std::map<MessageId, std::set<MessageId>&> messageIdPartitionMap;
4544
std::deque<std::set<MessageId>> timePartitions;
4645
std::mutex lock_;
4746
DeadlineTimerPtr timer_;

0 commit comments

Comments
 (0)