@@ -343,47 +343,60 @@ void ReplicatedMergeTreeQueue::updateTimesInZooKeeper(
343343
344344void ReplicatedMergeTreeQueue::removeProcessedEntry (zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry)
345345{
346- auto code = zookeeper->tryRemove (replica_path + " /queue/" + entry->znode_name );
347-
348- if (code)
349- LOG_ERROR (log, " Couldn't remove " << replica_path << " /queue/" << entry->znode_name << " : "
350- << zkutil::ZooKeeper::error2string (code) << " . This shouldn't happen often." );
351-
352346 std::optional<time_t > min_unprocessed_insert_time_changed;
353347 std::optional<time_t > max_processed_insert_time_changed;
354348
355349 bool found = false ;
350+ bool need_remove_from_zk = true ;
356351 size_t queue_size = 0 ;
357352
358353 {
359354 std::unique_lock lock (state_mutex);
360-
361- // / Remove the job from the queue in the RAM.
362- // / You can not just refer to a pre-saved iterator, because someone else might be able to delete the task.
363- // / Why do we view the queue from the end?
364- // / - because the task for execution first is moved to the end of the queue, so that in case of failure it remains at the end.
365- for (Queue::iterator it = queue. end (); it != queue. begin ();)
355+ if (entry-> removed_by_other_entry )
356+ {
357+ need_remove_from_zk = false ;
358+ queue_size = queue. size ();
359+ }
360+ else
366361 {
367- --it;
368- if (*it == entry)
362+ // / Remove the job from the queue in the RAM.
363+ // / You can not just refer to a pre-saved iterator, because someone else might be able to delete the task.
364+ // / Why do we view the queue from the end?
365+ // / - because the task for execution first is moved to the end of the queue, so that in case of failure it remains at the end.
366+ for (Queue::iterator it = queue.end (); it != queue.begin ();)
369367 {
370- found = true ;
371- updateStateOnQueueEntryRemoval (
372- entry, /* is_successful = */ true ,
373- min_unprocessed_insert_time_changed, max_processed_insert_time_changed, lock);
368+ --it;
374369
375- queue.erase (it);
376- queue_size = queue.size ();
377- break ;
370+ if (*it == entry)
371+ {
372+ found = true ;
373+ updateStateOnQueueEntryRemoval (
374+ entry, /* is_successful = */ true ,
375+ min_unprocessed_insert_time_changed, max_processed_insert_time_changed, lock);
376+
377+ queue.erase (it);
378+ queue_size = queue.size ();
379+ break ;
380+ }
378381 }
379382 }
380383 }
381384
382- if (!found)
385+ if (!found && need_remove_from_zk )
383386 throw Exception (" Can't find " + entry->znode_name + " in the memory queue. It is a bug" , ErrorCodes::LOGICAL_ERROR);
384387
385388 notifySubscribers (queue_size);
386389
390+
391+ if (!need_remove_from_zk)
392+ return ;
393+
394+ auto code = zookeeper->tryRemove (replica_path + " /queue/" + entry->znode_name );
395+
396+ if (code)
397+ LOG_ERROR (log, " Couldn't remove " << replica_path << " /queue/" << entry->znode_name << " : "
398+ << zkutil::ZooKeeper::error2string (code) << " . This shouldn't happen often." );
399+
387400 updateTimesInZooKeeper (zookeeper, min_unprocessed_insert_time_changed, max_processed_insert_time_changed);
388401}
389402
@@ -845,13 +858,18 @@ void ReplicatedMergeTreeQueue::removePartProducingOpsInRange(
845858 const MergeTreePartInfo & part_info,
846859 const ReplicatedMergeTreeLogEntryData & current)
847860{
861+ // / TODO is it possible to simplify it?
848862 Queue to_wait;
849863 size_t removed_entries = 0 ;
850864 std::optional<time_t > min_unprocessed_insert_time_changed;
851865 std::optional<time_t > max_processed_insert_time_changed;
852866
853867 // / Remove operations with parts, contained in the range to be deleted, from the queue.
854868 std::unique_lock lock (state_mutex);
869+
870+ [[maybe_unused]] bool called_from_alter_query_directly = current.replace_range_entry && current.replace_range_entry ->columns_version < 0 ;
871+ assert (currently_executing_drop_or_replace_range || called_from_alter_query_directly);
872+
855873 for (Queue::iterator it = queue.begin (); it != queue.end ();)
856874 {
857875 auto type = (*it)->type ;
@@ -863,13 +881,18 @@ void ReplicatedMergeTreeQueue::removePartProducingOpsInRange(
863881 if ((*it)->currently_executing )
864882 to_wait.push_back (*it);
865883 auto code = zookeeper->tryRemove (replica_path + " /queue/" + (*it)->znode_name );
884+ // / FIXME it's probably unsafe to remove entries non-atomically
885+ // / when this method called directly from alter query (not from replication queue task),
886+ // / because entries will be lost if ALTER fails.
866887 if (code)
867888 LOG_INFO (log, " Couldn't remove " << replica_path + " /queue/" + (*it)->znode_name << " : "
868889 << zkutil::ZooKeeper::error2string (code));
869890
870891 updateStateOnQueueEntryRemoval (
871892 *it, /* is_successful = */ false ,
872893 min_unprocessed_insert_time_changed, max_processed_insert_time_changed, lock);
894+
895+ (*it)->removed_by_other_entry = true ;
873896 queue.erase (it++);
874897 ++removed_entries;
875898 }
@@ -1105,6 +1128,16 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
11051128 }
11061129 }
11071130
1131+ if (entry.type == LogEntry::DROP_RANGE || entry.type == LogEntry::REPLACE_RANGE)
1132+ {
1133+ // / DROP_RANGE and REPLACE_RANGE entries remove other entries, which produce parts in the range.
1134+ // / If such part producing operations are currently executing, then DROP/REPLACE RANGE wait them to finish.
1135+ // / Deadlock is possible if multiple DROP/REPLACE RANGE entries are executing in parallel and wait each other.
1136+ // / See also removePartProducingOpsInRange(...) and ReplicatedMergeTreeQueue::CurrentlyExecuting.
1137+ if (currently_executing_drop_or_replace_range)
1138+ return false ;
1139+ }
1140+
11081141 return true ;
11091142}
11101143
@@ -1135,6 +1168,11 @@ Int64 ReplicatedMergeTreeQueue::getCurrentMutationVersion(const String & partiti
11351168ReplicatedMergeTreeQueue::CurrentlyExecuting::CurrentlyExecuting (const ReplicatedMergeTreeQueue::LogEntryPtr & entry_, ReplicatedMergeTreeQueue & queue_)
11361169 : entry(entry_), queue(queue_)
11371170{
1171+ if (entry->type == ReplicatedMergeTreeLogEntry::DROP_RANGE || entry->type == ReplicatedMergeTreeLogEntry::REPLACE_RANGE)
1172+ {
1173+ assert (!queue.currently_executing_drop_or_replace_range );
1174+ queue.currently_executing_drop_or_replace_range = true ;
1175+ }
11381176 entry->currently_executing = true ;
11391177 ++entry->num_tries ;
11401178 entry->last_attempt_time = time (nullptr );
@@ -1168,6 +1206,11 @@ ReplicatedMergeTreeQueue::CurrentlyExecuting::~CurrentlyExecuting()
11681206{
11691207 std::lock_guard lock (queue.state_mutex );
11701208
1209+ if (entry->type == ReplicatedMergeTreeLogEntry::DROP_RANGE || entry->type == ReplicatedMergeTreeLogEntry::REPLACE_RANGE)
1210+ {
1211+ assert (queue.currently_executing_drop_or_replace_range );
1212+ queue.currently_executing_drop_or_replace_range = false ;
1213+ }
11711214 entry->currently_executing = false ;
11721215 entry->execution_complete .notify_all ();
11731216
0 commit comments