Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docker/test/fasttest/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,9 @@ TESTS_TO_SKIP=(
00974_query_profiler

# Look at DistributedFilesToInsert, so cannot run in parallel.
01460_DistributedFilesToInsert
01457_DistributedFilesToInsert

01541_max_memory_usage_for_user

# Require python libraries like scipy, pandas and numpy
01322_ttest_scipy
Expand Down
2 changes: 1 addition & 1 deletion programs/server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
Poco::Logger * log = &logger();
UseSSL use_ssl;

ThreadStatus thread_status;
MainThreadStatus::getInstance();

registerFunctions();
registerAggregateFunctions();
Expand Down
69 changes: 52 additions & 17 deletions src/Common/MemoryTracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,24 @@
#include <random>
#include <cstdlib>

namespace
{

MemoryTracker * getMemoryTracker()
{
if (auto * thread_memory_tracker = DB::CurrentThread::getMemoryTracker())
return thread_memory_tracker;

/// Once the main thread is initialized,
/// total_memory_tracker is initialized too.
/// And can be used, since MainThreadStatus is required for profiling.
if (DB::MainThreadStatus::get())
return &total_memory_tracker;

return nullptr;
}

}

namespace DB
{
Expand Down Expand Up @@ -192,14 +210,15 @@ void MemoryTracker::free(Int64 size)
DB::TraceCollector::collect(DB::TraceType::MemorySample, StackTrace(), -size);
}

Int64 accounted_size = size;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't understand this change.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.

if (level == VariableContext::Thread)
{
/// Could become negative if memory allocated in this thread is freed in another one
amount.fetch_sub(size, std::memory_order_relaxed);
amount.fetch_sub(accounted_size, std::memory_order_relaxed);
}
else
{
Int64 new_amount = amount.fetch_sub(size, std::memory_order_relaxed) - size;
Int64 new_amount = amount.fetch_sub(accounted_size, std::memory_order_relaxed) - accounted_size;

/** Sometimes, query could free some data, that was allocated outside of query context.
* Example: cache eviction.
Expand All @@ -210,15 +229,15 @@ void MemoryTracker::free(Int64 size)
if (unlikely(new_amount < 0))
{
amount.fetch_sub(new_amount);
size += new_amount;
accounted_size += new_amount;
}
}

if (auto * loaded_next = parent.load(std::memory_order_relaxed))
loaded_next->free(size);

if (metric != CurrentMetrics::end())
CurrentMetrics::sub(metric, size);
CurrentMetrics::sub(metric, accounted_size);
}


Expand Down Expand Up @@ -270,16 +289,24 @@ namespace CurrentMemoryTracker

void alloc(Int64 size)
{
if (auto * memory_tracker = DB::CurrentThread::getMemoryTracker())
if (auto * memory_tracker = getMemoryTracker())
{
current_thread->untracked_memory += size;
if (current_thread->untracked_memory > current_thread->untracked_memory_limit)
if (current_thread)
{
current_thread->untracked_memory += size;
if (current_thread->untracked_memory > current_thread->untracked_memory_limit)
{
/// Zero untracked before track. If tracker throws out-of-limit we would be able to alloc up to untracked_memory_limit bytes
/// more. It could be useful to enlarge Exception message in rethrow logic.
Int64 tmp = current_thread->untracked_memory;
current_thread->untracked_memory = 0;
memory_tracker->alloc(tmp);
}
}
/// total_memory_tracker only, ignore untracked_memory
else
{
/// Zero untracked before track. If tracker throws out-of-limit we would be able to alloc up to untracked_memory_limit bytes
/// more. It could be useful to enlarge Exception message in rethrow logic.
Int64 tmp = current_thread->untracked_memory;
current_thread->untracked_memory = 0;
memory_tracker->alloc(tmp);
memory_tracker->alloc(size);
}
}
}
Expand All @@ -292,13 +319,21 @@ namespace CurrentMemoryTracker

void free(Int64 size)
{
if (auto * memory_tracker = DB::CurrentThread::getMemoryTracker())
if (auto * memory_tracker = getMemoryTracker())
{
current_thread->untracked_memory -= size;
if (current_thread->untracked_memory < -current_thread->untracked_memory_limit)
if (current_thread)
{
current_thread->untracked_memory -= size;
if (current_thread->untracked_memory < -current_thread->untracked_memory_limit)
{
memory_tracker->free(-current_thread->untracked_memory);
current_thread->untracked_memory = 0;
}
}
/// total_memory_tracker only, ignore untracked_memory
else
{
memory_tracker->free(-current_thread->untracked_memory);
current_thread->untracked_memory = 0;
memory_tracker->free(size);
}
}
}
Expand Down
17 changes: 17 additions & 0 deletions src/Common/ThreadStatus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ namespace ErrorCodes


thread_local ThreadStatus * current_thread = nullptr;
thread_local ThreadStatus * main_thread = nullptr;


ThreadStatus::ThreadStatus()
Expand Down Expand Up @@ -115,4 +116,20 @@ void ThreadStatus::onFatalError()
fatal_error_callback();
}

ThreadStatus * MainThreadStatus::main_thread = nullptr;
MainThreadStatus & MainThreadStatus::getInstance()
{
static MainThreadStatus thread_status;
return thread_status;
}
MainThreadStatus::MainThreadStatus()
: ThreadStatus()
{
main_thread = current_thread;
}
MainThreadStatus::~MainThreadStatus()
{
main_thread = nullptr;
}

}
18 changes: 18 additions & 0 deletions src/Common/ThreadStatus.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,4 +215,22 @@ class ThreadStatus : public boost::noncopyable
void setupState(const ThreadGroupStatusPtr & thread_group_);
};

/**
* Creates ThreadStatus for the main thread.
*/
class MainThreadStatus : public ThreadStatus
{
public:
static MainThreadStatus & getInstance();
static ThreadStatus * get() { return main_thread; }
static bool isMainThread() { return main_thread == current_thread; }

~MainThreadStatus();

private:
MainThreadStatus();

static ThreadStatus * main_thread;
};

}
16 changes: 13 additions & 3 deletions src/Common/TraceCollector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,20 @@ void TraceCollector::collect(TraceType trace_type, const StackTrace & stack_trac
char buffer[buf_size];
WriteBufferFromFileDescriptorDiscardOnFailure out(pipe.fds_rw[1], buf_size, buffer);

StringRef query_id = CurrentThread::getQueryId();
query_id.size = std::min(query_id.size, QUERY_ID_MAX_LEN);
StringRef query_id;
UInt64 thread_id;

auto thread_id = CurrentThread::get().thread_id;
if (CurrentThread::isInitialized())
{
query_id = CurrentThread::getQueryId();
query_id.size = std::min(query_id.size, QUERY_ID_MAX_LEN);

thread_id = CurrentThread::get().thread_id;
}
else
{
thread_id = MainThreadStatus::get()->thread_id;
}

writeChar(false, out); /// true if requested to stop the collecting thread.
writeStringBinary(query_id, out);
Expand Down
4 changes: 2 additions & 2 deletions src/Interpreters/ThreadStatusExt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,8 @@ void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits)
performance_counters.setParent(&ProfileEvents::global_counters);
memory_tracker.reset();

/// Must reset pointer to thread_group's memory_tracker, because it will be destroyed two lines below.
memory_tracker.setParent(nullptr);
/// Must reset pointer to thread_group's memory_tracker, because it will be destroyed two lines below (will reset to its parent).
memory_tracker.setParent(thread_group->memory_tracker.getParent());

query_id.clear();
query_context = nullptr;
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
<yandex>
<!-- this update period also syncs MemoryTracking with RSS, disable this, by using period = 1 day -->
<asynchronous_metrics_update_period_s>86400</asynchronous_metrics_update_period_s>
</yandex>
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<yandex>
<metric_log remove="remove"/>
<query_masking_rules remove="remove"/>
<query_thread_log remove="remove"/>
<text_log remove="remove"/>
<trace_log remove="remove"/>
</yandex>
95 changes: 95 additions & 0 deletions tests/integration/test_MemoryTracking/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# pylint: disable=unused-argument
# pylint: disable=redefined-outer-name
# pylint: disable=line-too-long

import logging
import pytest
from helpers.cluster import ClickHouseCluster

cluster = ClickHouseCluster(__file__)

node = cluster.add_instance('node', main_configs=[
'configs/no_system_log.xml',
'configs/asynchronous_metrics_update_period_s.xml',
])

logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler())

@pytest.fixture(scope='module', autouse=True)
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()

query_settings = {
'max_threads': 1,
'query_profiler_real_time_period_ns': 0,
'query_profiler_cpu_time_period_ns': 0,
'log_queries': 0,
}
sample_query = "SELECT groupArray(repeat('a', 1000)) FROM numbers(10000) GROUP BY number%10 FORMAT JSON"

def query(*args, **kwargs):
if 'settings' not in kwargs:
kwargs['settings'] = query_settings
else:
kwargs['settings'].update(query_settings)
return node.query(*args, **kwargs)
def http_query(*args, **kwargs):
if 'params' not in kwargs:
kwargs['params'] = query_settings
else:
kwargs['params'].update(query_settings)
return node.http_query(*args, **kwargs)

def get_MemoryTracking():
return int(http_query("SELECT value FROM system.metrics WHERE metric = 'MemoryTracking'"))

def check_memory(memory):
# 3 changes to MemoryTracking is minimum, since:
# - this is not that high to not detect inacuracy
# - memory can go like X/X+N due to some background allocations
# - memory can go like X/X+N/X, so at least 2 changes
changes_allowed = 3
# if number of samples is large enough, use 10% from them
# (actually most of the time there will be only few changes, it was made 10% to avoid flackiness)
changes_allowed_auto=int(len(memory) * 0.1)
changes_allowed = max(changes_allowed_auto, changes_allowed)

changed=len(set(memory))
logging.info('Changes: allowed=%s, actual=%s, sample=%s',
changes_allowed, changed, len(memory))
assert changed < changes_allowed

def test_http():
memory = []
memory.append(get_MemoryTracking())
for _ in range(100):
http_query(sample_query)
memory.append(get_MemoryTracking())
check_memory(memory)

def test_tcp_multiple_sessions():
memory = []
memory.append(get_MemoryTracking())
for _ in range(100):
query(sample_query)
memory.append(get_MemoryTracking())
check_memory(memory)

def test_tcp_single_session():
memory = []
memory.append(get_MemoryTracking())
sample_queries = [
sample_query,
"SELECT metric, value FROM system.metrics WHERE metric = 'MemoryTracking'",
] * 100
rows = query(';'.join(sample_queries))
memory = rows.split('\n')
memory = filter(lambda x: x.startswith('MemoryTracking'), memory)
memory = map(lambda x: x.split('\t')[1], memory)
memory = [*memory]
check_memory(memory)
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
HTTP
TCP_ONE_SESSION
TCP
OK
KILL sleep
Loading