Skip to content

Commit dbfe932

Browse files
committed
Use shared ThreadPoolCallbackRunnerFast in ParquetBlockInputFormat
1 parent 7d5a0a8 commit dbfe932

File tree

3 files changed

+34
-29
lines changed

3 files changed

+34
-29
lines changed

src/Common/threadPoolCallbackRunner.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ void ThreadPoolCallbackRunnerFast::initThreadPool(ThreadPool & pool_, size_t max
2121
thread_name = thread_name_;
2222
thread_group = thread_group_;
2323

24-
/// TODO [parquet]: Maybe adjust number of threads dynamically based on queue size.
24+
/// We could dynamically add and remove threads based on load, but it's not clear whether it's
25+
/// worth the added complexity.
2526
threads = max_threads;
2627
for (size_t i = 0; i < max_threads; ++i)
2728
pool->scheduleOrThrowOnError([this] { threadFunction(); });

src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <Formats/FormatFactory.h>
99
#include <Formats/SchemaInferenceUtils.h>
1010
#include <IO/ReadBufferFromMemory.h>
11+
#include <IO/SharedThreadPools.h>
1112
#include <IO/copyData.h>
1213
#include <arrow/api.h>
1314
#include <arrow/io/api.h>
@@ -32,6 +33,7 @@
3233
#include <Interpreters/convertFieldToType.h>
3334
#include <Storages/MergeTree/KeyCondition.h>
3435

36+
#include <shared_mutex>
3537
#include <boost/algorithm/string/case_conv.hpp>
3638

3739
namespace ProfileEvents
@@ -46,10 +48,6 @@ namespace CurrentMetrics
4648
extern const Metric IOThreads;
4749
extern const Metric IOThreadsActive;
4850
extern const Metric IOThreadsScheduled;
49-
50-
extern const Metric FormatParsingThreads;
51-
extern const Metric FormatParsingThreadsActive;
52-
extern const Metric FormatParsingThreadsScheduled;
5351
}
5452

5553
namespace DB
@@ -583,20 +581,23 @@ ParquetBlockInputFormat::ParquetBlockInputFormat(
583581
, pending_chunks(PendingChunk::Compare { .row_group_first = format_settings_.parquet.preserve_order })
584582
, previous_block_missing_values(getPort().getHeader().columns())
585583
{
586-
size_t max_decoding_threads = parser_group->getParsingThreadsPerReader();
587-
if (max_decoding_threads > 1)
588-
pool = std::make_unique<ThreadPool>(CurrentMetrics::FormatParsingThreads, CurrentMetrics::FormatParsingThreadsActive, CurrentMetrics::FormatParsingThreadsScheduled, max_decoding_threads);
584+
use_thread_pool = parser_group->max_parsing_threads > 1;
589585

590-
size_t max_io_threads = parser_group->getIOThreadsPerReader();
591-
if (supportPrefetch(max_decoding_threads, max_io_threads))
592-
io_pool = std::make_shared<ThreadPool>(CurrentMetrics::IOThreads, CurrentMetrics::IOThreadsActive, CurrentMetrics::IOThreadsScheduled, max_io_threads);
586+
bool row_group_prefetch =
587+
!use_thread_pool && parser_group->max_io_threads > 0 &&
588+
format_settings.parquet.enable_row_group_prefetch &&
589+
!format_settings.parquet.use_native_reader;
590+
if (row_group_prefetch)
591+
io_pool = std::make_shared<ThreadPool>(
592+
CurrentMetrics::IOThreads, CurrentMetrics::IOThreadsActive, CurrentMetrics::IOThreadsScheduled,
593+
parser_group->getIOThreadsPerReader());
593594
}
594595

595596
ParquetBlockInputFormat::~ParquetBlockInputFormat()
596597
{
597598
is_stopped = true;
598-
if (pool)
599-
pool->wait();
599+
if (use_thread_pool)
600+
shutdown->shutdown();
600601
if (io_pool)
601602
io_pool->wait();
602603
}
@@ -609,6 +610,10 @@ void ParquetBlockInputFormat::initializeIfNeeded()
609610
std::call_once(parser_group->init_flag, [&]
610611
{
611612
parser_group->initKeyCondition(getPort().getHeader());
613+
614+
if (use_thread_pool)
615+
parser_group->parsing_runner.initThreadPool(
616+
getFormatParsingThreadPool().get(), parser_group->max_parsing_threads, "ParquetDecoder", CurrentThread::getGroup());
612617
});
613618

614619
// Create arrow file adapter.
@@ -889,13 +894,15 @@ void ParquetBlockInputFormat::scheduleRowGroup(size_t row_group_batch_idx)
889894

890895
status = RowGroupBatchState::Status::Running;
891896

892-
pool->scheduleOrThrowOnError(
893-
[this, row_group_batch_idx, thread_group = CurrentThread::getGroup()]()
897+
parser_group->parsing_runner(
898+
[this, row_group_batch_idx, shutdown_ = shutdown]()
894899
{
900+
std::shared_lock shutdown_lock(*shutdown_, std::try_to_lock);
901+
if (!shutdown_lock.owns_lock())
902+
return;
903+
895904
try
896905
{
897-
ThreadGroupSwitcher switcher(thread_group, "ParquetDecoder");
898-
899906
threadFunction(row_group_batch_idx);
900907
}
901908
catch (...)
@@ -928,11 +935,6 @@ void ParquetBlockInputFormat::threadFunction(size_t row_group_batch_idx)
928935
return;
929936
}
930937
}
931-
bool ParquetBlockInputFormat::supportPrefetch(size_t max_decoding_threads, size_t max_io_threads) const
932-
{
933-
return max_decoding_threads == 1 && max_io_threads > 0 && format_settings.parquet.enable_row_group_prefetch && !format_settings.parquet.use_native_reader;
934-
}
935-
936938
std::shared_ptr<arrow::RecordBatchReader> ParquetBlockInputFormat::RowGroupPrefetchIterator::nextRowGroupReader()
937939
{
938940
if (prefetched_row_groups.empty()) return nullptr;
@@ -1069,7 +1071,7 @@ void ParquetBlockInputFormat::scheduleMoreWorkIfNeeded(std::optional<size_t> row
10691071
++row_group_batches_completed;
10701072
}
10711073

1072-
if (pool)
1074+
if (use_thread_pool)
10731075
{
10741076
size_t max_decoding_threads = parser_group->getParsingThreadsPerReader();
10751077
while (row_group_batches_started - row_group_batches_completed < max_decoding_threads &&
@@ -1132,7 +1134,7 @@ Chunk ParquetBlockInputFormat::read()
11321134
if (row_group_batches_completed == row_group_batches.size())
11331135
return {};
11341136

1135-
if (pool)
1137+
if (use_thread_pool)
11361138
condvar.wait(lock);
11371139
else
11381140
decodeOneChunk(row_group_batches_completed, lock);
@@ -1142,8 +1144,11 @@ Chunk ParquetBlockInputFormat::read()
11421144
void ParquetBlockInputFormat::resetParser()
11431145
{
11441146
is_stopped = true;
1145-
if (pool)
1146-
pool->wait();
1147+
if (use_thread_pool)
1148+
{
1149+
shutdown->shutdown();
1150+
shutdown = std::make_shared<ShutdownHelper>();
1151+
}
11471152

11481153
arrow_file.reset();
11491154
metadata.reset();

src/Processors/Formats/Impl/ParquetBlockInputFormat.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,6 @@ class ParquetBlockInputFormat : public IInputFormat
8989

9090
void threadFunction(size_t row_group_batch_idx);
9191

92-
inline bool supportPrefetch(size_t max_decoding_threads, size_t max_io_threads) const;
93-
9492
// Data layout in the file:
9593
//
9694
// row group 0
@@ -327,7 +325,8 @@ class ParquetBlockInputFormat : public IInputFormat
327325

328326
// These are only used when max_decoding_threads > 1.
329327
size_t row_group_batches_started = 0;
330-
std::unique_ptr<ThreadPool> pool;
328+
bool use_thread_pool = false;
329+
std::shared_ptr<ShutdownHelper> shutdown = std::make_shared<ShutdownHelper>();
331330
std::shared_ptr<ThreadPool> io_pool;
332331

333332
BlockMissingValues previous_block_missing_values;

0 commit comments

Comments
 (0)