Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ IncludeCategories:
ReflowComments: false
AlignEscapedNewlinesLeft: false
AlignEscapedNewlines: DontAlign
AlignTrailingComments: true

# Not changed:
AccessModifierOffset: -4
AlignConsecutiveAssignments: false
AlignOperands: false
AlignTrailingComments: false
AllowAllParametersOfDeclarationOnNextLine: true
AllowShortBlocksOnASingleLine: false
AllowShortCaseLabelsOnASingleLine: false
Expand Down
60 changes: 40 additions & 20 deletions dbms/src/Common/MemoryTracker.cpp
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
#include <cstdlib>

#include "MemoryTracker.h"
#include <common/likely.h>
#include <common/logger_useful.h>

#include <IO/WriteHelpers.h>
#include "Common/TraceCollector.h"
#include <Common/CurrentThread.h>
#include <Common/Exception.h>
#include <Common/formatReadable.h>
#include <Common/CurrentThread.h>
#include <IO/WriteHelpers.h>
#include <common/likely.h>
#include <common/logger_useful.h>
#include <ext/singleton.h>

#include <atomic>
#include <cmath>
#include <cstdlib>


namespace DB
Expand Down Expand Up @@ -73,15 +78,16 @@ void MemoryTracker::alloc(Int64 size)
return;

/** Using memory_order_relaxed means that if allocations are done simultaneously,
* we allow exception about memory limit exceeded to be thrown only on next allocation.
* we allow exception about memory limit exceeded to be thrown only on next allocation.
* So, we allow over-allocations.
*/
Int64 will_be = size + amount.fetch_add(size, std::memory_order_relaxed);

if (metric != CurrentMetrics::end())
CurrentMetrics::add(metric, size);

Int64 current_limit = limit.load(std::memory_order_relaxed);
Int64 current_hard_limit = hard_limit.load(std::memory_order_relaxed);
Int64 current_profiler_limit = profiler_limit.load(std::memory_order_relaxed);

/// Using non-thread-safe random number generator. Joint distribution in different threads would not be uniform.
/// In this case, it doesn't matter.
Expand All @@ -98,12 +104,19 @@ void MemoryTracker::alloc(Int64 size)
message << " " << description;
message << ": fault injected. Would use " << formatReadableSizeWithBinarySuffix(will_be)
<< " (attempt to allocate chunk of " << size << " bytes)"
<< ", maximum: " << formatReadableSizeWithBinarySuffix(current_limit);
<< ", maximum: " << formatReadableSizeWithBinarySuffix(current_hard_limit);

throw DB::Exception(message.str(), DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED);
}

if (unlikely(current_limit && will_be > current_limit))
if (unlikely(current_profiler_limit && will_be > current_profiler_limit))
{
auto no_track = blocker.cancel();
ext::Singleton<DB::TraceCollector>()->collect(size);
setOrRaiseProfilerLimit(current_profiler_limit + Int64(std::ceil((will_be - current_profiler_limit) / profiler_step)) * profiler_step);
}

if (unlikely(current_hard_limit && will_be > current_hard_limit))
{
free(size);

Expand All @@ -116,7 +129,7 @@ void MemoryTracker::alloc(Int64 size)
message << " " << description;
message << " exceeded: would use " << formatReadableSizeWithBinarySuffix(will_be)
<< " (attempt to allocate chunk of " << size << " bytes)"
<< ", maximum: " << formatReadableSizeWithBinarySuffix(current_limit);
<< ", maximum: " << formatReadableSizeWithBinarySuffix(current_hard_limit);

throw DB::Exception(message.str(), DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED);
}
Expand Down Expand Up @@ -174,7 +187,8 @@ void MemoryTracker::resetCounters()
{
amount.store(0, std::memory_order_relaxed);
peak.store(0, std::memory_order_relaxed);
limit.store(0, std::memory_order_relaxed);
hard_limit.store(0, std::memory_order_relaxed);
profiler_limit.store(0, std::memory_order_relaxed);
}


Expand All @@ -187,11 +201,20 @@ void MemoryTracker::reset()
}


void MemoryTracker::setOrRaiseLimit(Int64 value)
void MemoryTracker::setOrRaiseHardLimit(Int64 value)
{
/// This is just atomic set to maximum.
Int64 old_value = hard_limit.load(std::memory_order_relaxed);
while (old_value < value && !hard_limit.compare_exchange_weak(old_value, value))
;
}


void MemoryTracker::setOrRaiseProfilerLimit(Int64 value)
{
/// This is just atomic set to maximum.
Int64 old_value = limit.load(std::memory_order_relaxed);
while (old_value < value && !limit.compare_exchange_weak(old_value, value))
Int64 old_value = profiler_limit.load(std::memory_order_relaxed);
while (old_value < value && !profiler_limit.compare_exchange_weak(old_value, value))
;
}

Expand All @@ -207,7 +230,7 @@ namespace CurrentMemoryTracker
if (untracked > untracked_memory_limit)
{
/// Zero untracked before track. If tracker throws out-of-limit we would be able to alloc up to untracked_memory_limit bytes
/// more. It could be usefull for enlarge Exception message in rethrow logic.
/// more. It could be useful to enlarge Exception message in rethrow logic.
Int64 tmp = untracked;
untracked = 0;
memory_tracker->alloc(tmp);
Expand All @@ -218,10 +241,7 @@ namespace CurrentMemoryTracker
void realloc(Int64 old_size, Int64 new_size)
{
Int64 addition = new_size - old_size;
if (addition > 0)
alloc(addition);
else
free(-addition);
addition > 0 ? alloc(addition) : free(-addition);
}

void free(Int64 size)
Expand Down
19 changes: 11 additions & 8 deletions dbms/src/Common/MemoryTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ class MemoryTracker
{
std::atomic<Int64> amount {0};
std::atomic<Int64> peak {0};
std::atomic<Int64> limit {0};
std::atomic<Int64> hard_limit {0};
std::atomic<Int64> profiler_limit {0};

Int64 profiler_step = 0;

/// To test exception safety of calling code, memory tracker throws an exception on each memory allocation with specified probability.
double fault_probability = 0;
Expand All @@ -32,7 +35,6 @@ class MemoryTracker

public:
MemoryTracker(VariableContext level_ = VariableContext::Thread) : level(level_) {}
MemoryTracker(Int64 limit_, VariableContext level_ = VariableContext::Thread) : limit(limit_), level(level_) {}
MemoryTracker(MemoryTracker * parent_, VariableContext level_ = VariableContext::Thread) : parent(parent_), level(level_) {}

~MemoryTracker();
Expand Down Expand Up @@ -66,21 +68,22 @@ class MemoryTracker
return peak.load(std::memory_order_relaxed);
}

void setLimit(Int64 limit_)
{
limit.store(limit_, std::memory_order_relaxed);
}

/** Set limit if it was not set.
* Otherwise, set limit to new value, if new value is greater than previous limit.
*/
void setOrRaiseLimit(Int64 value);
void setOrRaiseHardLimit(Int64 value);
void setOrRaiseProfilerLimit(Int64 value);

void setFaultProbability(double value)
{
fault_probability = value;
}

void setProfilerStep(Int64 value)
{
profiler_step = value;
}

/// next should be changed only once: from nullptr to some value.
/// NOTE: It is not true in MergeListElement
void setParent(MemoryTracker * elem)
Expand Down
92 changes: 19 additions & 73 deletions dbms/src/Common/QueryProfiler.cpp
Original file line number Diff line number Diff line change
@@ -1,92 +1,38 @@
#include "QueryProfiler.h"

#include <random>
#include <common/phdr_cache.h>
#include <common/config_common.h>
#include <common/StringRef.h>
#include <common/logger_useful.h>
#include <Common/PipeFDs.h>
#include <Common/StackTrace.h>
#include <Common/CurrentThread.h>
#include <IO/WriteHelpers.h>
#include <Common/Exception.h>
#include <Common/StackTrace.h>
#include <Common/TraceCollector.h>
#include <Common/thread_local_rng.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromFileDescriptorDiscardOnFailure.h>
#include <common/StringRef.h>
#include <common/config_common.h>
#include <common/logger_useful.h>
#include <common/phdr_cache.h>
#include <ext/singleton.h>

#include <random>

namespace ProfileEvents
{
extern const Event QueryProfilerSignalOverruns;
}

namespace DB
{

extern LazyPipeFDs trace_pipe;

namespace
{
/// Normally query_id is a UUID (string with a fixed length) but user can provide custom query_id.
/// Thus upper bound on query_id length should be introduced to avoid buffer overflow in signal handler.
constexpr size_t QUERY_ID_MAX_LEN = 1024;

#if defined(OS_LINUX)
thread_local size_t write_trace_iteration = 0;
#endif

void writeTraceInfo(TimerType timer_type, int /* sig */, siginfo_t * info, void * context)
void writeTraceInfo(TraceType trace_type, int /* sig */, siginfo_t * info, void * context)
{
int overrun_count = 0;
#if defined(OS_LINUX)
/// Quickly drop if signal handler is called too frequently.
/// Otherwise we may end up infinitelly processing signals instead of doing any useful work.
++write_trace_iteration;
if (info && info->si_overrun > 0)
{
/// But pass with some frequency to avoid drop of all traces.
if (write_trace_iteration % info->si_overrun == 0)
{
ProfileEvents::increment(ProfileEvents::QueryProfilerSignalOverruns, info->si_overrun);
}
else
{
ProfileEvents::increment(ProfileEvents::QueryProfilerSignalOverruns, info->si_overrun + 1);
return;
}
}
if (info)
overrun_count = info->si_overrun;
#else
UNUSED(info);
#endif

constexpr size_t buf_size = sizeof(char) + // TraceCollector stop flag
8 * sizeof(char) + // maximum VarUInt length for string size
QUERY_ID_MAX_LEN * sizeof(char) + // maximum query_id length
sizeof(UInt8) + // number of stack frames
sizeof(StackTrace::Frames) + // collected stack trace, maximum capacity
sizeof(TimerType) + // timer type
sizeof(UInt64); // thread_id
char buffer[buf_size];
WriteBufferFromFileDescriptorDiscardOnFailure out(trace_pipe.fds_rw[1], buf_size, buffer);

StringRef query_id = CurrentThread::getQueryId();
query_id.size = std::min(query_id.size, QUERY_ID_MAX_LEN);

UInt64 thread_id = CurrentThread::get().thread_id;

const auto signal_context = *reinterpret_cast<ucontext_t *>(context);
const StackTrace stack_trace(signal_context);

writeChar(false, out);
writeStringBinary(query_id, out);

size_t stack_trace_size = stack_trace.getSize();
size_t stack_trace_offset = stack_trace.getOffset();
writeIntBinary(UInt8(stack_trace_size - stack_trace_offset), out);
for (size_t i = stack_trace_offset; i < stack_trace_size; ++i)
writePODBinary(stack_trace.getFrames()[i], out);

writePODBinary(timer_type, out);
writePODBinary(thread_id, out);
out.next();
ext::Singleton<TraceCollector>()->collect(trace_type, stack_trace, overrun_count);
}

[[maybe_unused]] const UInt32 TIMER_PRECISION = 1e9;
Expand Down Expand Up @@ -135,11 +81,11 @@ QueryProfilerBase<ProfilerImpl>::QueryProfilerBase(const UInt64 thread_id, const
sev.sigev_notify = SIGEV_THREAD_ID;
sev.sigev_signo = pause_signal;

#if defined(__FreeBSD__)
# if defined(__FreeBSD__)
sev._sigev_un._threadid = thread_id;
#else
# else
sev._sigev_un._tid = thread_id;
#endif
# endif
if (timer_create(clock_type, &sev, &timer_id))
{
/// In Google Cloud Run, the function "timer_create" is implemented incorrectly as of 2020-01-25.
Expand Down Expand Up @@ -206,7 +152,7 @@ QueryProfilerReal::QueryProfilerReal(const UInt64 thread_id, const UInt32 period

void QueryProfilerReal::signalHandler(int sig, siginfo_t * info, void * context)
{
writeTraceInfo(TimerType::Real, sig, info, context);
writeTraceInfo(TraceType::REAL_TIME, sig, info, context);
}

QueryProfilerCpu::QueryProfilerCpu(const UInt64 thread_id, const UInt32 period)
Expand All @@ -215,7 +161,7 @@ QueryProfilerCpu::QueryProfilerCpu(const UInt64 thread_id, const UInt32 period)

void QueryProfilerCpu::signalHandler(int sig, siginfo_t * info, void * context)
{
writeTraceInfo(TimerType::Cpu, sig, info, context);
writeTraceInfo(TraceType::CPU_TIME, sig, info, context);
}

}
6 changes: 0 additions & 6 deletions dbms/src/Common/QueryProfiler.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,6 @@ namespace Poco
namespace DB
{

enum class TimerType : UInt8
{
Real,
Cpu,
};

/**
* Query profiler implementation for selected thread.
*
Expand Down
Loading