88#include < Formats/FormatFactory.h>
99#include < Formats/SchemaInferenceUtils.h>
1010#include < IO/ReadBufferFromMemory.h>
11+ #include < IO/SharedThreadPools.h>
1112#include < IO/copyData.h>
1213#include < arrow/api.h>
1314#include < arrow/io/api.h>
3233#include < Interpreters/convertFieldToType.h>
3334#include < Storages/MergeTree/KeyCondition.h>
3435
36+ #include < shared_mutex>
3537#include < boost/algorithm/string/case_conv.hpp>
3638
3739namespace ProfileEvents
@@ -46,10 +48,6 @@ namespace CurrentMetrics
4648 extern const Metric IOThreads;
4749 extern const Metric IOThreadsActive;
4850 extern const Metric IOThreadsScheduled;
49-
50- extern const Metric FormatParsingThreads;
51- extern const Metric FormatParsingThreadsActive;
52- extern const Metric FormatParsingThreadsScheduled;
5351}
5452
5553namespace DB
@@ -583,20 +581,23 @@ ParquetBlockInputFormat::ParquetBlockInputFormat(
583581 , pending_chunks(PendingChunk::Compare { .row_group_first = format_settings_.parquet .preserve_order })
584582 , previous_block_missing_values(getPort().getHeader().columns())
585583{
586- size_t max_decoding_threads = parser_group->getParsingThreadsPerReader ();
587- if (max_decoding_threads > 1 )
588- pool = std::make_unique<ThreadPool>(CurrentMetrics::FormatParsingThreads, CurrentMetrics::FormatParsingThreadsActive, CurrentMetrics::FormatParsingThreadsScheduled, max_decoding_threads);
584+ use_thread_pool = parser_group->max_parsing_threads > 1 ;
589585
590- size_t max_io_threads = parser_group->getIOThreadsPerReader ();
591- if (supportPrefetch (max_decoding_threads, max_io_threads))
592- io_pool = std::make_shared<ThreadPool>(CurrentMetrics::IOThreads, CurrentMetrics::IOThreadsActive, CurrentMetrics::IOThreadsScheduled, max_io_threads);
586+ bool row_group_prefetch =
587+ !use_thread_pool && parser_group->max_io_threads > 0 &&
588+ format_settings.parquet .enable_row_group_prefetch &&
589+ !format_settings.parquet .use_native_reader ;
590+ if (row_group_prefetch)
591+ io_pool = std::make_shared<ThreadPool>(
592+ CurrentMetrics::IOThreads, CurrentMetrics::IOThreadsActive, CurrentMetrics::IOThreadsScheduled,
593+ parser_group->getIOThreadsPerReader ());
593594}
594595
595596ParquetBlockInputFormat::~ParquetBlockInputFormat ()
596597{
597598 is_stopped = true ;
598- if (pool )
599- pool-> wait ();
599+ if (use_thread_pool )
600+ shutdown-> shutdown ();
600601 if (io_pool)
601602 io_pool->wait ();
602603}
@@ -609,6 +610,10 @@ void ParquetBlockInputFormat::initializeIfNeeded()
609610 std::call_once (parser_group->init_flag , [&]
610611 {
611612 parser_group->initKeyCondition (getPort ().getHeader ());
613+
614+ if (use_thread_pool)
615+ parser_group->parsing_runner .initThreadPool (
616+ getFormatParsingThreadPool ().get (), parser_group->max_parsing_threads , " ParquetDecoder" , CurrentThread::getGroup ());
612617 });
613618
614619 // Create arrow file adapter.
@@ -889,13 +894,15 @@ void ParquetBlockInputFormat::scheduleRowGroup(size_t row_group_batch_idx)
889894
890895 status = RowGroupBatchState::Status::Running;
891896
892- pool-> scheduleOrThrowOnError (
893- [this , row_group_batch_idx, thread_group = CurrentThread::getGroup () ]()
897+ parser_group-> parsing_runner (
898+ [this , row_group_batch_idx, shutdown_ = shutdown ]()
894899 {
900+ std::shared_lock shutdown_lock (*shutdown_, std::try_to_lock);
901+ if (!shutdown_lock.owns_lock ())
902+ return ;
903+
895904 try
896905 {
897- ThreadGroupSwitcher switcher (thread_group, " ParquetDecoder" );
898-
899906 threadFunction (row_group_batch_idx);
900907 }
901908 catch (...)
@@ -928,11 +935,6 @@ void ParquetBlockInputFormat::threadFunction(size_t row_group_batch_idx)
928935 return ;
929936 }
930937}
931- bool ParquetBlockInputFormat::supportPrefetch (size_t max_decoding_threads, size_t max_io_threads) const
932- {
933- return max_decoding_threads == 1 && max_io_threads > 0 && format_settings.parquet .enable_row_group_prefetch && !format_settings.parquet .use_native_reader ;
934- }
935-
936938std::shared_ptr<arrow::RecordBatchReader> ParquetBlockInputFormat::RowGroupPrefetchIterator::nextRowGroupReader ()
937939{
938940 if (prefetched_row_groups.empty ()) return nullptr ;
@@ -1069,7 +1071,7 @@ void ParquetBlockInputFormat::scheduleMoreWorkIfNeeded(std::optional<size_t> row
10691071 ++row_group_batches_completed;
10701072 }
10711073
1072- if (pool )
1074+ if (use_thread_pool )
10731075 {
10741076 size_t max_decoding_threads = parser_group->getParsingThreadsPerReader ();
10751077 while (row_group_batches_started - row_group_batches_completed < max_decoding_threads &&
@@ -1132,7 +1134,7 @@ Chunk ParquetBlockInputFormat::read()
11321134 if (row_group_batches_completed == row_group_batches.size ())
11331135 return {};
11341136
1135- if (pool )
1137+ if (use_thread_pool )
11361138 condvar.wait (lock);
11371139 else
11381140 decodeOneChunk (row_group_batches_completed, lock);
@@ -1142,8 +1144,11 @@ Chunk ParquetBlockInputFormat::read()
11421144void ParquetBlockInputFormat::resetParser ()
11431145{
11441146 is_stopped = true ;
1145- if (pool)
1146- pool->wait ();
1147+ if (use_thread_pool)
1148+ {
1149+ shutdown->shutdown ();
1150+ shutdown = std::make_shared<ShutdownHelper>();
1151+ }
11471152
11481153 arrow_file.reset ();
11491154 metadata.reset ();
0 commit comments