2727#include < Storages/StorageMaterializedView.h>
2828#include < Storages/NamedCollectionsHelpers.h>
2929#include < base/getFQDNOrHostName.h>
30+ #include < Common/Stopwatch.h>
3031#include < Common/logger_useful.h>
3132#include < boost/algorithm/string/replace.hpp>
3233#include < boost/algorithm/string/split.hpp>
@@ -76,6 +77,7 @@ namespace ErrorCodes
7677 extern const int BAD_ARGUMENTS;
7778 extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
7879 extern const int QUERY_NOT_ALLOWED;
80+ extern const int ABORTED;
7981}
8082
8183struct StorageKafkaInterceptors
@@ -262,7 +264,6 @@ StorageKafka::StorageKafka(
262264 , schema_name(getContext()->getMacros()->expand(kafka_settings->kafka_schema.value, macros_info))
263265 , num_consumers(kafka_settings->kafka_num_consumers.value)
264266 , log(&Poco::Logger::get (" StorageKafka (" + table_id_.table_name + " )" ))
265- , semaphore(0 , static_cast <int >(num_consumers))
266267 , intermediate_commit(kafka_settings->kafka_commit_every_batch.value)
267268 , settings_adjustments(createSettingsAdjustments())
268269 , thread_per_consumer(kafka_settings->kafka_thread_per_consumer.value)
@@ -343,8 +344,8 @@ Pipe StorageKafka::read(
343344 size_t /* max_block_size */ ,
344345 size_t /* num_streams */ )
345346{
346- if (all_consumers. empty () )
347- return {} ;
347+ if (shutdown_called )
348+ throw Exception (ErrorCodes::ABORTED, " Table is detached " ) ;
348349
349350 if (!local_context->getSettingsRef ().stream_like_engine_allow_direct_select )
350351 throw Exception (ErrorCodes::QUERY_NOT_ALLOWED,
@@ -357,12 +358,12 @@ Pipe StorageKafka::read(
357358
358359 // / Always use all consumers at once, otherwise SELECT may not read messages from all partitions.
359360 Pipes pipes;
360- pipes.reserve (all_consumers. size () );
361+ pipes.reserve (num_consumers );
361362 auto modified_context = Context::createCopy (local_context);
362363 modified_context->applySettingsChanges (settings_adjustments);
363364
364365 // Claim as many consumers as requested, but don't block
365- for (size_t i = 0 ; i < all_consumers. size () ; ++i)
366+ for (size_t i = 0 ; i < num_consumers ; ++i)
366367 {
367368 // / Use block size of 1, otherwise LIMIT won't work properly as it will buffer excess messages in the last block
368369 // / TODO: probably that leads to awful performance.
@@ -412,19 +413,7 @@ SinkToStoragePtr StorageKafka::write(const ASTPtr &, const StorageMetadataPtr &
412413
413414void StorageKafka::startup ()
414415{
415- for (size_t i = 0 ; i < num_consumers; ++i)
416- {
417- try
418- {
419- auto consumer = createConsumer (i);
420- pushConsumer (consumer);
421- all_consumers.push_back (consumer);
422- }
423- catch (const cppkafka::Exception &)
424- {
425- tryLogCurrentException (log);
426- }
427- }
416+ all_consumers.resize (num_consumers);
428417
429418 // Start the reader thread
430419 for (auto & task : tasks)
@@ -438,21 +427,34 @@ void StorageKafka::shutdown(bool)
438427{
439428 shutdown_called = true ;
440429
441- for (auto & task : tasks)
442430 {
443- // Interrupt streaming thread
444- task->stream_cancelled = true ;
431+ LOG_TRACE (log, " Waiting for streaming jobs" );
432+ Stopwatch watch;
433+ for (auto & task : tasks)
434+ {
435+ // Interrupt streaming thread
436+ task->stream_cancelled = true ;
445437
446- LOG_TRACE (log, " Waiting for cleanup" );
447- task->holder ->deactivate ();
438+ LOG_TEST (log, " Waiting for cleanup of a task" );
439+ task->holder ->deactivate ();
440+ }
441+ LOG_TRACE (log, " Streaming jobs finished in {} ms." , watch.elapsedMilliseconds ());
448442 }
449443
450- LOG_TRACE (log, " Closing consumers" );
451- for (size_t i = 0 ; i < all_consumers.size (); ++i)
452- auto consumer = popConsumer ();
453- LOG_TRACE (log, " Consumers closed" );
444+ {
445+ std::lock_guard lock (mutex);
446+ LOG_TRACE (log, " Closing {} consumers" , consumers.size ());
447+ Stopwatch watch;
448+ consumers.clear ();
449+ LOG_TRACE (log, " Consumers closed. Took {} ms." , watch.elapsedMilliseconds ());
450+ }
454451
455- rd_kafka_wait_destroyed (CLEANUP_TIMEOUT_MS);
452+ {
453+ LOG_TRACE (log, " Waiting for final cleanup" );
454+ Stopwatch watch;
455+ rd_kafka_wait_destroyed (CLEANUP_TIMEOUT_MS);
456+ LOG_TRACE (log, " Final cleanup finished in {} ms (timeout {} ms)." , watch.elapsedMilliseconds (), CLEANUP_TIMEOUT_MS);
457+ }
456458}
457459
458460
@@ -461,7 +463,7 @@ void StorageKafka::pushConsumer(KafkaConsumerPtr consumer)
461463 std::lock_guard lock (mutex);
462464 consumer->notInUse ();
463465 consumers.push_back (consumer);
464- semaphore. set ();
466+ cv. notify_one ();
465467 CurrentMetrics::sub (CurrentMetrics::KafkaConsumersInUse, 1 );
466468}
467469
@@ -474,22 +476,48 @@ KafkaConsumerPtr StorageKafka::popConsumer()
474476
475477KafkaConsumerPtr StorageKafka::popConsumer (std::chrono::milliseconds timeout)
476478{
477- // Wait for the first free buffer
478- if (timeout == std::chrono::milliseconds::zero ())
479- semaphore.wait ();
479+ std::unique_lock lock (mutex);
480+
481+ KafkaConsumerPtr consumer_ptr;
482+
483+ // / 1. There is consumer available. Return one of them.
484+ if (!consumers.empty ())
485+ {
486+ consumer_ptr = consumers.back ();
487+ consumers.pop_back ();
488+ }
480489 else
481490 {
482- if (!semaphore.tryWait (timeout.count ()))
483- return nullptr ;
491+ auto expired_consumer = std::find_if (all_consumers.begin (), all_consumers.end (), [](const auto & consumer_weak_ptr)
492+ {
493+ return consumer_weak_ptr.expired ();
494+ });
495+
496+ // / 2. There is no consumer, but we can create a new one.
497+ if (expired_consumer != all_consumers.end ())
498+ {
499+ size_t consumer_number = std::distance (all_consumers.begin (), expired_consumer);
500+ // / It should be OK to create consumer under lock, since it should be fast (without subscribing).
501+ consumer_ptr = createConsumer (consumer_number);
502+ *expired_consumer = consumer_ptr;
503+ }
504+ // / 3. There is no consumer and num_consumers already created, waiting @timeout.
505+ else
506+ {
507+ if (cv.wait_for (lock, timeout, [&]() { return !consumers.empty (); }))
508+ {
509+ consumer_ptr = consumers.back ();
510+ consumers.pop_back ();
511+ }
512+ }
484513 }
485514
486- // Take the first available buffer from the list
487- std::lock_guard lock (mutex);
488- auto consumer = consumers.back ();
489- consumers.pop_back ();
490- CurrentMetrics::add (CurrentMetrics::KafkaConsumersInUse, 1 );
491- consumer->inUse ();
492- return consumer;
515+ if (consumer_ptr)
516+ {
517+ CurrentMetrics::add (CurrentMetrics::KafkaConsumersInUse, 1 );
518+ consumer_ptr->inUse ();
519+ }
520+ return consumer_ptr;
493521}
494522
495523
@@ -545,10 +573,59 @@ KafkaConsumerPtr StorageKafka::createConsumer(size_t consumer_number)
545573 {
546574 kafka_consumer_ptr = std::make_shared<KafkaConsumer>(consumer_impl, log, getPollMaxBatchSize (), getPollTimeoutMillisecond (), intermediate_commit, tasks.back ()->stream_cancelled , topics);
547575 }
576+ LOG_TRACE (log, " Created #{} consumer" , consumer_number);
577+
548578 *consumer_weak_ptr_ptr = kafka_consumer_ptr;
549579 return kafka_consumer_ptr;
550580}
551581
582+ void StorageKafka::cleanConsumers ()
583+ {
584+ static const UInt64 CONSUMER_TTL_USEC = 60'000'000 ;
585+ UInt64 now_usec = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now ().time_since_epoch ()).count ();
586+
587+ // / Copy consumers for closing to a new vector to close them without a lock
588+ std::vector<KafkaConsumerPtr> consumers_to_close;
589+
590+ {
591+ std::lock_guard lock (mutex);
592+
593+ for (auto it = consumers.begin (); it != consumers.end ();)
594+ {
595+ auto & consumer_ptr = *it;
596+
597+ UInt64 consumer_last_used_usec = consumer_ptr->getLastUsedUsec ();
598+ chassert (consumer_last_used_usec <= now_usec);
599+ if (now_usec - consumer_last_used_usec > CONSUMER_TTL_USEC)
600+ {
601+ // / We need this only to get the consumer number.
602+ auto weak_it = std::find_if (all_consumers.begin (), all_consumers.end (), [&](const auto & consume_weak_ptr)
603+ {
604+ return consumer_ptr == consume_weak_ptr.lock ();
605+ });
606+ chassert (weak_it != all_consumers.end ());
607+ size_t consumer_number = std::distance (all_consumers.begin (), weak_it);
608+
609+ LOG_TRACE (log, " Closing #{} consumer (id: {})" , consumer_number, consumer_ptr->getMemberId ());
610+
611+ consumers_to_close.push_back (std::move (consumer_ptr));
612+ it = consumers.erase (it);
613+ }
614+ else
615+ ++it;
616+ }
617+ }
618+
619+ if (!consumers_to_close.empty ())
620+ {
621+ Stopwatch watch;
622+ size_t closed = consumers_to_close.size ();
623+ consumers_to_close.clear ();
624+ LOG_TRACE (log, " {} consumers had been closed (due to {} usec timeout). Took {} ms." ,
625+ closed, CONSUMER_TTL_USEC, watch.elapsedMilliseconds ());
626+ }
627+ }
628+
552629size_t StorageKafka::getMaxBlockSize () const
553630{
554631 return kafka_settings->kafka_max_block_size .changed
@@ -806,6 +883,8 @@ void StorageKafka::threadFunc(size_t idx)
806883
807884 mv_attached.store (false );
808885
886+ cleanConsumers ();
887+
809888 // Wait for attached views
810889 if (!task->stream_cancelled )
811890 task->holder ->scheduleAfter (RESCHEDULE_MS);
0 commit comments