Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
1748853
Parallelize query processing right after reading FROM ...
devcrafter Apr 12, 2023
ac54033
Let's see blast radius w/o parallelization after numbers()
devcrafter Apr 12, 2023
ba1adeb
Fix fast tests
devcrafter Apr 12, 2023
5ed85d9
Fix 00109_shard_totals_after_having.sql
devcrafter Apr 13, 2023
bb60f10
Fix 02231_buffer_aggregate_states_leak.sql
devcrafter Apr 13, 2023
fcd2eae
Disable 'Dictionary' storage due to count() can return incorrect result
devcrafter Apr 13, 2023
eacbd2b
Fix test_storage_mysql/test_settings_connection_wait_timeout
devcrafter Apr 13, 2023
39df2bd
Automatic style fix
robot-clickhouse Apr 13, 2023
a01efbd
Fix flaky check
devcrafter Apr 13, 2023
fa63460
Merge remote-tracking branch 'origin/master' into parallel-processing…
devcrafter Apr 13, 2023
780e4f9
Fix flaky check: 00109_shard_totals_after_having.sql
devcrafter Apr 14, 2023
7c84dc4
Better way to define for which storage output is parallelized
devcrafter Apr 14, 2023
4dfad9e
Try to fix flaky intergration test
devcrafter Apr 14, 2023
9e92c26
Fix test_storage_mysql/test.py::test_settings_connection_wait_timeout
devcrafter Apr 14, 2023
60dbb7b
Merge remote-tracking branch 'origin/master' into parallel-processing…
devcrafter Apr 14, 2023
6c03b2e
Automatic style fix
robot-clickhouse Apr 14, 2023
8603807
Use generic way to parallelize output for file()
devcrafter Apr 15, 2023
cdd9aef
Merge remote-tracking branch 'origin/master' into parallel-processing…
devcrafter Apr 15, 2023
908ad29
Do not parallelize output for zeroes()
devcrafter Apr 15, 2023
2455334
Merge remote-tracking branch 'origin/master' into parallel-processing…
devcrafter Apr 18, 2023
d5eb65b
Remove redundant narrowPipe()
devcrafter Apr 18, 2023
8a92eb0
Update src/Storages/IStorage.h
alexey-milovidov Apr 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/Storages/IStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,13 @@ void IStorage::read(
size_t num_streams)
{
auto pipe = read(column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams);

/// parallelize processing if not yet
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Should we do it here, or is it better to do it inside InterpreterSelectQuery?
  2. Are there any potential troubles with mutations and StorageFromMergeTreeDataPart?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Should we do it here, or is it better to do it inside InterpreterSelectQuery?

I think it's ok to do it here with the following considerations ...

num_streams is provided by InterpreterSelectQuery as a recommendation, i.e. how many threads are available for data processing. The reading step has the following choices:

  • (a) it knows the amount of data it can read, and it's not much data, so it creates only the necessary number of data streams based on parameters passed to IStorage::read() i.e. max_block_size/storage_limits. In this case, we don't want to adjust the number of streams and parallelizeOutputAfterReading() can return false

  • (b) it's either an unknown amount of data or known amount of data (but enough to utilize all available threads) -> in both cases output is parallelized by num_streams

  1. Are there any potential troubles with mutations and StorageFromMergeTreeDataPart?

The generic thing about this change affects only storage which will use default plan step to read from storage – ReadFromStorageStep. Sophisticated engines use specialized steps to read from its storage, like ReadFromMergeTree in MergeTree case.

StorageFromMergeTreeDataPart is not affected since it uses the ReadFromMergeTree step, which overrides read() method where resize() is added.

const size_t output_ports = pipe.numOutputPorts();
const auto storage_name = getName();
if (parallelizeOutputAfterReading() && output_ports > 0 && output_ports < num_streams)
pipe.resize(num_streams);

readFromPipe(query_plan, std::move(pipe), column_names, storage_snapshot, query_info, context, getName());
}

Expand Down
9 changes: 9 additions & 0 deletions src/Storages/IStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,15 @@ class IStorage : public std::enable_shared_from_this<IStorage>, public TypePromo
size_t /*max_block_size*/,
size_t /*num_streams*/);

/// Should we process blocks of data returned by the storage in parallel
/// even when the storage returned only one stream of data for reading?
/// It is beneficial, for example, when you read from a file quickly,
/// but then do heavy computations on returned blocks.
/// This is enabled by default, but in some cases shouldn't be done.
/// For example, when you read from system.numbers instead of system.numbers_mt,
/// you still expect the data to be processed sequentially.
virtual bool parallelizeOutputAfterReading() const { return true; }

public:
/// Other version of read which adds reading step to query plan.
/// Default implementation creates ReadFromStorageStep and uses usual read.
Expand Down
4 changes: 4 additions & 0 deletions src/Storages/StorageDictionary.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ friend class TableFunctionDictionary;
size_t max_block_size,
size_t threads) override;

/// FIXME: processing after reading from dictionaries are not parallelized due to some bug:
/// count() can return wrong result, see test_dictionaries_redis/test_long.py::test_redis_dict_long
bool parallelizeOutputAfterReading() const override { return false; }

std::shared_ptr<const IDictionary> getDictionary() const;

static NamesAndTypesList getNamesAndTypes(const DictionaryStructure & dictionary_structure);
Expand Down
10 changes: 1 addition & 9 deletions src/Storages/StorageFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -791,15 +791,7 @@ Pipe StorageFile::read(
std::move(read_buffer)));
}

Pipe pipe = Pipe::unitePipes(std::move(pipes));
/// Parallelize output as much as possible
/// Note: number of streams can be 0 if paths is empty
/// It happens if globs in file(path, ...) expands to empty set i.e. no files to process
if (num_streams > 0 && num_streams < max_num_streams)
{
pipe.resize(max_num_streams);
}
return pipe;
return Pipe::unitePipes(std::move(pipes));
}


Expand Down
2 changes: 2 additions & 0 deletions src/Storages/StorageNull.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ class StorageNull final : public IStorage
std::make_shared<NullSource>(storage_snapshot->getSampleBlockForColumns(column_names)));
}

bool parallelizeOutputAfterReading() const override { return false; }

bool supportsParallelInsert() const override { return true; }

SinkToStoragePtr write(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr) override
Expand Down
6 changes: 1 addition & 5 deletions src/Storages/StorageS3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Formats/IInputFormat.h>
#include <QueryPipeline/narrowPipe.h>

#include <QueryPipeline/QueryPipelineBuilder.h>

Expand Down Expand Up @@ -1076,10 +1075,7 @@ Pipe StorageS3::read(
max_download_threads));
}

auto pipe = Pipe::unitePipes(std::move(pipes));

narrowPipe(pipe, num_streams);
return pipe;
return Pipe::unitePipes(std::move(pipes));
}

SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/System/StorageSystemNumbers.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class StorageSystemNumbers final : public IStorage
size_t max_block_size,
size_t num_streams) override;

bool parallelizeOutputAfterReading() const override { return false; }

bool hasEvenlyDistributedRead() const override { return true; }
bool isSystemStorage() const override { return true; }
bool supportsTransactions() const override { return true; }
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/System/StorageSystemOne.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ class StorageSystemOne final : public IStorage
size_t max_block_size,
size_t num_streams) override;

bool parallelizeOutputAfterReading() const override { return false; }

bool isSystemStorage() const override { return true; }

bool supportsTransactions() const override { return true; }
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/System/StorageSystemZeros.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class StorageSystemZeros final : public IStorage
size_t max_block_size,
size_t num_streams) override;

bool parallelizeOutputAfterReading() const override { return false; }

bool hasEvenlyDistributedRead() const override { return true; }
bool isSystemStorage() const override { return true; }
bool supportsTransactions() const override { return true; }
Expand Down
16 changes: 14 additions & 2 deletions tests/integration/test_storage_mysql/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,21 +519,33 @@ def test_settings_connection_wait_timeout(started_cluster):
)
)

worker_started_event = threading.Event()

def worker():
node1.query("SELECT sleepEachRow(1) FROM {}".format(table_name))
worker_started_event.set()
node1.query(
"SELECT 1, sleepEachRow(1) FROM {} SETTINGS max_threads=1".format(
table_name
)
)

worker_thread = threading.Thread(target=worker)
worker_thread.start()

# ensure that first query started in worker_thread
assert worker_started_event.wait(10)
time.sleep(1)

started = time.time()
with pytest.raises(
QueryRuntimeException,
match=r"Exception: mysqlxx::Pool is full \(connection_wait_timeout is exceeded\)",
):
node1.query("SELECT sleepEachRow(1) FROM {}".format(table_name))
node1.query(
"SELECT 2, sleepEachRow(1) FROM {} SETTINGS max_threads=1".format(
table_name
)
)
ended = time.time()
assert (ended - started) >= wait_timeout

Expand Down
8 changes: 4 additions & 4 deletions tests/queries/0_stateless/00109_shard_totals_after_having.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ CREATE TABLE numbers500k (number UInt32) ENGINE = TinyLog;
INSERT INTO numbers500k SELECT number FROM system.numbers LIMIT 500000;

SET totals_mode = 'after_having_auto';
SELECT intDiv(number, 2) AS k, count(), argMax(toString(number), number) FROM remote('127.0.0.{2,3}', currentDatabase(), numbers500k) GROUP BY k WITH TOTALS ORDER BY k LIMIT 10;
SELECT intDiv(number, 2) AS k, count(), argMax(toString(number), number) FROM (SELECT * FROM remote('127.0.0.{2,3}', currentDatabase(), numbers500k) ORDER BY number) GROUP BY k WITH TOTALS ORDER BY k LIMIT 10;

SET totals_mode = 'after_having_inclusive';
SELECT intDiv(number, 2) AS k, count(), argMax(toString(number), number) FROM remote('127.0.0.{2,3}', currentDatabase(), numbers500k) GROUP BY k WITH TOTALS ORDER BY k LIMIT 10;
SELECT intDiv(number, 2) AS k, count(), argMax(toString(number), number) FROM (SELECT * FROM remote('127.0.0.{2,3}', currentDatabase(), numbers500k) ORDER BY number) GROUP BY k WITH TOTALS ORDER BY k LIMIT 10;

SET totals_mode = 'after_having_exclusive';
SELECT intDiv(number, 2) AS k, count(), argMax(toString(number), number) FROM remote('127.0.0.{2,3}', currentDatabase(), numbers500k) GROUP BY k WITH TOTALS ORDER BY k LIMIT 10;
SELECT intDiv(number, 2) AS k, count(), argMax(toString(number), number) FROM (SELECT * FROM remote('127.0.0.{2,3}', currentDatabase(), numbers500k) ORDER BY number) GROUP BY k WITH TOTALS ORDER BY k LIMIT 10;

SET totals_mode = 'before_having';
SELECT intDiv(number, 2) AS k, count(), argMax(toString(number), number) FROM remote('127.0.0.{2,3}', currentDatabase(), numbers500k) GROUP BY k WITH TOTALS ORDER BY k LIMIT 10;
SELECT intDiv(number, 2) AS k, count(), argMax(toString(number), number) FROM (SELECT * FROM remote('127.0.0.{2,3}', currentDatabase(), numbers500k) ORDER BY number) GROUP BY k WITH TOTALS ORDER BY k LIMIT 10;

DROP TABLE numbers500k;
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ create materialized view mv_02231 to buffer_02231 as select
from in_02231
group by key;

insert into in_02231 select * from numbers(10e6) settings max_memory_usage='300Mi';
insert into in_02231 select * from numbers(10e6) settings max_memory_usage='310Mi', max_threads=1;

drop table buffer_02231;
drop table out_02231;
Expand Down