Skip to content

Commit 7267d7c

Browse files
committed
simplify reading in order of sorting key
1 parent d6e6921 commit 7267d7c

File tree

2 files changed

+6
-41
lines changed

2 files changed

+6
-41
lines changed

src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp

Lines changed: 5 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -639,25 +639,16 @@ Pipes MergeTreeDataSelectExecutor::readFromParts(
639639
}
640640
else if ((settings.optimize_read_in_order || settings.optimize_aggregation_in_order) && query_info.input_order_info)
641641
{
642-
size_t prefix_size = query_info.input_order_info->order_key_prefix_descr.size();
643-
auto order_key_prefix_ast = data.getSortingKey().expression_list_ast->clone();
644-
order_key_prefix_ast->children.resize(prefix_size);
645-
646-
auto syntax_result = SyntaxAnalyzer(context).analyze(order_key_prefix_ast, data.getColumns().getAllPhysical());
647-
auto sorting_key_prefix_expr = ExpressionAnalyzer(order_key_prefix_ast, syntax_result, context).getActions(false);
648-
649642
res = spreadMarkRangesAmongStreamsWithOrder(
650643
std::move(parts_with_ranges),
651644
num_streams,
652645
column_names_to_read,
653646
max_block_size,
654647
settings.use_uncompressed_cache,
655648
query_info,
656-
sorting_key_prefix_expr,
657649
virt_column_names,
658650
settings,
659-
reader_settings,
660-
result_projection);
651+
reader_settings);
661652
}
662653
else
663654
{
@@ -848,11 +839,9 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
848839
UInt64 max_block_size,
849840
bool use_uncompressed_cache,
850841
const SelectQueryInfo & query_info,
851-
const ExpressionActionsPtr & sorting_key_prefix_expr,
852842
const Names & virt_columns,
853843
const Settings & settings,
854-
const MergeTreeReaderSettings & reader_settings,
855-
ExpressionActionsPtr & out_projection) const
844+
const MergeTreeReaderSettings & reader_settings) const
856845
{
857846
size_t sum_marks = 0;
858847
const InputOrderInfoPtr & input_order_info = query_info.input_order_info;
@@ -945,8 +934,6 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
945934
{
946935
size_t need_marks = min_marks_per_stream;
947936

948-
Pipes pipes;
949-
950937
/// Loop over parts.
951938
/// We will iteratively take part or some subrange of a part from the back
952939
/// and assign a stream to read from it.
@@ -1003,43 +990,23 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
1003990

1004991
if (input_order_info->direction == 1)
1005992
{
1006-
pipes.emplace_back(std::make_shared<MergeTreeSelectProcessor>(
993+
res.emplace_back(std::make_shared<MergeTreeSelectProcessor>(
1007994
data, part.data_part, max_block_size, settings.preferred_block_size_bytes,
1008995
settings.preferred_max_column_in_block_size_bytes, column_names, ranges_to_get_from_part,
1009996
use_uncompressed_cache, query_info.prewhere_info, true, reader_settings,
1010997
virt_columns, part.part_index_in_query));
1011998
}
1012999
else
10131000
{
1014-
pipes.emplace_back(std::make_shared<MergeTreeReverseSelectProcessor>(
1001+
res.emplace_back(std::make_shared<MergeTreeReverseSelectProcessor>(
10151002
data, part.data_part, max_block_size, settings.preferred_block_size_bytes,
10161003
settings.preferred_max_column_in_block_size_bytes, column_names, ranges_to_get_from_part,
10171004
use_uncompressed_cache, query_info.prewhere_info, true, reader_settings,
10181005
virt_columns, part.part_index_in_query));
10191006

1020-
pipes.back().addSimpleTransform(std::make_shared<ReverseTransform>(pipes.back().getHeader()));
1007+
res.back().addSimpleTransform(std::make_shared<ReverseTransform>(res.back().getHeader()));
10211008
}
10221009
}
1023-
1024-
if (pipes.size() > 1)
1025-
{
1026-
SortDescription sort_description;
1027-
for (size_t j = 0; j < input_order_info->order_key_prefix_descr.size(); ++j)
1028-
sort_description.emplace_back(data.getSortingKey().column_names[j],
1029-
input_order_info->direction, 1);
1030-
1031-
/// Drop temporary columns, added by 'sorting_key_prefix_expr'
1032-
out_projection = createProjection(pipes.back(), data);
1033-
for (auto & pipe : pipes)
1034-
pipe.addSimpleTransform(std::make_shared<ExpressionTransform>(pipe.getHeader(), sorting_key_prefix_expr));
1035-
1036-
auto merging_sorted = std::make_shared<MergingSortedTransform>(
1037-
pipes.back().getHeader(), pipes.size(), sort_description, max_block_size);
1038-
1039-
res.emplace_back(std::move(pipes), std::move(merging_sorted));
1040-
}
1041-
else
1042-
res.emplace_back(std::move(pipes.front()));
10431010
}
10441011

10451012
return res;

src/Storages/MergeTree/MergeTreeDataSelectExecutor.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,9 @@ class MergeTreeDataSelectExecutor
6565
UInt64 max_block_size,
6666
bool use_uncompressed_cache,
6767
const SelectQueryInfo & query_info,
68-
const ExpressionActionsPtr & sorting_key_prefix_expr,
6968
const Names & virt_columns,
7069
const Settings & settings,
71-
const MergeTreeReaderSettings & reader_settings,
72-
ExpressionActionsPtr & out_projection) const;
70+
const MergeTreeReaderSettings & reader_settings) const;
7371

7472
Pipes spreadMarkRangesAmongStreamsFinal(
7573
RangesInDataParts && parts,

0 commit comments

Comments
 (0)