Skip to content

Commit ef3b191

Browse files
authored
Merge pull request #58036 from MikhailBurdukov/backoff_for_failed_mutations
Backoff policy for failed mutation.
2 parents 83841c2 + fa5747a commit ef3b191

File tree

13 files changed

+350
-5
lines changed

13 files changed

+350
-5
lines changed

docker/test/upgrade/run.sh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ rm /etc/clickhouse-server/config.d/merge_tree.xml
8888
rm /etc/clickhouse-server/config.d/enable_wait_for_shutdown_replicated_tables.xml
8989
rm /etc/clickhouse-server/config.d/zero_copy_destructive_operations.xml
9090
rm /etc/clickhouse-server/config.d/storage_conf_02963.xml
91+
rm /etc/clickhouse-server/config.d/backoff_failed_mutation.xml
9192
rm /etc/clickhouse-server/users.d/nonconst_timezone.xml
9293
rm /etc/clickhouse-server/users.d/s3_cache_new.xml
9394
rm /etc/clickhouse-server/users.d/replicated_ddl_entry.xml
@@ -134,6 +135,7 @@ rm /etc/clickhouse-server/config.d/merge_tree.xml
134135
rm /etc/clickhouse-server/config.d/enable_wait_for_shutdown_replicated_tables.xml
135136
rm /etc/clickhouse-server/config.d/zero_copy_destructive_operations.xml
136137
rm /etc/clickhouse-server/config.d/storage_conf_02963.xml
138+
rm /etc/clickhouse-server/config.d/backoff_failed_mutation.xml
137139
rm /etc/clickhouse-server/config.d/block_number.xml
138140
rm /etc/clickhouse-server/users.d/nonconst_timezone.xml
139141
rm /etc/clickhouse-server/users.d/s3_cache_new.xml

src/Storages/MergeTree/MergeTreeData.h

Lines changed: 88 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@
3535
#include <Storages/extractKeyExpressionList.h>
3636
#include <Storages/PartitionCommands.h>
3737
#include <Interpreters/PartLog.h>
38+
#include <Poco/Timestamp.h>
3839
#include <Common/threadPoolCallbackRunner.h>
3940

40-
4141
#include <boost/multi_index_container.hpp>
4242
#include <boost/multi_index/ordered_index.hpp>
4343
#include <boost/multi_index/global_fun.hpp>
@@ -1353,6 +1353,93 @@ class MergeTreeData : public IStorage, public WithMutableContext
13531353
const MergeListEntry * merge_entry,
13541354
std::shared_ptr<ProfileEvents::Counters::Snapshot> profile_counters);
13551355

1356+
class PartMutationBackoffPolicy
1357+
{
1358+
struct PartMutationInfo
1359+
{
1360+
size_t retry_count;
1361+
size_t latest_fail_time_us;
1362+
size_t max_postpone_time_ms;
1363+
size_t max_postpone_power;
1364+
1365+
PartMutationInfo(size_t max_postpone_time_ms_)
1366+
: retry_count(0ull)
1367+
, latest_fail_time_us(static_cast<size_t>(Poco::Timestamp().epochMicroseconds()))
1368+
, max_postpone_time_ms(max_postpone_time_ms_)
1369+
, max_postpone_power((max_postpone_time_ms_) ? (static_cast<size_t>(std::log2(max_postpone_time_ms_))) : (0ull))
1370+
{}
1371+
1372+
1373+
size_t getNextMinExecutionTimeUsResolution() const
1374+
{
1375+
if (max_postpone_time_ms == 0)
1376+
return static_cast<size_t>(Poco::Timestamp().epochMicroseconds());
1377+
size_t current_backoff_interval_us = (1 << retry_count) * 1000ul;
1378+
return latest_fail_time_us + current_backoff_interval_us;
1379+
}
1380+
1381+
void addPartFailure()
1382+
{
1383+
if (max_postpone_time_ms == 0)
1384+
return;
1385+
retry_count = std::min(max_postpone_power, retry_count + 1);
1386+
latest_fail_time_us = static_cast<size_t>(Poco::Timestamp().epochMicroseconds());
1387+
}
1388+
1389+
bool partCanBeMutated()
1390+
{
1391+
if (max_postpone_time_ms == 0)
1392+
return true;
1393+
1394+
auto current_time_us = static_cast<size_t>(Poco::Timestamp().epochMicroseconds());
1395+
return current_time_us >= getNextMinExecutionTimeUsResolution();
1396+
}
1397+
};
1398+
1399+
using DataPartsWithRetryInfo = std::unordered_map<String, PartMutationInfo>;
1400+
DataPartsWithRetryInfo failed_mutation_parts;
1401+
mutable std::mutex parts_info_lock;
1402+
1403+
public:
1404+
1405+
void resetMutationFailures()
1406+
{
1407+
std::unique_lock _lock(parts_info_lock);
1408+
failed_mutation_parts.clear();
1409+
}
1410+
1411+
void removePartFromFailed(const String & part_name)
1412+
{
1413+
std::unique_lock _lock(parts_info_lock);
1414+
failed_mutation_parts.erase(part_name);
1415+
}
1416+
1417+
void addPartMutationFailure (const String& part_name, size_t max_postpone_time_ms_)
1418+
{
1419+
std::unique_lock _lock(parts_info_lock);
1420+
auto part_info_it = failed_mutation_parts.find(part_name);
1421+
if (part_info_it == failed_mutation_parts.end())
1422+
{
1423+
auto [it, success] = failed_mutation_parts.emplace(part_name, PartMutationInfo(max_postpone_time_ms_));
1424+
std::swap(it, part_info_it);
1425+
}
1426+
auto& part_info = part_info_it->second;
1427+
part_info.addPartFailure();
1428+
}
1429+
1430+
bool partCanBeMutated(const String& part_name)
1431+
{
1432+
1433+
std::unique_lock _lock(parts_info_lock);
1434+
auto iter = failed_mutation_parts.find(part_name);
1435+
if (iter == failed_mutation_parts.end())
1436+
return true;
1437+
return iter->second.partCanBeMutated();
1438+
}
1439+
};
1440+
/// Controls postponing logic for failed mutations.
1441+
PartMutationBackoffPolicy mutation_backoff_policy;
1442+
13561443
/// If part is assigned to merge or mutation (possibly replicated)
13571444
/// Should be overridden by children, because they can have different
13581445
/// mechanisms for parts locking

src/Storages/MergeTree/MergeTreeSettings.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ struct Settings;
146146
M(UInt64, vertical_merge_algorithm_min_rows_to_activate, 16 * 8192, "Minimal (approximate) sum of rows in merging parts to activate Vertical merge algorithm.", 0) \
147147
M(UInt64, vertical_merge_algorithm_min_bytes_to_activate, 0, "Minimal (approximate) uncompressed size in bytes in merging parts to activate Vertical merge algorithm.", 0) \
148148
M(UInt64, vertical_merge_algorithm_min_columns_to_activate, 11, "Minimal amount of non-PK columns to activate Vertical merge algorithm.", 0) \
149+
M(UInt64, max_postpone_time_for_failed_mutations_ms, 5ULL * 60 * 1000, "The maximum postpone time for failed mutations.", 0) \
149150
\
150151
/** Compatibility settings */ \
151152
M(Bool, allow_suspicious_indices, false, "Reject primary/secondary indexes and sorting keys with identical expressions", 0) \

src/Storages/MergeTree/ReplicatedMergeMutateTaskBase.cpp

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include <Storages/MergeTree/ReplicatedMergeMutateTaskBase.h>
22

33
#include <Storages/StorageReplicatedMergeTree.h>
4+
#include <Storages/MergeTree/MergeTreeData.h>
45
#include <Storages/MergeTree/ReplicatedMergeTreeQueue.h>
56
#include <Common/ProfileEventsScope.h>
67

@@ -110,11 +111,14 @@ bool ReplicatedMergeMutateTaskBase::executeStep()
110111
auto mutations_end_it = in_partition->second.upper_bound(result_data_version);
111112
for (auto it = mutations_begin_it; it != mutations_end_it; ++it)
112113
{
114+
auto & src_part = log_entry->source_parts.at(0);
113115
ReplicatedMergeTreeQueue::MutationStatus & status = *it->second;
114-
status.latest_failed_part = log_entry->source_parts.at(0);
116+
status.latest_failed_part = src_part;
115117
status.latest_failed_part_info = source_part_info;
116118
status.latest_fail_time = time(nullptr);
117119
status.latest_fail_reason = getExceptionMessage(saved_exception, false);
120+
if (result_data_version == it->first)
121+
storage.mutation_backoff_policy.addPartMutationFailure(src_part, storage.getSettings()->max_postpone_time_for_failed_mutations_ms);
118122
}
119123
}
120124
}
@@ -142,6 +146,12 @@ bool ReplicatedMergeMutateTaskBase::executeImpl()
142146
{
143147
storage.queue.removeProcessedEntry(storage.getZooKeeper(), selected_entry->log_entry);
144148
state = State::SUCCESS;
149+
150+
auto & log_entry = selected_entry->log_entry;
151+
if (log_entry->type == ReplicatedMergeTreeLogEntryData::MUTATE_PART)
152+
{
153+
storage.mutation_backoff_policy.removePartFromFailed(log_entry->source_parts.at(0));
154+
}
145155
}
146156
catch (...)
147157
{

src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include <base/sort.h>
1414

1515
#include <ranges>
16+
#include <Poco/Timestamp.h>
1617

1718
namespace DB
1819
{
@@ -1353,9 +1354,17 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
13531354
sum_parts_size_in_bytes += part_in_memory->block.bytes();
13541355
else
13551356
sum_parts_size_in_bytes += part->getBytesOnDisk();
1357+
1358+
if (entry.type == LogEntry::MUTATE_PART && !storage.mutation_backoff_policy.partCanBeMutated(part->name))
1359+
{
1360+
constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} "
1361+
"because recently it has failed. According to exponential backoff policy, put aside this log entry.";
1362+
1363+
LOG_DEBUG(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name, entry.typeToString(), entry.new_part_name);
1364+
return false;
1365+
}
13561366
}
13571367
}
1358-
13591368
if (merger_mutator.merges_blocker.isCancelled())
13601369
{
13611370
constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} because merges and mutations are cancelled now.";

src/Storages/StorageMergeTree.cpp

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <optional>
66
#include <ranges>
77

8+
#include <Poco/Timestamp.h>
89
#include <base/sort.h>
910
#include <Backups/BackupEntriesCollector.h>
1011
#include <Databases/IDatabase.h>
@@ -538,6 +539,8 @@ void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr re
538539

539540
Int64 sources_data_version = result_part->parts.at(0)->info.getDataVersion();
540541
Int64 result_data_version = result_part->part_info.getDataVersion();
542+
auto & failed_part = result_part->parts.at(0);
543+
541544
if (sources_data_version != result_data_version)
542545
{
543546
std::lock_guard lock(currently_processing_in_background_mutex);
@@ -555,14 +558,21 @@ void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr re
555558
entry.latest_failed_part_info = MergeTreePartInfo();
556559
entry.latest_fail_time = 0;
557560
entry.latest_fail_reason.clear();
561+
if (static_cast<UInt64>(result_part->part_info.mutation) == it->first)
562+
mutation_backoff_policy.removePartFromFailed(failed_part->name);
558563
}
559564
}
560565
else
561566
{
562-
entry.latest_failed_part = result_part->parts.at(0)->name;
563-
entry.latest_failed_part_info = result_part->parts.at(0)->info;
567+
entry.latest_failed_part = failed_part->name;
568+
entry.latest_failed_part_info = failed_part->info;
564569
entry.latest_fail_time = time(nullptr);
565570
entry.latest_fail_reason = exception_message;
571+
572+
if (static_cast<UInt64>(result_part->part_info.mutation) == it->first)
573+
{
574+
mutation_backoff_policy.addPartMutationFailure(failed_part->name, getSettings()->max_postpone_time_for_failed_mutations_ms);
575+
}
566576
}
567577
}
568578
}
@@ -833,6 +843,8 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id)
833843
}
834844
}
835845

846+
mutation_backoff_policy.resetMutationFailures();
847+
836848
if (!to_kill)
837849
return CancellationCode::NotFound;
838850

@@ -1217,6 +1229,12 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate(
12171229
TransactionID first_mutation_tid = mutations_begin_it->second.tid;
12181230
MergeTreeTransactionPtr txn;
12191231

1232+
if (!mutation_backoff_policy.partCanBeMutated(part->name))
1233+
{
1234+
LOG_DEBUG(log, "According to exponential backoff policy, do not perform mutations for the part {} yet. Put it aside.", part->name);
1235+
continue;
1236+
}
1237+
12201238
if (!first_mutation_tid.isPrehistoric())
12211239
{
12221240

src/Storages/StorageReplicatedMergeTree.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7460,6 +7460,7 @@ CancellationCode StorageReplicatedMergeTree::killMutation(const String & mutatio
74607460
Int64 block_number = pair.second;
74617461
getContext()->getMergeList().cancelPartMutations(getStorageID(), partition_id, block_number);
74627462
}
7463+
mutation_backoff_policy.resetMutationFailures();
74637464
return CancellationCode::CancelSent;
74647465
}
74657466

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
<clickhouse>
2+
<merge_tree>
3+
<max_postpone_time_for_failed_mutations_ms>200</max_postpone_time_for_failed_mutations_ms>
4+
</merge_tree>
5+
</clickhouse>

tests/config/install.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ ln -sf $SRC_PATH/config.d/graphite_alternative.xml $DEST_SERVER_PATH/config.d/
3030
ln -sf $SRC_PATH/config.d/database_atomic.xml $DEST_SERVER_PATH/config.d/
3131
ln -sf $SRC_PATH/config.d/max_concurrent_queries.xml $DEST_SERVER_PATH/config.d/
3232
ln -sf $SRC_PATH/config.d/merge_tree_settings.xml $DEST_SERVER_PATH/config.d/
33+
ln -sf $SRC_PATH/config.d/backoff_failed_mutation.xml $DEST_SERVER_PATH/config.d/
3334
ln -sf $SRC_PATH/config.d/merge_tree_old_dirs_cleanup.xml $DEST_SERVER_PATH/config.d/
3435
ln -sf $SRC_PATH/config.d/test_cluster_with_incorrect_pw.xml $DEST_SERVER_PATH/config.d/
3536
ln -sf $SRC_PATH/config.d/keeper_port.xml $DEST_SERVER_PATH/config.d/

tests/integration/test_failed_mutations/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)