Skip to content

Commit 2310269

Browse files
committed
Add async metadata prefetch and staleness for Iceberg
1 parent c3ce959 commit 2310269

File tree

19 files changed

+614
-95
lines changed

19 files changed

+614
-95
lines changed

src/Common/CurrentMetrics.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,8 @@
211211
M(IcebergCatalogThreads, "Number of threads in the IcebergCatalog thread pool.") \
212212
M(IcebergCatalogThreadsActive, "Number of threads in the IcebergCatalog thread pool running a task.") \
213213
M(IcebergCatalogThreadsScheduled, "Number of queued or active jobs in the IcebergCatalog thread pool.") \
214+
M(IcebergSchedulePoolTask, "Number of tasks in the background schedule pool for Iceberg tables.") \
215+
M(IcebergSchedulePoolSize, "Limit on number of tasks in the background schedule pool for Iceberg tables.") \
214216
M(ParallelWithQueryThreads, "Number of threads in the threadpool for processing PARALLEL WITH queries.") \
215217
M(ParallelWithQueryActiveThreads, "Number of active threads in the threadpool for processing PARALLEL WITH queries.") \
216218
M(ParallelWithQueryScheduledThreads, "Number of queued or active jobs in the threadpool for processing PARALLEL WITH queries.") \

src/Common/ProfileEvents.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@
9797
M(PrimaryIndexCacheMisses, "Number of times an entry has not been found in the primary index cache, so we had to load a index file in memory, which is a costly operation, adding to query latency.", ValueType::Number) \
9898
M(IcebergMetadataFilesCacheHits, "Number of times iceberg metadata files have been found in the cache.", ValueType::Number) \
9999
M(IcebergMetadataFilesCacheMisses, "Number of times iceberg metadata files have not been found in the iceberg metadata cache and had to be read from (remote) disk.", ValueType::Number) \
100+
M(IcebergMetadataFilesCacheStaleMisses, "Number of times iceberg metadata files have been found in the cache, but were considered stale and had to be read from (remote) disk.", ValueType::Number) \
100101
M(IcebergMetadataFilesCacheWeightLost, "Approximate number of bytes evicted from the iceberg metadata cache.", ValueType::Number) \
101102
M(IcebergMetadataReadWaitTimeMicroseconds, "Total time data readers spend waiting for iceberg metadata files to be read and parsed, summed across all reader threads.", ValueType::Microseconds) \
102103
M(ParquetMetadataCacheHits, "Number of times parquet metadata has been found in the cache.", ValueType::Number) \

src/Common/setThreadName.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ namespace DB
6666
M(HASHED_DICT_LOAD, "HashedDictLoad") \
6767
M(HTTP_HANDLER, "HTTPHandler") \
6868
M(ICEBERG_ITERATOR, "IcebergIter") \
69+
M(ICEBERG_SCHEDULE_POOL, "IcebergSchPool") \
6970
M(INTERSERVER_HANDLER, "IntersrvHandler") \
7071
M(IO_URING_MONITOR, "IoUringMonitr") \
7172
M(KEEPER_HANDLER, "KeeperHandler") \

src/Core/Defines.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ static constexpr auto DEFAULT_ICEBERG_METADATA_CACHE_POLICY = "SLRU";
119119
static constexpr auto DEFAULT_ICEBERG_METADATA_CACHE_MAX_SIZE = 1_GiB;
120120
static constexpr auto DEFAULT_ICEBERG_METADATA_CACHE_SIZE_RATIO = 0.5;
121121
static constexpr auto DEFAULT_ICEBERG_METADATA_CACHE_MAX_ENTRIES = 1000;
122+
static constexpr auto DEFAULT_ICEBERG_METADATA_ASYNC_REFRESH_PERIOD = 10'000;
122123
static constexpr auto DEFAULT_PARQUET_METADATA_CACHE_POLICY = "SLRU";
123124
static constexpr auto DEFAULT_PARQUET_METADATA_CACHE_MAX_SIZE = 512_MiB;
124125
static constexpr auto DEFAULT_PARQUET_METADATA_CACHE_SIZE_RATIO = 0.5;

src/Core/ServerSettings.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1159,6 +1159,7 @@ The policy on how to perform a scheduling of CPU slots specified by `concurrent_
11591159
DECLARE(UInt64, threadpool_writer_queue_size, 10000, R"(Number of tasks which is possible to push into background pool for write requests to object storages)", 0) \
11601160
DECLARE(UInt64, iceberg_catalog_threadpool_pool_size, 50, R"(Size of background pool for iceberg catalog)", 0) \
11611161
DECLARE(UInt64, iceberg_catalog_threadpool_queue_size, 10000, R"(Number of tasks which is possible to push into iceberg catalog pool)", 0) \
1162+
DECLARE(UInt64, iceberg_background_schedule_pool_size, 10, "Size of thread pool to asynchronously fetch the latest metadata from a remote iceberg catalog; the pool is shared by all the active tables.", 0) \
11621163
DECLARE(UInt64, drop_distributed_cache_pool_size, 8, R"(The size of the threadpool used for dropping distributed cache.)", 0) \
11631164
DECLARE(UInt64, drop_distributed_cache_queue_size, 1000, R"(The queue size of the threadpool used for dropping distributed cache.)", 0) \
11641165
DECLARE(Bool, distributed_cache_apply_throttling_settings_from_client, true, R"(Whether cache server should apply throttling settings received from client.)", 0) \

src/Core/Settings.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5202,6 +5202,10 @@ Possible values:
52025202
52035203
- 0 - Disabled
52045204
- 1 - Enabled
5205+
)", 0) \
5206+
DECLARE(UInt64, iceberg_metadata_staleness_seconds, 0, R"(
5207+
If non-zero, skip fetching iceberg metadata from remote catalog if the cached metadata was refreshed within the given period.
5208+
Zero means to always fetch the latest metadata. Setting this a non-zero trades metadata freshness to much lower latency.
52055209
)", 0) \
52065210
DECLARE(Bool, use_parquet_metadata_cache, true, R"(
52075211
If turned on, parquet format may utilize the parquet metadata cache.
@@ -5211,7 +5215,6 @@ Possible values:
52115215
- 0 - Disabled
52125216
- 1 - Enabled
52135217
)", 0) \
5214-
\
52155218
DECLARE(Bool, use_query_cache, false, R"(
52165219
If turned on, `SELECT` queries may utilize the [query cache](../query-cache.md). Parameters [enable_reads_from_query_cache](#enable_reads_from_query_cache)
52175220
and [enable_writes_to_query_cache](#enable_writes_to_query_cache) control in more detail how the cache is used.

src/Core/SettingsChangesHistory.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
5050
{"use_partition_pruning", true, true, "New setting controlling whether MergeTree uses partition key for pruning. 'use_partition_key' is an alias for this setting."},
5151
{"use_partition_key", true, true, "Alias for setting 'use_partition_pruning'."},
5252
{"type_json_allow_duplicated_key_with_literal_and_nested_object", false, true, "Allow duplicated paths in JSON type with literal and nested object by default"},
53+
{"iceberg_metadata_staleness_seconds", 0, 0, "New setting to allow reading of stale iceberg metadata without accessing remote catalog"},
5354
{"webassembly_udf_max_fuel", 100'000, 100'000, "New setting to limit CPU instructions (fuel) per WebAssembly UDF instance execution."},
5455
{"webassembly_udf_max_memory", 128_MiB, 128_MiB, "New setting to limit memory per WebAssembly UDF instance."},
5556
{"webassembly_udf_max_input_block_size", 0, 0, "New setting to limit input block size for WebAssembly UDFs."},

src/Interpreters/Context.cpp

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,8 @@ namespace CurrentMetrics
223223
extern const Metric BackgroundFetchesPoolSize;
224224
extern const Metric BackgroundCommonPoolTask;
225225
extern const Metric BackgroundCommonPoolSize;
226+
extern const Metric IcebergSchedulePoolTask;
227+
extern const Metric IcebergSchedulePoolSize;
226228
extern const Metric MarksLoaderThreads;
227229
extern const Metric MarksLoaderThreadsActive;
228230
extern const Metric MarksLoaderThreadsScheduled;
@@ -353,6 +355,7 @@ namespace ServerSetting
353355
extern const ServerSettingsFloat background_merges_mutations_concurrency_ratio;
354356
extern const ServerSettingsString background_merges_mutations_scheduling_policy;
355357
extern const ServerSettingsUInt64 background_message_broker_schedule_pool_size;
358+
extern const ServerSettingsUInt64 iceberg_background_schedule_pool_size;
356359
extern const ServerSettingsUInt64 background_move_pool_size;
357360
extern const ServerSettingsUInt64 background_pool_size;
358361
extern const ServerSettingsUInt64 background_schedule_pool_size;
@@ -577,6 +580,8 @@ struct ContextSharedPart : boost::noncopyable
577580
mutable BackgroundSchedulePoolPtr distributed_schedule_pool; /// A thread pool that can run different jobs in background (used for distributed sends)
578581
OnceFlag message_broker_schedule_pool_initialized;
579582
mutable BackgroundSchedulePoolPtr message_broker_schedule_pool; /// A thread pool that can run different jobs in background (used for message brokers, like RabbitMQ and Kafka)
583+
OnceFlag iceberg_schedule_pool_initialized;
584+
mutable BackgroundSchedulePoolPtr iceberg_schedule_pool; /// A thread pool that runs background metadata refresh for all active Iceberg tables
580585

581586
mutable OnceFlag readers_initialized;
582587
mutable std::unique_ptr<IAsynchronousReader> asynchronous_remote_fs_reader;
@@ -933,6 +938,7 @@ struct ContextSharedPart : boost::noncopyable
933938
BackgroundSchedulePoolPtr delete_schedule_pool;
934939
BackgroundSchedulePoolPtr delete_distributed_schedule_pool;
935940
BackgroundSchedulePoolPtr delete_message_broker_schedule_pool;
941+
BackgroundSchedulePoolPtr delete_iceberg_schedule_pool;
936942

937943
std::unique_ptr<AccessControl> delete_access_control;
938944

@@ -1011,6 +1017,7 @@ struct ContextSharedPart : boost::noncopyable
10111017
delete_schedule_pool = std::move(schedule_pool);
10121018
delete_distributed_schedule_pool = std::move(distributed_schedule_pool);
10131019
delete_message_broker_schedule_pool = std::move(message_broker_schedule_pool);
1020+
delete_iceberg_schedule_pool = std::move(iceberg_schedule_pool);
10141021

10151022
delete_access_control = std::move(access_control);
10161023

@@ -1067,6 +1074,7 @@ struct ContextSharedPart : boost::noncopyable
10671074
join_background_pool(std::move(delete_schedule_pool));
10681075
join_background_pool(std::move(delete_distributed_schedule_pool));
10691076
join_background_pool(std::move(delete_message_broker_schedule_pool));
1077+
join_background_pool(std::move(delete_iceberg_schedule_pool));
10701078

10711079
delete_access_control.reset();
10721080

@@ -4609,6 +4617,20 @@ BackgroundSchedulePool & Context::getMessageBrokerSchedulePool() const
46094617
return *shared->message_broker_schedule_pool;
46104618
}
46114619

4620+
BackgroundSchedulePool & Context::getIcebergSchedulePool() const
4621+
{
4622+
callOnce(shared->iceberg_schedule_pool_initialized, [&] {
4623+
shared->iceberg_schedule_pool = BackgroundSchedulePool::create(
4624+
shared->server_settings[ServerSetting::iceberg_background_schedule_pool_size],
4625+
/*max_parallel_tasks_per_type*/ 0,
4626+
CurrentMetrics::IcebergSchedulePoolTask,
4627+
CurrentMetrics::IcebergSchedulePoolSize,
4628+
DB::ThreadName::ICEBERG_SCHEDULE_POOL);
4629+
});
4630+
4631+
return *shared->iceberg_schedule_pool;
4632+
}
4633+
46124634
void Context::configureServerWideThrottling()
46134635
{
46144636
if (shared->application_type == ApplicationType::LOCAL || shared->application_type == ApplicationType::SERVER || shared->application_type == ApplicationType::DISKS)

src/Interpreters/Context.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1459,6 +1459,7 @@ class Context: public ContextData, public std::enable_shared_from_this<Context>
14591459
BackgroundSchedulePool & getSchedulePool() const;
14601460
BackgroundSchedulePool & getMessageBrokerSchedulePool() const;
14611461
BackgroundSchedulePool & getDistributedSchedulePool() const;
1462+
BackgroundSchedulePool & getIcebergSchedulePool() const;
14621463

14631464
/// Has distributed_ddl configuration or not.
14641465
bool hasDistributedDDL() const;

src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -398,10 +398,10 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
398398
}
399399

400400
private:
401-
DataLakeMetadataPtr current_metadata;
402-
LoggerPtr log = getLogger("DataLakeConfiguration");
403401
const DataLakeStorageSettingsPtr settings;
404402
ObjectStoragePtr ready_object_storage;
403+
DataLakeMetadataPtr current_metadata;
404+
LoggerPtr log = getLogger("DataLakeConfiguration");
405405

406406
void assertLocalPathCorrect(ObjectStoragePtr object_storage, ContextPtr local_context)
407407
{

0 commit comments

Comments
 (0)