Skip to content

Commit 0410155

Browse files
authored
Merge pull request #15537 from ClickHouse/mutation_hangs_after_replace_partition
Fix mutation may hang after REPLACE/DROP PARTITION
2 parents 5f7aedd + 767a525 commit 0410155

File tree

5 files changed

+85
-16
lines changed

5 files changed

+85
-16
lines changed

src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -715,6 +715,16 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C
715715
for (const String & produced_part_name : queue_entry->getVirtualPartNames())
716716
{
717717
auto part_info = MergeTreePartInfo::fromPartName(produced_part_name, format_version);
718+
719+
/// Oddly enough, getVirtualPartNames() may return _virtual_ part name.
720+
/// Such parts do not exist and will never appear, so we should not add virtual parts to parts_to_do list.
721+
/// Fortunately, it's easy to distinguish virtual parts from normal parts by part level.
722+
/// See StorageReplicatedMergeTree::getFakePartCoveringAllPartsInPartition(...)
723+
auto max_level = MergeTreePartInfo::MAX_LEVEL; /// DROP/DETACH PARTITION
724+
auto another_max_level = std::numeric_limits<decltype(part_info.level)>::max(); /// REPLACE/MOVE PARTITION
725+
if (part_info.level == max_level || part_info.level == another_max_level)
726+
continue;
727+
718728
auto it = entry->block_numbers.find(part_info.partition_id);
719729
if (it != entry->block_numbers.end() && it->second > part_info.getDataVersion())
720730
mutation.parts_to_do.add(produced_part_name);

src/Storages/StorageReplicatedMergeTree.cpp

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4053,6 +4053,7 @@ Pipe StorageReplicatedMergeTree::alterPartition(
40534053

40544054

40554055
/// If new version returns ordinary name, else returns part name containing the first and last month of the month
4056+
/// NOTE: use it in pair with getFakePartCoveringAllPartsInPartition(...)
40564057
static String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version, const MergeTreePartInfo & part_info)
40574058
{
40584059
if (format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
@@ -4068,7 +4069,7 @@ static String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version,
40684069
return part_info.getPartName();
40694070
}
40704071

4071-
bool StorageReplicatedMergeTree::getFakePartCoveringAllPartsInPartition(const String & partition_id, MergeTreePartInfo & part_info)
4072+
bool StorageReplicatedMergeTree::getFakePartCoveringAllPartsInPartition(const String & partition_id, MergeTreePartInfo & part_info, bool for_replace_partition)
40724073
{
40734074
/// Even if there is no data in the partition, you still need to mark the range for deletion.
40744075
/// - Because before executing DETACH, tasks for downloading parts to this partition can be executed.
@@ -4091,14 +4092,21 @@ bool StorageReplicatedMergeTree::getFakePartCoveringAllPartsInPartition(const St
40914092
mutation_version = queue.getCurrentMutationVersion(partition_id, right);
40924093
}
40934094

4094-
/// Empty partition.
4095-
if (right == 0)
4096-
return false;
4095+
/// REPLACE PARTITION uses different max level and does not decrement max_block of DROP_RANGE for unknown (probably historical) reason.
4096+
auto max_level = std::numeric_limits<decltype(part_info.level)>::max();
4097+
if (!for_replace_partition)
4098+
{
4099+
max_level = MergeTreePartInfo::MAX_LEVEL;
4100+
4101+
/// Empty partition.
4102+
if (right == 0)
4103+
return false;
40974104

4098-
--right;
4105+
--right;
4106+
}
40994107

41004108
/// Artificial high level is chosen, to make this part "covering" all parts inside.
4101-
part_info = MergeTreePartInfo(partition_id, left, right, MergeTreePartInfo::MAX_LEVEL, mutation_version);
4109+
part_info = MergeTreePartInfo(partition_id, left, right, max_level, mutation_version);
41024110
return true;
41034111
}
41044112

@@ -5305,11 +5313,11 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
53055313
/// Firstly, generate last block number and compute drop_range
53065314
/// NOTE: Even if we make ATTACH PARTITION instead of REPLACE PARTITION drop_range will not be empty, it will contain a block.
53075315
/// So, such case has special meaning, if drop_range contains only one block it means that nothing to drop.
5316+
/// TODO why not to add normal DROP_RANGE entry to replication queue if `replace` is true?
53085317
MergeTreePartInfo drop_range;
5309-
drop_range.partition_id = partition_id;
5310-
drop_range.max_block = allocateBlockNumber(partition_id, zookeeper)->getNumber();
5311-
drop_range.min_block = replace ? 0 : drop_range.max_block;
5312-
drop_range.level = std::numeric_limits<decltype(drop_range.level)>::max();
5318+
getFakePartCoveringAllPartsInPartition(partition_id, drop_range, true);
5319+
if (!replace)
5320+
drop_range.min_block = drop_range.max_block;
53135321

53145322
String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range);
53155323

@@ -5388,6 +5396,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
53885396
}
53895397

53905398
/// We are almost ready to commit changes, remove fetches and merges from drop range
5399+
/// FIXME it's unsafe to remove queue entries before we actually commit REPLACE_RANGE to replication log
53915400
queue.removePartProducingOpsInRange(zookeeper, drop_range, entry);
53925401

53935402
/// Remove deduplication block_ids of replacing parts
@@ -5502,11 +5511,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
55025511
/// A range for log entry to remove parts from the source table (myself).
55035512

55045513
MergeTreePartInfo drop_range;
5505-
drop_range.partition_id = partition_id;
5506-
drop_range.max_block = allocateBlockNumber(partition_id, zookeeper)->getNumber();
5507-
drop_range.min_block = 0;
5508-
drop_range.level = std::numeric_limits<decltype(drop_range.level)>::max();
5509-
5514+
getFakePartCoveringAllPartsInPartition(partition_id, drop_range, true);
55105515
String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range);
55115516

55125517
if (drop_range.getBlocksCount() > 1)
@@ -5561,6 +5566,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
55615566
drop_range_dest.max_block = drop_range.max_block;
55625567
drop_range_dest.min_block = drop_range.max_block;
55635568
drop_range_dest.level = drop_range.level;
5569+
drop_range_dest.mutation = drop_range.mutation;
55645570

55655571
entry.type = ReplicatedMergeTreeLogEntryData::REPLACE_RANGE;
55665572
entry.source_replica = dest_table_storage->replica_name;

src/Storages/StorageReplicatedMergeTree.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -526,7 +526,7 @@ class StorageReplicatedMergeTree final : public ext::shared_ptr_helper<StorageRe
526526

527527
/// Produce an imaginary part info covering all parts in the specified partition (at the call moment).
528528
/// Returns false if the partition doesn't exist yet.
529-
bool getFakePartCoveringAllPartsInPartition(const String & partition_id, MergeTreePartInfo & part_info);
529+
bool getFakePartCoveringAllPartsInPartition(const String & partition_id, MergeTreePartInfo & part_info, bool for_replace_partition = false);
530530

531531
/// Check for a node in ZK. If it is, remember this information, and then immediately answer true.
532532
std::unordered_set<std::string> existing_nodes_cache;
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
1 1
2+
2 2
3+
3 3
4+
4 4
5+
mt 0 0_1_1_0 2
6+
rmt 0 0_0_0_0 2
7+
1 1
8+
2 2
9+
mt 0 0_1_1_0 2
10+
rmt 0 0_3_3_0 2
11+
0000000000 UPDATE s = concat(\'s\', toString(n)) WHERE 1 [] 0 1
12+
0000000001 DROP COLUMN s [] 0 1
13+
3
14+
4
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
drop table if exists mt;
2+
drop table if exists rmt sync;
3+
4+
create table mt (n UInt64, s String) engine = MergeTree partition by intDiv(n, 10) order by n;
5+
insert into mt values (3, '3'), (4, '4');
6+
7+
create table rmt (n UInt64, s String) engine = ReplicatedMergeTree('/clickhouse/test_01149/rmt', 'r1') partition by intDiv(n, 10) order by n;
8+
insert into rmt values (1,'1'), (2, '2');
9+
10+
select * from rmt;
11+
select * from mt;
12+
select table, partition_id, name, rows from system.parts where database=currentDatabase() and table in ('mt', 'rmt') and active=1 order by table, name;
13+
14+
alter table rmt update s = 's'||toString(n) where 1;
15+
16+
select * from rmt;
17+
alter table rmt replace partition '0' from mt;
18+
19+
select table, partition_id, name, rows from system.parts where database=currentDatabase() and table in ('mt', 'rmt') and active=1 order by table, name;
20+
21+
alter table rmt drop column s;
22+
23+
select mutation_id, command, parts_to_do_names, parts_to_do, is_done from system.mutations where database=currentDatabase() and table='rmt';
24+
select * from rmt;
25+
26+
drop table rmt sync;
27+
28+
set replication_alter_partitions_sync=0;
29+
create table rmt (n UInt64, s String) engine = ReplicatedMergeTree('/clickhouse/test_01149/rmt', 'r1') partition by intDiv(n, 10) order by n;
30+
insert into rmt values (1,'1'), (2, '2');
31+
32+
alter table rmt update s = 's'||toString(n) where 1;
33+
alter table rmt drop partition '0';
34+
35+
set replication_alter_partitions_sync=1;
36+
alter table rmt drop column s;
37+
38+
drop table mt;
39+
drop table rmt sync;

0 commit comments

Comments
 (0)