Skip to content

Commit 25f40db

Browse files
authored
Merge pull request #17499 from ClickHouse/concurrent_mutation_and_random_kill
Fix kill mutation on concurrent alter queries
2 parents e9795ac + 6567796 commit 25f40db

File tree

5 files changed

+154
-0
lines changed

5 files changed

+154
-0
lines changed

src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -655,6 +655,11 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C
655655
{
656656
LOG_DEBUG(log, "Removing killed mutation {} from local state.", entry.znode_name);
657657
some_active_mutations_were_killed = true;
658+
if (entry.isAlterMutation())
659+
{
660+
LOG_DEBUG(log, "Removed alter {} because mutation {} were killed.", entry.alter_version, entry.znode_name);
661+
alter_sequence.finishDataAlter(entry.alter_version, state_lock);
662+
}
658663
}
659664
else
660665
LOG_DEBUG(log, "Removing obsolete mutation {} from local state.", entry.znode_name);
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
CREATE TABLE default.concurrent_mutate_kill\n(\n `key` UInt64,\n `value` Int64\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/test_01593/concurrent_mutate_kill\', \'1\')\nPARTITION BY key % 100\nORDER BY key\nSETTINGS max_replicated_mutations_in_queue = 1000, number_of_free_entries_in_pool_to_execute_mutation = 0, max_replicated_merges_in_queue = 1000, index_granularity = 8192
2+
499999500000
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
#!/usr/bin/env bash
2+
3+
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
4+
. "$CURDIR"/../shell_config.sh
5+
6+
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS concurrent_mutate_kill"
7+
8+
$CLICKHOUSE_CLIENT --query "CREATE TABLE concurrent_mutate_kill (key UInt64, value String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test_01593/concurrent_mutate_kill', '1') ORDER BY key PARTITION BY key % 100 SETTINGS max_replicated_mutations_in_queue=1000, number_of_free_entries_in_pool_to_execute_mutation=0,max_replicated_merges_in_queue=1000"
9+
10+
$CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_mutate_kill SELECT number, toString(number) FROM numbers(1000000)"
11+
12+
function alter_thread
13+
{
14+
while true; do
15+
TYPE=$($CLICKHOUSE_CLIENT --query "SELECT type FROM system.columns WHERE table='concurrent_mutate_kill' and database='${CLICKHOUSE_DATABASE}' and name='value'")
16+
if [ "$TYPE" == "String" ]; then
17+
$CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_mutate_kill MODIFY COLUMN value UInt64 SETTINGS replication_alter_partitions_sync=2"
18+
else
19+
$CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_mutate_kill MODIFY COLUMN value String SETTINGS replication_alter_partitions_sync=2"
20+
fi
21+
done
22+
}
23+
24+
function kill_mutation_thread
25+
{
26+
while true; do
27+
mutation_id=$($CLICKHOUSE_CLIENT --query "SELECT mutation_id FROM system.mutations WHERE is_done=0 and database='${CLICKHOUSE_DATABASE}' and table='concurrent_mutate_kill' LIMIT 1")
28+
if [ ! -z "$mutation_id" ]; then
29+
$CLICKHOUSE_CLIENT --query "KILL MUTATION WHERE mutation_id='$mutation_id'" 1> /dev/null
30+
sleep 1
31+
fi
32+
done
33+
}
34+
35+
36+
export -f alter_thread;
37+
export -f kill_mutation_thread;
38+
39+
TIMEOUT=30
40+
41+
timeout $TIMEOUT bash -c alter_thread 2> /dev/null &
42+
timeout $TIMEOUT bash -c kill_mutation_thread 2> /dev/null &
43+
44+
wait
45+
46+
$CLICKHOUSE_CLIENT --query "SYSTEM SYNC REPLICA concurrent_mutate_kill"
47+
$CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_mutate_kill MODIFY COLUMN value Int64 SETTINGS replication_alter_partitions_sync=2"
48+
$CLICKHOUSE_CLIENT --query "SHOW CREATE TABLE concurrent_mutate_kill"
49+
$CLICKHOUSE_CLIENT --query "OPTIMIZE TABLE concurrent_mutate_kill FINAL"
50+
$CLICKHOUSE_CLIENT --query "SELECT sum(value) FROM concurrent_mutate_kill"
51+
52+
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS concurrent_mutate_kill"
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
499999500000
2+
499999500000
3+
499999500000
4+
499999500000
5+
499999500000
6+
Metadata version on replica 1 equal with first replica, OK
7+
CREATE TABLE default.concurrent_kill_1\n(\n `key` UInt64,\n `value` Int64\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/test_01593_concurrent_kill\', \'1\')\nORDER BY key\nSETTINGS max_replicated_mutations_in_queue = 1000, number_of_free_entries_in_pool_to_execute_mutation = 0, max_replicated_merges_in_queue = 1000, index_granularity = 8192
8+
Metadata version on replica 2 equal with first replica, OK
9+
CREATE TABLE default.concurrent_kill_2\n(\n `key` UInt64,\n `value` Int64\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/test_01593_concurrent_kill\', \'2\')\nORDER BY key\nSETTINGS max_replicated_mutations_in_queue = 1000, number_of_free_entries_in_pool_to_execute_mutation = 0, max_replicated_merges_in_queue = 1000, index_granularity = 8192
10+
Metadata version on replica 3 equal with first replica, OK
11+
CREATE TABLE default.concurrent_kill_3\n(\n `key` UInt64,\n `value` Int64\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/test_01593_concurrent_kill\', \'3\')\nORDER BY key\nSETTINGS max_replicated_mutations_in_queue = 1000, number_of_free_entries_in_pool_to_execute_mutation = 0, max_replicated_merges_in_queue = 1000, index_granularity = 8192
12+
Metadata version on replica 4 equal with first replica, OK
13+
CREATE TABLE default.concurrent_kill_4\n(\n `key` UInt64,\n `value` Int64\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/test_01593_concurrent_kill\', \'4\')\nORDER BY key\nSETTINGS max_replicated_mutations_in_queue = 1000, number_of_free_entries_in_pool_to_execute_mutation = 0, max_replicated_merges_in_queue = 1000, index_granularity = 8192
14+
Metadata version on replica 5 equal with first replica, OK
15+
CREATE TABLE default.concurrent_kill_5\n(\n `key` UInt64,\n `value` Int64\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/test_01593_concurrent_kill\', \'5\')\nORDER BY key\nSETTINGS max_replicated_mutations_in_queue = 1000, number_of_free_entries_in_pool_to_execute_mutation = 0, max_replicated_merges_in_queue = 1000, index_granularity = 8192
16+
499999500000
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
#!/usr/bin/env bash
2+
3+
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
4+
. "$CURDIR"/../shell_config.sh
5+
6+
REPLICAS=5
7+
8+
for i in $(seq $REPLICAS); do
9+
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS concurrent_kill_$i"
10+
done
11+
12+
for i in $(seq $REPLICAS); do
13+
$CLICKHOUSE_CLIENT --query "CREATE TABLE concurrent_kill_$i (key UInt64, value String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test_01593_concurrent_kill', '$i') ORDER BY key SETTINGS max_replicated_mutations_in_queue=1000, number_of_free_entries_in_pool_to_execute_mutation=0,max_replicated_merges_in_queue=1000"
14+
done
15+
16+
$CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_kill_1 SELECT number, toString(number) FROM numbers(1000000)"
17+
18+
for i in $(seq $REPLICAS); do
19+
$CLICKHOUSE_CLIENT --query "SYSTEM SYNC REPLICA concurrent_kill_$i"
20+
$CLICKHOUSE_CLIENT --query "SELECT sum(toUInt64(value)) FROM concurrent_kill_$i"
21+
done
22+
23+
function alter_thread
24+
{
25+
while true; do
26+
REPLICA=$(($RANDOM % 5 + 1))
27+
TYPE=$($CLICKHOUSE_CLIENT --query "SELECT type FROM system.columns WHERE table='concurrent_kill_$REPLICA' and database='${CLICKHOUSE_DATABASE}' and name='value'")
28+
if [ "$TYPE" == "String" ]; then
29+
$CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_kill_$REPLICA MODIFY COLUMN value UInt64 SETTINGS replication_alter_partitions_sync=2"
30+
else
31+
$CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_kill_$REPLICA MODIFY COLUMN value String SETTINGS replication_alter_partitions_sync=2"
32+
fi
33+
done
34+
}
35+
36+
function kill_mutation_thread
37+
{
38+
while true; do
39+
mutation_id=$($CLICKHOUSE_CLIENT --query "SELECT mutation_id FROM system.mutations WHERE is_done = 0 and table like 'concurrent_kill_%' and database='${CLICKHOUSE_DATABASE}' LIMIT 1")
40+
if [ ! -z "$mutation_id" ]; then
41+
$CLICKHOUSE_CLIENT --query "KILL MUTATION WHERE mutation_id='$mutation_id'" 1> /dev/null
42+
sleep 1
43+
fi
44+
done
45+
}
46+
47+
export -f alter_thread;
48+
export -f kill_mutation_thread;
49+
50+
TIMEOUT=30
51+
52+
timeout $TIMEOUT bash -c alter_thread 2> /dev/null &
53+
timeout $TIMEOUT bash -c kill_mutation_thread 2> /dev/null &
54+
55+
wait
56+
57+
for i in $(seq $REPLICAS); do
58+
$CLICKHOUSE_CLIENT --query "SYSTEM SYNC REPLICA concurrent_kill_$i"
59+
done
60+
61+
$CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_kill_$i MODIFY COLUMN value Int64 SETTINGS replication_alter_partitions_sync=2"
62+
63+
metadata_version=$($CLICKHOUSE_CLIENT --query "SELECT value FROM system.zookeeper WHERE path = '/clickhouse/tables/test_01593_concurrent_kill/replicas/$i/' and name = 'metadata_version'")
64+
for i in $(seq $REPLICAS); do
65+
replica_metadata_version=$($CLICKHOUSE_CLIENT --query "SELECT value FROM system.zookeeper WHERE path = '/clickhouse/tables/test_01593_concurrent_kill/replicas/$i/' and name = 'metadata_version'")
66+
if [ "$metadata_version" != "$replica_metadata_version" ]; then
67+
echo "Metadata version on replica $i differs from the first replica, FAIL"
68+
else
69+
echo "Metadata version on replica $i equal with first replica, OK"
70+
fi
71+
72+
$CLICKHOUSE_CLIENT --query "SHOW CREATE TABLE concurrent_kill_$i"
73+
done
74+
75+
$CLICKHOUSE_CLIENT --query "SELECT sum(value) FROM concurrent_kill_1"
76+
77+
for i in $(seq $REPLICAS); do
78+
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS concurrent_kill_$i"
79+
done

0 commit comments

Comments
 (0)