Skip to content

Commit fce211f

Browse files
Backport #87140 to 25.7: fix: fix server level max temporary size limit
1 parent abb0542 commit fce211f

File tree

13 files changed

+139
-20
lines changed

13 files changed

+139
-20
lines changed

src/Interpreters/Aggregator.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1762,7 +1762,7 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, si
17621762
auto & out_stream = [this, max_temp_file_size]() -> TemporaryBlockStreamHolder &
17631763
{
17641764
std::lock_guard lk(tmp_files_mutex);
1765-
return tmp_files.emplace_back(std::make_shared<const Block>(getHeader(false)), tmp_data.get(), max_temp_file_size);
1765+
return tmp_files.emplace_back(std::make_shared<const Block>(getHeader(false)), tmp_data, max_temp_file_size);
17661766
}();
17671767

17681768
ProfileEvents::increment(ProfileEvents::ExternalAggregationWritePart);

src/Interpreters/GraceHashJoin.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -386,8 +386,8 @@ void GraceHashJoin::addBuckets(const size_t bucket_count)
386386
for (size_t i = 0; i < bucket_count; ++i)
387387
try
388388
{
389-
TemporaryBlockStreamHolder left_file(left_sample_block, tmp_data.get());
390-
TemporaryBlockStreamHolder right_file(std::make_shared<const Block>(prepareRightBlock(*right_sample_block)), tmp_data.get());
389+
TemporaryBlockStreamHolder left_file(left_sample_block, tmp_data);
390+
TemporaryBlockStreamHolder right_file(std::make_shared<const Block>(prepareRightBlock(*right_sample_block)), tmp_data);
391391

392392
BucketPtr new_bucket = std::make_shared<FileBucket>(current_size + i, std::move(left_file), std::move(right_file), log);
393393
tmp_buckets.emplace_back(std::move(new_bucket));

src/Interpreters/HashJoin/HashJoin.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -655,7 +655,7 @@ bool HashJoin::addBlockToJoin(ScatteredBlock & source_block, bool check_limits)
655655
|| (max_rows_in_join && getTotalRowCount() + block_to_save.rows() >= max_rows_in_join)))
656656
{
657657
if (!tmp_stream)
658-
tmp_stream.emplace(std::make_shared<const Block>(right_sample_block), tmp_data.get());
658+
tmp_stream.emplace(std::make_shared<const Block>(right_sample_block), tmp_data);
659659

660660
chassert(!source_block.wasScattered()); /// We don't run parallel_hash for cross join
661661
tmp_stream.value()->write(block_to_save);

src/Interpreters/SortedBlocksWriter.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ void updateProfileEvents(TemporaryDataBuffer::Stat stat)
4040

4141
TemporaryBlockStreamHolder flushBlockToFile(const TemporaryDataOnDiskScopePtr & tmp_data, const Block & block)
4242
{
43-
TemporaryBlockStreamHolder stream_holder(std::make_shared<const Block>(block.cloneEmpty()), tmp_data.get());
43+
TemporaryBlockStreamHolder stream_holder(std::make_shared<const Block>(block.cloneEmpty()), tmp_data);
4444
stream_holder->write(block);
4545

4646
auto stat = stream_holder.finishWriting();
@@ -52,7 +52,7 @@ TemporaryBlockStreamHolder flushBlockToFile(const TemporaryDataOnDiskScopePtr &
5252

5353
TemporaryBlockStreamHolder flushToFile(const TemporaryDataOnDiskScopePtr & tmp_data, const Block & header, QueryPipelineBuilder pipeline)
5454
{
55-
TemporaryBlockStreamHolder stream_holder(std::make_shared<const Block>(header), tmp_data.get());
55+
TemporaryBlockStreamHolder stream_holder(std::make_shared<const Block>(header), tmp_data);
5656

5757
auto exec_pipeline = QueryPipelineBuilder::getPipeline(std::move(pipeline));
5858
PullingPipelineExecutor executor(exec_pipeline);

src/Interpreters/TemporaryDataOnDisk.cpp

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ bool TemporaryDataReadBuffer::nextImpl()
219219
return true;
220220
}
221221

222-
TemporaryDataBuffer::TemporaryDataBuffer(TemporaryDataOnDiskScope * parent_, size_t reserve_size)
222+
TemporaryDataBuffer::TemporaryDataBuffer(std::shared_ptr<TemporaryDataOnDiskScope> parent_, size_t reserve_size)
223223
: WriteBuffer(nullptr, 0)
224224
, parent(parent_)
225225
, file_holder(parent->file_provider(reserve_size))
@@ -313,6 +313,15 @@ void TemporaryDataBuffer::updateAllocAndCheck()
313313
stat.uncompressed_size = new_uncompressed_size;
314314
}
315315

316+
317+
void TemporaryDataBuffer::freeAlloc()
318+
{
319+
if (parent)
320+
parent->deltaAllocAndCheck(-stat.compressed_size, -stat.uncompressed_size);
321+
stat.compressed_size = 0;
322+
stat.uncompressed_size = 0;
323+
}
324+
316325
void TemporaryDataOnDiskScope::deltaAllocAndCheck(ssize_t compressed_delta, ssize_t uncompressed_delta)
317326
{
318327
if (parent)
@@ -334,7 +343,7 @@ void TemporaryDataOnDiskScope::deltaAllocAndCheck(ssize_t compressed_delta, ssiz
334343
stat.uncompressed_size += uncompressed_delta;
335344
}
336345

337-
TemporaryBlockStreamHolder::TemporaryBlockStreamHolder(SharedHeader header_, TemporaryDataOnDiskScope * parent_, size_t reserve_size)
346+
TemporaryBlockStreamHolder::TemporaryBlockStreamHolder(SharedHeader header_, std::shared_ptr<TemporaryDataOnDiskScope> parent_, size_t reserve_size)
338347
: WrapperGuard(std::make_unique<TemporaryDataBuffer>(parent_, reserve_size), DBMS_TCP_PROTOCOL_VERSION, header_)
339348
{
340349
/// Constant columns must be avoided since they are not supported in (de/)serialization, but we have to keep lazy columns
@@ -363,5 +372,6 @@ TemporaryDataBuffer::~TemporaryDataBuffer()
363372
{
364373
if (!finalized)
365374
cancel();
375+
freeAlloc();
366376
}
367377
}

src/Interpreters/TemporaryDataOnDisk.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#pragma once
22

33
#include <atomic>
4+
#include <memory>
45
#include <mutex>
56
#include <boost/noncopyable.hpp>
67

@@ -184,7 +185,7 @@ class TemporaryDataBuffer : public WriteBuffer
184185
size_t uncompressed_size = 0;
185186
};
186187

187-
explicit TemporaryDataBuffer(TemporaryDataOnDiskScope * parent_, size_t reserve_size = 0);
188+
explicit TemporaryDataBuffer(std::shared_ptr<TemporaryDataOnDiskScope> parent_, size_t reserve_size = 0);
188189
~TemporaryDataBuffer() override;
189190

190191
void nextImpl() override;
@@ -198,8 +199,9 @@ class TemporaryDataBuffer : public WriteBuffer
198199

199200
private:
200201
void updateAllocAndCheck();
202+
void freeAlloc();
201203

202-
TemporaryDataOnDiskScope * parent;
204+
std::shared_ptr<TemporaryDataOnDiskScope> parent;
203205
std::unique_ptr<TemporaryFileHolder> file_holder;
204206
WrapperGuard<CompressedWriteBuffer, WriteBuffer> out_compressed_buf;
205207
std::once_flag write_finished;
@@ -214,7 +216,7 @@ using TemporaryBlockStreamReaderHolder = WrapperGuard<NativeReader, ReadBuffer>;
214216
class TemporaryBlockStreamHolder : public WrapperGuard<NativeWriter, TemporaryDataBuffer>
215217
{
216218
public:
217-
TemporaryBlockStreamHolder(SharedHeader header_, TemporaryDataOnDiskScope * parent_, size_t reserve_size = 0);
219+
TemporaryBlockStreamHolder(SharedHeader header_, std::shared_ptr<TemporaryDataOnDiskScope> parent_, size_t reserve_size = 0);
218220

219221
TemporaryBlockStreamReaderHolder getReadStream() const;
220222

src/Interpreters/tests/gtest_filecache.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1015,13 +1015,13 @@ try
10151015

10161016

10171017
{
1018-
TemporaryBlockStreamHolder stream(std::make_shared<const Block>(generateBlock()), tmp_data_scope.get());
1018+
TemporaryBlockStreamHolder stream(std::make_shared<const Block>(generateBlock()), tmp_data_scope);
10191019
ASSERT_TRUE(stream);
10201020
/// Do nothing with stream, just create it and destroy.
10211021
}
10221022

10231023
{
1024-
TemporaryBlockStreamHolder stream(std::make_shared<const Block>(generateBlock()), tmp_data_scope.get());
1024+
TemporaryBlockStreamHolder stream(std::make_shared<const Block>(generateBlock()), tmp_data_scope);
10251025
ASSERT_GT(stream->write(generateBlock(100)), 0);
10261026

10271027
ASSERT_GT(file_cache.getUsedCacheSize(), 0);
@@ -1041,7 +1041,7 @@ try
10411041

10421042
{
10431043
size_t before_used_size = file_cache.getUsedCacheSize();
1044-
auto write_buf_stream = std::make_unique<TemporaryDataBuffer>(tmp_data_scope.get());
1044+
auto write_buf_stream = std::make_unique<TemporaryDataBuffer>(tmp_data_scope);
10451045

10461046
write_buf_stream->write("1234567890", 10);
10471047
write_buf_stream->write("abcde", 5);
@@ -1058,7 +1058,7 @@ try
10581058
}
10591059

10601060
{
1061-
TemporaryBlockStreamHolder stream(std::make_shared<const Block>(generateBlock()), tmp_data_scope.get());
1061+
TemporaryBlockStreamHolder stream(std::make_shared<const Block>(generateBlock()), tmp_data_scope);
10621062

10631063
ASSERT_GT(stream->write(generateBlock(100)), 0);
10641064

@@ -1193,7 +1193,7 @@ TEST_F(FileCacheTest, TemporaryDataReadBufferSize)
11931193
auto tmp_data_scope = std::make_shared<TemporaryDataOnDiskScope>(&file_cache, TemporaryDataOnDiskSettings{});
11941194

11951195
auto block = generateBlock(/*size=*/3);
1196-
TemporaryBlockStreamHolder stream(std::make_shared<const Block>(block), tmp_data_scope.get());
1196+
TemporaryBlockStreamHolder stream(std::make_shared<const Block>(block), tmp_data_scope);
11971197

11981198
stream->write(block);
11991199
auto stat = stream.finishWriting();
@@ -1219,7 +1219,7 @@ TEST_F(FileCacheTest, TemporaryDataReadBufferSize)
12191219
auto tmp_data_scope = std::make_shared<TemporaryDataOnDiskScope>(volume, TemporaryDataOnDiskSettings{});
12201220

12211221
auto block = generateBlock(/*size=*/3);
1222-
TemporaryBlockStreamHolder stream(std::make_shared<const Block>(block), tmp_data_scope.get());
1222+
TemporaryBlockStreamHolder stream(std::make_shared<const Block>(block), tmp_data_scope);
12231223
stream->write(block);
12241224
auto stat = stream.finishWriting();
12251225

src/Processors/Transforms/MergeSortingTransform.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ void MergeSortingTransform::consume(Chunk chunk)
243243
/// If there's less free disk space than reserve_size, an exception will be thrown
244244
size_t reserve_size = sum_bytes_in_blocks + min_free_disk_space;
245245
SharedHeader shared_header_without_constants = std::make_shared<const Block>(header_without_constants);
246-
TemporaryBlockStreamHolder tmp_stream(shared_header_without_constants, tmp_data.get(), reserve_size);
246+
TemporaryBlockStreamHolder tmp_stream(shared_header_without_constants, tmp_data, reserve_size);
247247
size_t max_merged_block_size = this->max_merged_block_size;
248248
if (max_block_bytes > 0 && sum_rows_in_blocks > 0 && sum_bytes_in_blocks > 0)
249249
{

src/Server/HTTPHandler.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,7 @@ void HTTPHandler::processQuery(
384384
auto tmp_data = server.context()->getTempDataOnDisk();
385385
cascade_buffers_lazy.emplace_back([tmp_data](const WriteBufferPtr &) -> WriteBufferPtr
386386
{
387-
return std::make_unique<TemporaryDataBuffer>(tmp_data.get());
387+
return std::make_unique<TemporaryDataBuffer>(tmp_data);
388388
});
389389
}
390390
else

src/Storages/MergeTree/MergeTask.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ class RowsSourcesTemporaryFile : public ITemporaryFileLookup
192192
if (tmp_data_buffer)
193193
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary file was already requested for writing, there musto be only one writer");
194194

195-
tmp_data_buffer = std::make_unique<TemporaryDataBuffer>(temporary_data_on_disk.get());
195+
tmp_data_buffer = std::make_unique<TemporaryDataBuffer>(temporary_data_on_disk);
196196
return *tmp_data_buffer;
197197
}
198198

0 commit comments

Comments
 (0)