Skip to content

Commit 837f4ea

Browse files
committed
Add ability to throttle merges/mutations
Main motivation was to has an ability to throttle background tasks, to avoid affecting queries. To new server settings had been added for this: - max_mutations_bandwidth_for_server - max_merges_bandwidth_for_server Note, that they limit only reading, since usually you will not write more data then you read, but sometimes it is possible in case of ALTER UPDATE. But for now, to keep things simple, I decided to limit this with only 2 settings instead of 4. Note, that if the write throttling will be needed, then they can use the same settings, and just create new throttler for write. Signed-off-by: Azat Khuzhin <[email protected]>
1 parent 6ed9b53 commit 837f4ea

File tree

9 files changed

+88
-3
lines changed

9 files changed

+88
-3
lines changed

src/Core/ServerSettings.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ namespace DB
2626
M(UInt64, max_active_parts_loading_thread_pool_size, 64, "The number of threads to load active set of data parts (Active ones) at startup.", 0) \
2727
M(UInt64, max_outdated_parts_loading_thread_pool_size, 32, "The number of threads to load inactive set of data parts (Outdated ones) at startup.", 0) \
2828
M(UInt64, max_parts_cleaning_thread_pool_size, 128, "The number of threads for concurrent removal of inactive data parts.", 0) \
29+
M(UInt64, max_mutations_bandwidth_for_server, 0, "The maximum read speed of all mutations on server in bytes per second. Zero means unlimited.", 0) \
30+
M(UInt64, max_merges_bandwidth_for_server, 0, "The maximum read speed of all merges on server in bytes per second. Zero means unlimited.", 0) \
2931
M(UInt64, max_replicated_fetches_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for replicated fetches. Zero means unlimited.", 0) \
3032
M(UInt64, max_replicated_sends_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for replicated sends. Zero means unlimited.", 0) \
3133
M(UInt64, max_remote_read_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for read. Zero means unlimited.", 0) \

src/Interpreters/Context.cpp

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,9 @@ struct ContextSharedPart : boost::noncopyable
325325

326326
mutable ThrottlerPtr backups_server_throttler; /// A server-wide throttler for BACKUPs
327327

328+
mutable ThrottlerPtr mutations_throttler; /// A server-wide throttler for mutations
329+
mutable ThrottlerPtr merges_throttler; /// A server-wide throttler for merges
330+
328331
MultiVersion<Macros> macros; /// Substitutions extracted from config.
329332
std::unique_ptr<DDLWorker> ddl_worker TSA_GUARDED_BY(mutex); /// Process ddl commands from zk.
330333
LoadTaskPtr ddl_worker_startup_task; /// To postpone `ddl_worker->startup()` after all tables startup
@@ -733,6 +736,12 @@ struct ContextSharedPart : boost::noncopyable
733736

734737
if (auto bandwidth = server_settings.max_backup_bandwidth_for_server)
735738
backups_server_throttler = std::make_shared<Throttler>(bandwidth);
739+
740+
if (auto bandwidth = server_settings.max_mutations_bandwidth_for_server)
741+
mutations_throttler = std::make_shared<Throttler>(bandwidth);
742+
743+
if (auto bandwidth = server_settings.max_merges_bandwidth_for_server)
744+
merges_throttler = std::make_shared<Throttler>(bandwidth);
736745
}
737746
};
738747

@@ -2994,6 +3003,16 @@ ThrottlerPtr Context::getBackupsThrottler() const
29943003
return throttler;
29953004
}
29963005

3006+
ThrottlerPtr Context::getMutationsThrottler() const
3007+
{
3008+
return shared->mutations_throttler;
3009+
}
3010+
3011+
ThrottlerPtr Context::getMergesThrottler() const
3012+
{
3013+
return shared->merges_throttler;
3014+
}
3015+
29973016
bool Context::hasDistributedDDL() const
29983017
{
29993018
return getConfigRef().has("distributed_ddl");

src/Interpreters/Context.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1324,6 +1324,9 @@ class Context: public ContextData, public std::enable_shared_from_this<Context>
13241324

13251325
ThrottlerPtr getBackupsThrottler() const;
13261326

1327+
ThrottlerPtr getMutationsThrottler() const;
1328+
ThrottlerPtr getMergesThrottler() const;
1329+
13271330
/// Kitchen sink
13281331
using ContextData::KitchenSink;
13291332
using ContextData::kitchen_sink;

src/Interpreters/MutationsInterpreter.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1280,6 +1280,7 @@ void MutationsInterpreter::Source::read(
12801280
VirtualColumns virtual_columns(std::move(required_columns), part);
12811281

12821282
createReadFromPartStep(
1283+
MergeTreeSequentialSourceType::Mutation,
12831284
plan, *data, storage_snapshot, part,
12841285
std::move(virtual_columns.columns_to_read),
12851286
apply_deleted_mask_, filter, context_,

src/Storages/MergeTree/MergeTask.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -566,6 +566,7 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const
566566
for (size_t part_num = 0; part_num < global_ctx->future_part->parts.size(); ++part_num)
567567
{
568568
Pipe pipe = createMergeTreeSequentialSource(
569+
MergeTreeSequentialSourceType::Merge,
569570
*global_ctx->data,
570571
global_ctx->storage_snapshot,
571572
global_ctx->future_part->parts[part_num],
@@ -920,6 +921,7 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
920921
for (const auto & part : global_ctx->future_part->parts)
921922
{
922923
Pipe pipe = createMergeTreeSequentialSource(
924+
MergeTreeSequentialSourceType::Merge,
923925
*global_ctx->data,
924926
global_ctx->storage_snapshot,
925927
part,

src/Storages/MergeTree/MergeTreeSequentialSource.cpp

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ class MergeTreeSequentialSource : public ISource
3030
{
3131
public:
3232
MergeTreeSequentialSource(
33+
MergeTreeSequentialSourceType type,
3334
const MergeTreeData & storage_,
3435
const StorageSnapshotPtr & storage_snapshot_,
3536
MergeTreeData::DataPartPtr data_part_,
@@ -85,6 +86,7 @@ class MergeTreeSequentialSource : public ISource
8586

8687

8788
MergeTreeSequentialSource::MergeTreeSequentialSource(
89+
MergeTreeSequentialSourceType type,
8890
const MergeTreeData & storage_,
8991
const StorageSnapshotPtr & storage_snapshot_,
9092
MergeTreeData::DataPartPtr data_part_,
@@ -152,6 +154,17 @@ MergeTreeSequentialSource::MergeTreeSequentialSource(
152154
read_settings.local_fs_method = LocalFSReadMethod::pread;
153155
if (read_with_direct_io)
154156
read_settings.direct_io_threshold = 1;
157+
/// Configure throttling
158+
switch (type)
159+
{
160+
case Mutation:
161+
read_settings.local_throttler = context->getMutationsThrottler();
162+
break;
163+
case Merge:
164+
read_settings.local_throttler = context->getMergesThrottler();
165+
break;
166+
}
167+
read_settings.remote_throttler = read_settings.local_throttler;
155168

156169
MergeTreeReaderSettings reader_settings =
157170
{
@@ -244,6 +257,7 @@ MergeTreeSequentialSource::~MergeTreeSequentialSource() = default;
244257

245258

246259
Pipe createMergeTreeSequentialSource(
260+
MergeTreeSequentialSourceType type,
247261
const MergeTreeData & storage,
248262
const StorageSnapshotPtr & storage_snapshot,
249263
MergeTreeData::DataPartPtr data_part,
@@ -264,7 +278,7 @@ Pipe createMergeTreeSequentialSource(
264278
if (need_to_filter_deleted_rows && !has_filter_column)
265279
columns_to_read.emplace_back(filter_column.name);
266280

267-
auto column_part_source = std::make_shared<MergeTreeSequentialSource>(
281+
auto column_part_source = std::make_shared<MergeTreeSequentialSource>(type,
268282
storage, storage_snapshot, data_part, columns_to_read, std::move(mark_ranges),
269283
/*apply_deleted_mask=*/ false, read_with_direct_io, take_column_types_from_storage, quiet);
270284

@@ -292,6 +306,7 @@ class ReadFromPart final : public ISourceStep
292306
{
293307
public:
294308
ReadFromPart(
309+
MergeTreeSequentialSourceType type_,
295310
const MergeTreeData & storage_,
296311
const StorageSnapshotPtr & storage_snapshot_,
297312
MergeTreeData::DataPartPtr data_part_,
@@ -301,6 +316,7 @@ class ReadFromPart final : public ISourceStep
301316
ContextPtr context_,
302317
Poco::Logger * log_)
303318
: ISourceStep(DataStream{.header = storage_snapshot_->getSampleBlockForColumns(columns_to_read_)})
319+
, type(type_)
304320
, storage(storage_)
305321
, storage_snapshot(storage_snapshot_)
306322
, data_part(std::move(data_part_))
@@ -337,7 +353,7 @@ class ReadFromPart final : public ISourceStep
337353
}
338354
}
339355

340-
auto source = createMergeTreeSequentialSource(
356+
auto source = createMergeTreeSequentialSource(type,
341357
storage,
342358
storage_snapshot,
343359
data_part,
@@ -353,6 +369,7 @@ class ReadFromPart final : public ISourceStep
353369
}
354370

355371
private:
372+
MergeTreeSequentialSourceType type;
356373
const MergeTreeData & storage;
357374
StorageSnapshotPtr storage_snapshot;
358375
MergeTreeData::DataPartPtr data_part;
@@ -364,6 +381,7 @@ class ReadFromPart final : public ISourceStep
364381
};
365382

366383
void createReadFromPartStep(
384+
MergeTreeSequentialSourceType type,
367385
QueryPlan & plan,
368386
const MergeTreeData & storage,
369387
const StorageSnapshotPtr & storage_snapshot,
@@ -374,7 +392,7 @@ void createReadFromPartStep(
374392
ContextPtr context,
375393
Poco::Logger * log)
376394
{
377-
auto reading = std::make_unique<ReadFromPart>(
395+
auto reading = std::make_unique<ReadFromPart>(type,
378396
storage, storage_snapshot, std::move(data_part),
379397
std::move(columns_to_read), apply_deleted_mask,
380398
filter, std::move(context), log);

src/Storages/MergeTree/MergeTreeSequentialSource.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,16 @@
88
namespace DB
99
{
1010

11+
enum MergeTreeSequentialSourceType
12+
{
13+
Mutation,
14+
Merge,
15+
};
16+
1117
/// Create stream for reading single part from MergeTree.
1218
/// If the part has lightweight delete mask then the deleted rows are filtered out.
1319
Pipe createMergeTreeSequentialSource(
20+
MergeTreeSequentialSourceType type,
1421
const MergeTreeData & storage,
1522
const StorageSnapshotPtr & storage_snapshot,
1623
MergeTreeData::DataPartPtr data_part,
@@ -25,6 +32,7 @@ Pipe createMergeTreeSequentialSource(
2532
class QueryPlan;
2633

2734
void createReadFromPartStep(
35+
MergeTreeSequentialSourceType type,
2836
QueryPlan & plan,
2937
const MergeTreeData & storage,
3038
const StorageSnapshotPtr & storage_snapshot,

tests/integration/test_throttling/configs/static_overrides.xml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,7 @@
3131
<allowed_disk>default</allowed_disk>
3232
<allowed_path>/backups/</allowed_path>
3333
</backups>
34+
35+
<max_mutations_bandwidth_for_server>1000000</max_mutations_bandwidth_for_server> <!-- 1M -->
36+
<max_merges_bandwidth_for_server>1000000</max_merges_bandwidth_for_server> <!-- 1M -->
3437
</clickhouse>

tests/integration/test_throttling/test.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -430,3 +430,32 @@ def test_write_throttling(policy, mode, setting, value, should_took):
430430
)
431431
_, took = elapsed(node.query, f"insert into data select * from numbers(1e6)")
432432
assert_took(took, should_took)
433+
434+
435+
def test_max_mutations_bandwidth_for_server():
436+
node.query(
437+
"""
438+
drop table if exists data;
439+
create table data (key UInt64 CODEC(NONE)) engine=MergeTree() order by tuple() settings min_bytes_for_wide_part=1e9;
440+
"""
441+
)
442+
node.query("insert into data select * from numbers(1e6)")
443+
_, took = elapsed(
444+
node.query,
445+
"alter table data update key = -key where 1 settings mutations_sync = 1",
446+
)
447+
# reading 1e6*8 bytes with 1M/s bandwith should take (8-1)/1=7 seconds
448+
assert_took(took, 7)
449+
450+
451+
def test_max_merges_bandwidth_for_server():
452+
node.query(
453+
"""
454+
drop table if exists data;
455+
create table data (key UInt64 CODEC(NONE)) engine=MergeTree() order by tuple() settings min_bytes_for_wide_part=1e9;
456+
"""
457+
)
458+
node.query("insert into data select * from numbers(1e6)")
459+
_, took = elapsed(node.query, "optimize table data final")
460+
# reading 1e6*8 bytes with 1M/s bandwith should take (8-1)/1=7 seconds
461+
assert_took(took, 7)

0 commit comments

Comments
 (0)