Skip to content

Commit 801c1c1

Browse files
authored
Merge pull request #11205 from ClickHouse/fix-view-totals
Fix totals and extremes header for view.
2 parents bd795c8 + 8074e20 commit 801c1c1

File tree

6 files changed

+46
-14
lines changed

6 files changed

+46
-14
lines changed

src/Processors/Pipe.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ class Pipe
2323
/// Will connect pipes outputs with transform inputs automatically.
2424
Pipe(Pipes && pipes, ProcessorPtr transform);
2525
/// Create pipe from output port. If pipe was created that way, it possibly will not have tree shape.
26-
Pipe(OutputPort * port);
26+
explicit Pipe(OutputPort * port);
2727

2828
Pipe(const Pipe & other) = delete;
2929
Pipe(Pipe && other) = default;

src/Processors/QueryPipeline.cpp

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -758,6 +758,11 @@ void QueryPipeline::initRowsBeforeLimit()
758758
Pipe QueryPipeline::getPipe() &&
759759
{
760760
resize(1);
761+
return std::move(std::move(*this).getPipes()[0]);
762+
}
763+
764+
Pipes QueryPipeline::getPipes() &&
765+
{
761766
Pipe pipe(std::move(processors), streams.at(0), totals_having_port, extremes_port);
762767
pipe.max_parallel_streams = streams.maxParallelStreams();
763768

@@ -776,7 +781,13 @@ Pipe QueryPipeline::getPipe() &&
776781
if (extremes_port)
777782
pipe.setExtremesPort(extremes_port);
778783

779-
return pipe;
784+
Pipes pipes;
785+
pipes.emplace_back(std::move(pipe));
786+
787+
for (size_t i = 1; i < streams.size(); ++i)
788+
pipes.emplace_back(Pipe(streams[i]));
789+
790+
return pipes;
780791
}
781792

782793
PipelineExecutorPtr QueryPipeline::execute()

src/Processors/QueryPipeline.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,8 +168,9 @@ class QueryPipeline
168168
/// Set upper limit for the recommend number of threads
169169
void setMaxThreads(size_t max_threads_) { max_threads = max_threads_; }
170170

171-
/// Convert query pipeline to single pipe.
171+
/// Convert query pipeline to single or several pipes.
172172
Pipe getPipe() &&;
173+
Pipes getPipes() &&;
173174

174175
private:
175176
/// Destruction order: processors, header, locks, temporary storages, local contexts

src/Storages/StorageView.cpp

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
#include <Processors/Sources/SourceFromInputStream.h>
2020
#include <Processors/Transforms/MaterializingTransform.h>
2121
#include <Processors/Transforms/ConvertingTransform.h>
22+
#include <DataStreams/MaterializingBlockInputStream.h>
23+
#include <DataStreams/ConvertingBlockInputStream.h>
2224

2325

2426
namespace DB
@@ -62,29 +64,42 @@ Pipes StorageView::read(
6264
if (context.getSettings().enable_optimize_predicate_expression)
6365
current_inner_query = getRuntimeViewQuery(*query_info.query->as<const ASTSelectQuery>(), context);
6466

65-
QueryPipeline pipeline;
6667
InterpreterSelectWithUnionQuery interpreter(current_inner_query, context, {}, column_names);
6768
/// FIXME res may implicitly use some objects owned be pipeline, but them will be destructed after return
6869
if (query_info.force_tree_shaped_pipeline)
6970
{
71+
QueryPipeline pipeline;
7072
BlockInputStreams streams = interpreter.executeWithMultipleStreams(pipeline);
73+
74+
for (auto & stream : streams)
75+
{
76+
stream = std::make_shared<MaterializingBlockInputStream>(stream);
77+
stream = std::make_shared<ConvertingBlockInputStream>(stream, getSampleBlockForColumns(column_names),
78+
ConvertingBlockInputStream::MatchColumnsMode::Name);
79+
}
80+
7181
for (auto & stream : streams)
7282
pipes.emplace_back(std::make_shared<SourceFromInputStream>(std::move(stream)));
7383
}
7484
else
75-
/// TODO: support multiple streams here. Need more general interface than pipes.
76-
pipes.emplace_back(interpreter.executeWithProcessors().getPipe());
77-
78-
/// It's expected that the columns read from storage are not constant.
79-
/// Because method 'getSampleBlockForColumns' is used to obtain a structure of result in InterpreterSelectQuery.
80-
for (auto & pipe : pipes)
8185
{
82-
pipe.addSimpleTransform(std::make_shared<MaterializingTransform>(pipe.getHeader()));
86+
auto pipeline = interpreter.executeWithProcessors();
87+
88+
/// It's expected that the columns read from storage are not constant.
89+
/// Because method 'getSampleBlockForColumns' is used to obtain a structure of result in InterpreterSelectQuery.
90+
pipeline.addSimpleTransform([](const Block & header)
91+
{
92+
return std::make_shared<MaterializingTransform>(header);
93+
});
8394

8495
/// And also convert to expected structure.
85-
pipe.addSimpleTransform(std::make_shared<ConvertingTransform>(
86-
pipe.getHeader(), getSampleBlockForColumns(column_names),
87-
ConvertingTransform::MatchColumnsMode::Name));
96+
pipeline.addSimpleTransform([&](const Block & header)
97+
{
98+
return std::make_shared<ConvertingTransform>(header, getSampleBlockForColumns(column_names),
99+
ConvertingTransform::MatchColumnsMode::Name);
100+
});
101+
102+
pipes = std::move(pipeline).getPipes();
88103
}
89104

90105
return pipes;
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
World
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
drop table if exists view_bug_const;
2+
CREATE VIEW view_bug_const AS SELECT 'World' AS hello FROM (SELECT number FROM system.numbers LIMIT 1) AS n1 JOIN (SELECT number FROM system.numbers LIMIT 1) AS n2 USING (number);
3+
select * from view_bug_const;
4+
drop table if exists view_bug_const;

0 commit comments

Comments
 (0)