Skip to content

Add pool to WriteBufferFromS3#33291

Merged
KochetovNicolai merged 20 commits intomasterfrom
add-pool-to-s3-write-buffer
Jan 31, 2022
Merged

Add pool to WriteBufferFromS3#33291
KochetovNicolai merged 20 commits intomasterfrom
add-pool-to-s3-write-buffer

Conversation

@KochetovNicolai
Copy link
Copy Markdown
Member

Changelog category (leave one):

  • Performance Improvement

Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):
For inserts and merges into S3, write files in parallel whenever possible.

@robot-clickhouse robot-clickhouse added the pr-performance Pull request with some performance improvements label Dec 29, 2021
@alexey-milovidov
Copy link
Copy Markdown
Member

How is it related to #16503?

@KochetovNicolai
Copy link
Copy Markdown
Member Author

I suppose here is the thread pool part which is the same.
However, in this pr I've added a logic that we can send written to s3 files in parallel also (in case of insert and merge).
ThreadPool itself did not improve insert select time.
At current stage, I see speed up from 24MB/s to ~ 200MB/s

@alesapin alesapin self-assigned this Jan 24, 2022
@KochetovNicolai KochetovNicolai marked this pull request as ready for review January 24, 2022 16:04
Copy link
Copy Markdown
Member

@alesapin alesapin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not finished yet (replicated sink part), but small clarification in comments required. In general idea is clear and the code should work as expected.

impl->preFinalize();
impl->finalize();

// LOG_TRACE(&Poco::Logger::get("WritingToCacheWriteBuffer"), "count {}\n{}", impl->count(), StackTrace().toString());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove?

if (!is_prefinalized)
preFinalize();

// LOG_TRACE(&Poco::Logger::get("WritingToCacheWriteBuffer"), "{}", StackTrace().toString());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove

{
WriteBufferFromFileDecorator::finalizeImpl();

// LOG_TRACE(&Poco::Logger::get("WriteIndirectBufferFromRemoteFS"), "{}\n{}", count(), StackTrace().toString());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove

if (thread_group)
CurrentThread::attachTo(thread_group);

SCOPE_EXIT_SAFE(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How it differs from SCOPE_EXIT? The code inside should be exception safe?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quite the opposite, SAVE has additional try/catch to log an exception if there is one.

void load(const MergeTreeData & storage, const DiskPtr & disk, const String & part_path);
void store(const MergeTreeData & storage, const DiskPtr & disk, const String & part_path, MergeTreeDataPartChecksums & checksums) const;
void store(const Block & partition_key_sample, const DiskPtr & disk, const String & part_path, MergeTreeDataPartChecksums & checksums) const;
[[nodiscard]] std::unique_ptr<WriteBufferFromFileBase> store(const MergeTreeData & storage, const DiskPtr & disk, const String & part_path, MergeTreeDataPartChecksums & checksums) const;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is returned and what user should do with it? It's unclear, need to add a comment?


}

void MergeTreeDataWriter::TemporaryPart::finalize()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are switching between finalize and finish multiple times on different levels. It's confusing. Maybe better to add some suffix to these names, so it became more clear... for example here it can be finilizeAllStreamsForPart...

void load(const MergeTreeData & data, const DiskPtr & disk_, const String & part_path);
void store(const MergeTreeData & data, const DiskPtr & disk_, const String & part_path, Checksums & checksums) const;
void store(const Names & column_names, const DataTypes & data_types, const DiskPtr & disk_, const String & part_path, Checksums & checksums) const;
[[nodiscard]] WrittenFiles store(const MergeTreeData & data, const DiskPtr & disk_, const String & part_path, Checksums & checksums) const;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to add a comment -- what to do with them.

else
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
{
std::string tags_str;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fmt::join?

Copy link
Copy Markdown
Member

@alesapin alesapin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool!

@KochetovNicolai KochetovNicolai merged commit 321fa4a into master Jan 31, 2022
@KochetovNicolai KochetovNicolai deleted the add-pool-to-s3-write-buffer branch January 31, 2022 16:37
azat added a commit to azat/ClickHouse that referenced this pull request Feb 18, 2022
There are two possible cases for execution merges/mutations:
1) from background thread
2) from OPTIMIZE TABLE query

1) is pretty simple, it's memory tracking structure is as follow:

    current_thread::memory_tracker = level=Thread / description="(for thread)" ==
      background_thread_memory_tracker = level=Thread / description="(for thread)"
    current_thread::memory_tracker.parent = level=Global / description="(total)"

  So as you can see it is pretty simple and MemoryTrackerThreadSwitcher
  does not do anything icky for this case.

2) is complex, it's memory tracking structure is as follow:

    current_thread::memory_tracker = level=Thread / description="(for thread)"
    current_thread::memory_tracker.parent = level=Process / description="(for query)" ==
      background_thread_memory_tracker = level=Process / description="(for query)"

  Before this patch to track memory (and related things, like sampling,
  profiling and so on) for OPTIMIZE TABLE query dirty hacks was done to
  do this, since current_thread memory_tracker was of Thread scope, that
  does not have any limits.

  And so if will change parent for it to Merge/Mutate memory tracker
  (which also does not have some of settings) it will not be correctly
  tracked.

  To address this Merge/Mutate was set as parent not to the
  current_thread memory_tracker but to it's parent, since it's scope is
  Process with all settings.

  But that parent's memory_tracker is the memory_tracker of the
  thread_group, and so if you will have nested ThreadPool inside
  merge/mutate (this is the case for s3 async writes, which has been
  added in ClickHouse#33291) you may get use-after-free of memory_tracker.

  Consider the following example:

    MemoryTrackerThreadSwitcher()
      thread_group.memory_tracker.parent = merge_list_entry->memory_tracker
      (see also background_thread_memory_tracker above)

    CurrentThread::attachTo()
      current_thread.memory_tracker.parent = thread_group.memory_tracker

    CurrentThread::detachQuery()
      current_thread.memory_tracker.parent = thread_group.memory_tracker.parent
      # and this is equal to merge_list_entry->memory_tracker

    ~MemoryTrackerThreadSwitcher()
      thread_group.memory_tracker = thread_group.memory_tracker.parent

  So after the following we will get incorrect memory_tracker (from the
  mege_list_entry) when the next job in that ThreadPool will not have
  thread_group, since in this case it will not try to update the
  current_thread.memory_tracker.parent and use-after-free will happens.

So to address the (2) issue, settings from the parent memory_tracker
should be copied to the merge_list_entry->memory_tracker, to avoid
playing with parent memory tracker.

Note, that settings from the query (OPTIMIZE TABLE) is not available at
that time, so it cannot be used (instead of parent's memory tracker
settings).

v2: remove memory_tracker.setOrRaiseHardLimit() from settings

Signed-off-by: Azat Khuzhin <[email protected]>
KochetovNicolai pushed a commit that referenced this pull request Feb 18, 2022
There are two possible cases for execution merges/mutations:
1) from background thread
2) from OPTIMIZE TABLE query

1) is pretty simple, it's memory tracking structure is as follow:

    current_thread::memory_tracker = level=Thread / description="(for thread)" ==
      background_thread_memory_tracker = level=Thread / description="(for thread)"
    current_thread::memory_tracker.parent = level=Global / description="(total)"

  So as you can see it is pretty simple and MemoryTrackerThreadSwitcher
  does not do anything icky for this case.

2) is complex, it's memory tracking structure is as follow:

    current_thread::memory_tracker = level=Thread / description="(for thread)"
    current_thread::memory_tracker.parent = level=Process / description="(for query)" ==
      background_thread_memory_tracker = level=Process / description="(for query)"

  Before this patch to track memory (and related things, like sampling,
  profiling and so on) for OPTIMIZE TABLE query dirty hacks was done to
  do this, since current_thread memory_tracker was of Thread scope, that
  does not have any limits.

  And so if will change parent for it to Merge/Mutate memory tracker
  (which also does not have some of settings) it will not be correctly
  tracked.

  To address this Merge/Mutate was set as parent not to the
  current_thread memory_tracker but to it's parent, since it's scope is
  Process with all settings.

  But that parent's memory_tracker is the memory_tracker of the
  thread_group, and so if you will have nested ThreadPool inside
  merge/mutate (this is the case for s3 async writes, which has been
  added in #33291) you may get use-after-free of memory_tracker.

  Consider the following example:

    MemoryTrackerThreadSwitcher()
      thread_group.memory_tracker.parent = merge_list_entry->memory_tracker
      (see also background_thread_memory_tracker above)

    CurrentThread::attachTo()
      current_thread.memory_tracker.parent = thread_group.memory_tracker

    CurrentThread::detachQuery()
      current_thread.memory_tracker.parent = thread_group.memory_tracker.parent
      # and this is equal to merge_list_entry->memory_tracker

    ~MemoryTrackerThreadSwitcher()
      thread_group.memory_tracker = thread_group.memory_tracker.parent

  So after the following we will get incorrect memory_tracker (from the
  mege_list_entry) when the next job in that ThreadPool will not have
  thread_group, since in this case it will not try to update the
  current_thread.memory_tracker.parent and use-after-free will happens.

So to address the (2) issue, settings from the parent memory_tracker
should be copied to the merge_list_entry->memory_tracker, to avoid
playing with parent memory tracker.

Note, that settings from the query (OPTIMIZE TABLE) is not available at
that time, so it cannot be used (instead of parent's memory tracker
settings).

v2: remove memory_tracker.setOrRaiseHardLimit() from settings

Signed-off-by: Azat Khuzhin <[email protected]>
(cherry picked from commit 65e9b48)
azat added a commit to azat/ClickHouse that referenced this pull request Mar 8, 2022
In ClickHouse#33291 final part commit had been defered, and now it can take
significantly more time, that may lead to "Part directory doesn't exist"
error during INSERT:

    2022.02.21 18:18:06.979881 [ 11329 ] {insert} <Debug> executeQuery: (from 127.1:24572, user: default) INSERT INTO db.table (...) VALUES
    2022.02.21 20:58:03.933593 [ 11329 ] {insert} <Trace> db.table: Renaming temporary part tmp_insert_20220214_18044_18044_0 to 20220214_270654_270654_0.
    2022.02.21 21:16:50.961917 [ 11329 ] {insert} <Trace> db.table: Renaming temporary part tmp_insert_20220214_18197_18197_0 to 20220214_270689_270689_0.
    ...
    2022.02.22 21:16:57.632221 [ 64878 ] {} <Warning> db.table: Removing temporary directory /clickhouse/data/db/table/tmp_insert_20220214_18232_18232_0/
    ...
    2022.02.23 12:23:56.277480 [ 11329 ] {insert} <Trace> db.table: Renaming temporary part tmp_insert_20220214_18232_18232_0 to 20220214_273459_273459_0.
    2022.02.23 12:23:56.299218 [ 11329 ] {insert} <Error> executeQuery: Code: 107. DB::Exception: Part directory /clickhouse/data/db/table/tmp_insert_20220214_18232_18232_0/ doesn't exist. Most likely it is a logical error. (FILE_DOESNT_EXIST) (version 22.2.1.1) (from 127.1:24572) (in query: INSERT INTO db.table (...) VALUES), Stack trace (when copying this message, always include the lines below):

Follow-up for: ClickHouse#28760
Refs: ClickHouse#33291

Signed-off-by: Azat Khuzhin <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

pr-performance Pull request with some performance improvements

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants