Skip to content

Commit 46f5268

Browse files
committed
Fix incorrect WriteBuffer interface usage in message brokers producers
WriteBufferToKafkaProducer and WriteBufferToRabbitMQProducer uses WriteBuffer::set(nullptr, 0) which will leave the WriteBuffer in the invalid state, since after this available() is 0 and next() will not call nextImpl(). Stacktrace: 2021.02.02 05:27:16.248056 [ 97 ] {} <Fatal> BaseDaemon: ######################################## 2021.02.02 05:27:16.248697 [ 97 ] {} <Fatal> BaseDaemon: (version 21.3.1.5880, build id: 03F265087EF651DC4D8E569775FE9E91FFA5DE76) (from thread 93) (query_id: 8c34c220-f66a-45d2-8f4f-01193cede98b) Received signal Segmentation fault (11) 2021.02.02 05:27:16.249030 [ 97 ] {} <Fatal> BaseDaemon: Address: NULL pointer. Access: write. Address not mapped to object. 2021.02.02 05:27:16.249623 [ 97 ] {} <Fatal> BaseDaemon: Stack trace: 0x12f3dbeb 0x12efd075 0x12efc90e 0x12ebb0e9 0x12ebe0cb 0x12e1da18 0x12a8c857 0x1219aa62 0x1218fc82 0x1218fdaa 0x11e3f120 0x12dfa3df 0x12df42a4 0x12e00fa8 0x158cdc63 0x158ce38f 0x15a36b72 0x15a35110 0x15a33918 0x8adf12d 0x7fb5e0fe3609 0x7fb5e0f0a293 2021.02.02 05:27:16.408985 [ 97 ] {} <Fatal> BaseDaemon: 5. ./obj-x86_64-linux-gnu/../src/IO/WriteHelpers.h:64: DB::JSONEachRowRowOutputFormat::writeRowStartDelimiter() @ 0x12f3dbeb in /usr/bin/clickhouse 2021.02.02 05:27:16.534797 [ 97 ] {} <Fatal> BaseDaemon: 6. ./obj-x86_64-linux-gnu/../src/Processors/Formats/IRowOutputFormat.cpp:80: DB::IRowOutputFormat::write(std::__1::vector<COW<DB::IColumn>::immutable_ptr<DB::IColumn>, std::__1::allocator<COW<DB::IColumn>::immutable_ptr<DB::IColumn> > > const&, unsigned long) @ 0x12efd075 in /usr/bin/clickhouse 2021.02.02 05:27:16.599210 [ 97 ] {} <Fatal> BaseDaemon: 7. ./obj-x86_64-linux-gnu/../src/Processors/Formats/IRowOutputFormat.cpp:0: DB::IRowOutputFormat::consume(DB::Chunk) @ 0x12efc90e in /usr/bin/clickhouse 2021.02.02 05:27:16.680733 [ 97 ] {} <Fatal> BaseDaemon: 8. ./obj-x86_64-linux-gnu/../contrib/libcxx/include/memory:3211: DB::IOutputFormat::write(DB::Block const&) @ 0x12ebb0e9 in /usr/bin/clickhouse 2021.02.02 05:27:16.798768 [ 97 ] {} <Fatal> BaseDaemon: 9. ./obj-x86_64-linux-gnu/../src/Processors/Formats/OutputStreamToOutputFormat.cpp:15: DB::OutputStreamToOutputFormat::write(DB::Block const&) @ 0x12ebe0cb in /usr/bin/clickhouse 2021.02.02 05:27:16.981593 [ 97 ] {} <Fatal> BaseDaemon: 10. ./obj-x86_64-linux-gnu/../src/DataStreams/MaterializingBlockOutputStream.h:0: DB::MaterializingBlockOutputStream::write(DB::Block const&) @ 0x12e1da18 in /usr/bin/clickhouse 2021.02.02 05:27:17.090383 [ 97 ] {} <Fatal> BaseDaemon: 11. ./obj-x86_64-linux-gnu/../src/Storages/RabbitMQ/RabbitMQBlockOutputStream.cpp:61: DB::RabbitMQBlockOutputStream::write(DB::Block const&) @ 0x12a8c857 in /usr/bin/clickhouse 2021.02.02 05:27:17.140425 [ 97 ] {} <Fatal> BaseDaemon: 12. ./obj-x86_64-linux-gnu/../src/DataStreams/AddingDefaultBlockOutputStream.cpp:0: DB::AddingDefaultBlockOutputStream::write(DB::Block const&) @ 0x1219aa62 in /usr/bin/clickhouse 2021.02.02 05:27:17.190282 [ 97 ] {} <Fatal> BaseDaemon: 13. ./obj-x86_64-linux-gnu/../src/DataStreams/SquashingBlockOutputStream.cpp:0: DB::SquashingBlockOutputStream::finalize() @ 0x1218fc82 in /usr/bin/clickhouse 2021.02.02 05:27:17.240052 [ 97 ] {} <Fatal> BaseDaemon: 14. ./obj-x86_64-linux-gnu/../contrib/libcxx/include/memory:2844: DB::SquashingBlockOutputStream::writeSuffix() @ 0x1218fdaa in /usr/bin/clickhouse 2021.02.02 05:27:17.294527 [ 97 ] {} <Fatal> BaseDaemon: 15. ./obj-x86_64-linux-gnu/../src/DataStreams/CountingBlockOutputStream.h:37: DB::CountingBlockOutputStream::writeSuffix() @ 0x11e3f120 in /usr/bin/clickhouse 2021.02.02 05:27:17.405442 [ 97 ] {} <Fatal> BaseDaemon: 16. ./obj-x86_64-linux-gnu/../src/Server/TCPHandler.cpp:521: DB::TCPHandler::processInsertQuery(DB::Settings const&) @ 0x12dfa3df in /usr/bin/clickhouse 2021.02.02 05:27:17.475822 [ 97 ] {} <Fatal> BaseDaemon: 17. ./obj-x86_64-linux-gnu/../src/Server/TCPHandler.cpp:0: DB::TCPHandler::runImpl() @ 0x12df42a4 in /usr/bin/clickhouse 2021.02.02 05:27:17.609845 [ 97 ] {} <Fatal> BaseDaemon: 18. ./obj-x86_64-linux-gnu/../src/Server/TCPHandler.cpp:1419: DB::TCPHandler::run() @ 0x12e00fa8 in /usr/bin/clickhouse 2021.02.02 05:27:17.695292 [ 97 ] {} <Fatal> BaseDaemon: 19. ./obj-x86_64-linux-gnu/../contrib/poco/Net/src/TCPServerConnection.cpp:57: Poco::Net::TCPServerConnection::start() @ 0x158cdc63 in /usr/bin/clickhouse 2021.02.02 05:27:17.833612 [ 97 ] {} <Fatal> BaseDaemon: 20. ./obj-x86_64-linux-gnu/../contrib/poco/Net/src/TCPServerDispatcher.cpp:0: Poco::Net::TCPServerDispatcher::run() @ 0x158ce38f in /usr/bin/clickhouse 2021.02.02 05:27:17.907391 [ 97 ] {} <Fatal> BaseDaemon: 21. ./obj-x86_64-linux-gnu/../contrib/poco/Foundation/include/Poco/ScopedLock.h:36: Poco::PooledThread::run() @ 0x15a36b72 in /usr/bin/clickhouse 2021.02.02 05:27:18.033949 [ 97 ] {} <Fatal> BaseDaemon: 22. ./obj-x86_64-linux-gnu/../contrib/poco/Foundation/src/Thread.cpp:56: Poco::(anonymous namespace)::RunnableHolder::run() @ 0x15a35110 in /usr/bin/clickhouse 2021.02.02 05:27:18.122761 [ 97 ] {} <Fatal> BaseDaemon: 23. ./obj-x86_64-linux-gnu/../contrib/poco/Foundation/include/Poco/SharedPtr.h:277: Poco::ThreadImpl::runnableEntry(void*) @ 0x15a33918 in /usr/bin/clickhouse 2021.02.02 05:27:18.283663 [ 97 ] {} <Fatal> BaseDaemon: 24. __tsan_thread_start_func @ 0x8adf12d in /usr/bin/clickhouse 2021.02.02 05:27:18.284153 [ 97 ] {} <Fatal> BaseDaemon: 25. start_thread @ 0x9609 in /usr/lib/x86_64-linux-gnu/libpthread-2.31.so 2021.02.02 05:27:18.284544 [ 97 ] {} <Fatal> BaseDaemon: 26. __clone @ 0x122293 in /usr/lib/x86_64-linux-gnu/libc-2.31.so 2021.02.02 05:27:21.675458 [ 97 ] {} <Fatal> BaseDaemon: Calculated checksum of the binary: B53B58DB6CF8186EEC9EA9273F135E44. There is no information about the reference checksum. 2021.02.02 05:27:28.078805 [ 1 ] {} <Fatal> Application: Child process was terminated by signal 11.
1 parent fabace3 commit 46f5268

File tree

4 files changed

+28
-7
lines changed

4 files changed

+28
-7
lines changed

src/Storages/Kafka/WriteBufferToKafkaProducer.cpp

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ WriteBufferToKafkaProducer::WriteBufferToKafkaProducer(
4242
timestamp_column_index = column_index;
4343
}
4444
}
45+
46+
reinitializeChunks();
4547
}
4648

4749
WriteBufferToKafkaProducer::~WriteBufferToKafkaProducer()
@@ -108,9 +110,7 @@ void WriteBufferToKafkaProducer::countRow(const Columns & columns, size_t curren
108110
break;
109111
}
110112

111-
rows = 0;
112-
chunks.clear();
113-
set(nullptr, 0);
113+
reinitializeChunks();
114114
}
115115
}
116116

@@ -141,4 +141,14 @@ void WriteBufferToKafkaProducer::nextImpl()
141141
set(chunks.back().data(), chunk_size);
142142
}
143143

144+
void WriteBufferToKafkaProducer::reinitializeChunks()
145+
{
146+
rows = 0;
147+
chunks.clear();
148+
/// We cannot leave the buffer in the undefined state (i.e. without any
149+
/// underlying buffer), since in this case the WriteBuffeR::next() will
150+
/// not call our nextImpl() (due to available() == 0)
151+
nextImpl();
152+
}
153+
144154
}

src/Storages/Kafka/WriteBufferToKafkaProducer.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ class WriteBufferToKafkaProducer : public WriteBuffer
3030

3131
private:
3232
void nextImpl() override;
33+
void reinitializeChunks();
3334

3435
ProducerPtr producer;
3536
const std::string topic;

src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer(
5555
, max_rows(rows_per_message)
5656
, chunk_size(chunk_size_)
5757
{
58-
5958
loop = std::make_unique<uv_loop_t>();
6059
uv_loop_init(loop.get());
6160
event_handler = std::make_unique<RabbitMQHandler>(loop.get(), log);
@@ -85,6 +84,8 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer(
8584
key_arguments[matching[0]] = matching[1];
8685
}
8786
}
87+
88+
reinitializeChunks();
8889
}
8990

9091

@@ -122,9 +123,7 @@ void WriteBufferToRabbitMQProducer::countRow()
122123

123124
payload.append(last_chunk, 0, last_chunk_size);
124125

125-
rows = 0;
126-
chunks.clear();
127-
set(nullptr, 0);
126+
reinitializeChunks();
128127

129128
++payload_counter;
130129
payloads.push(std::make_pair(payload_counter, payload));
@@ -331,6 +330,15 @@ void WriteBufferToRabbitMQProducer::nextImpl()
331330
chunks.back().resize(chunk_size);
332331
set(chunks.back().data(), chunk_size);
333332
}
333+
void WriteBufferToRabbitMQProducer::reinitializeChunks()
334+
{
335+
rows = 0;
336+
chunks.clear();
337+
/// We cannot leave the buffer in the undefined state (i.e. without any
338+
/// underlying buffer), since in this case the WriteBuffeR::next() will
339+
/// not call our nextImpl() (due to available() == 0)
340+
nextImpl();
341+
}
334342

335343

336344
void WriteBufferToRabbitMQProducer::iterateEventLoop()

src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ class WriteBufferToRabbitMQProducer : public WriteBuffer
4141

4242
private:
4343
void nextImpl() override;
44+
void reinitializeChunks();
45+
4446
void iterateEventLoop();
4547
void writingFunc();
4648
bool setupConnection(bool reconnecting);

0 commit comments

Comments
 (0)