|
19 | 19 | #include <Processors/Sources/SourceFromInputStream.h> |
20 | 20 | #include <Processors/Transforms/MaterializingTransform.h> |
21 | 21 | #include <Processors/Transforms/ConvertingTransform.h> |
| 22 | +#include <DataStreams/MaterializingBlockInputStream.h> |
| 23 | +#include <DataStreams/ConvertingBlockInputStream.h> |
22 | 24 |
|
23 | 25 |
|
24 | 26 | namespace DB |
@@ -62,29 +64,42 @@ Pipes StorageView::read( |
62 | 64 | if (context.getSettings().enable_optimize_predicate_expression) |
63 | 65 | current_inner_query = getRuntimeViewQuery(*query_info.query->as<const ASTSelectQuery>(), context); |
64 | 66 |
|
65 | | - QueryPipeline pipeline; |
66 | 67 | InterpreterSelectWithUnionQuery interpreter(current_inner_query, context, {}, column_names); |
67 | 68 | /// FIXME res may implicitly use some objects owned be pipeline, but them will be destructed after return |
68 | 69 | if (query_info.force_tree_shaped_pipeline) |
69 | 70 | { |
| 71 | + QueryPipeline pipeline; |
70 | 72 | BlockInputStreams streams = interpreter.executeWithMultipleStreams(pipeline); |
| 73 | + |
| 74 | + for (auto & stream : streams) |
| 75 | + { |
| 76 | + stream = std::make_shared<MaterializingBlockInputStream>(stream); |
| 77 | + stream = std::make_shared<ConvertingBlockInputStream>(stream, getSampleBlockForColumns(column_names), |
| 78 | + ConvertingBlockInputStream::MatchColumnsMode::Name); |
| 79 | + } |
| 80 | + |
71 | 81 | for (auto & stream : streams) |
72 | 82 | pipes.emplace_back(std::make_shared<SourceFromInputStream>(std::move(stream))); |
73 | 83 | } |
74 | 84 | else |
75 | | - /// TODO: support multiple streams here. Need more general interface than pipes. |
76 | | - pipes.emplace_back(interpreter.executeWithProcessors().getPipe()); |
77 | | - |
78 | | - /// It's expected that the columns read from storage are not constant. |
79 | | - /// Because method 'getSampleBlockForColumns' is used to obtain a structure of result in InterpreterSelectQuery. |
80 | | - for (auto & pipe : pipes) |
81 | 85 | { |
82 | | - pipe.addSimpleTransform(std::make_shared<MaterializingTransform>(pipe.getHeader())); |
| 86 | + auto pipeline = interpreter.executeWithProcessors(); |
| 87 | + |
| 88 | + /// It's expected that the columns read from storage are not constant. |
| 89 | + /// Because method 'getSampleBlockForColumns' is used to obtain a structure of result in InterpreterSelectQuery. |
| 90 | + pipeline.addSimpleTransform([](const Block & header) |
| 91 | + { |
| 92 | + return std::make_shared<MaterializingTransform>(header); |
| 93 | + }); |
83 | 94 |
|
84 | 95 | /// And also convert to expected structure. |
85 | | - pipe.addSimpleTransform(std::make_shared<ConvertingTransform>( |
86 | | - pipe.getHeader(), getSampleBlockForColumns(column_names), |
87 | | - ConvertingTransform::MatchColumnsMode::Name)); |
| 96 | + pipeline.addSimpleTransform([&](const Block & header) |
| 97 | + { |
| 98 | + return std::make_shared<ConvertingTransform>(header, getSampleBlockForColumns(column_names), |
| 99 | + ConvertingTransform::MatchColumnsMode::Name); |
| 100 | + }); |
| 101 | + |
| 102 | + pipes = std::move(pipeline).getPipes(); |
88 | 103 | } |
89 | 104 |
|
90 | 105 | return pipes; |
|
0 commit comments