Skip to content

Commit 853fdfe

Browse files
committed
Clean cached messages on destroy kafka consumer
The callchain of the kafka consumer is very tricky, so for the sake of common sense let's just clean the messages on moving out consumer (and in dtor, but this is just to keep that two code path in sync). (Also reported by @filimonov) Signed-off-by: Azat Khuzhin <[email protected]>
1 parent b3d6caf commit 853fdfe

File tree

1 file changed

+2
-0
lines changed

1 file changed

+2
-0
lines changed

src/Storages/Kafka/KafkaConsumer.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ void KafkaConsumer::createConsumer(cppkafka::Configuration consumer_config)
153153

154154
ConsumerPtr && KafkaConsumer::moveConsumer()
155155
{
156+
cleanUnprocessed();
156157
if (!consumer->get_subscription().empty())
157158
{
158159
try
@@ -173,6 +174,7 @@ KafkaConsumer::~KafkaConsumer()
173174
if (!consumer)
174175
return;
175176

177+
cleanUnprocessed();
176178
try
177179
{
178180
if (!consumer->get_subscription().empty())

0 commit comments

Comments
 (0)