|
35 | 35 | #include <Storages/extractKeyExpressionList.h> |
36 | 36 | #include <Storages/PartitionCommands.h> |
37 | 37 | #include <Interpreters/PartLog.h> |
| 38 | +#include <Poco/Timestamp.h> |
38 | 39 | #include <Common/threadPoolCallbackRunner.h> |
39 | 40 |
|
40 | | - |
41 | 41 | #include <boost/multi_index_container.hpp> |
42 | 42 | #include <boost/multi_index/ordered_index.hpp> |
43 | 43 | #include <boost/multi_index/global_fun.hpp> |
@@ -1353,6 +1353,93 @@ class MergeTreeData : public IStorage, public WithMutableContext |
1353 | 1353 | const MergeListEntry * merge_entry, |
1354 | 1354 | std::shared_ptr<ProfileEvents::Counters::Snapshot> profile_counters); |
1355 | 1355 |
|
| 1356 | + class PartMutationBackoffPolicy |
| 1357 | + { |
| 1358 | + struct PartMutationInfo |
| 1359 | + { |
| 1360 | + size_t retry_count; |
| 1361 | + size_t latest_fail_time_us; |
| 1362 | + size_t max_postpone_time_ms; |
| 1363 | + size_t max_postpone_power; |
| 1364 | + |
| 1365 | + PartMutationInfo(size_t max_postpone_time_ms_) |
| 1366 | + : retry_count(0ull) |
| 1367 | + , latest_fail_time_us(static_cast<size_t>(Poco::Timestamp().epochMicroseconds())) |
| 1368 | + , max_postpone_time_ms(max_postpone_time_ms_) |
| 1369 | + , max_postpone_power((max_postpone_time_ms_) ? (static_cast<size_t>(std::log2(max_postpone_time_ms_))) : (0ull)) |
| 1370 | + {} |
| 1371 | + |
| 1372 | + |
| 1373 | + size_t getNextMinExecutionTimeUsResolution() const |
| 1374 | + { |
| 1375 | + if (max_postpone_time_ms == 0) |
| 1376 | + return static_cast<size_t>(Poco::Timestamp().epochMicroseconds()); |
| 1377 | + size_t current_backoff_interval_us = (1 << retry_count) * 1000ul; |
| 1378 | + return latest_fail_time_us + current_backoff_interval_us; |
| 1379 | + } |
| 1380 | + |
| 1381 | + void addPartFailure() |
| 1382 | + { |
| 1383 | + if (max_postpone_time_ms == 0) |
| 1384 | + return; |
| 1385 | + retry_count = std::min(max_postpone_power, retry_count + 1); |
| 1386 | + latest_fail_time_us = static_cast<size_t>(Poco::Timestamp().epochMicroseconds()); |
| 1387 | + } |
| 1388 | + |
| 1389 | + bool partCanBeMutated() |
| 1390 | + { |
| 1391 | + if (max_postpone_time_ms == 0) |
| 1392 | + return true; |
| 1393 | + |
| 1394 | + auto current_time_us = static_cast<size_t>(Poco::Timestamp().epochMicroseconds()); |
| 1395 | + return current_time_us >= getNextMinExecutionTimeUsResolution(); |
| 1396 | + } |
| 1397 | + }; |
| 1398 | + |
| 1399 | + using DataPartsWithRetryInfo = std::unordered_map<String, PartMutationInfo>; |
| 1400 | + DataPartsWithRetryInfo failed_mutation_parts; |
| 1401 | + mutable std::mutex parts_info_lock; |
| 1402 | + |
| 1403 | + public: |
| 1404 | + |
| 1405 | + void resetMutationFailures() |
| 1406 | + { |
| 1407 | + std::unique_lock _lock(parts_info_lock); |
| 1408 | + failed_mutation_parts.clear(); |
| 1409 | + } |
| 1410 | + |
| 1411 | + void removePartFromFailed(const String & part_name) |
| 1412 | + { |
| 1413 | + std::unique_lock _lock(parts_info_lock); |
| 1414 | + failed_mutation_parts.erase(part_name); |
| 1415 | + } |
| 1416 | + |
| 1417 | + void addPartMutationFailure (const String& part_name, size_t max_postpone_time_ms_) |
| 1418 | + { |
| 1419 | + std::unique_lock _lock(parts_info_lock); |
| 1420 | + auto part_info_it = failed_mutation_parts.find(part_name); |
| 1421 | + if (part_info_it == failed_mutation_parts.end()) |
| 1422 | + { |
| 1423 | + auto [it, success] = failed_mutation_parts.emplace(part_name, PartMutationInfo(max_postpone_time_ms_)); |
| 1424 | + std::swap(it, part_info_it); |
| 1425 | + } |
| 1426 | + auto& part_info = part_info_it->second; |
| 1427 | + part_info.addPartFailure(); |
| 1428 | + } |
| 1429 | + |
| 1430 | + bool partCanBeMutated(const String& part_name) |
| 1431 | + { |
| 1432 | + |
| 1433 | + std::unique_lock _lock(parts_info_lock); |
| 1434 | + auto iter = failed_mutation_parts.find(part_name); |
| 1435 | + if (iter == failed_mutation_parts.end()) |
| 1436 | + return true; |
| 1437 | + return iter->second.partCanBeMutated(); |
| 1438 | + } |
| 1439 | + }; |
| 1440 | + /// Controls postponing logic for failed mutations. |
| 1441 | + PartMutationBackoffPolicy mutation_backoff_policy; |
| 1442 | + |
1356 | 1443 | /// If part is assigned to merge or mutation (possibly replicated) |
1357 | 1444 | /// Should be overridden by children, because they can have different |
1358 | 1445 | /// mechanisms for parts locking |
|
0 commit comments