Skip to content

Commit 0fd0711

Browse files
Merge pull request #10910 from filimonov/kafka_drop_hang_fix
Fix for the hang during deletion of engine=Kafka
2 parents 0d76091 + 791f3a6 commit 0fd0711

File tree

4 files changed

+30
-28
lines changed

4 files changed

+30
-28
lines changed

src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -80,22 +80,9 @@ ReadBufferFromKafkaConsumer::ReadBufferFromKafkaConsumer(
8080
});
8181
}
8282

83-
ReadBufferFromKafkaConsumer::~ReadBufferFromKafkaConsumer()
84-
{
85-
/// NOTE: see https://github.com/edenhill/librdkafka/issues/2077
86-
try
87-
{
88-
if (!consumer->get_subscription().empty())
89-
consumer->unsubscribe();
90-
if (!assignment.empty())
91-
consumer->unassign();
92-
while (consumer->get_consumer_queue().next_event(100ms));
93-
}
94-
catch (const cppkafka::HandleException & e)
95-
{
96-
LOG_ERROR(log, "Exception from ReadBufferFromKafkaConsumer destructor: " << e.what());
97-
}
98-
}
83+
// NOTE on removed desctuctor: There is no need to unsubscribe prior to calling rd_kafka_consumer_close().
84+
// check: https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#termination
85+
// manual destruction was source of weird errors (hangs during droping kafka table, etc.)
9986

10087
void ReadBufferFromKafkaConsumer::commit()
10188
{
@@ -226,8 +213,13 @@ void ReadBufferFromKafkaConsumer::unsubscribe()
226213
// it should not raise exception as used in destructor
227214
try
228215
{
229-
if (!consumer->get_subscription().empty())
230-
consumer->unsubscribe();
216+
// From docs: Any previous subscription will be unassigned and unsubscribed first.
217+
consumer->subscribe(topics);
218+
219+
// I wanted to avoid explicit unsubscribe as it requires draining the messages
220+
// to close the consumer safely after unsubscribe
221+
// see https://github.com/edenhill/librdkafka/issues/2077
222+
// https://github.com/confluentinc/confluent-kafka-go/issues/189 etc.
231223
}
232224
catch (const cppkafka::HandleException & e)
233225
{

src/Storages/Kafka/ReadBufferFromKafkaConsumer.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ class ReadBufferFromKafkaConsumer : public ReadBuffer
2828
const std::atomic<bool> & stopped_,
2929
const Names & _topics
3030
);
31-
~ReadBufferFromKafkaConsumer() override;
3231

3332
void allowNext() { allowed = true; } // Allow to read next message.
3433
void commit(); // Commit all processed messages.
@@ -64,10 +63,13 @@ class ReadBufferFromKafkaConsumer : public ReadBuffer
6463

6564
const std::atomic<bool> & stopped;
6665

66+
// order is important, need to be destructed before consumer
6767
Messages messages;
6868
Messages::const_iterator current;
6969

7070
bool rebalance_happened = false;
71+
72+
// order is important, need to be destructed before consumer
7173
cppkafka::TopicPartitionList assignment;
7274
const Names topics;
7375

src/Storages/Kafka/StorageKafka.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,14 +235,19 @@ ProducerBufferPtr StorageKafka::createWriteBuffer(const Block & header)
235235
ConsumerBufferPtr StorageKafka::createReadBuffer()
236236
{
237237
cppkafka::Configuration conf;
238+
238239
conf.set("metadata.broker.list", brokers);
239240
conf.set("group.id", group);
240241
conf.set("client.id", VERSION_FULL);
242+
241243
conf.set("auto.offset.reset", "smallest"); // If no offset stored for this group, read all messages from the start
244+
245+
updateConfiguration(conf);
246+
247+
// those settings should not be changed by users.
242248
conf.set("enable.auto.commit", "false"); // We manually commit offsets after a stream successfully finished
243249
conf.set("enable.auto.offset.store", "false"); // Update offset automatically - to commit them all at once.
244250
conf.set("enable.partition.eof", "false"); // Ignore EOF messages
245-
updateConfiguration(conf);
246251

247252
// Create a consumer and subscribe to topics
248253
auto consumer = std::make_shared<cppkafka::Consumer>(conf);

tests/integration/test_storage_kafka/test.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,6 @@ def test_kafka_settings_new_syntax(kafka_cluster):
198198
kafka_check_result(result, True)
199199

200200

201-
@pytest.mark.skip(reason="https://github.com/edenhill/librdkafka/issues/2077")
202201
@pytest.mark.timeout(180)
203202
def test_kafka_consumer_hang(kafka_cluster):
204203

@@ -219,7 +218,7 @@ def test_kafka_consumer_hang(kafka_cluster):
219218
CREATE MATERIALIZED VIEW test.consumer TO test.view AS SELECT * FROM test.kafka;
220219
''')
221220

222-
time.sleep(12)
221+
time.sleep(10)
223222
instance.query('SELECT * FROM test.view')
224223

225224
# This should trigger heartbeat fail,
@@ -229,19 +228,23 @@ def test_kafka_consumer_hang(kafka_cluster):
229228
time.sleep(0.5)
230229
kafka_cluster.unpause_container('kafka1')
231230

231+
# print("Attempt to drop")
232232
instance.query('DROP TABLE test.kafka')
233233

234+
#kafka_cluster.open_bash_shell('instance')
235+
234236
instance.query('''
235237
DROP TABLE test.consumer;
236238
DROP TABLE test.view;
237239
''')
238240

239-
log = '/var/log/clickhouse-server/stderr.log'
240-
instance.exec_in_container(['grep', '-q', 'BROKERFAIL', log])
241-
instance.exec_in_container(['grep', '-q', '|ASSIGN|', log])
242-
instance.exec_in_container(['grep', '-q', 'Heartbeat failed: REBALANCE_IN_PROGRESS: group is rebalancing', log])
243-
instance.exec_in_container(['grep', '-q', 'Group "consumer_hang": waiting for rebalance_cb', log])
241+
# original problem appearance was a sequence of the following messages in librdkafka logs:
242+
# BROKERFAIL -> |ASSIGN| -> REBALANCE_IN_PROGRESS -> "waiting for rebalance_cb" (repeated forever)
243+
# so it was waiting forever while the application will execute queued rebalance callback
244244

245+
# from a user perspective: we expect no hanging 'drop' queries
246+
# 'dr'||'op' to avoid self matching
247+
assert int(instance.query("select count() from system.processes where position(lower(query),'dr'||'op')>0")) == 0
245248

246249
@pytest.mark.timeout(180)
247250
def test_kafka_csv_with_delimiter(kafka_cluster):
@@ -1234,7 +1237,7 @@ def test_exception_from_destructor(kafka_cluster):
12341237
DROP TABLE test.kafka;
12351238
''')
12361239

1237-
kafka_cluster.open_bash_shell('instance')
1240+
#kafka_cluster.open_bash_shell('instance')
12381241
assert TSV(instance.query('SELECT 1')) == TSV('1')
12391242

12401243

0 commit comments

Comments
 (0)