Skip to content

Commit ebad1bf

Browse files
committed
Move StorageKafka::createConsumer() into KafkaConsumer
Signed-off-by: Azat Khuzhin <[email protected]>
1 parent 0321820 commit ebad1bf

File tree

4 files changed

+21
-29
lines changed

4 files changed

+21
-29
lines changed

src/Storages/Kafka/KafkaConsumer.cpp

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
#include <Common/CurrentMetrics.h>
1515
#include <Common/ProfileEvents.h>
16+
#include <base/defines.h>
1617

1718
namespace CurrentMetrics
1819
{
@@ -63,9 +64,22 @@ KafkaConsumer::KafkaConsumer(
6364
{
6465
}
6566

66-
void KafkaConsumer::setConsumer(const ConsumerPtr & consumer_)
67+
void KafkaConsumer::createConsumer(cppkafka::Configuration consumer_config)
6768
{
68-
consumer = consumer_;
69+
chassert(!consumer.get());
70+
71+
/// Using this should be safe, since cppkafka::Consumer can poll messages
72+
/// (including statistics, which will trigger the callback below) only via
73+
/// KafkaConsumer.
74+
if (consumer_config.get("statistics.interval.ms") != "0")
75+
{
76+
consumer_config.set_stats_callback([this](cppkafka::KafkaHandleBase &, const std::string & stat_json)
77+
{
78+
setRDKafkaStat(stat_json);
79+
});
80+
}
81+
consumer = std::make_shared<cppkafka::Consumer>(consumer_config);
82+
consumer->set_destroy_flags(RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE);
6983

7084
// called (synchronously, during poll) when we enter the consumer group
7185
consumer->set_assignment_callback([this](const cppkafka::TopicPartitionList & topic_partitions)

src/Storages/Kafka/KafkaConsumer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ class KafkaConsumer
7272

7373
~KafkaConsumer();
7474

75-
void setConsumer(const ConsumerPtr & consumer);
75+
void createConsumer(cppkafka::Configuration consumer_config);
7676
bool hasConsumer() const { return consumer.get() != nullptr; }
7777
ConsumerPtr && moveConsumer() { return std::move(consumer); }
7878

src/Storages/Kafka/StorageKafka.cpp

Lines changed: 4 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -530,8 +530,11 @@ KafkaConsumerPtr StorageKafka::popConsumer(std::chrono::milliseconds timeout)
530530
else if (!ret_consumer_ptr && closed_consumer_index.has_value())
531531
{
532532
ret_consumer_ptr = consumers[*closed_consumer_index];
533+
534+
cppkafka::Configuration consumer_config = getConsumerConfiguration(*closed_consumer_index);
533535
/// It should be OK to create consumer under lock, since it should be fast (without subscribing).
534-
ret_consumer_ptr->setConsumer(createConsumer(*ret_consumer_ptr, *closed_consumer_index));
536+
ret_consumer_ptr->createConsumer(consumer_config);
537+
LOG_TRACE(log, "Created #{} consumer", *closed_consumer_index);
535538
}
536539
/// 3. There is no free consumer and num_consumers already created, waiting @timeout.
537540
else
@@ -576,29 +579,6 @@ KafkaConsumerPtr StorageKafka::createKafkaConsumer(size_t consumer_number)
576579
return kafka_consumer_ptr;
577580
}
578581

579-
ConsumerPtr StorageKafka::createConsumer(KafkaConsumer & kafka_consumer, size_t consumer_number)
580-
{
581-
cppkafka::Configuration consumer_config = getConsumerConfiguration(consumer_number);
582-
583-
/// Using KafkaConsumer by reference should be safe, since
584-
/// cppkafka::Consumer can poll messages (including statistics, which will
585-
/// trigger the callback below) only via KafkaConsumer.
586-
if (consumer_config.get("statistics.interval.ms") != "0")
587-
{
588-
consumer_config.set_stats_callback([&kafka_consumer](cppkafka::KafkaHandleBase &, const std::string & stat_json)
589-
{
590-
kafka_consumer.setRDKafkaStat(stat_json);
591-
});
592-
}
593-
594-
auto consumer_ptr = std::make_shared<cppkafka::Consumer>(consumer_config);
595-
consumer_ptr->set_destroy_flags(RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE);
596-
597-
LOG_TRACE(log, "Created #{} consumer", consumer_number);
598-
599-
return consumer_ptr;
600-
}
601-
602582
cppkafka::Configuration StorageKafka::getConsumerConfiguration(size_t consumer_number)
603583
{
604584
cppkafka::Configuration conf;

src/Storages/Kafka/StorageKafka.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,6 @@ class StorageKafka final : public IStorage, WithContext
130130
SettingsChanges createSettingsAdjustments();
131131
/// Creates KafkaConsumer object without real consumer (cppkafka::Consumer)
132132
KafkaConsumerPtr createKafkaConsumer(size_t consumer_number);
133-
/// Creates real cppkafka::Consumer object
134-
ConsumerPtr createConsumer(KafkaConsumer & kafka_consumer, size_t consumer_number);
135133
/// Returns consumer configuration with all changes that had been overwritten in config
136134
cppkafka::Configuration getConsumerConfiguration(size_t consumer_number);
137135

0 commit comments

Comments
 (0)