Skip to content

Commit 1171630

Browse files
authored
Merge pull request #13670 from ClickHouse/backport/20.4/13626
Backport #13626 to 20.4: Fix deadlock and logical error in replication queue
2 parents 922de6e + 8e75533 commit 1171630

File tree

4 files changed

+79
-22
lines changed

4 files changed

+79
-22
lines changed

src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ struct ReplicatedMergeTreeLogEntryData
161161

162162
/// Access under queue_mutex, see ReplicatedMergeTreeQueue.
163163
bool currently_executing = false; /// Whether the action is executing now.
164+
bool removed_by_other_entry = false;
164165
/// These several fields are informational only (for viewing by the user using system tables).
165166
/// Access under queue_mutex, see ReplicatedMergeTreeQueue.
166167
size_t num_tries = 0; /// The number of attempts to perform the action (since the server started, including the running one).

src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp

Lines changed: 65 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -343,47 +343,60 @@ void ReplicatedMergeTreeQueue::updateTimesInZooKeeper(
343343

344344
void ReplicatedMergeTreeQueue::removeProcessedEntry(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry)
345345
{
346-
auto code = zookeeper->tryRemove(replica_path + "/queue/" + entry->znode_name);
347-
348-
if (code)
349-
LOG_ERROR(log, "Couldn't remove " << replica_path << "/queue/" << entry->znode_name << ": "
350-
<< zkutil::ZooKeeper::error2string(code) << ". This shouldn't happen often.");
351-
352346
std::optional<time_t> min_unprocessed_insert_time_changed;
353347
std::optional<time_t> max_processed_insert_time_changed;
354348

355349
bool found = false;
350+
bool need_remove_from_zk = true;
356351
size_t queue_size = 0;
357352

358353
{
359354
std::unique_lock lock(state_mutex);
360-
361-
/// Remove the job from the queue in the RAM.
362-
/// You can not just refer to a pre-saved iterator, because someone else might be able to delete the task.
363-
/// Why do we view the queue from the end?
364-
/// - because the task for execution first is moved to the end of the queue, so that in case of failure it remains at the end.
365-
for (Queue::iterator it = queue.end(); it != queue.begin();)
355+
if (entry->removed_by_other_entry)
356+
{
357+
need_remove_from_zk = false;
358+
queue_size = queue.size();
359+
}
360+
else
366361
{
367-
--it;
368-
if (*it == entry)
362+
/// Remove the job from the queue in the RAM.
363+
/// You can not just refer to a pre-saved iterator, because someone else might be able to delete the task.
364+
/// Why do we view the queue from the end?
365+
/// - because the task for execution first is moved to the end of the queue, so that in case of failure it remains at the end.
366+
for (Queue::iterator it = queue.end(); it != queue.begin();)
369367
{
370-
found = true;
371-
updateStateOnQueueEntryRemoval(
372-
entry, /* is_successful = */ true,
373-
min_unprocessed_insert_time_changed, max_processed_insert_time_changed, lock);
368+
--it;
374369

375-
queue.erase(it);
376-
queue_size = queue.size();
377-
break;
370+
if (*it == entry)
371+
{
372+
found = true;
373+
updateStateOnQueueEntryRemoval(
374+
entry, /* is_successful = */ true,
375+
min_unprocessed_insert_time_changed, max_processed_insert_time_changed, lock);
376+
377+
queue.erase(it);
378+
queue_size = queue.size();
379+
break;
380+
}
378381
}
379382
}
380383
}
381384

382-
if (!found)
385+
if (!found && need_remove_from_zk)
383386
throw Exception("Can't find " + entry->znode_name + " in the memory queue. It is a bug", ErrorCodes::LOGICAL_ERROR);
384387

385388
notifySubscribers(queue_size);
386389

390+
391+
if (!need_remove_from_zk)
392+
return;
393+
394+
auto code = zookeeper->tryRemove(replica_path + "/queue/" + entry->znode_name);
395+
396+
if (code)
397+
LOG_ERROR(log, "Couldn't remove " << replica_path << "/queue/" << entry->znode_name << ": "
398+
<< zkutil::ZooKeeper::error2string(code) << ". This shouldn't happen often.");
399+
387400
updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, max_processed_insert_time_changed);
388401
}
389402

@@ -845,13 +858,18 @@ void ReplicatedMergeTreeQueue::removePartProducingOpsInRange(
845858
const MergeTreePartInfo & part_info,
846859
const ReplicatedMergeTreeLogEntryData & current)
847860
{
861+
/// TODO is it possible to simplify it?
848862
Queue to_wait;
849863
size_t removed_entries = 0;
850864
std::optional<time_t> min_unprocessed_insert_time_changed;
851865
std::optional<time_t> max_processed_insert_time_changed;
852866

853867
/// Remove operations with parts, contained in the range to be deleted, from the queue.
854868
std::unique_lock lock(state_mutex);
869+
870+
[[maybe_unused]] bool called_from_alter_query_directly = current.replace_range_entry && current.replace_range_entry->columns_version < 0;
871+
assert(currently_executing_drop_or_replace_range || called_from_alter_query_directly);
872+
855873
for (Queue::iterator it = queue.begin(); it != queue.end();)
856874
{
857875
auto type = (*it)->type;
@@ -863,13 +881,18 @@ void ReplicatedMergeTreeQueue::removePartProducingOpsInRange(
863881
if ((*it)->currently_executing)
864882
to_wait.push_back(*it);
865883
auto code = zookeeper->tryRemove(replica_path + "/queue/" + (*it)->znode_name);
884+
/// FIXME it's probably unsafe to remove entries non-atomically
885+
/// when this method called directly from alter query (not from replication queue task),
886+
/// because entries will be lost if ALTER fails.
866887
if (code)
867888
LOG_INFO(log, "Couldn't remove " << replica_path + "/queue/" + (*it)->znode_name << ": "
868889
<< zkutil::ZooKeeper::error2string(code));
869890

870891
updateStateOnQueueEntryRemoval(
871892
*it, /* is_successful = */ false,
872893
min_unprocessed_insert_time_changed, max_processed_insert_time_changed, lock);
894+
895+
(*it)->removed_by_other_entry = true;
873896
queue.erase(it++);
874897
++removed_entries;
875898
}
@@ -1105,6 +1128,16 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
11051128
}
11061129
}
11071130

1131+
if (entry.type == LogEntry::DROP_RANGE || entry.type == LogEntry::REPLACE_RANGE)
1132+
{
1133+
/// DROP_RANGE and REPLACE_RANGE entries remove other entries, which produce parts in the range.
1134+
/// If such part producing operations are currently executing, then DROP/REPLACE RANGE wait them to finish.
1135+
/// Deadlock is possible if multiple DROP/REPLACE RANGE entries are executing in parallel and wait each other.
1136+
/// See also removePartProducingOpsInRange(...) and ReplicatedMergeTreeQueue::CurrentlyExecuting.
1137+
if (currently_executing_drop_or_replace_range)
1138+
return false;
1139+
}
1140+
11081141
return true;
11091142
}
11101143

@@ -1135,6 +1168,11 @@ Int64 ReplicatedMergeTreeQueue::getCurrentMutationVersion(const String & partiti
11351168
ReplicatedMergeTreeQueue::CurrentlyExecuting::CurrentlyExecuting(const ReplicatedMergeTreeQueue::LogEntryPtr & entry_, ReplicatedMergeTreeQueue & queue_)
11361169
: entry(entry_), queue(queue_)
11371170
{
1171+
if (entry->type == ReplicatedMergeTreeLogEntry::DROP_RANGE || entry->type == ReplicatedMergeTreeLogEntry::REPLACE_RANGE)
1172+
{
1173+
assert(!queue.currently_executing_drop_or_replace_range);
1174+
queue.currently_executing_drop_or_replace_range = true;
1175+
}
11381176
entry->currently_executing = true;
11391177
++entry->num_tries;
11401178
entry->last_attempt_time = time(nullptr);
@@ -1168,6 +1206,11 @@ ReplicatedMergeTreeQueue::CurrentlyExecuting::~CurrentlyExecuting()
11681206
{
11691207
std::lock_guard lock(queue.state_mutex);
11701208

1209+
if (entry->type == ReplicatedMergeTreeLogEntry::DROP_RANGE || entry->type == ReplicatedMergeTreeLogEntry::REPLACE_RANGE)
1210+
{
1211+
assert(queue.currently_executing_drop_or_replace_range);
1212+
queue.currently_executing_drop_or_replace_range = false;
1213+
}
11711214
entry->currently_executing = false;
11721215
entry->execution_complete.notify_all();
11731216

src/Storages/MergeTree/ReplicatedMergeTreeQueue.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,9 @@ class ReplicatedMergeTreeQueue
8787
/// Index of the first log entry that we didn't see yet.
8888
Int64 log_pointer = 0;
8989

90+
/// Avoid parallel execution of queue enties, which may remove other entries from the queue.
91+
bool currently_executing_drop_or_replace_range = false;
92+
9093
/** What will be the set of active parts after executing all log entries up to log_pointer.
9194
* Used to determine which merges can be assigned (see ReplicatedMergeTreeMergePredicate)
9295
*/

tests/queries/0_stateless/01035_concurrent_move_partition_from_table_zookeeper.sh

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,18 +43,28 @@ function thread4()
4343
done
4444
}
4545

46+
function thread5()
47+
{
48+
while true;
49+
do
50+
$CLICKHOUSE_CLIENT --query="ALTER TABLE src MOVE PARTITION 1 TO TABLE dst;" --query_id=query5
51+
done
52+
}
53+
4654
# https://stackoverflow.com/questions/9954794/execute-a-shell-function-with-timeout
4755
export -f thread1;
4856
export -f thread2;
4957
export -f thread3;
5058
export -f thread4;
59+
export -f thread5;
5160

5261
TIMEOUT=30
5362

5463
timeout $TIMEOUT bash -c thread1 2> /dev/null &
5564
timeout $TIMEOUT bash -c thread2 2> /dev/null &
5665
timeout $TIMEOUT bash -c thread3 2> /dev/null &
5766
timeout $TIMEOUT bash -c thread4 2> /dev/null &
67+
timeout $TIMEOUT bash -c thread5 2> /dev/null &
5868

5969
wait
6070

0 commit comments

Comments
 (0)