Skip to content

Commit 74cd25a

Browse files
Backport #86089 to 25.8: Fix deadlock in PipelineExecutor downscaling logic
1 parent 290cc14 commit 74cd25a

File tree

8 files changed

+240
-18
lines changed

8 files changed

+240
-18
lines changed

src/Common/ISlotControl.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ class ISlotLease : public IAcquiredSlot
6262

6363
/// This method is for CPU consumption only.
6464
/// It should be called from a thread that started using the slot.
65-
/// Required for obtainting CPU time for the thread, because ctor is called in another thread.
65+
/// Required for obtaining CPU time for the thread, because ctor is called in another thread.
6666
virtual void startConsumption() = 0;
6767

6868
/// Renew the slot. This method should be called periodically.
@@ -81,6 +81,9 @@ class ISlotAllocation : public std::enable_shared_from_this<ISlotAllocation>, bo
8181
public:
8282
virtual ~ISlotAllocation() = default;
8383

84+
/// Free the allocated slots, cancel slot requests and wake up preempted threads.
85+
virtual void free() {}
86+
8487
/// Take one already granted slot if available.
8588
[[nodiscard]] virtual AcquiredSlotPtr tryAcquire() = 0;
8689

src/Common/Scheduler/CPULeaseAllocation.cpp

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -207,11 +207,29 @@ CPULeaseAllocation::CPULeaseAllocation(SlotCount max_threads_, ResourceLink mast
207207
}
208208

209209
CPULeaseAllocation::~CPULeaseAllocation()
210+
{
211+
free();
212+
}
213+
214+
void CPULeaseAllocation::free()
210215
{
211216
std::unique_lock lock{mutex};
217+
218+
if (shutdown)
219+
return;
220+
212221
shutdown = true;
213222
acquirable.store(false, std::memory_order_relaxed);
214223

224+
// Wake up all preempted threads
225+
while (true)
226+
{
227+
if (size_t thread_num = threads.preempted.find_first(); thread_num != boost::dynamic_bitset<>::npos)
228+
resetPreempted(thread_num);
229+
else
230+
break; // No preempted threads, we are done
231+
}
232+
215233
// Properly cancel pending resource request (if any)
216234
requests.cancel(lock);
217235

@@ -433,13 +451,21 @@ bool CPULeaseAllocation::renew(Lease & lease)
433451
}
434452

435453
std::unique_lock lock{mutex};
454+
436455
if (exception)
437456
throw Exception(ErrorCodes::RESOURCE_ACCESS_DENIED, "CPU Resource request failed: {}", getExceptionMessage(exception, /* with_stacktrace = */ false));
438457

439458
consume(lock, delta_ns);
440459

441460
report_span.reset();
442461

462+
if (shutdown) // Allocation is being destroyed, worker thread should stop
463+
{
464+
downscale(lease.slot_id);
465+
lease.reset();
466+
return false;
467+
}
468+
443469
// Check if we need to decrease number of running threads (i.e. `acquired`).
444470
// We want number of `acquired` slots to be less than number of `allocated` slots.
445471
// Difference `allocated - acquired` equals `granted`. But we allow `granted == -1` for two reasons:
@@ -477,14 +503,17 @@ bool CPULeaseAllocation::renew(Lease & lease)
477503
CurrentMetrics::Increment preempted_increment(CurrentMetrics::ConcurrencyControlPreempted);
478504
acquired_increment.sub(1);
479505

480-
if (!waitForGrant(lock, thread_num))
506+
if (!waitForGrant(lock, thread_num) || shutdown)
481507
{
482-
// Timeout - worker thread should stop, but query continues
508+
// Timeout or exception or shutdown - worker thread should stop
483509
downscale(thread_num);
484510
lease.reset();
485511
return false;
486512
}
487513

514+
if (settings.on_resume)
515+
settings.on_resume(thread_num);
516+
488517
if (exception) // Stop the query
489518
throw Exception(ErrorCodes::RESOURCE_ACCESS_DENIED, "CPU Resource request failed: {}", getExceptionMessage(exception, /* with_stacktrace = */ false));
490519

@@ -503,9 +532,26 @@ bool CPULeaseAllocation::waitForGrant(std::unique_lock<std::mutex> & lock, size_
503532

504533
auto predicate = [this, thread_num]
505534
{
506-
return !threads.preempted[thread_num] || exception;
535+
return !threads.preempted[thread_num] || exception || shutdown;
507536
};
508537

538+
// It is important to call on_preempt w/o lock to avoid deadlock due to recursive locking:
539+
// renew() -> ExecutorTasks::preempt() -> ExecutorTasks::finish() -> free()
540+
if (settings.on_preempt)
541+
{
542+
lock.unlock();
543+
try
544+
{
545+
settings.on_preempt(thread_num);
546+
}
547+
catch (...)
548+
{
549+
lock.lock();
550+
throw;
551+
}
552+
lock.lock();
553+
}
554+
509555
if (timeout == std::chrono::milliseconds::max())
510556
{
511557
threads.wake[thread_num].wait(lock, predicate);

src/Common/Scheduler/CPULeaseAllocation.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include <condition_variable>
1616
#include <mutex>
1717
#include <chrono>
18+
#include <functional>
1819

1920
namespace DB
2021
{
@@ -34,6 +35,12 @@ struct CPULeaseSettings
3435
/// Timeout after which preempted thread should exit
3536
std::chrono::milliseconds preemption_timeout = default_preemption_timeout;
3637

38+
/// Callback to be invoked when a thread is preempted
39+
std::function<void(size_t slot_id)> on_preempt;
40+
41+
/// Callback to be invoked when a thread is resumed
42+
std::function<void(size_t slot_id)> on_resume;
43+
3744
/// For debugging purposes, not used in production
3845
String workload;
3946

@@ -149,6 +156,9 @@ class CPULeaseAllocation final : public ISlotAllocation
149156
CPULeaseSettings settings = {});
150157
~CPULeaseAllocation() override;
151158

159+
/// Free all resources held by this allocation.
160+
void free() override;
161+
152162
/// Take one already granted slot if available. Never blocks or waits for slots.
153163
/// Should be used before spawning worker threads for a query.
154164
[[nodiscard]] AcquiredSlotPtr tryAcquire() override;

src/Processors/Executors/ExecutorTasks.cpp

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,26 @@ void ExecutorTasks::finish()
1818
async_task_queue.finish();
1919
}
2020

21+
freeCPU();
22+
2123
std::lock_guard guard(executor_contexts_mutex);
2224

2325
for (auto & context : executor_contexts)
2426
context->wakeUp();
2527
}
2628

29+
void ExecutorTasks::freeCPU()
30+
{
31+
SlotAllocationPtr slots;
32+
{
33+
std::lock_guard lock(mutex);
34+
slots = std::exchange(cpu_slots, nullptr);
35+
}
36+
if (!slots)
37+
return;
38+
slots->free();
39+
}
40+
2741
void ExecutorTasks::rethrowFirstThreadException()
2842
{
2943
for (auto & executor_context : executor_contexts)
@@ -177,14 +191,19 @@ ExecutorTasks::SpawnStatus ExecutorTasks::pushTasks(Queue & queue, Queue & async
177191
return DO_NOT_SPAWN; // No new tasks -- no need for new threads
178192
}
179193

180-
void ExecutorTasks::init(size_t num_threads_, size_t use_threads_, bool profile_processors, bool trace_processors, ReadProgressCallback * callback)
194+
void ExecutorTasks::init(size_t num_threads_, size_t use_threads_, const SlotAllocationPtr & cpu_slots_, bool profile_processors, bool trace_processors, ReadProgressCallback * callback)
181195
{
182196
num_threads = num_threads_;
183197
use_threads = use_threads_;
184198
threads_queue.init(num_threads);
185199
task_queue.init(num_threads);
186200
fast_task_queue.init(num_threads);
187201

202+
{
203+
std::lock_guard lock(mutex); // In case finish() is executed concurrently with init() due to exception
204+
cpu_slots = cpu_slots_;
205+
}
206+
188207
// Initialize slot counters with zeros up to max_threads
189208
slot_count.resize(num_threads, 0);
190209

@@ -246,12 +265,11 @@ ExecutorTasks::SpawnStatus ExecutorTasks::upscale(size_t slot_id)
246265

247266
void ExecutorTasks::downscale(size_t slot_id)
248267
{
249-
std::unique_lock lock(mutex);
268+
std::lock_guard lock(mutex);
250269

251270
if (slot_id >= slot_count.size() || slot_count[slot_id] == 0)
252271
return;
253272
--slot_count[slot_id];
254-
--total_slots;
255273

256274
if (slot_id + 1 == use_threads)
257275
{
@@ -265,16 +283,34 @@ void ExecutorTasks::downscale(size_t slot_id)
265283
}
266284
}
267285
}
286+
}
268287

269-
// We should make sure that downscaled thread has no local task inside context.
270-
// It is allowed to have tasks in `task_queue` or `fast_task_queue` because they can be stealed by other threads.
288+
void ExecutorTasks::preempt(size_t slot_id)
289+
{
290+
std::unique_lock lock(mutex);
291+
--total_slots;
292+
293+
/// We should make sure that preempted thread has no local task inside context.
294+
/// It is allowed to have tasks in `task_queue` or `fast_task_queue` because they can be stealed by other threads.
271295
auto & context = executor_contexts[slot_id];
272296
if (auto * task = context->popTask())
273297
{
274298
task_queue.push(task, slot_id);
275299
/// Wake up at least one thread to avoid deadlocks (all other threads maybe idle)
276-
tryWakeUpAnyOtherThreadWithTasks(*context, lock);
300+
tryWakeUpAnyOtherThreadWithTasks(*context, lock); // this releases the lock if it wakes up a thread
277301
}
302+
else if (task_queue.empty() && fast_task_queue.empty() && async_task_queue.empty() && threads_queue.size() == total_slots)
303+
{
304+
/// Finish pipeline if preempted thread was the last non-idle thread executed the last task of the whole pipeline
305+
lock.unlock();
306+
finish();
307+
}
308+
}
309+
310+
void ExecutorTasks::resume(size_t)
311+
{
312+
std::lock_guard lock(mutex);
313+
++total_slots;
278314
}
279315

280316
void ExecutorTasks::processAsyncTasks()

src/Processors/Executors/ExecutorTasks.h

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <Processors/Executors/PollingQueue.h>
55
#include <Processors/Executors/ThreadsQueue.h>
66
#include <Processors/Executors/TasksQueue.h>
7+
#include <Common/ISlotControl.h>
78
#include <stack>
89

910
namespace DB
@@ -46,12 +47,15 @@ class ExecutorTasks
4647
/// Reference counters for thread CPU slots to handle race conditions between upscale/downscale.
4748
std::vector<size_t> slot_count;
4849

49-
/// Total number of slots (sum of all slot_count).
50+
/// Total number of non-preempted slots.
5051
size_t total_slots = 0;
5152

5253
/// A set of currently waiting threads.
5354
ThreadsQueue threads_queue;
5455

56+
/// CPU slots for each thread.
57+
SlotAllocationPtr cpu_slots;
58+
5559
/// Threshold found by rolling dice.
5660
const static size_t TOO_MANY_IDLE_THRESHOLD = 4;
5761

@@ -94,18 +98,27 @@ class ExecutorTasks
9498
// If non-local tasks were added, wake up one thread to process them.
9599
SpawnStatus pushTasks(Queue & queue, Queue & async_queue, ExecutionThreadContext & context);
96100

97-
void init(size_t num_threads_, size_t use_threads_, bool profile_processors, bool trace_processors, ReadProgressCallback * callback);
101+
void init(size_t num_threads_, size_t use_threads_, const SlotAllocationPtr & cpu_slots_, bool profile_processors, bool trace_processors, ReadProgressCallback * callback);
98102
void fill(Queue & queue, Queue & async_queue);
99103

104+
/// Release CPU slots
105+
void freeCPU();
106+
100107
/// Upscale to include slot_id. Updates use_threads to max(use_threads, slot_id + 1)
101108
/// Returns spawn status indicating if more threads should be spawned
102109
SpawnStatus upscale(size_t slot_id);
103110

104-
void processAsyncTasks();
105-
106111
/// Downscale by removing slot_id from active slots. Updates use_threads to highest active slot + 1
107112
void downscale(size_t slot_id);
108113

114+
/// Temporarily release slot_id without downscale. Later either downscale() or resume() is called.
115+
void preempt(size_t slot_id);
116+
117+
/// Resume execution of a previously preempted slot.
118+
void resume(size_t slot_id);
119+
120+
void processAsyncTasks();
121+
109122
ExecutionThreadContext & getThreadContext(size_t thread_num) { return *executor_contexts[thread_num]; }
110123

111124
String dump();

src/Processors/Executors/PipelineExecutor.cpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ void PipelineExecutor::setReadProgressCallback(ReadProgressCallbackPtr callback)
226226
void PipelineExecutor::finalizeExecution()
227227
{
228228
single_thread_cpu_slot.reset();
229+
tasks.freeCPU();
229230
{
230231
std::lock_guard lock(spawn_mutex);
231232
cpu_slots.reset();
@@ -408,6 +409,7 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, IAcquiredSlot * cpu_sl
408409
try
409410
{
410411
// Preemption point. Renewal could block execution due to CPU overload.
412+
// It may trigger callbacks to tasks.preempt() and tasks.resume()
411413
if (!cpu_helper.renew())
412414
{
413415
tasks.downscale(cpu_helper.id());
@@ -437,7 +439,7 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, IAcquiredSlot * cpu_sl
437439
}
438440

439441
/// Properly allocate CPU slots or lease for the thread pool
440-
static SlotAllocationPtr allocateCPU(size_t num_threads, bool concurrency_control, bool trace_cpu_scheduling)
442+
SlotAllocationPtr PipelineExecutor::allocateCPU(size_t num_threads, bool concurrency_control)
441443
{
442444
// The first thread is called master thread.
443445
// It is NOT the thread that handles async tasks (unless query has max_threads=1).
@@ -494,6 +496,8 @@ static SlotAllocationPtr allocateCPU(size_t num_threads, bool concurrency_contro
494496
.quantum_ns = static_cast<ResourceCost>(quantum_ns),
495497
.report_ns = static_cast<ResourceCost>(quantum_ns / 10),
496498
.preemption_timeout = std::chrono::milliseconds(query_context->getCPUSlotPreemptionTimeout()),
499+
.on_preempt = [this](size_t slot_id) { tasks.preempt(slot_id); },
500+
.on_resume = [this](size_t slot_id) { tasks.resume(slot_id); },
497501
.workload = query_context->getSettingsRef()[Setting::workload],
498502
.trace_cpu_scheduling = trace_cpu_scheduling,
499503
});
@@ -521,7 +525,7 @@ void PipelineExecutor::initializeExecution(size_t num_threads, bool concurrency_
521525
is_execution_initialized = true;
522526
tryUpdateExecutionStatus(ExecutionStatus::NotStarted, ExecutionStatus::Executing);
523527

524-
cpu_slots = allocateCPU(num_threads, concurrency_control, trace_cpu_scheduling);
528+
cpu_slots = allocateCPU(num_threads, concurrency_control);
525529

526530
Queue queue;
527531
Queue async_queue;
@@ -530,7 +534,7 @@ void PipelineExecutor::initializeExecution(size_t num_threads, bool concurrency_
530534
/// use_threads should reflect number of thread spawned and can grow with tasks.upscale(...).
531535
/// Starting from 1 instead of 0 is to tackle the single thread scenario, where no upscale() will
532536
/// be invoked but actually 1 thread used.
533-
tasks.init(num_threads, 1, profile_processors, trace_processors, read_progress_callback.get());
537+
tasks.init(num_threads, 1, cpu_slots, profile_processors, trace_processors, read_progress_callback.get());
534538
tasks.fill(queue, async_queue);
535539

536540
if (num_threads > 1)

src/Processors/Executors/PipelineExecutor.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,6 @@ class PipelineExecutor
111111

112112
void initializeExecution(size_t num_threads, bool concurrency_control); /// Initialize executor contexts and task_queue.
113113
void finalizeExecution(); /// Check all processors are finished.
114-
void spawnThreads(AcquiredSlotPtr slot) TSA_REQUIRES(spawn_mutex);
115114

116115
/// Methods connected to execution.
117116
void executeImpl(size_t num_threads, bool concurrency_control);
@@ -120,6 +119,10 @@ class PipelineExecutor
120119
void finish();
121120
void cancel(ExecutionStatus reason);
122121

122+
// Methods for CPU scheduling
123+
SlotAllocationPtr allocateCPU(size_t num_threads, bool concurrency_control);
124+
void spawnThreads(AcquiredSlotPtr slot) TSA_REQUIRES(spawn_mutex);
125+
123126
/// If execution_status == from, change it to desired.
124127
bool tryUpdateExecutionStatus(ExecutionStatus expected, ExecutionStatus desired);
125128

0 commit comments

Comments
 (0)