Skip to content

Commit 73a0ef6

Browse files
authored
Fix for the hang during deletion of engine=Kafka (one more time) (#11145)
* Added flag for safer rdkafka destruction, but more testing detected another hang case, which has no straigt solutions and can be workarounded currenly only by draining the consumer queue, so destructor is back * After review fixes * After review fixes2 * After review fixes3
1 parent 30d0e77 commit 73a0ef6

File tree

5 files changed

+129
-5
lines changed

5 files changed

+129
-5
lines changed

contrib/cppkafka

src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp

Lines changed: 67 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ namespace ErrorCodes
1515

1616
using namespace std::chrono_literals;
1717
const auto MAX_TIME_TO_WAIT_FOR_ASSIGNMENT_MS = 15000;
18+
const auto DRAIN_TIMEOUT_MS = 5000ms;
1819

1920

2021
ReadBufferFromKafkaConsumer::ReadBufferFromKafkaConsumer(
@@ -80,9 +81,72 @@ ReadBufferFromKafkaConsumer::ReadBufferFromKafkaConsumer(
8081
});
8182
}
8283

83-
// NOTE on removed desctuctor: There is no need to unsubscribe prior to calling rd_kafka_consumer_close().
84-
// check: https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#termination
85-
// manual destruction was source of weird errors (hangs during droping kafka table, etc.)
84+
ReadBufferFromKafkaConsumer::~ReadBufferFromKafkaConsumer()
85+
{
86+
try
87+
{
88+
if (!consumer->get_subscription().empty())
89+
{
90+
try
91+
{
92+
consumer->unsubscribe();
93+
}
94+
catch (const cppkafka::HandleException & e)
95+
{
96+
LOG_ERROR(log, "Error during unsubscribe: " << e.what());
97+
}
98+
drain();
99+
}
100+
}
101+
catch (const cppkafka::HandleException & e)
102+
{
103+
LOG_ERROR(log, "Error while destructing consumer: " << e.what());
104+
}
105+
106+
}
107+
108+
// Needed to drain rest of the messages / queued callback calls from the consumer
109+
// after unsubscribe, otherwise consumer will hang on destruction
110+
// see https://github.com/edenhill/librdkafka/issues/2077
111+
// https://github.com/confluentinc/confluent-kafka-go/issues/189 etc.
112+
void ReadBufferFromKafkaConsumer::drain()
113+
{
114+
auto start_time = std::chrono::steady_clock::now();
115+
cppkafka::Error last_error(RD_KAFKA_RESP_ERR_NO_ERROR);
116+
117+
while (true)
118+
{
119+
auto msg = consumer->poll(100ms);
120+
if (!msg)
121+
break;
122+
123+
auto error = msg.get_error();
124+
125+
if (error)
126+
{
127+
if (msg.is_eof() || error == last_error)
128+
{
129+
break;
130+
}
131+
else
132+
{
133+
LOG_ERROR(log, "Error during draining: " << error);
134+
}
135+
}
136+
137+
// i don't stop draining on first error,
138+
// only if it repeats once again sequentially
139+
last_error = error;
140+
141+
auto ts = std::chrono::steady_clock::now();
142+
if (std::chrono::duration_cast<std::chrono::milliseconds>(ts-start_time) > DRAIN_TIMEOUT_MS)
143+
{
144+
LOG_ERROR(log, "Timeout during draining.");
145+
break;
146+
}
147+
}
148+
}
149+
86150

87151
void ReadBufferFromKafkaConsumer::commit()
88152
{

src/Storages/Kafka/ReadBufferFromKafkaConsumer.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class ReadBufferFromKafkaConsumer : public ReadBuffer
2828
const std::atomic<bool> & stopped_,
2929
const Names & _topics
3030
);
31-
31+
~ReadBufferFromKafkaConsumer() override;
3232
void allowNext() { allowed = true; } // Allow to read next message.
3333
void commit(); // Commit all processed messages.
3434
void subscribe(); // Subscribe internal consumer to topics.
@@ -75,6 +75,8 @@ class ReadBufferFromKafkaConsumer : public ReadBuffer
7575
cppkafka::TopicPartitionList assignment;
7676
const Names topics;
7777

78+
void drain();
79+
7880
bool nextImpl() override;
7981
};
8082

src/Storages/Kafka/StorageKafka.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,7 @@ ConsumerBufferPtr StorageKafka::createReadBuffer()
293293

294294
// Create a consumer and subscribe to topics
295295
auto consumer = std::make_shared<cppkafka::Consumer>(conf);
296+
consumer->set_destroy_flags(RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE);
296297

297298
// Limit the number of batched messages to allow early cancellations
298299
const Settings & settings = global_context.getSettingsRef();

tests/integration/test_storage_kafka/test.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,50 @@ def test_kafka_consumer_hang(kafka_cluster):
246246
# 'dr'||'op' to avoid self matching
247247
assert int(instance.query("select count() from system.processes where position(lower(query),'dr'||'op')>0")) == 0
248248

249+
@pytest.mark.timeout(180)
250+
def test_kafka_consumer_hang2(kafka_cluster):
251+
252+
instance.query('''
253+
DROP TABLE IF EXISTS test.kafka;
254+
255+
CREATE TABLE test.kafka (key UInt64, value UInt64)
256+
ENGINE = Kafka
257+
SETTINGS kafka_broker_list = 'kafka1:19092',
258+
kafka_topic_list = 'consumer_hang2',
259+
kafka_group_name = 'consumer_hang2',
260+
kafka_format = 'JSONEachRow';
261+
262+
CREATE TABLE test.kafka2 (key UInt64, value UInt64)
263+
ENGINE = Kafka
264+
SETTINGS kafka_broker_list = 'kafka1:19092',
265+
kafka_topic_list = 'consumer_hang2',
266+
kafka_group_name = 'consumer_hang2',
267+
kafka_format = 'JSONEachRow';
268+
''')
269+
270+
# first consumer subscribe the topic, try to poll some data, and go to rest
271+
instance.query('SELECT * FROM test.kafka')
272+
273+
# second consumer do the same leading to rebalance in the first
274+
# consumer, try to poll some data
275+
instance.query('SELECT * FROM test.kafka2')
276+
277+
#echo 'SELECT * FROM test.kafka; SELECT * FROM test.kafka2; DROP TABLE test.kafka;' | clickhouse client -mn &
278+
# kafka_cluster.open_bash_shell('instance')
279+
280+
# first consumer has pending rebalance callback unprocessed (no poll after select)
281+
# one of those queries was failing because of
282+
# https://github.com/edenhill/librdkafka/issues/2077
283+
# https://github.com/edenhill/librdkafka/issues/2898
284+
instance.query('DROP TABLE test.kafka')
285+
instance.query('DROP TABLE test.kafka2')
286+
287+
288+
# from a user perspective: we expect no hanging 'drop' queries
289+
# 'dr'||'op' to avoid self matching
290+
assert int(instance.query("select count() from system.processes where position(lower(query),'dr'||'op')>0")) == 0
291+
292+
249293
@pytest.mark.timeout(180)
250294
def test_kafka_csv_with_delimiter(kafka_cluster):
251295
instance.query('''
@@ -1130,13 +1174,26 @@ def produce():
11301174

11311175
print(instance.query('SELECT count(), uniqExact(key), max(key) + 1 FROM test.destination'))
11321176

1177+
# Some queries to debug...
11331178
# SELECT * FROM test.destination where key in (SELECT key FROM test.destination group by key having count() <> 1)
11341179
# select number + 1 as key from numbers(4141) left join test.destination using (key) where test.destination.key = 0;
11351180
# SELECT * FROM test.destination WHERE key between 2360 and 2370 order by key;
11361181
# select _partition from test.destination group by _partition having count() <> max(_offset) + 1;
11371182
# select toUInt64(0) as _partition, number + 1 as _offset from numbers(400) left join test.destination using (_partition,_offset) where test.destination.key = 0 order by _offset;
11381183
# SELECT * FROM test.destination WHERE _partition = 0 and _offset between 220 and 240 order by _offset;
11391184

1185+
# CREATE TABLE test.reference (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092',
1186+
# kafka_topic_list = 'topic_with_multiple_partitions',
1187+
# kafka_group_name = 'rebalance_test_group_reference',
1188+
# kafka_format = 'JSONEachRow',
1189+
# kafka_max_block_size = 100000;
1190+
#
1191+
# CREATE MATERIALIZED VIEW test.reference_mv Engine=Log AS
1192+
# SELECT key, value, _topic,_key,_offset, _partition, _timestamp, 'reference' as _consumed_by
1193+
# FROM test.reference;
1194+
#
1195+
# select * from test.reference_mv left join test.destination using (key,_topic,_offset,_partition) where test.destination._consumed_by = '';
1196+
11401197
result = int(instance.query('SELECT count() == uniqExact(key) FROM test.destination'))
11411198

11421199
for consumer_index in range(NUMBER_OF_CONSURRENT_CONSUMERS):

0 commit comments

Comments
 (0)