Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/en/operations/settings/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -3290,6 +3290,19 @@ Possible values:

Default value: `16`.

## max_insert_delayed_streams_for_parallel_write {#max-insert-delayed-streams-for-parallel-write}

The maximum number of streams (columns) to delay final part flush.

It makes difference only if underlying storage supports parallel write (i.e. S3), otherwise it will not give any benefit.

Possible values:

- Positive integer.
- 0 or 1 — Disabled.

Default value: `1000` for S3 and `0` otherwise.

## opentelemetry_start_trace_probability {#opentelemetry-start-trace-probability}

Sets the probability that the ClickHouse can start a trace for executed queries (if no parent [trace context](https://www.w3.org/TR/trace-context/) is supplied).
Expand Down
1 change: 1 addition & 0 deletions src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class IColumn;
M(UInt64, min_insert_block_size_bytes_for_materialized_views, 0, "Like min_insert_block_size_bytes, but applied only during pushing to MATERIALIZED VIEW (default: min_insert_block_size_bytes)", 0) \
M(UInt64, max_joined_block_size_rows, DEFAULT_BLOCK_SIZE, "Maximum block size for JOIN result (if join algorithm supports it). 0 means unlimited.", 0) \
M(UInt64, max_insert_threads, 0, "The maximum number of threads to execute the INSERT SELECT query. Values 0 or 1 means that INSERT SELECT is not run in parallel. Higher values will lead to higher memory usage. Parallel INSERT SELECT has effect only if the SELECT part is run on parallel, see 'max_threads' setting.", 0) \
M(UInt64, max_insert_delayed_streams_for_parallel_write, 0, "The maximum number of streams (columns) to delay final part flush. Default - auto (1000 in case of underlying storage supports parallel write, for example S3 and disabled otherwise)", 0) \
M(UInt64, max_final_threads, 16, "The maximum number of threads to read from table with FINAL.", 0) \
M(MaxThreads, max_threads, 0, "The maximum number of threads to execute the request. By default, it is determined automatically.", 0) \
M(UInt64, max_read_buffer_size, DBMS_DEFAULT_BUFFER_SIZE, "The maximum size of the buffer to read from the filesystem.", 0) \
Expand Down
4 changes: 4 additions & 0 deletions src/Disks/IDisk.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,10 @@ class IDisk : public Space
/// Overrode in remote fs disks.
virtual bool supportZeroCopyReplication() const = 0;

/// Whether this disk support parallel write
/// Overrode in remote fs disks.
virtual bool supportParallelWrite() const { return false; }
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw looks like this never worked because remote disks are wrapped into DiskDecorator for DiskRestartProxy and DiskCacheWrapper (old cache version which is stilled turned on by default) and for DiskDecorator this method was not overriden: #38792

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@azat the test, added in this PR, does not work after the issue (which I described above) is fixed (see #38792 checks). If this test does not actually check anything now, I will remove it for now, agree?

Copy link
Copy Markdown
Member Author

@azat azat Jul 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test uses explicit max_insert_delayed_streams_for_parallel_write so it works correctly.

As for the failure in the #38792 you can set max_insert_delayed_streams_for_parallel_write=0 explicitly for INSERT w/o this option and then it will ignore remote disk or not.

Also I would recommend add another test that will ensure that S3 does uses parallel writes (via MEMORY_LIMIT_EXCEEDED during INSERT or thread_ids from query_log)


virtual bool isReadOnly() const { return false; }

/// Check if disk is broken. Broken disks will have 0 space and not be used.
Expand Down
1 change: 0 additions & 1 deletion src/Disks/IDiskRemote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteBufferFromS3.h>
#include <IO/WriteHelpers.h>
#include <Common/createHardLink.h>
#include <Common/quoteString.h>
Expand Down
2 changes: 2 additions & 0 deletions src/Disks/S3/DiskS3.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ class DiskS3 final : public IDiskRemote

bool supportZeroCopyReplication() const override { return true; }

bool supportParallelWrite() const override { return true; }

void shutdown() override;

void startup() override;
Expand Down
3 changes: 3 additions & 0 deletions src/Storages/MergeTree/MergeTreeData.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
namespace DB
{

/// Number of streams is not number parts, but number or parts*files, hence 1000.
const size_t DEFAULT_DELAYED_STREAMS_FOR_PARALLEL_WRITE = 1000;

class AlterCommands;
class MergeTreePartsMover;
class MergeTreeDataMergerMutator;
Expand Down
32 changes: 30 additions & 2 deletions src/Storages/MergeTree/MergeTreeSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,14 @@ void MergeTreeSink::consume(Chunk chunk)
auto block = getHeader().cloneWithColumns(chunk.detachColumns());

auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context);
std::vector<MergeTreeSink::DelayedChunk::Partition> partitions;

using DelayedPartitions = std::vector<MergeTreeSink::DelayedChunk::Partition>;
DelayedPartitions partitions;

const Settings & settings = context->getSettingsRef();
size_t streams = 0;
bool support_parallel_write = false;

for (auto & current_block : part_blocks)
{
Stopwatch watch;
Expand All @@ -67,9 +74,12 @@ void MergeTreeSink::consume(Chunk chunk)
if (!temp_part.part)
continue;

if (!support_parallel_write && temp_part.part->volume->getDisk()->supportParallelWrite())
support_parallel_write = true;

if (storage.getDeduplicationLog())
{
const String & dedup_token = context->getSettingsRef().insert_deduplication_token;
const String & dedup_token = settings.insert_deduplication_token;
if (!dedup_token.empty())
{
/// multiple blocks can be inserted within the same insert query
Expand All @@ -79,6 +89,24 @@ void MergeTreeSink::consume(Chunk chunk)
}
}

size_t max_insert_delayed_streams_for_parallel_write = DEFAULT_DELAYED_STREAMS_FOR_PARALLEL_WRITE;
if (!support_parallel_write || settings.max_insert_delayed_streams_for_parallel_write.changed)
max_insert_delayed_streams_for_parallel_write = settings.max_insert_delayed_streams_for_parallel_write;

/// In case of too much columns/parts in block, flush explicitly.
streams += temp_part.streams.size();
if (streams > max_insert_delayed_streams_for_parallel_write)
{
finishDelayedChunk();
delayed_chunk = std::make_unique<MergeTreeSink::DelayedChunk>();
delayed_chunk->partitions = std::move(partitions);
finishDelayedChunk();

streams = 0;
support_parallel_write = false;
partitions = DelayedPartitions{};
}

partitions.emplace_back(MergeTreeSink::DelayedChunk::Partition
{
.temp_part = std::move(temp_part),
Expand Down
34 changes: 30 additions & 4 deletions src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,14 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk)
if (quorum)
checkQuorumPrecondition(zookeeper);

const Settings & settings = context->getSettingsRef();
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context);
std::vector<ReplicatedMergeTreeSink::DelayedChunk::Partition> partitions;
String block_dedup_token;

using DelayedPartitions = std::vector<ReplicatedMergeTreeSink::DelayedChunk::Partition>;
DelayedPartitions partitions;

size_t streams = 0;
bool support_parallel_write = false;

for (auto & current_block : part_blocks)
{
Expand All @@ -171,17 +176,20 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk)

if (deduplicate)
{
String block_dedup_token;

/// We add the hash from the data and partition identifier to deduplication ID.
/// That is, do not insert the same data to the same partition twice.

const String & dedup_token = context->getSettingsRef().insert_deduplication_token;
const String & dedup_token = settings.insert_deduplication_token;
if (!dedup_token.empty())
{
/// multiple blocks can be inserted within the same insert query
/// an ordinal number is added to dedup token to generate a distinctive block id for each block
block_dedup_token = fmt::format("{}_{}", dedup_token, chunk_dedup_seqnum);
++chunk_dedup_seqnum;
}

block_id = temp_part.part->getZeroLevelPartBlockID(block_dedup_token);
LOG_DEBUG(log, "Wrote block with ID '{}', {} rows", block_id, current_block.block.rows());
}
Expand All @@ -192,6 +200,24 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk)

UInt64 elapsed_ns = watch.elapsed();

size_t max_insert_delayed_streams_for_parallel_write = DEFAULT_DELAYED_STREAMS_FOR_PARALLEL_WRITE;
if (!support_parallel_write || settings.max_insert_delayed_streams_for_parallel_write.changed)
max_insert_delayed_streams_for_parallel_write = settings.max_insert_delayed_streams_for_parallel_write;

/// In case of too much columns/parts in block, flush explicitly.
streams += temp_part.streams.size();
if (streams > max_insert_delayed_streams_for_parallel_write)
{
finishDelayedChunk(zookeeper);
delayed_chunk = std::make_unique<ReplicatedMergeTreeSink::DelayedChunk>();
delayed_chunk->partitions = std::move(partitions);
finishDelayedChunk(zookeeper);

streams = 0;
support_parallel_write = false;
partitions = DelayedPartitions{};
}

partitions.emplace_back(ReplicatedMergeTreeSink::DelayedChunk::Partition{
.temp_part = std::move(temp_part),
.elapsed_ns = elapsed_ns,
Expand All @@ -207,7 +233,7 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk)
/// value for `last_block_is_duplicate`, which is possible only after the part is committed.
/// Othervide we can delay commit.
/// TODO: we can also delay commit if there is no MVs.
if (!context->getSettingsRef().deduplicate_blocks_in_dependent_materialized_views)
if (!settings.deduplicate_blocks_in_dependent_materialized_views)
finishDelayedChunk(zookeeper);
}

Expand Down
5 changes: 4 additions & 1 deletion tests/queries/0_stateless/02051_read_settings.sql.j2
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
-- Tags: long
-- Tags: long, no-tsan, no-parallel, no-random-settings
-- Tag: no-tsan -- too slow under TSan (~5-6min)
-- Tag: no-random-settings -- to avoid settings overlaps
-- Tag: no-parallel -- to reduce test time
--
-- Test for testing various read settings.

Expand Down
Empty file.
15 changes: 15 additions & 0 deletions tests/queries/0_stateless/02228_merge_tree_insert_memory_usage.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-- Tags: long, no-parallel

-- regression for MEMORY_LIMIT_EXCEEDED error because of deferred final part flush

drop table if exists data_02228;
create table data_02228 (key1 UInt32, sign Int8, s UInt64) engine = CollapsingMergeTree(sign) order by (key1) partition by key1 % 1024;
insert into data_02228 select number, 1, number from numbers_mt(100e3) settings max_memory_usage='300Mi', max_partitions_per_insert_block=1024;
insert into data_02228 select number, 1, number from numbers_mt(100e3) settings max_memory_usage='300Mi', max_partitions_per_insert_block=1024, max_insert_delayed_streams_for_parallel_write=10000000; -- { serverError MEMORY_LIMIT_EXCEEDED }
drop table data_02228;

drop table if exists data_rep_02228;
create table data_rep_02228 (key1 UInt32, sign Int8, s UInt64) engine = ReplicatedCollapsingMergeTree('/clickhouse/{database}', 'r1', sign) order by (key1) partition by key1 % 1024;
insert into data_rep_02228 select number, 1, number from numbers_mt(100e3) settings max_memory_usage='300Mi', max_partitions_per_insert_block=1024;
insert into data_rep_02228 select number, 1, number from numbers_mt(100e3) settings max_memory_usage='300Mi', max_partitions_per_insert_block=1024, max_insert_delayed_streams_for_parallel_write=10000000; -- { serverError MEMORY_LIMIT_EXCEEDED }
drop table data_rep_02228;