Skip to content

Commit e7592c1

Browse files
committed
Create consumers for Kafka tables on fly (but keep them for 1min since last used)
Pool of consumers created a problem for librdkafka internal statistics, you need to read from the queue always, while in ClickHouse consumers created regardless are there any readers or not (attached materialized views or direct SELECTs). Otherwise, this statistics messages got queued and never released, which: - creates live memory leak - and also makes destroy very slow, due to librdkafka internals (it moves entries from this queue into another linked list, but in a with sorting, which is incredibly slow for linked lists) So the idea is simple, let's create a pool of consumers only when they are required, and destroy them after some timeout (right now it is 60 seconds) if nobody uses them, that way this problem should gone. This should also reduce number of internal librdkafka threads, when nobody reads from Kafka tables. Signed-off-by: Azat Khuzhin <[email protected]>
1 parent 51d4f58 commit e7592c1

File tree

5 files changed

+140
-45
lines changed

5 files changed

+140
-45
lines changed

src/Storages/Kafka/KafkaConsumer.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -604,6 +604,7 @@ KafkaConsumer::Stat KafkaConsumer::getStat() const
604604
.exceptions_buffer = [&](){std::lock_guard<std::mutex> lock(exception_mutex);
605605
return exceptions_buffer;}(),
606606
.in_use = in_use.load(),
607+
.last_used_usec = last_used_usec.load(),
607608
.rdkafka_stat = [&](){std::lock_guard<std::mutex> lock(rdkafka_stat_mutex);
608609
return rdkafka_stat;}(),
609610
};

src/Storages/Kafka/KafkaConsumer.h

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ class KafkaConsumer
5757
UInt64 num_rebalance_revocations;
5858
KafkaConsumer::ExceptionsBuffer exceptions_buffer;
5959
bool in_use;
60+
UInt64 last_used_usec;
6061
std::string rdkafka_stat;
6162
};
6263

@@ -113,11 +114,20 @@ class KafkaConsumer
113114
rdkafka_stat = stat_json_string;
114115
}
115116
void inUse() { in_use = true; }
116-
void notInUse() { in_use = false; }
117+
void notInUse()
118+
{
119+
in_use = false;
120+
last_used_usec = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
121+
}
117122

118123
// For system.kafka_consumers
119124
Stat getStat() const;
120125

126+
bool isInUse() const { return in_use; }
127+
UInt64 getLastUsedUsec() const { return last_used_usec; }
128+
129+
std::string getMemberId() const;
130+
121131
private:
122132
using Messages = std::vector<cppkafka::Message>;
123133
CurrentMetrics::Increment metric_increment{CurrentMetrics::KafkaConsumers};
@@ -168,6 +178,8 @@ class KafkaConsumer
168178
std::atomic<UInt64> num_rebalance_assignments = 0;
169179
std::atomic<UInt64> num_rebalance_revocations = 0;
170180
std::atomic<bool> in_use = 0;
181+
/// Last used time (for TTL)
182+
std::atomic<UInt64> last_used_usec = 0;
171183

172184
mutable std::mutex rdkafka_stat_mutex;
173185
std::string rdkafka_stat;
@@ -178,8 +190,6 @@ class KafkaConsumer
178190
/// Return number of messages with an error.
179191
size_t filterMessageErrors();
180192
ReadBufferPtr getNextMessage();
181-
182-
std::string getMemberId() const;
183193
};
184194

185195
}

src/Storages/Kafka/StorageKafka.cpp

Lines changed: 120 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include <Storages/StorageMaterializedView.h>
2828
#include <Storages/NamedCollectionsHelpers.h>
2929
#include <base/getFQDNOrHostName.h>
30+
#include <Common/Stopwatch.h>
3031
#include <Common/logger_useful.h>
3132
#include <boost/algorithm/string/replace.hpp>
3233
#include <boost/algorithm/string/split.hpp>
@@ -76,6 +77,7 @@ namespace ErrorCodes
7677
extern const int BAD_ARGUMENTS;
7778
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
7879
extern const int QUERY_NOT_ALLOWED;
80+
extern const int ABORTED;
7981
}
8082

8183
struct StorageKafkaInterceptors
@@ -262,7 +264,6 @@ StorageKafka::StorageKafka(
262264
, schema_name(getContext()->getMacros()->expand(kafka_settings->kafka_schema.value, macros_info))
263265
, num_consumers(kafka_settings->kafka_num_consumers.value)
264266
, log(&Poco::Logger::get("StorageKafka (" + table_id_.table_name + ")"))
265-
, semaphore(0, static_cast<int>(num_consumers))
266267
, intermediate_commit(kafka_settings->kafka_commit_every_batch.value)
267268
, settings_adjustments(createSettingsAdjustments())
268269
, thread_per_consumer(kafka_settings->kafka_thread_per_consumer.value)
@@ -343,8 +344,8 @@ Pipe StorageKafka::read(
343344
size_t /* max_block_size */,
344345
size_t /* num_streams */)
345346
{
346-
if (all_consumers.empty())
347-
return {};
347+
if (shutdown_called)
348+
throw Exception(ErrorCodes::ABORTED, "Table is detached");
348349

349350
if (!local_context->getSettingsRef().stream_like_engine_allow_direct_select)
350351
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED,
@@ -357,12 +358,12 @@ Pipe StorageKafka::read(
357358

358359
/// Always use all consumers at once, otherwise SELECT may not read messages from all partitions.
359360
Pipes pipes;
360-
pipes.reserve(all_consumers.size());
361+
pipes.reserve(num_consumers);
361362
auto modified_context = Context::createCopy(local_context);
362363
modified_context->applySettingsChanges(settings_adjustments);
363364

364365
// Claim as many consumers as requested, but don't block
365-
for (size_t i = 0; i < all_consumers.size(); ++i)
366+
for (size_t i = 0; i < num_consumers; ++i)
366367
{
367368
/// Use block size of 1, otherwise LIMIT won't work properly as it will buffer excess messages in the last block
368369
/// TODO: probably that leads to awful performance.
@@ -412,19 +413,7 @@ SinkToStoragePtr StorageKafka::write(const ASTPtr &, const StorageMetadataPtr &
412413

413414
void StorageKafka::startup()
414415
{
415-
for (size_t i = 0; i < num_consumers; ++i)
416-
{
417-
try
418-
{
419-
auto consumer = createConsumer(i);
420-
pushConsumer(consumer);
421-
all_consumers.push_back(consumer);
422-
}
423-
catch (const cppkafka::Exception &)
424-
{
425-
tryLogCurrentException(log);
426-
}
427-
}
416+
all_consumers.resize(num_consumers);
428417

429418
// Start the reader thread
430419
for (auto & task : tasks)
@@ -438,21 +427,34 @@ void StorageKafka::shutdown(bool)
438427
{
439428
shutdown_called = true;
440429

441-
for (auto & task : tasks)
442430
{
443-
// Interrupt streaming thread
444-
task->stream_cancelled = true;
431+
LOG_TRACE(log, "Waiting for streaming jobs");
432+
Stopwatch watch;
433+
for (auto & task : tasks)
434+
{
435+
// Interrupt streaming thread
436+
task->stream_cancelled = true;
445437

446-
LOG_TRACE(log, "Waiting for cleanup");
447-
task->holder->deactivate();
438+
LOG_TEST(log, "Waiting for cleanup of a task");
439+
task->holder->deactivate();
440+
}
441+
LOG_TRACE(log, "Streaming jobs finished in {} ms.", watch.elapsedMilliseconds());
448442
}
449443

450-
LOG_TRACE(log, "Closing consumers");
451-
for (size_t i = 0; i < all_consumers.size(); ++i)
452-
auto consumer = popConsumer();
453-
LOG_TRACE(log, "Consumers closed");
444+
{
445+
std::lock_guard lock(mutex);
446+
LOG_TRACE(log, "Closing {} consumers", consumers.size());
447+
Stopwatch watch;
448+
consumers.clear();
449+
LOG_TRACE(log, "Consumers closed. Took {} ms.", watch.elapsedMilliseconds());
450+
}
454451

455-
rd_kafka_wait_destroyed(CLEANUP_TIMEOUT_MS);
452+
{
453+
LOG_TRACE(log, "Waiting for final cleanup");
454+
Stopwatch watch;
455+
rd_kafka_wait_destroyed(CLEANUP_TIMEOUT_MS);
456+
LOG_TRACE(log, "Final cleanup finished in {} ms (timeout {} ms).", watch.elapsedMilliseconds(), CLEANUP_TIMEOUT_MS);
457+
}
456458
}
457459

458460

@@ -461,7 +463,7 @@ void StorageKafka::pushConsumer(KafkaConsumerPtr consumer)
461463
std::lock_guard lock(mutex);
462464
consumer->notInUse();
463465
consumers.push_back(consumer);
464-
semaphore.set();
466+
cv.notify_one();
465467
CurrentMetrics::sub(CurrentMetrics::KafkaConsumersInUse, 1);
466468
}
467469

@@ -474,22 +476,48 @@ KafkaConsumerPtr StorageKafka::popConsumer()
474476

475477
KafkaConsumerPtr StorageKafka::popConsumer(std::chrono::milliseconds timeout)
476478
{
477-
// Wait for the first free buffer
478-
if (timeout == std::chrono::milliseconds::zero())
479-
semaphore.wait();
479+
std::unique_lock lock(mutex);
480+
481+
KafkaConsumerPtr consumer_ptr;
482+
483+
/// 1. There is consumer available. Return one of them.
484+
if (!consumers.empty())
485+
{
486+
consumer_ptr = consumers.back();
487+
consumers.pop_back();
488+
}
480489
else
481490
{
482-
if (!semaphore.tryWait(timeout.count()))
483-
return nullptr;
491+
auto expired_consumer = std::find_if(all_consumers.begin(), all_consumers.end(), [](const auto & consumer_weak_ptr)
492+
{
493+
return consumer_weak_ptr.expired();
494+
});
495+
496+
/// 2. There is no consumer, but we can create a new one.
497+
if (expired_consumer != all_consumers.end())
498+
{
499+
size_t consumer_number = std::distance(all_consumers.begin(), expired_consumer);
500+
/// It should be OK to create consumer under lock, since it should be fast (without subscribing).
501+
consumer_ptr = createConsumer(consumer_number);
502+
*expired_consumer = consumer_ptr;
503+
}
504+
/// 3. There is no consumer and num_consumers already created, waiting @timeout.
505+
else
506+
{
507+
if (cv.wait_for(lock, timeout, [&]() { return !consumers.empty(); }))
508+
{
509+
consumer_ptr = consumers.back();
510+
consumers.pop_back();
511+
}
512+
}
484513
}
485514

486-
// Take the first available buffer from the list
487-
std::lock_guard lock(mutex);
488-
auto consumer = consumers.back();
489-
consumers.pop_back();
490-
CurrentMetrics::add(CurrentMetrics::KafkaConsumersInUse, 1);
491-
consumer->inUse();
492-
return consumer;
515+
if (consumer_ptr)
516+
{
517+
CurrentMetrics::add(CurrentMetrics::KafkaConsumersInUse, 1);
518+
consumer_ptr->inUse();
519+
}
520+
return consumer_ptr;
493521
}
494522

495523

@@ -545,10 +573,59 @@ KafkaConsumerPtr StorageKafka::createConsumer(size_t consumer_number)
545573
{
546574
kafka_consumer_ptr = std::make_shared<KafkaConsumer>(consumer_impl, log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), intermediate_commit, tasks.back()->stream_cancelled, topics);
547575
}
576+
LOG_TRACE(log, "Created #{} consumer", consumer_number);
577+
548578
*consumer_weak_ptr_ptr = kafka_consumer_ptr;
549579
return kafka_consumer_ptr;
550580
}
551581

582+
void StorageKafka::cleanConsumers()
583+
{
584+
static const UInt64 CONSUMER_TTL_USEC = 60'000'000;
585+
UInt64 now_usec = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
586+
587+
/// Copy consumers for closing to a new vector to close them without a lock
588+
std::vector<KafkaConsumerPtr> consumers_to_close;
589+
590+
{
591+
std::lock_guard lock(mutex);
592+
593+
for (auto it = consumers.begin(); it != consumers.end();)
594+
{
595+
auto & consumer_ptr = *it;
596+
597+
UInt64 consumer_last_used_usec = consumer_ptr->getLastUsedUsec();
598+
chassert(consumer_last_used_usec <= now_usec);
599+
if (now_usec - consumer_last_used_usec > CONSUMER_TTL_USEC)
600+
{
601+
/// We need this only to get the consumer number.
602+
auto weak_it = std::find_if(all_consumers.begin(), all_consumers.end(), [&](const auto & consume_weak_ptr)
603+
{
604+
return consumer_ptr == consume_weak_ptr.lock();
605+
});
606+
chassert(weak_it != all_consumers.end());
607+
size_t consumer_number = std::distance(all_consumers.begin(), weak_it);
608+
609+
LOG_TRACE(log, "Closing #{} consumer (id: {})", consumer_number, consumer_ptr->getMemberId());
610+
611+
consumers_to_close.push_back(std::move(consumer_ptr));
612+
it = consumers.erase(it);
613+
}
614+
else
615+
++it;
616+
}
617+
}
618+
619+
if (!consumers_to_close.empty())
620+
{
621+
Stopwatch watch;
622+
size_t closed = consumers_to_close.size();
623+
consumers_to_close.clear();
624+
LOG_TRACE(log, "{} consumers had been closed (due to {} usec timeout). Took {} ms.",
625+
closed, CONSUMER_TTL_USEC, watch.elapsedMilliseconds());
626+
}
627+
}
628+
552629
size_t StorageKafka::getMaxBlockSize() const
553630
{
554631
return kafka_settings->kafka_max_block_size.changed
@@ -806,6 +883,8 @@ void StorageKafka::threadFunc(size_t idx)
806883

807884
mv_attached.store(false);
808885

886+
cleanConsumers();
887+
809888
// Wait for attached views
810889
if (!task->stream_cancelled)
811890
task->holder->scheduleAfter(RESCHEDULE_MS);

src/Storages/Kafka/StorageKafka.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
#include <Poco/Semaphore.h>
1111

12+
#include <condition_variable>
1213
#include <mutex>
1314
#include <list>
1415
#include <atomic>
@@ -102,7 +103,6 @@ class StorageKafka final : public IStorage, WithContext
102103
const String schema_name;
103104
const size_t num_consumers; /// total number of consumers
104105
Poco::Logger * log;
105-
Poco::Semaphore semaphore;
106106
const bool intermediate_commit;
107107
const SettingsChanges settings_adjustments;
108108

@@ -112,6 +112,7 @@ class StorageKafka final : public IStorage, WithContext
112112
std::vector<KafkaConsumerWeakPtr> all_consumers; /// busy (belong to a KafkaSource) and vacant consumers
113113

114114
std::mutex mutex;
115+
std::condition_variable cv;
115116

116117
// Stream thread
117118
struct TaskContext
@@ -157,6 +158,7 @@ class StorageKafka final : public IStorage, WithContext
157158
bool streamToViews();
158159
bool checkDependencies(const StorageID & table_id);
159160

161+
void cleanConsumers();
160162
};
161163

162164
}

src/Storages/System/StorageSystemKafkaConsumers.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ NamesAndTypesList StorageSystemKafkaConsumers::getNamesAndTypes()
4141
{"num_rebalance_revocations", std::make_shared<DataTypeUInt64>()},
4242
{"num_rebalance_assignments", std::make_shared<DataTypeUInt64>()},
4343
{"is_currently_used", std::make_shared<DataTypeUInt8>()},
44+
{"last_used", std::make_shared<DataTypeDateTime64>(6)},
4445
{"rdkafka_stat", std::make_shared<DataTypeString>()},
4546
};
4647
return names_and_types;
@@ -78,6 +79,7 @@ void StorageSystemKafkaConsumers::fillData(MutableColumns & res_columns, Context
7879
auto & num_rebalance_revocations = assert_cast<ColumnUInt64 &>(*res_columns[index++]);
7980
auto & num_rebalance_assigments = assert_cast<ColumnUInt64 &>(*res_columns[index++]);
8081
auto & is_currently_used = assert_cast<ColumnUInt8 &>(*res_columns[index++]);
82+
auto & last_used = assert_cast<ColumnDateTime64 &>(*res_columns[index++]);
8183
auto & rdkafka_stat = assert_cast<ColumnString &>(*res_columns[index++]);
8284

8385
const auto access = context->getAccess();
@@ -144,6 +146,7 @@ void StorageSystemKafkaConsumers::fillData(MutableColumns & res_columns, Context
144146
num_rebalance_assigments.insert(consumer_stat.num_rebalance_assignments);
145147

146148
is_currently_used.insert(consumer_stat.in_use);
149+
last_used.insert(consumer_stat.last_used_usec);
147150

148151
rdkafka_stat.insertData(consumer_stat.rdkafka_stat.data(), consumer_stat.rdkafka_stat.size());
149152
}

0 commit comments

Comments
 (0)