Skip to content

Commit 783cf36

Browse files
authored
Merge pull request #1320 from Altinity/accept_table_function_as_destination_for_part_export
Accept table function as destination for part export
2 parents f2260bb + 52dc5b9 commit 783cf36

17 files changed

+250
-44
lines changed

docs/en/engines/table-engines/mergetree-family/part_export.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,16 @@ SETTINGS allow_experimental_export_merge_tree_part = 1
2121
[, setting_name = value, ...]
2222
```
2323

24+
## Syntax with table function
25+
26+
```sql
27+
ALTER TABLE [database.]table_name
28+
EXPORT PART 'part_name'
29+
TO TABLE FUNCTION s3(s3_conn, filename='table_function', partition_strategy...)
30+
SETTINGS allow_experimental_export_merge_tree_part = 1
31+
[, setting_name = value, ...]
32+
```
33+
2434
### Parameters
2535

2636
- **`table_name`**: The source MergeTree table containing the part to export
@@ -34,6 +44,8 @@ Source and destination tables must be 100% compatible:
3444
1. **Identical schemas** - same columns, types, and order
3545
2. **Matching partition keys** - partition expressions must be identical
3646

47+
In case a table function is used as the destination, the schema can be omitted and it will be inferred from the source table.
48+
3749
## Settings
3850

3951
### `allow_experimental_export_merge_tree_part` (Required)
@@ -95,6 +107,20 @@ ALTER TABLE mt_table EXPORT PART '2021_2_2_0' TO TABLE s3_table
95107
SETTINGS allow_experimental_export_merge_tree_part = 1;
96108
```
97109

110+
### Table function export
111+
112+
```sql
113+
-- Create source and destination tables
114+
CREATE TABLE mt_table (id UInt64, year UInt16)
115+
ENGINE = MergeTree() PARTITION BY year ORDER BY tuple();
116+
117+
-- Insert and export
118+
INSERT INTO mt_table VALUES (1, 2020), (2, 2020), (3, 2021);
119+
120+
ALTER TABLE mt_table EXPORT PART '2020_1_1_0' TO TABLE FUNCTION s3(s3_conn, filename='table_function', format=Parquet, partition_strategy='hive') PARTITION BY year
121+
SETTINGS allow_experimental_export_merge_tree_part = 1;
122+
```
123+
98124
## Monitoring
99125

100126
### Active Exports

src/Interpreters/InterpreterAlterQuery.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -542,7 +542,9 @@ AccessRightsElements InterpreterAlterQuery::getRequiredAccessForCommand(const AS
542542
case ASTAlterCommand::EXPORT_PART:
543543
{
544544
required_access.emplace_back(AccessType::ALTER_EXPORT_PART, database, table);
545-
required_access.emplace_back(AccessType::INSERT, command.to_database, command.to_table);
545+
/// For table functions, access control is handled by the table function itself
546+
if (!command.to_table_function)
547+
required_access.emplace_back(AccessType::INSERT, command.to_database, command.to_table);
546548
break;
547549
}
548550
case ASTAlterCommand::EXPORT_PARTITION:

src/Parsers/ASTAlterQuery.cpp

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@ ASTPtr ASTAlterCommand::clone() const
6565
res->sql_security = res->children.emplace_back(sql_security->clone()).get();
6666
if (rename_to)
6767
res->rename_to = res->children.emplace_back(rename_to->clone()).get();
68+
if (to_table_function)
69+
res->to_table_function = res->children.emplace_back(to_table_function->clone()).get();
70+
if (partition_by_expr)
71+
res->partition_by_expr = res->children.emplace_back(partition_by_expr->clone()).get();
6872

6973
return res;
7074
}
@@ -367,11 +371,23 @@ void ASTAlterCommand::formatImpl(WriteBuffer & ostr, const FormatSettings & sett
367371
{
368372
case DataDestinationType::TABLE:
369373
ostr << "TABLE ";
370-
if (!to_database.empty())
374+
if (to_table_function)
375+
{
376+
ostr << "FUNCTION ";
377+
to_table_function->format(ostr, settings, state, frame);
378+
if (partition_by_expr)
379+
{
380+
ostr << " PARTITION BY ";
381+
partition_by_expr->format(ostr, settings, state, frame);
382+
}
383+
}
384+
else
371385
{
372-
ostr << backQuoteIfNeed(to_database) << ".";
386+
if (!to_database.empty())
387+
ostr << backQuoteIfNeed(to_database) << ".";
388+
389+
ostr << backQuoteIfNeed(to_table);
373390
}
374-
ostr << backQuoteIfNeed(to_table);
375391
return;
376392
default:
377393
break;
@@ -584,6 +600,8 @@ void ASTAlterCommand::forEachPointerToChild(std::function<void(void**)> f)
584600
f(reinterpret_cast<void **>(&select));
585601
f(reinterpret_cast<void **>(&sql_security));
586602
f(reinterpret_cast<void **>(&rename_to));
603+
f(reinterpret_cast<void **>(&to_table_function));
604+
f(reinterpret_cast<void **>(&partition_by_expr));
587605
}
588606

589607

src/Parsers/ASTAlterQuery.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,9 @@ class ASTAlterCommand : public IAST
220220
/// MOVE PARTITION partition TO TABLE db.table
221221
String to_database;
222222
String to_table;
223+
/// EXPORT PART/PARTITION to TABLE FUNCTION (e.g., s3())
224+
IAST * to_table_function = nullptr;
225+
IAST * partition_by_expr = nullptr;
223226

224227
String snapshot_name;
225228
IAST * snapshot_desc;

src/Parsers/ParserAlterQuery.cpp

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
9393
ParserKeyword s_unfreeze(Keyword::UNFREEZE);
9494
ParserKeyword s_unlock_snapshot(Keyword::UNLOCK_SNAPSHOT);
9595
ParserKeyword s_partition(Keyword::PARTITION);
96+
ParserKeyword s_partition_by(Keyword::PARTITION_BY);
9697

9798
ParserKeyword s_first(Keyword::FIRST);
9899
ParserKeyword s_after(Keyword::AFTER);
@@ -107,6 +108,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
107108
ParserKeyword s_to_volume(Keyword::TO_VOLUME);
108109
ParserKeyword s_to_table(Keyword::TO_TABLE);
109110
ParserKeyword s_to_shard(Keyword::TO_SHARD);
111+
ParserKeyword s_function(Keyword::FUNCTION);
110112

111113
ParserKeyword s_delete(Keyword::DELETE);
112114
ParserKeyword s_update(Keyword::UPDATE);
@@ -176,6 +178,8 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
176178
ASTPtr command_rename_to;
177179
ASTPtr command_sql_security;
178180
ASTPtr command_snapshot_desc;
181+
ASTPtr export_table_function;
182+
ASTPtr export_table_function_partition_by_expr;
179183

180184
if (with_round_bracket)
181185
{
@@ -550,9 +554,27 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
550554
return false;
551555
}
552556

553-
if (!parseDatabaseAndTableName(pos, expected, command->to_database, command->to_table))
554-
return false;
555-
command->move_destination_type = DataDestinationType::TABLE;
557+
if (s_function.ignore(pos, expected))
558+
{
559+
ParserFunction table_function_parser(/*allow_function_parameters=*/true, /*is_table_function=*/true);
560+
561+
if (!table_function_parser.parse(pos, export_table_function, expected))
562+
return false;
563+
564+
if (s_partition_by.ignore(pos, expected))
565+
if (!parser_exp_elem.parse(pos, export_table_function_partition_by_expr, expected))
566+
return false;
567+
568+
command->to_table_function = export_table_function.get();
569+
command->partition_by_expr = export_table_function_partition_by_expr.get();
570+
command->move_destination_type = DataDestinationType::TABLE;
571+
}
572+
else
573+
{
574+
if (!parseDatabaseAndTableName(pos, expected, command->to_database, command->to_table))
575+
return false;
576+
command->move_destination_type = DataDestinationType::TABLE;
577+
}
556578
}
557579
else if (s_export_partition.ignore(pos, expected))
558580
{
@@ -1061,6 +1083,10 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
10611083
command->rename_to = command->children.emplace_back(std::move(command_rename_to)).get();
10621084
if (command_snapshot_desc)
10631085
command->snapshot_desc = command->children.emplace_back(std::move(command_snapshot_desc)).get();
1086+
if (export_table_function)
1087+
command->to_table_function = command->children.emplace_back(std::move(export_table_function)).get();
1088+
if (export_table_function_partition_by_expr)
1089+
command->partition_by_expr = command->children.emplace_back(std::move(export_table_function_partition_by_expr)).get();
10641090

10651091
return true;
10661092
}

src/Processors/Formats/Impl/Parquet/ReadManager.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -904,7 +904,10 @@ ReadManager::ReadResult ReadManager::read()
904904
{
905905
/// Pump the manual executor.
906906
lock.unlock();
907-
if (!parser_shared_resources->parsing_runner.runTaskInline())
907+
/// Note: the executor can be shared among multiple files, so we may execute someone
908+
/// else's task, and someone else may execute our task.
909+
/// Hence the thread_pool_was_idle check.
910+
if (!parser_shared_resources->parsing_runner.runTaskInline() && thread_pool_was_idle)
908911
throw Exception(ErrorCodes::LOGICAL_ERROR, "Deadlock in Parquet::ReadManager (single-threaded)");
909912
lock.lock();
910913
}

src/Storages/MergeTree/ExportPartTask.cpp

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -110,19 +110,12 @@ bool ExportPartTask::executeStep()
110110
block_with_partition_values = manifest.data_part->minmax_idx->getBlock(storage);
111111
}
112112

113-
auto destination_storage = DatabaseCatalog::instance().tryGetTable(manifest.destination_storage_id, local_context);
114-
if (!destination_storage)
115-
{
116-
std::lock_guard inner_lock(storage.export_manifests_mutex);
117-
118-
const auto destination_storage_id_name = manifest.destination_storage_id.getNameForLogs();
119-
storage.export_manifests.erase(manifest);
120-
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Failed to reconstruct destination storage: {}", destination_storage_id_name);
121-
}
113+
const auto & destination_storage = manifest.destination_storage_ptr;
114+
const auto destination_storage_id = destination_storage->getStorageID();
122115

123116
auto exports_list_entry = storage.getContext()->getExportsList().insert(
124117
getStorageID(),
125-
manifest.destination_storage_id,
118+
destination_storage_id,
126119
manifest.data_part->getBytesOnDisk(),
127120
manifest.data_part->name,
128121
std::vector<std::string>{},

src/Storages/MergeTree/MergeTreeData.cpp

Lines changed: 55 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
#include <Storages/MergeTree/ExportList.h>
55
#include <Access/AccessControl.h>
6+
#include <TableFunctions/TableFunctionFactory.h>
67
#include <AggregateFunctions/AggregateFunctionCount.h>
78
#include <Analyzer/QueryTreeBuilder.h>
89
#include <Analyzer/Utils.h>
@@ -6215,9 +6216,45 @@ void MergeTreeData::exportPartToTable(const PartitionCommand & command, ContextP
62156216

62166217
const auto part_name = command.partition->as<ASTLiteral &>().value.safeGet<String>();
62176218

6218-
const auto database_name = query_context->resolveDatabase(command.to_database);
6219+
if (!command.to_table_function)
6220+
{
6221+
const auto database_name = query_context->resolveDatabase(command.to_database);
6222+
exportPartToTable(part_name, StorageID{database_name, command.to_table}, generateSnowflakeIDString(), query_context);
6223+
6224+
return;
6225+
}
6226+
6227+
auto table_function_ast = command.to_table_function;
6228+
auto table_function_ptr = TableFunctionFactory::instance().get(command.to_table_function, query_context);
6229+
6230+
if (table_function_ptr->needStructureHint())
6231+
{
6232+
const auto source_metadata_ptr = getInMemoryMetadataPtr();
6233+
6234+
/// Grab only the readable columns from the source metadata to skip ephemeral columns
6235+
const auto readable_columns = ColumnsDescription(source_metadata_ptr->getColumns().getReadable());
6236+
table_function_ptr->setStructureHint(readable_columns);
6237+
}
6238+
6239+
if (command.partition_by_expr)
6240+
{
6241+
table_function_ptr->setPartitionBy(command.partition_by_expr);
6242+
}
6243+
6244+
auto dest_storage = table_function_ptr->execute(
6245+
table_function_ast,
6246+
query_context,
6247+
table_function_ptr->getName(),
6248+
/* cached_columns */ {},
6249+
/* use_global_context */ false,
6250+
/* is_insert_query */ true);
6251+
6252+
if (!dest_storage)
6253+
{
6254+
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Failed to reconstruct destination storage");
6255+
}
62196256

6220-
exportPartToTable(part_name, StorageID{database_name, command.to_table}, generateSnowflakeIDString(), query_context);
6257+
exportPartToTable(part_name, dest_storage, generateSnowflakeIDString(), query_context);
62216258
}
62226259

62236260
void MergeTreeData::exportPartToTable(
@@ -6235,6 +6272,17 @@ void MergeTreeData::exportPartToTable(
62356272
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Exporting to the same table is not allowed");
62366273
}
62376274

6275+
exportPartToTable(part_name, dest_storage, transaction_id, query_context, allow_outdated_parts, completion_callback);
6276+
}
6277+
6278+
void MergeTreeData::exportPartToTable(
6279+
const std::string & part_name,
6280+
const StoragePtr & dest_storage,
6281+
const String & transaction_id,
6282+
ContextPtr query_context,
6283+
bool allow_outdated_parts,
6284+
std::function<void(MergeTreePartExportManifest::CompletionCallbackResult)> completion_callback)
6285+
{
62386286
if (!dest_storage->supportsImport())
62396287
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Destination storage {} does not support MergeTree parts or uses unsupported partitioning", dest_storage->getName());
62406288

@@ -6304,7 +6352,7 @@ void MergeTreeData::exportPartToTable(
63046352
{
63056353
const auto format_settings = getFormatSettings(query_context);
63066354
MergeTreePartExportManifest manifest(
6307-
dest_storage->getStorageID(),
6355+
dest_storage,
63086356
part,
63096357
transaction_id,
63106358
query_context->getCurrentQueryId(),
@@ -6317,8 +6365,7 @@ void MergeTreeData::exportPartToTable(
63176365

63186366
if (!export_manifests.emplace(std::move(manifest)).second)
63196367
{
6320-
throw Exception(ErrorCodes::ABORTED, "Data part '{}' is already being exported to table '{}'",
6321-
part_name, dest_storage->getStorageID().getFullTableName());
6368+
throw Exception(ErrorCodes::ABORTED, "Data part '{}' is already being exported", part_name);
63226369
}
63236370
}
63246371

@@ -8751,8 +8798,9 @@ std::vector<MergeTreeExportStatus> MergeTreeData::getExportsStatus() const
87518798

87528799
status.source_database = source_database;
87538800
status.source_table = source_table;
8754-
status.destination_database = manifest.destination_storage_id.database_name;
8755-
status.destination_table = manifest.destination_storage_id.table_name;
8801+
const auto destination_storage_id = manifest.destination_storage_ptr->getStorageID();
8802+
status.destination_database = destination_storage_id.database_name;
8803+
status.destination_table = destination_storage_id.table_name;
87568804
status.create_time = manifest.create_time;
87578805
status.part_name = manifest.data_part->name;
87588806

src/Storages/MergeTree/MergeTreeData.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -984,6 +984,14 @@ class MergeTreeData : public IStorage, public WithMutableContext
984984

985985
void exportPartToTable(const PartitionCommand & command, ContextPtr query_context);
986986

987+
void exportPartToTable(
988+
const std::string & part_name,
989+
const StoragePtr & destination_storage,
990+
const String & transaction_id,
991+
ContextPtr query_context,
992+
bool allow_outdated_parts = false,
993+
std::function<void(MergeTreePartExportManifest::CompletionCallbackResult)> completion_callback = {});
994+
987995
void exportPartToTable(
988996
const std::string & part_name,
989997
const StorageID & destination_storage_id,

src/Storages/MergeTree/MergeTreePartExportManifest.h

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <QueryPipeline/QueryPipeline.h>
77
#include <optional>
88
#include <Core/Settings.h>
9+
#include <Storages/IStorage.h>
910

1011
namespace DB
1112
{
@@ -43,15 +44,15 @@ struct MergeTreePartExportManifest
4344
};
4445

4546
MergeTreePartExportManifest(
46-
const StorageID & destination_storage_id_,
47+
const StoragePtr destination_storage_ptr_,
4748
const DataPartPtr & data_part_,
4849
const String & transaction_id_,
4950
const String & query_id_,
5051
FileAlreadyExistsPolicy file_already_exists_policy_,
5152
const Settings & settings_,
5253
const StorageMetadataPtr & metadata_snapshot_,
5354
std::function<void(CompletionCallbackResult)> completion_callback_ = {})
54-
: destination_storage_id(destination_storage_id_),
55+
: destination_storage_ptr(destination_storage_ptr_),
5556
data_part(data_part_),
5657
transaction_id(transaction_id_),
5758
query_id(query_id_),
@@ -61,7 +62,7 @@ struct MergeTreePartExportManifest
6162
completion_callback(completion_callback_),
6263
create_time(time(nullptr)) {}
6364

64-
StorageID destination_storage_id;
65+
StoragePtr destination_storage_ptr;
6566
DataPartPtr data_part;
6667
/// Used for killing the export.
6768
String transaction_id;
@@ -81,20 +82,12 @@ struct MergeTreePartExportManifest
8182

8283
bool operator<(const MergeTreePartExportManifest & rhs) const
8384
{
84-
// Lexicographic comparison: first compare destination storage, then part name
85-
auto lhs_storage = destination_storage_id.getQualifiedName();
86-
auto rhs_storage = rhs.destination_storage_id.getQualifiedName();
87-
88-
if (lhs_storage != rhs_storage)
89-
return lhs_storage < rhs_storage;
90-
9185
return data_part->name < rhs.data_part->name;
9286
}
9387

9488
bool operator==(const MergeTreePartExportManifest & rhs) const
9589
{
96-
return destination_storage_id.getQualifiedName() == rhs.destination_storage_id.getQualifiedName()
97-
&& data_part->name == rhs.data_part->name;
90+
return data_part->name == rhs.data_part->name;
9891
}
9992
};
10093

0 commit comments

Comments
 (0)