Skip to content

Commit 2268961

Browse files
authored
Merge branch 'project-antalya-24.12.2' into project-antalya-24.12.2-s3cluster-hive
2 parents a238931 + 5a4f3bf commit 2268961

File tree

17 files changed

+247
-11
lines changed

17 files changed

+247
-11
lines changed

src/Common/ProfileEvents.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -914,8 +914,10 @@ The server successfully detected this situation and will download merged part fr
914914
M(MemoryWorkerRun, "Number of runs done by MemoryWorker in background", ValueType::Number) \
915915
M(MemoryWorkerRunElapsedMicroseconds, "Total time spent by MemoryWorker for background work", ValueType::Microseconds) \
916916
\
917-
M(ParquetFetchWaitTimeMicroseconds, "Time of waiting fetching parquet data", ValueType::Microseconds) \
918-
917+
M(ParquetFetchWaitTimeMicroseconds, "Time of waiting fetching parquet data", ValueType::Microseconds) \
918+
\
919+
M(ParquetMetaDataCacheHits, "Number of times the read from filesystem cache hit the cache.", ValueType::Number) \
920+
M(ParquetMetaDataCacheMisses, "Number of times the read from filesystem cache miss the cache.", ValueType::Number) \
919921

920922
#ifdef APPLY_FOR_EXTERNAL_EVENTS
921923
#define APPLY_FOR_EVENTS(M) APPLY_FOR_BUILTIN_EVENTS(M) APPLY_FOR_EXTERNAL_EVENTS(M)

src/Core/FormatFactorySettings.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1251,7 +1251,7 @@ Set the quoting rule for identifiers in SHOW CREATE query
12511251
DECLARE(IdentifierQuotingStyle, show_create_query_identifier_quoting_style, IdentifierQuotingStyle::Backticks, R"(
12521252
Set the quoting style for identifiers in SHOW CREATE query
12531253
)", 0) \
1254-
1254+
DECLARE(Bool, input_format_parquet_use_metadata_cache, false, R"(Enable parquet file metadata caching)", 0) \
12551255
// End of FORMAT_FACTORY_SETTINGS
12561256

12571257
#define OBSOLETE_FORMAT_SETTINGS(M, ALIAS) \

src/Core/ServerSettings.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -212,9 +212,8 @@ namespace DB
212212
DECLARE(UInt64, threadpool_writer_queue_size, 1000000, "Number of tasks which is possible to push into background pool for write requests to object storages", 0) \
213213
DECLARE(UInt64, iceberg_catalog_threadpool_pool_size, 50, "Size of background pool for iceberg catalog", 0) \
214214
DECLARE(UInt64, iceberg_catalog_threadpool_queue_size, 1000000, "Number of tasks which is possible to push into iceberg catalog pool", 0) \
215-
DECLARE(UInt32, allow_feature_tier, 0, "0 - All feature tiers allowed (experimental, beta, production). 1 - Only beta and production feature tiers allowed. 2 - Only production feature tier allowed", 0) \
216-
217-
215+
DECLARE(UInt32, allow_feature_tier, 0, "0 - All feature tiers allowed (experimental, beta, production). 1 - Only beta and production feature tiers allowed. 2 - Only production feature tier allowed", 0) \
216+
DECLARE(UInt64, input_format_parquet_metadata_cache_max_size, 500000000, "Maximum size of parquet file metadata cache", 0) \
218217
// clang-format on
219218

220219
/// If you add a setting which can be updated at runtime, please update 'changeable_settings' map in dumpToSystemServerSettingsColumns below

src/Interpreters/ClusterProxy/executeQuery.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,7 @@ void executeQuery(
442442
not_optimized_cluster->getName());
443443

444444
read_from_remote->setStepDescription("Read from remote replica");
445+
read_from_remote->setIsRemoteFunction(is_remote_function);
445446
plan->addStep(std::move(read_from_remote));
446447
plan->addInterpreterContext(new_context);
447448
plans.emplace_back(std::move(plan));

src/Interpreters/Context.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2702,8 +2702,11 @@ void Context::setCurrentQueryId(const String & query_id)
27022702

27032703
client_info.current_query_id = query_id_to_set;
27042704

2705-
if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY)
2705+
if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY
2706+
&& (getApplicationType() != ApplicationType::SERVER || client_info.initial_query_id.empty()))
2707+
{
27062708
client_info.initial_query_id = client_info.current_query_id;
2709+
}
27072710
}
27082711

27092712
void Context::setBackgroundOperationTypeForContext(ClientInfo::BackgroundOperationType background_operation)

src/Processors/Formats/IInputFormat.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ class IInputFormat : public SourceWithKeyCondition
6767

6868
void needOnlyCount() { need_only_count = true; }
6969

70+
/// Set additional info/key/id related to underlying storage of the ReadBuffer
71+
virtual void setStorageRelatedUniqueKey(const ServerSettings & /* server_settings */, const Settings & /*settings*/, const String & /*key*/) {}
72+
7073
protected:
7174
ReadBuffer & getReadBuffer() const { chassert(in); return *in; }
7275

src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp

Lines changed: 84 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33

44
#if USE_PARQUET
55

6+
#include <Core/Settings.h>
7+
#include <Core/ServerSettings.h>
8+
#include <Common/ProfileEvents.h>
69
#include <Common/logger_useful.h>
710
#include <Common/ThreadPool.h>
811
#include <Formats/FormatFactory.h>
@@ -33,6 +36,8 @@
3336
namespace ProfileEvents
3437
{
3538
extern const Event ParquetFetchWaitTimeMicroseconds;
39+
extern const Event ParquetMetaDataCacheHits;
40+
extern const Event ParquetMetaDataCacheMisses;
3641
}
3742

3843
namespace CurrentMetrics
@@ -49,6 +54,16 @@ namespace CurrentMetrics
4954
namespace DB
5055
{
5156

57+
namespace Setting
58+
{
59+
extern const SettingsBool input_format_parquet_use_metadata_cache;
60+
}
61+
62+
namespace ServerSetting
63+
{
64+
extern const ServerSettingsUInt64 input_format_parquet_metadata_cache_max_size;
65+
}
66+
5267
namespace ErrorCodes
5368
{
5469
extern const int BAD_ARGUMENTS;
@@ -507,6 +522,58 @@ static std::vector<Range> getHyperrectangleForRowGroup(const parquet::FileMetaDa
507522
return hyperrectangle;
508523
}
509524

525+
ParquetFileMetaDataCache::ParquetFileMetaDataCache(UInt64 max_size_bytes)
526+
: CacheBase(max_size_bytes) {}
527+
528+
ParquetFileMetaDataCache * ParquetFileMetaDataCache::instance(UInt64 max_size_bytes)
529+
{
530+
static ParquetFileMetaDataCache instance(max_size_bytes);
531+
return &instance;
532+
}
533+
534+
std::shared_ptr<parquet::FileMetaData> ParquetBlockInputFormat::readMetadataFromFile()
535+
{
536+
createArrowFileIfNotCreated();
537+
return parquet::ReadMetaData(arrow_file);
538+
}
539+
540+
std::shared_ptr<parquet::FileMetaData> ParquetBlockInputFormat::getFileMetaData()
541+
{
542+
// in-memory cache is not implemented for local file operations, only for remote files
543+
// there is a chance the user sets `input_format_parquet_use_metadata_cache=1` for a local file operation
544+
// and the cache_key won't be set. Therefore, we also need to check for metadata_cache.key
545+
if (!metadata_cache.use_cache || metadata_cache.key.empty())
546+
{
547+
return readMetadataFromFile();
548+
}
549+
550+
auto [parquet_file_metadata, loaded] = ParquetFileMetaDataCache::instance(metadata_cache.max_size_bytes)->getOrSet(
551+
metadata_cache.key,
552+
[&]()
553+
{
554+
return readMetadataFromFile();
555+
}
556+
);
557+
if (loaded)
558+
ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheMisses);
559+
else
560+
ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheHits);
561+
return parquet_file_metadata;
562+
}
563+
564+
void ParquetBlockInputFormat::createArrowFileIfNotCreated()
565+
{
566+
if (arrow_file)
567+
{
568+
return;
569+
}
570+
571+
// Create arrow file adapter.
572+
// TODO: Make the adapter do prefetching on IO threads, based on the full set of ranges that
573+
// we'll need to read (which we know in advance). Use max_download_threads for that.
574+
arrow_file = asArrowFile(*in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true);
575+
}
576+
510577
std::unordered_set<std::size_t> getBloomFilterFilteringColumnKeys(const KeyCondition::RPN & rpn)
511578
{
512579
std::unordered_set<std::size_t> column_keys;
@@ -606,7 +673,7 @@ void ParquetBlockInputFormat::initializeIfNeeded()
606673
if (is_stopped)
607674
return;
608675

609-
metadata = parquet::ReadMetaData(arrow_file);
676+
metadata = getFileMetaData();
610677
const bool prefetch_group = supportPrefetch();
611678

612679
std::shared_ptr<arrow::Schema> schema;
@@ -706,6 +773,8 @@ void ParquetBlockInputFormat::initializeIfNeeded()
706773
}
707774
}
708775

776+
bool has_row_groups_to_read = false;
777+
709778
auto skip_row_group_based_on_filters = [&](int row_group)
710779
{
711780
if (!format_settings.parquet.filter_push_down && !format_settings.parquet.bloom_filter_push_down)
@@ -755,9 +824,23 @@ void ParquetBlockInputFormat::initializeIfNeeded()
755824
row_group_batches.back().total_bytes_compressed += row_group_size;
756825
auto rows = adaptive_chunk_size(row_group);
757826
row_group_batches.back().adaptive_chunk_size = rows ? rows : format_settings.parquet.max_block_size;
827+
828+
has_row_groups_to_read = true;
829+
}
830+
831+
if (has_row_groups_to_read)
832+
{
833+
createArrowFileIfNotCreated();
758834
}
759835
}
760836

837+
void ParquetBlockInputFormat::setStorageRelatedUniqueKey(const ServerSettings & server_settings, const Settings & settings, const String & key_)
838+
{
839+
metadata_cache.key = key_;
840+
metadata_cache.use_cache = settings[Setting::input_format_parquet_use_metadata_cache];
841+
metadata_cache.max_size_bytes = server_settings[ServerSetting::input_format_parquet_metadata_cache_max_size];
842+
}
843+
761844
void ParquetBlockInputFormat::initializeRowGroupBatchReader(size_t row_group_batch_idx)
762845
{
763846
const bool row_group_prefetch = supportPrefetch();

src/Processors/Formats/Impl/ParquetBlockInputFormat.h

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include "config.h"
33
#if USE_PARQUET
44

5+
#include <Common/CacheBase.h>
56
#include <Processors/Formats/IInputFormat.h>
67
#include <Processors/Formats/ISchemaReader.h>
78
#include <Formats/FormatSettings.h>
@@ -72,6 +73,8 @@ class ParquetBlockInputFormat : public IInputFormat
7273

7374
size_t getApproxBytesReadForChunk() const override { return previous_approx_bytes_read_for_chunk; }
7475

76+
void setStorageRelatedUniqueKey(const ServerSettings & server_settings, const Settings & settings, const String & key_) override;
77+
7578
private:
7679
Chunk read() override;
7780

@@ -90,6 +93,11 @@ class ParquetBlockInputFormat : public IInputFormat
9093

9194
void threadFunction(size_t row_group_batch_idx);
9295

96+
void createArrowFileIfNotCreated();
97+
std::shared_ptr<parquet::FileMetaData> readMetadataFromFile();
98+
99+
std::shared_ptr<parquet::FileMetaData> getFileMetaData();
100+
93101
inline bool supportPrefetch() const;
94102

95103
// Data layout in the file:
@@ -338,6 +346,12 @@ class ParquetBlockInputFormat : public IInputFormat
338346
std::exception_ptr background_exception = nullptr;
339347
std::atomic<int> is_stopped{0};
340348
bool is_initialized = false;
349+
struct Cache
350+
{
351+
String key;
352+
bool use_cache = false;
353+
UInt64 max_size_bytes{0};
354+
} metadata_cache;
341355
};
342356

343357
class ParquetSchemaReader : public ISchemaReader
@@ -356,6 +370,16 @@ class ParquetSchemaReader : public ISchemaReader
356370
std::shared_ptr<parquet::FileMetaData> metadata;
357371
};
358372

373+
class ParquetFileMetaDataCache : public CacheBase<String, parquet::FileMetaData>
374+
{
375+
public:
376+
static ParquetFileMetaDataCache * instance(UInt64 max_size_bytes);
377+
void clear() {}
378+
379+
private:
380+
ParquetFileMetaDataCache(UInt64 max_size_bytes);
381+
};
382+
359383
}
360384

361385
#endif

src/Processors/QueryPlan/ReadFromRemote.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,8 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStream
167167
my_main_table = main_table, my_table_func_ptr = table_func_ptr,
168168
my_scalars = scalars, my_external_tables = external_tables,
169169
my_stage = stage, local_delay = shard.local_delay,
170-
add_agg_info, add_totals, add_extremes, async_read, async_query_sending]() mutable
170+
add_agg_info, add_totals, add_extremes, async_read, async_query_sending,
171+
my_is_remote_function = is_remote_function]() mutable
171172
-> QueryPipelineBuilder
172173
{
173174
auto current_settings = my_context->getSettingsRef();
@@ -221,6 +222,8 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStream
221222
{DataTypeUInt32().createColumnConst(1, my_shard.shard_info.shard_num), std::make_shared<DataTypeUInt32>(), "_shard_num"}};
222223
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
223224
std::move(connections), query_string, header, my_context, my_throttler, my_scalars, my_external_tables, my_stage);
225+
remote_query_executor->setRemoteFunction(my_is_remote_function);
226+
remote_query_executor->setShardCount(my_shard_count);
224227

225228
auto pipe = createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read, async_query_sending);
226229
QueryPipelineBuilder builder;
@@ -304,6 +307,8 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact
304307
priority_func);
305308
remote_query_executor->setLogger(log);
306309
remote_query_executor->setPoolMode(PoolMode::GET_ONE);
310+
remote_query_executor->setRemoteFunction(is_remote_function);
311+
remote_query_executor->setShardCount(shard_count);
307312

308313
if (!table_func_ptr)
309314
remote_query_executor->setMainTable(shard.main_table ? shard.main_table : main_table);
@@ -320,6 +325,8 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact
320325
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
321326
shard.shard_info.pool, query_string, shard.header, context, throttler, scalars, external_tables, stage);
322327
remote_query_executor->setLogger(log);
328+
remote_query_executor->setRemoteFunction(is_remote_function);
329+
remote_query_executor->setShardCount(shard_count);
323330

324331
if (context->canUseTaskBasedParallelReplicas())
325332
{

src/Processors/QueryPlan/ReadFromRemote.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ class ReadFromRemote final : public ISourceStep
4242

4343
void enableMemoryBoundMerging();
4444
void enforceAggregationInOrder();
45+
void setIsRemoteFunction(bool is_remote_function_ = true) { is_remote_function = is_remote_function_; }
4546

4647
private:
4748
ClusterProxy::SelectStreamFactory::Shards shards;
@@ -57,6 +58,7 @@ class ReadFromRemote final : public ISourceStep
5758
UInt32 shard_count;
5859
const String cluster_name;
5960
std::optional<GetPriorityForLoadBalancing> priority_func_factory;
61+
bool is_remote_function = false;
6062

6163
void addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStreamFactory::Shard & shard);
6264
void addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFactory::Shard & shard);

0 commit comments

Comments
 (0)