Conversation
Fix MergeTreeReader. Fix MergeTreeBaseSelectProcessor. Better exception message for TreeExecutor. Added header_without_virtual_columns to MergeTreeBaseSelectProcessor. Fix MergeTreeReverseSelectProcessor. Fix MergeTreeDataSelectExecutor.
4e53d59 to
d4f11af
Compare
alesapin
left a comment
There was a problem hiding this comment.
I don't see any switch were we can disable new behavior. Is it expected?
For my point of view, this transition code looks very complex :( Maybe previously we need to rewrite some basic classes like ExpressionActions? Also it's not clear, why we pack processors into two dimensional array (Pipes) and then flatten them back sometimes. Constuctions like pipe.back()->getInputs().front() are not convinient.
I need some clarification on my questions. After that I can make review again.
| void ExecutionSpeedLimits::throttle(size_t read_rows, size_t read_bytes, size_t total_rows, UInt64 total_elapsed_microseconds) | ||
| { | ||
| if ((min_execution_speed || max_execution_speed || min_execution_speed_bytes || | ||
| max_execution_speed_bytes || (total_rows && timeout_before_checking_execution_speed != 0)) && |
There was a problem hiding this comment.
better to compare all integer variables with zero for consistency.
|
|
||
| if (elapsed_seconds > 0) | ||
| { | ||
| if (min_execution_speed && read_rows / elapsed_seconds < min_execution_speed) |
There was a problem hiding this comment.
I'd preferer to store read_rows / elapsed_seconds and read_bytes / elapsed in separate variables, like rows_per_second.
| } | ||
| } | ||
|
|
||
| void ExecutionSpeedLimits::throttle(size_t read_rows, size_t read_bytes, size_t total_rows, UInt64 total_elapsed_microseconds) |
There was a problem hiding this comment.
total_rows -- not clear, either total rows already read or total rows to read in future?
| class ExecutionSpeedLimits | ||
| { | ||
| public: | ||
| size_t min_execution_speed = 0; |
| { | ||
|
|
||
| /// Limits for query execution speed. | ||
| /// In rows per second. |
| for (size_t i = 0; i < sort_columns_size; ++i) | ||
| sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1); | ||
|
|
||
| auto streams_to_merge = [&]() |
There was a problem hiding this comment.
looks like we need only to_merge and pipes here.
| auto it = to_merge.begin(); | ||
| for (auto & input : merged_processor->getInputs()) | ||
| { | ||
| connect(**it, input); |
There was a problem hiding this comment.
It seems like the building of n to n connections is a common operation. Maybe create a separate function?
| } | ||
|
|
||
| Processors result; | ||
| result.reserve(2 * pipes.size() + 1); |
There was a problem hiding this comment.
Why do we need to put processors and pipes and then flatten them?
| extern const int MEMORY_LIMIT_EXCEEDED; | ||
| } | ||
|
|
||
| static Block replaceTypes(Block && header, const MergeTreeData::DataPartPtr & data_part) |
| block.insert({result.columns[pos], name_and_type->type, name_and_type->name}); | ||
|
|
||
| if (alias_actions) | ||
| alias_actions->execute(block); |
There was a problem hiding this comment.
In the future we will execute all actions on header and columns?
There was a problem hiding this comment.
Yes, I suppose so.
Yes. I just don't like to have two versions of code now. |
alesapin
left a comment
There was a problem hiding this comment.
Pipe, TreeExecutorBlockInputStream and Transforms interfaces look good to me. Changes in SelectExecutor and high-level streams (Select and Sequential) also seem reasonable.
Diff in low-level readers (MergeTreeReader and MergeTreeRangeReader) seems too complicated. We just replacing Block interface with Columns, but had to rewrite about 70% of code in these classes :(
Also, we need to add comments about the temporary code.
| bool has_columns = false; | ||
| for (auto & column : columns) | ||
| if (column) | ||
| has_columns = true; |
| { | ||
| IProcessor * node = stack.top(); | ||
|
|
||
| auto status = prepare_processor(node); |
There was a problem hiding this comment.
May right this code here without lambda? Seems like it used in one place.
There was a problem hiding this comment.
I think with lambda it's more readable.
| return columns; | ||
| } | ||
|
|
||
| columns.resize(merge_tree_reader->getColumns().size()); |
There was a problem hiding this comment.
Slightly confusing. Previously we haven't got Columns, and getColumns() method had looked definitely.
| auto pipes = MergeTreeDataSelectExecutor(part->storage).readFromParts( | ||
| {part}, column_names, query_info, context, max_block_size, num_streams); | ||
|
|
||
| BlockInputStreams streams; |
There was a problem hiding this comment.
Comments about temporary code?
| for (size_t i = 0; i < sort_columns_size; ++i) | ||
| sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1); | ||
|
|
||
| auto streams_to_merge = [&pipes]() |
I hereby agree to the terms of the CLA available at: https://yandex.ru/legal/cla/?lang=en
For changelog. Remove if this is non-significant change.
Category (leave one):
Short description (up to few sentences):
Support for processors in
MergeTree.