Skip to content

Commit dcaa0a6

Browse files
Backport #80795 to 25.4: Properly wait consumers before shutting down Kafka engine
1 parent efc7098 commit dcaa0a6

File tree

7 files changed

+102
-13
lines changed

7 files changed

+102
-13
lines changed

src/Storages/Kafka/StorageKafka.cpp

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ StorageKafka::StorageKafka(
220220
const auto & table = getStorageID().getTableName();
221221
const auto & thread_name = std::string("KfkCln:") + table;
222222
setThreadName(thread_name.c_str(), /*truncate=*/ true);
223-
cleanConsumers();
223+
cleanConsumersByTTL();
224224
});
225225
}
226226

@@ -318,11 +318,10 @@ void StorageKafka::shutdown(bool)
318318
}
319319

320320
{
321-
std::lock_guard lock(mutex);
322321
LOG_TRACE(log, "Closing {} consumers", consumers.size());
323322
Stopwatch watch;
324-
consumers.clear();
325-
LOG_TRACE(log, "Consumers closed. Took {} ms.", watch.elapsedMilliseconds());
323+
cleanConsumers();
324+
LOG_TRACE(log, "Consumers closed in {} ms.", watch.elapsedMilliseconds());
326325
}
327326

328327
{
@@ -333,6 +332,40 @@ void StorageKafka::shutdown(bool)
333332
}
334333
}
335334

335+
void StorageKafka::cleanConsumers()
336+
{
337+
/// We need to clear the cppkafka::Consumer separately from KafkaConsumer, since cppkafka::Consumer holds a weak_ptr to the KafkaConsumer (for logging callback)
338+
/// So if we will remove cppkafka::Consumer from KafkaConsumer destructor, then due to librdkafka will call the logging again from destructor, it will lead to a deadlock
339+
std::vector<ConsumerPtr> consumers_to_close;
340+
341+
{
342+
std::unique_lock lock(mutex);
343+
/// Wait until all consumers will be released
344+
cv.wait(lock, [&, this]()
345+
{
346+
auto it = std::find_if(consumers.begin(), consumers.end(), [](const auto & ptr)
347+
{
348+
return ptr->isInUse();
349+
});
350+
return it == consumers.end();
351+
});
352+
353+
for (const auto & consumer : consumers)
354+
{
355+
if (!consumer->hasConsumer())
356+
continue;
357+
consumers_to_close.push_back(consumer->moveConsumer());
358+
}
359+
}
360+
361+
/// First close cppkafka::Consumer (it can use KafkaConsumer object via stat callback)
362+
consumers_to_close.clear();
363+
364+
{
365+
std::unique_lock lock(mutex);
366+
consumers.clear();
367+
}
368+
}
336369

337370
void StorageKafka::pushConsumer(KafkaConsumerPtr consumer)
338371
{
@@ -342,17 +375,13 @@ void StorageKafka::pushConsumer(KafkaConsumerPtr consumer)
342375
CurrentMetrics::sub(CurrentMetrics::KafkaConsumersInUse, 1);
343376
}
344377

345-
346-
KafkaConsumerPtr StorageKafka::popConsumer()
347-
{
348-
return popConsumer(std::chrono::milliseconds::zero());
349-
}
350-
351-
352378
KafkaConsumerPtr StorageKafka::popConsumer(std::chrono::milliseconds timeout)
353379
{
354380
std::unique_lock lock(mutex);
355381

382+
if (shutdown_called)
383+
throw Exception(ErrorCodes::ABORTED, "Table is detached");
384+
356385
KafkaConsumerPtr ret_consumer_ptr;
357386
std::optional<size_t> closed_consumer_index;
358387
for (size_t i = 0; i < consumers.size(); ++i)
@@ -457,7 +486,7 @@ cppkafka::Configuration StorageKafka::getProducerConfiguration()
457486
return KafkaConfigLoader::getProducerConfiguration(*this, params);
458487
}
459488

460-
void StorageKafka::cleanConsumers()
489+
void StorageKafka::cleanConsumersByTTL()
461490
{
462491
UInt64 ttl_usec = (*kafka_settings)[KafkaSetting::kafka_consumers_pool_ttl_ms] * 1'000;
463492

src/Storages/Kafka/StorageKafka.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@ class StorageKafka final : public IStorage, WithContext
7777
bool prefersLargeBlocks() const override { return false; }
7878

7979
void pushConsumer(KafkaConsumerPtr consumer);
80-
KafkaConsumerPtr popConsumer();
8180
KafkaConsumerPtr popConsumer(std::chrono::milliseconds timeout);
8281

8382
const auto & getFormatName() const { return format_name; }
@@ -164,6 +163,7 @@ class StorageKafka final : public IStorage, WithContext
164163

165164
bool streamToViews();
166165

166+
void cleanConsumersByTTL();
167167
void cleanConsumers();
168168
};
169169

tests/queries/0_stateless/03522_storage_kafka_shutdown_smoke.reference

Whitespace-only changes.
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
-- Tags: no-fasttest
2+
-- Tag no-fasttest -- requires Kafka
3+
4+
-- Regression test for proper StorageKafka shutdown
5+
-- https://github.com/ClickHouse/ClickHouse/issues/80674
6+
7+
-- librdkafka may print some errors/warnings:
8+
---
9+
-- <Warning> StorageKafka (test_8g8g0dlf.kafka_test): sasl.kerberos.kinit.cmd configuration parameter is ignored.
10+
-- <Error> StorageKafka (test_8g8g0dlf.kafka_test): [client.id:ClickHouse-localhost-test_8g8g0dlf-kafka_test] [rdk:ERROR] [thrd:app]: ClickHouse-localhost-test_8g8g0dlf-kafka_test#consumer-1: 0.0.0.0:9092/bootstrap: Connect to ipv4#0.0.0.0:9092 failed: Connection refused (after 0ms in state CONNECT)
11+
-- <Error> StorageKafka (test_8g8g0dlf.kafka_test): [client.id:ClickHouse-localhost-test_8g8g0dlf-kafka_test] [rdk:ERROR] [thrd:app]: ClickHouse-localhost-test_8g8g0dlf-kafka_test#consumer-1: 0.0.0.0:9092/bootstrap: Connect to ipv4#0.0.0.0:9092 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)
12+
-- <Warning> StorageKafka (test_8g8g0dlf.kafka_test): Can't get assignment. Will keep trying.
13+
SET send_logs_level='fatal';
14+
15+
DROP TABLE IF EXISTS kafka_test;
16+
CREATE TABLE kafka_test
17+
(
18+
`raw_message` String
19+
)
20+
ENGINE = Kafka
21+
SETTINGS kafka_broker_list = '0.0.0.0:9092', kafka_group_name = 'test', kafka_topic_list = 'kafka_test', kafka_format = 'RawBLOB', kafka_consumers_pool_ttl_ms=500;
22+
23+
SELECT * FROM kafka_test LIMIT 1 settings stream_like_engine_allow_direct_select=1;
24+
DROP TABLE kafka_test SYNC;

tests/queries/0_stateless/03523_storage_kafka_shutdown_smoke_local.reference

Whitespace-only changes.
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
#!/usr/bin/env bash
2+
# Tags: no-fasttest
3+
# Tag no-fasttest -- requires Kafka
4+
5+
# Regression test for proper StorageKafka shutdown
6+
# https://github.com/ClickHouse/ClickHouse/issues/80674
7+
#
8+
# NOTE: this test differs from 03522_storage_kafka_shutdown_smoke, since it creates topic w/o topic group (using named collections)
9+
10+
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
11+
# shellcheck source=../shell_config.sh
12+
. "$CUR_DIR"/../shell_config.sh
13+
14+
$CLICKHOUSE_LOCAL -nm -q "
15+
CREATE NAMED COLLECTION kafka_config AS kafka_broker_list = '0.0.0.0:9092';
16+
17+
CREATE TABLE dummy
18+
(
19+
raw_message String
20+
)
21+
ENGINE = Kafka(kafka_config)
22+
SETTINGS kafka_topic_list = 'dummy', kafka_format = 'RawBLOB', kafka_consumers_pool_ttl_ms=500;
23+
24+
SELECT * FROM dummy LIMIT 1 settings stream_like_engine_allow_direct_select=1; -- { serverError 1001 }
25+
DROP TABLE dummy;
26+
"

tests/tsan_ignorelist.txt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,13 @@
1111
[thread]
1212
# https://github.com/ClickHouse/ClickHouse/issues/55629
1313
fun:rd_kafka_broker_set_nodename
14+
# cJSON is used in two libraries (at least) in ClickHouse:
15+
# - librdkafka
16+
# - aws-c-common
17+
#
18+
# Both libraries has it's own hooks, passed to cJSON_InitHooks, but they are
19+
# compatible, and eventually they simply call malloc()/free().
20+
# So let's suppress this warning until it will be fixed properly.
21+
#
22+
# See https://github.com/ClickHouse/ClickHouse/issues/80866 for more details.
23+
fun:cJSON_InitHooks

0 commit comments

Comments
 (0)