Skip to content

Commit 65e9b48

Browse files
committed
Fix possible memory_tracker use-after-free for merges/mutations
There are two possible cases for execution merges/mutations: 1) from background thread 2) from OPTIMIZE TABLE query 1) is pretty simple, it's memory tracking structure is as follow: current_thread::memory_tracker = level=Thread / description="(for thread)" == background_thread_memory_tracker = level=Thread / description="(for thread)" current_thread::memory_tracker.parent = level=Global / description="(total)" So as you can see it is pretty simple and MemoryTrackerThreadSwitcher does not do anything icky for this case. 2) is complex, it's memory tracking structure is as follow: current_thread::memory_tracker = level=Thread / description="(for thread)" current_thread::memory_tracker.parent = level=Process / description="(for query)" == background_thread_memory_tracker = level=Process / description="(for query)" Before this patch to track memory (and related things, like sampling, profiling and so on) for OPTIMIZE TABLE query dirty hacks was done to do this, since current_thread memory_tracker was of Thread scope, that does not have any limits. And so if will change parent for it to Merge/Mutate memory tracker (which also does not have some of settings) it will not be correctly tracked. To address this Merge/Mutate was set as parent not to the current_thread memory_tracker but to it's parent, since it's scope is Process with all settings. But that parent's memory_tracker is the memory_tracker of the thread_group, and so if you will have nested ThreadPool inside merge/mutate (this is the case for s3 async writes, which has been added in #33291) you may get use-after-free of memory_tracker. Consider the following example: MemoryTrackerThreadSwitcher() thread_group.memory_tracker.parent = merge_list_entry->memory_tracker (see also background_thread_memory_tracker above) CurrentThread::attachTo() current_thread.memory_tracker.parent = thread_group.memory_tracker CurrentThread::detachQuery() current_thread.memory_tracker.parent = thread_group.memory_tracker.parent # and this is equal to merge_list_entry->memory_tracker ~MemoryTrackerThreadSwitcher() thread_group.memory_tracker = thread_group.memory_tracker.parent So after the following we will get incorrect memory_tracker (from the mege_list_entry) when the next job in that ThreadPool will not have thread_group, since in this case it will not try to update the current_thread.memory_tracker.parent and use-after-free will happens. So to address the (2) issue, settings from the parent memory_tracker should be copied to the merge_list_entry->memory_tracker, to avoid playing with parent memory tracker. Note, that settings from the query (OPTIMIZE TABLE) is not available at that time, so it cannot be used (instead of parent's memory tracker settings). v2: remove memory_tracker.setOrRaiseHardLimit() from settings Signed-off-by: Azat Khuzhin <[email protected]>
1 parent c204ac7 commit 65e9b48

File tree

8 files changed

+41
-53
lines changed

8 files changed

+41
-53
lines changed

src/Storages/MergeTree/MergeFromLogEntryTask.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -187,9 +187,7 @@ std::pair<bool, ReplicatedMergeMutateTaskBase::PartLogWriter> MergeFromLogEntryT
187187
merge_mutate_entry = storage.getContext()->getMergeList().insert(
188188
storage.getStorageID(),
189189
future_merged_part,
190-
settings.memory_profiler_step,
191-
settings.memory_profiler_sample_probability,
192-
settings.max_untracked_memory);
190+
settings);
193191

194192
transaction_ptr = std::make_unique<MergeTreeData::Transaction>(storage);
195193
stopwatch_ptr = std::make_unique<Stopwatch>();

src/Storages/MergeTree/MergeList.cpp

Lines changed: 32 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,8 @@ MemoryTrackerThreadSwitcher::MemoryTrackerThreadSwitcher(MergeListEntry & merge_
1616
{
1717
// Each merge is executed into separate background processing pool thread
1818
background_thread_memory_tracker = CurrentThread::getMemoryTracker();
19-
if (background_thread_memory_tracker)
20-
{
21-
/// From the query context it will be ("for thread") memory tracker with VariableContext::Thread level,
22-
/// which does not have any limits and sampling settings configured.
23-
/// And parent for this memory tracker should be ("(for query)") with VariableContext::Process level,
24-
/// that has limits and sampling configured.
25-
MemoryTracker * parent;
26-
if (background_thread_memory_tracker->level == VariableContext::Thread &&
27-
(parent = background_thread_memory_tracker->getParent()) &&
28-
parent != &total_memory_tracker)
29-
{
30-
background_thread_memory_tracker = parent;
31-
}
32-
33-
background_thread_memory_tracker_prev_parent = background_thread_memory_tracker->getParent();
34-
background_thread_memory_tracker->setParent(&merge_list_entry->memory_tracker);
35-
}
19+
background_thread_memory_tracker_prev_parent = background_thread_memory_tracker->getParent();
20+
background_thread_memory_tracker->setParent(&merge_list_entry->memory_tracker);
3621

3722
prev_untracked_memory_limit = current_thread->untracked_memory_limit;
3823
current_thread->untracked_memory_limit = merge_list_entry->max_untracked_memory;
@@ -50,9 +35,7 @@ MemoryTrackerThreadSwitcher::MemoryTrackerThreadSwitcher(MergeListEntry & merge_
5035
MemoryTrackerThreadSwitcher::~MemoryTrackerThreadSwitcher()
5136
{
5237
// Unplug memory_tracker from current background processing pool thread
53-
54-
if (background_thread_memory_tracker)
55-
background_thread_memory_tracker->setParent(background_thread_memory_tracker_prev_parent);
38+
background_thread_memory_tracker->setParent(background_thread_memory_tracker_prev_parent);
5639

5740
current_thread->untracked_memory_limit = prev_untracked_memory_limit;
5841

@@ -65,16 +48,14 @@ MemoryTrackerThreadSwitcher::~MemoryTrackerThreadSwitcher()
6548
MergeListElement::MergeListElement(
6649
const StorageID & table_id_,
6750
FutureMergedMutatedPartPtr future_part,
68-
UInt64 memory_profiler_step,
69-
UInt64 memory_profiler_sample_probability,
70-
UInt64 max_untracked_memory_)
51+
const Settings & settings)
7152
: table_id{table_id_}
7253
, partition_id{future_part->part_info.partition_id}
7354
, result_part_name{future_part->name}
7455
, result_part_path{future_part->path}
7556
, result_part_info{future_part->part_info}
7657
, num_parts{future_part->parts.size()}
77-
, max_untracked_memory(max_untracked_memory_)
58+
, max_untracked_memory(settings.max_untracked_memory)
7859
, query_id(table_id.getShortName() + "::" + result_part_name)
7960
, thread_id{getThreadId()}
8061
, merge_type{future_part->merge_type}
@@ -97,8 +78,33 @@ MergeListElement::MergeListElement(
9778
}
9879

9980
memory_tracker.setDescription("Mutate/Merge");
100-
memory_tracker.setProfilerStep(memory_profiler_step);
101-
memory_tracker.setSampleProbability(memory_profiler_sample_probability);
81+
/// MemoryTracker settings should be set here, because
82+
/// later (see MemoryTrackerThreadSwitcher)
83+
/// parent memory tracker will be changed, and if merge executed from the
84+
/// query (OPTIMIZE TABLE), all settings will be lost (since
85+
/// current_thread::memory_tracker will have Thread level MemoryTracker,
86+
/// which does not have any settings itself, it relies on the settings of the
87+
/// thread_group::memory_tracker, but MemoryTrackerThreadSwitcher will reset parent).
88+
memory_tracker.setProfilerStep(settings.memory_profiler_step);
89+
memory_tracker.setSampleProbability(settings.memory_profiler_sample_probability);
90+
memory_tracker.setSoftLimit(settings.max_guaranteed_memory_usage);
91+
if (settings.memory_tracker_fault_probability)
92+
memory_tracker.setFaultProbability(settings.memory_tracker_fault_probability);
93+
94+
/// Let's try to copy memory related settings from the query,
95+
/// since settings that we have here is not from query, but global, from the table.
96+
///
97+
/// NOTE: Remember, that Thread level MemoryTracker does not have any settings,
98+
/// so it's parent is required.
99+
MemoryTracker * query_memory_tracker = CurrentThread::getMemoryTracker();
100+
MemoryTracker * parent_query_memory_tracker;
101+
if (query_memory_tracker->level == VariableContext::Thread &&
102+
(parent_query_memory_tracker = query_memory_tracker->getParent()) &&
103+
parent_query_memory_tracker != &total_memory_tracker)
104+
{
105+
memory_tracker.setOrRaiseHardLimit(parent_query_memory_tracker->getHardLimit());
106+
}
107+
102108
}
103109

104110
MergeInfo MergeListElement::getInfo() const

src/Storages/MergeTree/MergeList.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ using FutureMergedMutatedPartPtr = std::shared_ptr<FutureMergedMutatedPart>;
5858
struct MergeListElement;
5959
using MergeListEntry = BackgroundProcessListEntry<MergeListElement, MergeInfo>;
6060

61+
struct Settings;
62+
6163

6264
/**
6365
* Since merge is executed with multiple threads, this class
@@ -127,9 +129,7 @@ struct MergeListElement : boost::noncopyable
127129
MergeListElement(
128130
const StorageID & table_id_,
129131
FutureMergedMutatedPartPtr future_part,
130-
UInt64 memory_profiler_step,
131-
UInt64 memory_profiler_sample_probability,
132-
UInt64 max_untracked_memory_);
132+
const Settings & settings);
133133

134134
MergeInfo getInfo() const;
135135

src/Storages/MergeTree/MergePlainMergeTreeTask.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,7 @@ void MergePlainMergeTreeTask::prepare()
8181
merge_list_entry = storage.getContext()->getMergeList().insert(
8282
storage.getStorageID(),
8383
future_part,
84-
settings.memory_profiler_step,
85-
settings.memory_profiler_sample_probability,
86-
settings.max_untracked_memory);
84+
settings);
8785

8886
write_part_log = [this] (const ExecutionStatus & execution_status)
8987
{

src/Storages/MergeTree/MergeTask.cpp

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -585,12 +585,7 @@ bool MergeTask::MergeProjectionsStage::mergeMinMaxIndexAndPrepareProjections() c
585585
projection_future_part,
586586
projection.metadata,
587587
global_ctx->merge_entry,
588-
std::make_unique<MergeListElement>(
589-
(*global_ctx->merge_entry)->table_id,
590-
projection_future_part,
591-
settings.memory_profiler_step,
592-
settings.memory_profiler_sample_probability,
593-
settings.max_untracked_memory),
588+
std::make_unique<MergeListElement>((*global_ctx->merge_entry)->table_id, projection_future_part, settings),
594589
global_ctx->time_of_merge,
595590
global_ctx->context,
596591
global_ctx->space_reservation,

src/Storages/MergeTree/MutateFromLogEntryTask.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,7 @@ std::pair<bool, ReplicatedMergeMutateTaskBase::PartLogWriter> MutateFromLogEntry
116116
merge_mutate_entry = storage.getContext()->getMergeList().insert(
117117
storage.getStorageID(),
118118
future_mutated_part,
119-
settings.memory_profiler_step,
120-
settings.memory_profiler_sample_probability,
121-
settings.max_untracked_memory);
119+
settings);
122120

123121
stopwatch_ptr = std::make_unique<Stopwatch>();
124122

src/Storages/MergeTree/MutatePlainMergeTreeTask.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,7 @@ void MutatePlainMergeTreeTask::prepare()
3232
merge_list_entry = storage.getContext()->getMergeList().insert(
3333
storage.getStorageID(),
3434
future_part,
35-
settings.memory_profiler_step,
36-
settings.memory_profiler_sample_probability,
37-
settings.max_untracked_memory);
35+
settings);
3836

3937
stopwatch = std::make_unique<Stopwatch>();
4038

src/Storages/MergeTree/MutateTask.cpp

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -633,12 +633,7 @@ class MergeProjectionPartsTask : public IExecutableTask
633633
projection_future_part,
634634
projection.metadata,
635635
ctx->mutate_entry,
636-
std::make_unique<MergeListElement>(
637-
(*ctx->mutate_entry)->table_id,
638-
projection_future_part,
639-
settings.memory_profiler_step,
640-
settings.memory_profiler_sample_probability,
641-
settings.max_untracked_memory),
636+
std::make_unique<MergeListElement>((*ctx->mutate_entry)->table_id, projection_future_part, settings),
642637
*ctx->holder,
643638
ctx->time_of_mutation,
644639
ctx->context,

0 commit comments

Comments
 (0)