Skip to content

Commit b649fb6

Browse files
authored
Merge branch 'antalya-25.8' into fix/antalya-25.8/add-local-data-lake-guard
2 parents 51bec47 + 120dd78 commit b649fb6

File tree

13 files changed

+382
-40
lines changed

13 files changed

+382
-40
lines changed

docs/en/engines/table-engines/mergetree-family/part_export.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,12 @@ In case a table function is used as the destination, the schema can be omitted a
8484
- **Default**: `true`
8585
- **Description**: If set to true, throws if pending patch parts exists for a given part. Note that by default mutations are applied to all parts, which means that if a mutation in practice would only affetct part/partition x, all the other parts/partition will throw upon export. The exception is when the `IN PARTITION` clause was used in the mutation command. Note the `IN PARTITION` clause is not properly implemented for plain MergeTree tables.
8686

87+
### export_merge_tree_part_filename_pattern
88+
89+
- **Type**: `String`
90+
- **Default**: `{part_name}_{checksum}`
91+
- **Description**: Pattern for the filename of the exported merge tree part. The `part_name` and `checksum` are calculated and replaced on the fly. Additional macros are supported.
92+
8793
## Examples
8894

8995
### Basic Export to S3

docs/en/engines/table-engines/mergetree-family/partition_export.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,12 @@ TO TABLE [destination_database.]destination_table
8282
- **Default**: `true`
8383
- **Description**: If set to true, throws if pending patch parts exists for a given part. Note that by default mutations are applied to all parts, which means that if a mutation in practice would only affetct part/partition x, all the other parts/partition will throw upon export. The exception is when the `IN PARTITION` clause was used in the mutation command. Note the `IN PARTITION` clause is not properly implemented for plain MergeTree tables.
8484

85+
### export_merge_tree_part_filename_pattern
86+
87+
- **Type**: `String`
88+
- **Default**: `{part_name}_{checksum}`
89+
- **Description**: Pattern for the filename of the exported merge tree part. The `part_name` and `checksum` are calculated and replaced on the fly. Additional macros are supported.
90+
8591
## Examples
8692

8793
### Basic Export to S3

src/Core/Settings.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6937,6 +6937,9 @@ Serialize String values during aggregation with zero byte at the end. Enable to
69376937
)", 0) \
69386938
DECLARE(Bool, allow_local_data_lakes, false, R"(
69396939
Allow using local data lake engines and table functions (IcebergLocal, DeltaLakeLocal, etc.).
6940+
)", 0) \
6941+
DECLARE(String, export_merge_tree_part_filename_pattern, "{part_name}_{checksum}", R"(
6942+
Pattern for the filename of the exported merge tree part. The `part_name` and `checksum` are calculated and replaced on the fly. Additional macros are supported.
69406943
)", 0) \
69416944
DECLARE(Timezone, iceberg_partition_timezone, "", R"(
69426945
Time zone by which partitioning of Iceberg tables was performed.

src/Core/SettingsChangesHistory.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
7272
{"iceberg_partition_timezone", "", "", "New setting."},
7373
{"export_merge_tree_part_throw_on_pending_mutations", true, true, "New setting."},
7474
{"export_merge_tree_part_throw_on_pending_patch_parts", true, true, "New setting."},
75+
{"export_merge_tree_part_filename_pattern", "", "{part_name}_{checksum}", "New setting"},
7576
});
7677
addSettingsChanges(settings_changes_history, "25.8",
7778
{

src/Storages/ExportReplicatedMergeTreePartitionManifest.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ struct ExportReplicatedMergeTreePartitionManifest
116116
size_t max_bytes_per_file;
117117
size_t max_rows_per_file;
118118
MergeTreePartExportManifest::FileAlreadyExistsPolicy file_already_exists_policy;
119+
String filename_pattern;
119120

120121
std::string toJsonString() const
121122
{
@@ -141,6 +142,7 @@ struct ExportReplicatedMergeTreePartitionManifest
141142
json.set("create_time", create_time);
142143
json.set("max_retries", max_retries);
143144
json.set("ttl_seconds", ttl_seconds);
145+
json.set("filename_pattern", filename_pattern);
144146
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
145147
oss.exceptions(std::ios::failbit);
146148
Poco::JSON::Stringifier::stringify(json, oss);
@@ -173,6 +175,7 @@ struct ExportReplicatedMergeTreePartitionManifest
173175
manifest.parquet_parallel_encoding = json->getValue<bool>("parquet_parallel_encoding");
174176
manifest.max_bytes_per_file = json->getValue<size_t>("max_bytes_per_file");
175177
manifest.max_rows_per_file = json->getValue<size_t>("max_rows_per_file");
178+
manifest.filename_pattern = json->getValue<String>("filename_pattern");
176179
if (json->has("file_already_exists_policy"))
177180
{
178181
const auto file_already_exists_policy = magic_enum::enum_cast<MergeTreePartExportManifest::FileAlreadyExistsPolicy>(json->getValue<String>("file_already_exists_policy"));

src/Storages/MergeTree/ExportPartTask.cpp

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,14 @@
1313
#include <Processors/QueryPlan/QueryPlan.h>
1414
#include <Processors/QueryPlan/ExpressionStep.h>
1515
#include <QueryPipeline/QueryPipelineBuilder.h>
16+
#include <Common/Macros.h>
1617
#include <Common/Exception.h>
1718
#include <Common/ProfileEventsScope.h>
19+
#include <Databases/DatabaseReplicated.h>
1820
#include <Storages/MergeTree/ExportList.h>
1921
#include <Formats/FormatFactory.h>
2022
#include <Databases/enableAllExperimentalSettings.h>
23+
#include <boost/algorithm/string/replace.hpp>
2124

2225
namespace ProfileEvents
2326
{
@@ -43,6 +46,7 @@ namespace Setting
4346
extern const SettingsUInt64 min_bytes_to_use_direct_io;
4447
extern const SettingsUInt64 export_merge_tree_part_max_bytes_per_file;
4548
extern const SettingsUInt64 export_merge_tree_part_max_rows_per_file;
49+
extern const SettingsString export_merge_tree_part_filename_pattern;
4650
}
4751

4852
namespace
@@ -80,6 +84,33 @@ namespace
8084
plan_for_part.addStep(std::move(expression_step));
8185
}
8286
}
87+
88+
String buildDestinationFilename(
89+
const MergeTreePartExportManifest & manifest,
90+
const StorageID & storage_id,
91+
const ContextPtr & local_context)
92+
{
93+
auto filename = manifest.settings[Setting::export_merge_tree_part_filename_pattern].value;
94+
95+
boost::replace_all(filename, "{part_name}", manifest.data_part->name);
96+
boost::replace_all(filename, "{checksum}", manifest.data_part->checksums.getTotalChecksumHex());
97+
98+
Macros::MacroExpansionInfo macro_info;
99+
macro_info.table_id = storage_id;
100+
101+
if (auto database = DatabaseCatalog::instance().tryGetDatabase(storage_id.database_name))
102+
{
103+
if (const auto replicated = dynamic_cast<const DatabaseReplicated *>(database.get()))
104+
{
105+
macro_info.shard = replicated->getShardName();
106+
macro_info.replica = replicated->getReplicaName();
107+
}
108+
}
109+
110+
filename = local_context->getMacros()->expand(filename, macro_info);
111+
112+
return filename;
113+
}
83114
}
84115

85116
ExportPartTask::ExportPartTask(MergeTreeData & storage_, const MergeTreePartExportManifest & manifest_)
@@ -136,8 +167,10 @@ bool ExportPartTask::executeStep()
136167

137168
try
138169
{
170+
const auto filename = buildDestinationFilename(manifest, storage.getStorageID(), local_context);
171+
139172
sink = destination_storage->import(
140-
manifest.data_part->name + "_" + manifest.data_part->checksums.getTotalChecksumHex(),
173+
filename,
141174
block_with_partition_values,
142175
new_file_path_callback,
143176
manifest.file_already_exists_policy == MergeTreePartExportManifest::FileAlreadyExistsPolicy::overwrite,

src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ namespace
3535
context_copy->setSetting("export_merge_tree_part_throw_on_pending_mutations", false);
3636
context_copy->setSetting("export_merge_tree_part_throw_on_pending_patch_parts", false);
3737

38+
context_copy->setSetting("export_merge_tree_part_filename_pattern", manifest.filename_pattern);
39+
3840
return context_copy;
3941
}
4042
}

src/Storages/StorageReplicatedMergeTree.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ namespace Setting
206206
extern const SettingsUInt64 export_merge_tree_part_max_rows_per_file;
207207
extern const SettingsBool export_merge_tree_part_throw_on_pending_mutations;
208208
extern const SettingsBool export_merge_tree_part_throw_on_pending_patch_parts;
209+
extern const SettingsString export_merge_tree_part_filename_pattern;
209210
}
210211

211212
namespace MergeTreeSetting
@@ -8277,6 +8278,7 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand &
82778278
manifest.max_rows_per_file = query_context->getSettingsRef()[Setting::export_merge_tree_part_max_rows_per_file];
82788279

82798280
manifest.file_already_exists_policy = query_context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value;
8281+
manifest.filename_pattern = query_context->getSettingsRef()[Setting::export_merge_tree_part_filename_pattern].value;
82808282

82818283
ops.emplace_back(zkutil::makeCreateRequest(
82828284
fs::path(partition_exports_path) / "metadata.json",
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
<clickhouse>
2+
<macros>
3+
<shard>shard1</shard>
4+
<replica>replica1</replica>
5+
</macros>
6+
</clickhouse>
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
<clickhouse>
2+
<macros>
3+
<shard>shard2</shard>
4+
<replica>replica1</replica>
5+
</macros>
6+
</clickhouse>

0 commit comments

Comments
 (0)