@@ -18,12 +18,26 @@ void ExecutorTasks::finish()
1818 async_task_queue.finish ();
1919 }
2020
21+ freeCPU ();
22+
2123 std::lock_guard guard (executor_contexts_mutex);
2224
2325 for (auto & context : executor_contexts)
2426 context->wakeUp ();
2527}
2628
29+ void ExecutorTasks::freeCPU ()
30+ {
31+ SlotAllocationPtr slots;
32+ {
33+ std::lock_guard lock (mutex);
34+ slots = std::exchange (cpu_slots, nullptr );
35+ }
36+ if (!slots)
37+ return ;
38+ slots->free ();
39+ }
40+
2741void ExecutorTasks::rethrowFirstThreadException ()
2842{
2943 for (auto & executor_context : executor_contexts)
@@ -177,14 +191,19 @@ ExecutorTasks::SpawnStatus ExecutorTasks::pushTasks(Queue & queue, Queue & async
177191 return DO_NOT_SPAWN; // No new tasks -- no need for new threads
178192}
179193
180- void ExecutorTasks::init (size_t num_threads_, size_t use_threads_, bool profile_processors, bool trace_processors, ReadProgressCallback * callback)
194+ void ExecutorTasks::init (size_t num_threads_, size_t use_threads_, const SlotAllocationPtr & cpu_slots_, bool profile_processors, bool trace_processors, ReadProgressCallback * callback)
181195{
182196 num_threads = num_threads_;
183197 use_threads = use_threads_;
184198 threads_queue.init (num_threads);
185199 task_queue.init (num_threads);
186200 fast_task_queue.init (num_threads);
187201
202+ {
203+ std::lock_guard lock (mutex); // In case finish() is executed concurrently with init() due to exception
204+ cpu_slots = cpu_slots_;
205+ }
206+
188207 // Initialize slot counters with zeros up to max_threads
189208 slot_count.resize (num_threads, 0 );
190209
@@ -246,12 +265,11 @@ ExecutorTasks::SpawnStatus ExecutorTasks::upscale(size_t slot_id)
246265
247266void ExecutorTasks::downscale (size_t slot_id)
248267{
249- std::unique_lock lock (mutex);
268+ std::lock_guard lock (mutex);
250269
251270 if (slot_id >= slot_count.size () || slot_count[slot_id] == 0 )
252271 return ;
253272 --slot_count[slot_id];
254- --total_slots;
255273
256274 if (slot_id + 1 == use_threads)
257275 {
@@ -265,16 +283,34 @@ void ExecutorTasks::downscale(size_t slot_id)
265283 }
266284 }
267285 }
286+ }
268287
269- // We should make sure that downscaled thread has no local task inside context.
270- // It is allowed to have tasks in `task_queue` or `fast_task_queue` because they can be stealed by other threads.
288+ void ExecutorTasks::preempt (size_t slot_id)
289+ {
290+ std::unique_lock lock (mutex);
291+ --total_slots;
292+
293+ // / We should make sure that preempted thread has no local task inside context.
294+ // / It is allowed to have tasks in `task_queue` or `fast_task_queue` because they can be stealed by other threads.
271295 auto & context = executor_contexts[slot_id];
272296 if (auto * task = context->popTask ())
273297 {
274298 task_queue.push (task, slot_id);
275299 // / Wake up at least one thread to avoid deadlocks (all other threads maybe idle)
276- tryWakeUpAnyOtherThreadWithTasks (*context, lock);
300+ tryWakeUpAnyOtherThreadWithTasks (*context, lock); // this releases the lock if it wakes up a thread
277301 }
302+ else if (task_queue.empty () && fast_task_queue.empty () && async_task_queue.empty () && threads_queue.size () == total_slots)
303+ {
304+ // / Finish pipeline if preempted thread was the last non-idle thread executed the last task of the whole pipeline
305+ lock.unlock ();
306+ finish ();
307+ }
308+ }
309+
310+ void ExecutorTasks::resume (size_t )
311+ {
312+ std::lock_guard lock (mutex);
313+ ++total_slots;
278314}
279315
280316void ExecutorTasks::processAsyncTasks ()
0 commit comments