Skip to content

Processors and storages#7181

Merged
KochetovNicolai merged 39 commits intomasterfrom
processors-and-storages
Nov 1, 2019
Merged

Processors and storages#7181
KochetovNicolai merged 39 commits intomasterfrom
processors-and-storages

Conversation

@KochetovNicolai
Copy link
Copy Markdown
Member

I hereby agree to the terms of the CLA available at: https://yandex.ru/legal/cla/?lang=en

For changelog. Remove if this is non-significant change.

Category (leave one):

  • Improvement

Short description (up to few sentences):
Support for processors in MergeTree.

@KochetovNicolai KochetovNicolai added the pr-improvement Pull request with some product improvements label Oct 3, 2019
Fix MergeTreeReader.

Fix MergeTreeBaseSelectProcessor.

Better exception message for TreeExecutor.

Added header_without_virtual_columns to MergeTreeBaseSelectProcessor.

Fix MergeTreeReverseSelectProcessor.

Fix MergeTreeDataSelectExecutor.
@KochetovNicolai KochetovNicolai force-pushed the processors-and-storages branch from 4e53d59 to d4f11af Compare October 4, 2019 17:50
@KochetovNicolai KochetovNicolai marked this pull request as ready for review October 10, 2019 14:42
Copy link
Copy Markdown
Member

@alesapin alesapin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any switch were we can disable new behavior. Is it expected?

For my point of view, this transition code looks very complex :( Maybe previously we need to rewrite some basic classes like ExpressionActions? Also it's not clear, why we pack processors into two dimensional array (Pipes) and then flatten them back sometimes. Constuctions like pipe.back()->getInputs().front() are not convinient.

I need some clarification on my questions. After that I can make review again.

void ExecutionSpeedLimits::throttle(size_t read_rows, size_t read_bytes, size_t total_rows, UInt64 total_elapsed_microseconds)
{
if ((min_execution_speed || max_execution_speed || min_execution_speed_bytes ||
max_execution_speed_bytes || (total_rows && timeout_before_checking_execution_speed != 0)) &&
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to compare all integer variables with zero for consistency.


if (elapsed_seconds > 0)
{
if (min_execution_speed && read_rows / elapsed_seconds < min_execution_speed)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd preferer to store read_rows / elapsed_seconds and read_bytes / elapsed in separate variables, like rows_per_second.

}
}

void ExecutionSpeedLimits::throttle(size_t read_rows, size_t read_bytes, size_t total_rows, UInt64 total_elapsed_microseconds)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

total_rows -- not clear, either total rows already read or total rows to read in future?

class ExecutionSpeedLimits
{
public:
size_t min_execution_speed = 0;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_rows?

{

/// Limits for query execution speed.
/// In rows per second.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and bytes?

for (size_t i = 0; i < sort_columns_size; ++i)
sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1);

auto streams_to_merge = [&]()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like we need only to_merge and pipes here.

auto it = to_merge.begin();
for (auto & input : merged_processor->getInputs())
{
connect(**it, input);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like the building of n to n connections is a common operation. Maybe create a separate function?

}

Processors result;
result.reserve(2 * pipes.size() + 1);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to put processors and pipes and then flatten them?

extern const int MEMORY_LIMIT_EXCEEDED;
}

static Block replaceTypes(Block && header, const MergeTreeData::DataPartPtr & data_part)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seen this before.

block.insert({result.columns[pos], name_and_type->type, name_and_type->name});

if (alias_actions)
alias_actions->execute(block);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future we will execute all actions on header and columns?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I suppose so.

@KochetovNicolai
Copy link
Copy Markdown
Member Author

I don't see any switch were we can disable new behavior. Is it expected?

Yes. I just don't like to have two versions of code now.
There is still a switch for processors (experimental-use_processors flag), but the part with storages will work over wrapper.

@github-actions github-actions bot added the comp-message-queues Message queue integrations (Kafka, RabbitMQ, NATS table engines for stream ingestion/egress). label Oct 21, 2019
Copy link
Copy Markdown
Member

@alesapin alesapin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pipe, TreeExecutorBlockInputStream and Transforms interfaces look good to me. Changes in SelectExecutor and high-level streams (Select and Sequential) also seem reasonable.

Diff in low-level readers (MergeTreeReader and MergeTreeRangeReader) seems too complicated. We just replacing Block interface with Columns, but had to rewrite about 70% of code in these classes :(

Also, we need to add comments about the temporary code.

bool has_columns = false;
for (auto & column : columns)
if (column)
has_columns = true;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

break

{
IProcessor * node = stack.top();

auto status = prepare_processor(node);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May right this code here without lambda? Seems like it used in one place.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think with lambda it's more readable.

return columns;
}

columns.resize(merge_tree_reader->getColumns().size());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slightly confusing. Previously we haven't got Columns, and getColumns() method had looked definitely.

auto pipes = MergeTreeDataSelectExecutor(part->storage).readFromParts(
{part}, column_names, query_info, context, max_block_size, num_streams);

BlockInputStreams streams;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments about temporary code?

for (size_t i = 0; i < sort_columns_size; ++i)
sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1);

auto streams_to_merge = [&pipes]()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Temporary code. Need comment.

Copy link
Copy Markdown
Member

@alesapin alesapin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

comp-message-queues Message queue integrations (Kafka, RabbitMQ, NATS table engines for stream ingestion/egress). pr-improvement Pull request with some product improvements

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants