Skip to content

Commit 4117df8

Browse files
committed
Sanitize places that can have big allocations w/ ignored MEMORY_LIMIT_EXCEEDED
- Add MemoryTrackerDebugBlockerInThread for some internal places that should not throw - Add TrackedString implementation, a std::string that can throw, and use it in places where we can have a significant memory usage - Use AllocatorWithMemoryTracking in RadixSort as well (can create big allocations) I've found this from the log from CI [1]. [1]: https://s3.amazonaws.com/clickhouse-test-reports/json.html?PR=87035&sha=cf9779523fb72328052b0f541fef6cbd1cf1cf45&name_0=PR&name_1=Stateless%20tests%20%28amd_debug%2C%20AsyncInsert%2C%20s3%20storage%2C%20parallel%29
1 parent b62f682 commit 4117df8

File tree

12 files changed

+89
-40
lines changed

12 files changed

+89
-40
lines changed

src/AggregateFunctions/QuantileTDigest.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,6 @@ class QuantileTDigest
119119
static constexpr size_t PART_SIZE_BITS = 8;
120120

121121
using Transform = RadixSortFloatTransform<KeyBits>;
122-
using Allocator = RadixSortAllocator;
123122

124123
/// The function to get the key from an array element.
125124
static Key & extractKey(Element & elem) { return elem.mean; }

src/Common/RadixSort.h

Lines changed: 4 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
#pragma once
22

3-
#include <string.h>
43
#if !defined(OS_DARWIN) && !defined(OS_FREEBSD)
54
#include <malloc.h>
65
#endif
@@ -16,6 +15,7 @@
1615
#include <base/extended_types.h>
1716
#include <base/sort.h>
1817

18+
#include <Common/AllocatorWithMemoryTracking.h>
1919
#include <Common/TargetSpecific.h>
2020

2121
/** Radix sort, has the following functionality:
@@ -33,22 +33,6 @@
3333
*/
3434

3535

36-
/** Used as a template parameter. See below.
37-
*/
38-
struct RadixSortAllocator
39-
{
40-
static void * allocate(size_t size)
41-
{
42-
return ::operator new(size);
43-
}
44-
45-
static void deallocate(void * ptr, size_t size)
46-
{
47-
::operator delete(ptr, size);
48-
}
49-
};
50-
51-
5236
/** A transformation that transforms the bit representation of a key into an unsigned integer number,
5337
* that the order relation over the keys will match the order relation over the obtained unsigned numbers.
5438
* For floats this conversion does the following:
@@ -97,11 +81,6 @@ struct RadixSortFloatTraits
9781
/// Converting a key into KeyBits is such that the order relation over the key corresponds to the order relation over KeyBits.
9882
using Transform = RadixSortFloatTransform<KeyBits>;
9983

100-
/// An object with the functions allocate and deallocate.
101-
/// Can be used, for example, to allocate memory for a temporary array on the stack.
102-
/// To do this, the allocator itself is created on the stack.
103-
using Allocator = RadixSortAllocator;
104-
10584
/// The function to get the key from an array element.
10685
static Key & extractKey(Element & elem) { return elem; }
10786

@@ -144,7 +123,6 @@ struct RadixSortUIntTraits
144123
static constexpr size_t PART_SIZE_BITS = 8;
145124

146125
using Transform = RadixSortIdentityTransform<KeyBits>;
147-
using Allocator = RadixSortAllocator;
148126

149127
static Key & extractKey(Element & elem) { return elem; }
150128
static Result & extractResult(Element & elem) { return elem; }
@@ -183,7 +161,6 @@ struct RadixSortIntTraits
183161
static constexpr size_t PART_SIZE_BITS = 8;
184162

185163
using Transform = RadixSortSignedTransform<KeyBits>;
186-
using Allocator = RadixSortAllocator;
187164

188165
static Key & extractKey(Element & elem) { return elem; }
189166
static Result & extractResult(Element & elem) { return elem; }
@@ -289,10 +266,10 @@ struct RadixSort
289266
/// For each of the NUM_PASSES bit ranges of the key, consider how many times each value of this bit range met.
290267
std::unique_ptr<CountType[]> histograms{new CountType[HISTOGRAM_SIZE * NUM_PASSES]{}};
291268

292-
typename Traits::Allocator allocator;
269+
AllocatorWithMemoryTracking<typename Traits::Element> allocator;
293270

294271
/// We will do several passes through the array. On each pass, the data is transferred to another array. Let's allocate this temporary array.
295-
Element * swap_buffer = reinterpret_cast<Element *>(allocator.allocate(size * sizeof(Element)));
272+
Element * swap_buffer = allocator.allocate(size);
296273

297274
/// Transform the array and calculate the histogram.
298275
/// NOTE This is slightly suboptimal. Look at https://github.com/powturbo/TurboHist
@@ -436,7 +413,7 @@ struct RadixSort
436413
std::reverse(arr, arr + size);
437414
}
438415

439-
allocator.deallocate(swap_buffer, size * sizeof(Element));
416+
allocator.deallocate(swap_buffer, size);
440417
}
441418

442419

src/Common/SystemLogBase.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include <Interpreters/FilesystemCacheLog.h>
1616
#include <Interpreters/ObjectStorageQueueLog.h>
1717
#include <Interpreters/IcebergMetadataLog.h>
18+
#include <Common/MemoryTrackerDebugBlockerInThread.h>
1819
#if CLICKHOUSE_CLOUD
1920
#include <Interpreters/DistributedCacheLog.h>
2021
#include <Interpreters/DistributedCacheServerLog.h>
@@ -200,6 +201,8 @@ void SystemLogQueue<LogElement>::confirm(SystemLogQueue<LogElement>::Index last_
200201
template <typename LogElement>
201202
typename SystemLogQueue<LogElement>::PopResult SystemLogQueue<LogElement>::pop()
202203
{
204+
[[maybe_unused]] MemoryTrackerDebugBlockerInThread blocker;
205+
203206
PopResult result;
204207
size_t prev_ignored_logs = 0;
205208

@@ -308,6 +311,7 @@ void SystemLogBase<LogElement>::stopFlushThread()
308311
template <typename LogElement>
309312
void SystemLogBase<LogElement>::add(LogElement element)
310313
{
314+
[[maybe_unused]] MemoryTrackerDebugBlockerInThread blocker;
311315
queue->push(std::move(element));
312316
}
313317

src/Common/TrackedString.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
#pragma once
2+
3+
#include <string>
4+
#include <Common/AllocatorWithMemoryTracking.h>
5+
6+
/// String that can throw MEMORY_LIMIT_EXCEEDED
7+
/// Use it when the string may require significant memory.
8+
class TrackedString : public std::basic_string<char, std::char_traits<char>, AllocatorWithMemoryTracking<char>>
9+
{
10+
using std::basic_string<char, std::char_traits<char>, AllocatorWithMemoryTracking<char>>::basic_string;
11+
};

src/Common/ZooKeeper/ZooKeeperImpl.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include <Common/logger_useful.h>
2727
#include <Common/setThreadName.h>
2828
#include <Common/thread_local_rng.h>
29+
#include <Common/MemoryTrackerDebugBlockerInThread.h>
2930
#include <Core/Settings.h>
3031
#include <Core/ServerSettings.h>
3132

@@ -771,6 +772,8 @@ void ZooKeeper::sendAuth(const String & scheme, const String & data)
771772

772773
void ZooKeeper::sendThread()
773774
{
775+
[[maybe_unused]] MemoryTrackerDebugBlockerInThread blocker;
776+
774777
setThreadName("ZooKeeperSend");
775778

776779
scope_guard os_thread_nice_value_guard;
@@ -860,6 +863,8 @@ void ZooKeeper::sendThread()
860863

861864
void ZooKeeper::receiveThread()
862865
{
866+
[[maybe_unused]] MemoryTrackerDebugBlockerInThread blocker;
867+
863868
setThreadName("ZooKeeperRecv");
864869

865870
scope_guard os_thread_nice_value_guard;
@@ -1817,6 +1822,8 @@ void ZooKeeper::setZooKeeperLog(std::shared_ptr<DB::ZooKeeperLog> zk_log_)
18171822
#ifdef ZOOKEEPER_LOG
18181823
void ZooKeeper::logOperationIfNeeded(const ZooKeeperRequestPtr & request, const ZooKeeperResponsePtr & response, bool finalize, UInt64 elapsed_microseconds)
18191824
{
1825+
[[maybe_unused]] MemoryTrackerDebugBlockerInThread blocker;
1826+
18201827
auto maybe_zk_log = std::atomic_load(&zk_log);
18211828
if (!maybe_zk_log)
18221829
return;
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
#include <Common/Exception.h>
2+
#include <IO/WriteBufferFromTrackedString.h>
3+
4+
namespace DB
5+
{
6+
7+
/// It is safe to make them autofinalizable.
8+
WriteBufferFromTrackedString::~WriteBufferFromTrackedString()
9+
{
10+
try
11+
{
12+
if (!this->finalized && !this->canceled)
13+
this->finalize();
14+
}
15+
catch (...)
16+
{
17+
tryLogCurrentException(__PRETTY_FUNCTION__);
18+
}
19+
}
20+
21+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#pragma once
2+
3+
#include <IO/WriteBufferFromVector.h>
4+
#include <Common/TrackedString.h>
5+
6+
namespace DB
7+
{
8+
9+
class WriteBufferFromTrackedString final : public WriteBufferFromVectorImpl<TrackedString>
10+
{
11+
using Base = WriteBufferFromVectorImpl;
12+
public:
13+
explicit WriteBufferFromTrackedString(TrackedString & vector_)
14+
: Base(vector_)
15+
{
16+
}
17+
18+
WriteBufferFromTrackedString(TrackedString & vector_, AppendModeTag tag_)
19+
: Base(vector_, tag_)
20+
{
21+
}
22+
~WriteBufferFromTrackedString() override;
23+
};
24+
25+
}

src/Interpreters/AsynchronousInsertQueue.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include <IO/ConcatReadBuffer.h>
1111
#include <IO/LimitReadBuffer.h>
1212
#include <IO/ReadBufferFromString.h>
13+
#include <IO/WriteBufferFromTrackedString.h>
1314
#include <IO/copyData.h>
1415
#include <Interpreters/ExpressionActions.h>
1516
#include <Interpreters/AsynchronousInsertLog.h>
@@ -399,7 +400,7 @@ AsynchronousInsertQueue::pushQueryWithInlinedData(ASTPtr query, ContextPtr query
399400
}
400401
preprocessInsertQuery(query, query_context);
401402

402-
String bytes;
403+
TrackedString bytes;
403404
{
404405
/// Read at most 'async_insert_max_data_size' bytes of data.
405406
/// If limit is exceeded we will fallback to synchronous insert
@@ -424,7 +425,7 @@ AsynchronousInsertQueue::pushQueryWithInlinedData(ASTPtr query, ContextPtr query
424425
}
425426

426427
{
427-
WriteBufferFromString write_buf(bytes);
428+
WriteBufferFromTrackedString write_buf(bytes);
428429
copyData(limit_buf, write_buf);
429430
}
430431

src/Interpreters/AsynchronousInsertQueue.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <Common/MemoryTrackerSwitcher.h>
77
#include <Common/SettingsChanges.h>
88
#include <Common/ThreadPool.h>
9+
#include <Common/TrackedString.h>
910
#include <Interpreters/AsynchronousInsertQueueDataKind.h>
1011

1112
#include <future>
@@ -92,9 +93,9 @@ class AsynchronousInsertQueue : public WithContext
9293
};
9394

9495
private:
95-
struct DataChunk : public std::variant<String, Block>
96+
struct DataChunk : public std::variant<TrackedString, Block>
9697
{
97-
using std::variant<String, Block>::variant;
98+
using std::variant<TrackedString, Block>::variant;
9899

99100
size_t byteSize() const
100101
{
@@ -125,7 +126,7 @@ class AsynchronousInsertQueue : public WithContext
125126
}, *this);
126127
}
127128

128-
const String * asString() const { return std::get_if<String>(this); }
129+
const TrackedString * asString() const { return std::get_if<TrackedString>(this); }
129130
const Block * asBlock() const { return std::get_if<Block>(this); }
130131
};
131132

src/Interpreters/TraceCollector.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <Interpreters/TraceLog.h>
88
#include <Common/MemoryTrackerBlockerInThread.h>
99
#include <Common/Exception.h>
10+
#include <Common/MemoryTrackerDebugBlockerInThread.h>
1011
#include <Common/TraceSender.h>
1112
#include <Common/ProfileEvents.h>
1213
#include <Common/setThreadName.h>
@@ -99,6 +100,8 @@ TraceCollector::~TraceCollector()
99100

100101
void TraceCollector::run()
101102
{
103+
[[maybe_unused]] MemoryTrackerDebugBlockerInThread blocker;
104+
102105
setThreadName("TraceCollector");
103106

104107
MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global);

0 commit comments

Comments
 (0)