-
Notifications
You must be signed in to change notification settings - Fork 14
Expand file tree
/
Copy pathnanothread.h
More file actions
573 lines (506 loc) · 20.8 KB
/
nanothread.h
File metadata and controls
573 lines (506 loc) · 20.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
/*
nanothread/nanothread.h -- Simple thread pool with a task-based API
Copyright (c) 2021 Wenzel Jakob <[email protected]>
All rights reserved. Use of this source code is governed by a BSD-style
license that can be found in the LICENSE file.
*/
#pragma once
#include <stdint.h>
#include <stddef.h>
#include <stdio.h>
#if defined(__cplusplus)
# include <type_traits>
#else
# include <stdbool.h>
#endif
#if defined(NANOTHREAD_STATIC)
# define NANOTHREAD_EXPORT
#else
# if defined(_MSC_VER)
# if defined(NANOTHREAD_BUILD)
# define NANOTHREAD_EXPORT __declspec(dllexport)
# else
# define NANOTHREAD_EXPORT __declspec(dllimport)
# endif
# else
# define NANOTHREAD_EXPORT __attribute__ ((visibility("default")))
# endif
#endif
#if defined(__cplusplus)
# define NANOTHREAD_DEF(x) = x
#else
# define NANOTHREAD_DEF(x)
#endif
#define NANOTHREAD_AUTO ((uint32_t) -1)
typedef struct Pool Pool;
typedef struct Task Task;
#if defined(__cplusplus)
#define NANOTHREAD_THROW noexcept(false)
extern "C" {
#else
#define NANOTHREAD_THROW
#endif
/**
* \brief Create a new thread pool
*
* \param size
* Specifies the desired number of threads available for parallel
* work, including the calling thread (which also participates in
* execution inside \ref task_wait()). The default value of
* \c NANOTHREAD_AUTO uses \ref performance_core_count(). Passing
* \c size = 0 or \c size = 1 disables worker threads entirely; the
* calling thread then performs all parallel work during
* \ref task_wait(). Otherwise, \c size - 1 worker threads are spawned.
*
* \param ftz
* Should denormalized floating point numbers be flushed to zero?
* The pool workers will initialize their floating point control
* registers accordingly.
*/
extern NANOTHREAD_EXPORT Pool *
pool_create(uint32_t size NANOTHREAD_DEF(NANOTHREAD_AUTO),
int ftz NANOTHREAD_DEF(1));
/**
* \brief Destroy the thread pool and discard remaining unfinished work.
*
* It is undefined behavior to destroy the thread pool while other threads
* are waiting for the completion of scheduled work via \ref task_wait().
*
* \param pool
* The thread pool to destroy. \c nullptr refers to the default pool.
*/
extern NANOTHREAD_EXPORT void pool_destroy(Pool *pool NANOTHREAD_DEF(0));
/// Returns the number of available CPU cores.
extern NANOTHREAD_EXPORT uint32_t core_count();
/**
* \brief Returns the number of performance cores available to the process.
*
* Some processors split their cores into performance-oriented and
* efficiency-oriented groups. This function reports the size of the
* performance group; on processors without such a split, it is
* equivalent to \ref core_count().
*
* The default thread pool size is derived from this value: using the
* efficiency cores tends to be a net loss, since the slower efficiency
* cores become stragglers that delay barrier-synchronized parallel
* regions.
*/
extern NANOTHREAD_EXPORT uint32_t performance_core_count();
/**
* \brief Return the number of threads available for parallel work.
*
* The returned count includes the calling thread, which participates
* in task execution inside \ref task_wait(). A return value of 1 means
* that no worker threads have been spawned and all parallel work runs
* on the calling thread.
*
* \param pool
* The thread pool to query. \c nullptr refers to the default pool.
*/
extern NANOTHREAD_EXPORT uint32_t pool_size(Pool *pool NANOTHREAD_DEF(0));
/**
* \brief Resize the thread pool.
*
* The \c size parameter counts the calling thread, which participates
* in task execution inside \ref task_wait(). Passing \c size = 0 or
* \c size = 1 disables worker threads entirely. Otherwise, \c size - 1
* worker threads are spawned.
*
* \param pool
* The thread pool to resize. \c nullptr refers to the default pool.
*/
extern NANOTHREAD_EXPORT void pool_set_size(Pool *pool, uint32_t size);
/**
* \brief Enable/disable time profiling
*
* Profiling must be enabled to use the \ref task_time() function.
*
* \param value
* A nonzero value indicates that profiling should be enabled.
*/
extern NANOTHREAD_EXPORT void pool_set_profile(int value);
/// Check whether time profiling is enabled (global setting)
extern NANOTHREAD_EXPORT int pool_profile();
/**
* \brief Return a unique number identifying the current worker thread
*
* When called from a thread pool worker (e.g. while executing a parallel
* task), this function returns a unique identifying number between 1 and the
* pool's total thread count.
*
* The IDs of separate thread pools overlap. When the current thread is not a
* thread pool worker, the function returns zero.
*/
extern NANOTHREAD_EXPORT uint32_t pool_thread_id();
/** \brief Process work available within the pool until a stopping criterion is
* satisified.
*
* This function repeatedly fetches work from ``pool`` until the stopping
* criterion ``stopping_criterion(payload)`` evaluates to ``true``, at which
* point the function returns.
*
* It provides a way for an ordinary thread to temporarily join the thread
* pool. A function being called by a worker thread that needs to wait for an
* event to take place can also call this function to avoid starvation issues.
*/
extern NANOTHREAD_EXPORT void
pool_work_until(Pool *pool, bool (*stopping_criterion)(void *), void *payload);
/*
* \brief Submit a new task to a thread pool
*
* This function submits a new task consisting of \c size work units to the
* thread pool \c pool.
*
* <b>Callback</b>: The task callback \c func will be invoked \c size times by
* the various thread pool workers. Its first argument will range from
* <tt>0</tt> to \c <tt>size - 1</tt>, and the second argument refers to a
* payload memory region specified via the \c payload parameter.
*
* <b>Parents</bb>: The \c parent and \c parent_count parameters can be used to
* specify parent tasks that must be completed before execution of this task
* can commence. If the task does not depend on any other tasks (e.g.
* <tt>parent_count == 0</tt> and <tt>parent == nullptr</tt>), or when all of
* those other tasks have already finished executing, then it will be
* immediately appended to the end of the task queue. Otherwise, the task will
* be scheduled once all parent tasks have finished executing.
*
* <b>Payload storage</b>: The callback payload is handled using one of two
* possible modes:
*
* <ol>
* <li>When <tt>size == 0</tt> or <tt>payload_deleter != nullptr</tt>, the
* value of the \c payload parameter is simply forwarded to the callback \c
* func. In the latter case, <tt>payload_deleter(payload)</tt> is invoked
* following completion of the task, which can carry out additional cleanup
* operations if needed. In both cases, the memory region targeted by \c
* payload may be accessed asynchronously and must remain valid until the
* task is done.</li>
*
* <li>Otherwise, the function will internally create a copy of the payload
* and free it following completion of the task. In this case, it is fine to
* delete the the memory region targeted by \c payload right after the
* function call.</li>
* </ol>
*
* The function returns a task handle that can be used to schedule other
* dependent tasks, and to wait for task completion if desired. This handle
* must eventually be released using \ref task_release() or \ref
* task_release_and_wait(). A failure to do so will result in memory leaks.
*
* <b>Small task optimization</b>: If desired, small tasks can be executed
* right away without using the thread pool. This happens under the following
* conditions:
*
* <ol>
* <li>The task is "small" (\c size == 1).</li>
* <li>The task does not depend on any parent tasks.</li>
* <li>The \c always_async parameter is set to 0</li>
* </ol>
*
* \remark
* Barriers and similar dependency relations can be encoded by via
* artificial tasks using <tt>size == 0</tt> and <tt>func == nullptr<tt>
* along with a set of parent tasks.
*
* \param pool
* The thread pool that should execute the specified task. \c nullptr
* refers to the default pool.
*
* \param parent
* List of parents of size \c parent_count. \c nullptr-valued elements
* are ignored
*
* \param parent_count
* Number of parent tasks
*
* \param size
* Total number of work units; the callback \c func will be called this
* many times if provided.
*
* \param func
* Callback function that will be invoked to perform the actual computation.
* If set to \c nullptr, the callback is ignored. This can be used to create
* artificial tasks that only encode dependencies.
*
* \param payload
* Optional payload that is passed to the function \c func
*
* \param payload_size
* When \c payload_deleter is equal to \c nullptr and when \c size is
* nonzero, a temporary copy of the payload will be made. This parameter is
* necessary to specify the payload size in that case.
*
* \param payload_deleter
* Optional callback that will be invoked to free the payload
*
* \param always_async
* If set to a nonzero value, execution will always happen asynchronously,
* even in cases where the task being scheduled has no parents, and
* when only encodes a small amount of work (\c size == 1). Otherwise
* it will be executed synchronously, and the function will return \c nullptr.
*
* \param profile
* If set to a nonzero value, nanothread will keep track of the start and
* end time of the task. This behavior will also be enabled if
* \ref pool_set_profile() is used to enable profiling globally.
*
*
* \return
* A task handle that must eventually be released via \ref task_release()
* or \ref task_wait_and_release(). The function returns \c nullptr when
* no task was generated (e.g. when there are no parent tasks, and either
* <tt>size==0</tt>, or when <tt>size==1</tt> and the task was executed
* synchronously.)
*/
extern NANOTHREAD_EXPORT
Task *task_submit_dep(Pool *pool,
const Task * const *parent,
uint32_t parent_count,
uint32_t size NANOTHREAD_DEF(1),
void (*func)(uint32_t, void *) NANOTHREAD_DEF(0),
void *payload NANOTHREAD_DEF(0),
uint32_t payload_size NANOTHREAD_DEF(0),
void (*payload_deleter)(void *) NANOTHREAD_DEF(0),
int always_async NANOTHREAD_DEF(0),
int profile NANOTHREAD_DEF(0));
/*
* \brief Release a task handle so that it can eventually be reused
*
* Releasing a task handle does not impact the tasks's execution, which could
* be in one of three states: waiting, running, or complete. This operation is
* important because it frees internal resources that would otherwise leak.
*
* Following a call to \ref task_release(), the associated task can no
* longer be used as a direct parent of other tasks, and it is no longer
* possible to wait for its completion using an operation like \ref
* task_wait().
*
* \param pool
* The thread pool containing the task. \c nullptr refers to the default pool.
*
* \param task
* The task in question. When equal to \c nullptr, the operation is a no-op.
*/
extern NANOTHREAD_EXPORT void task_release(Task *task);
/*
* \brief Wait for the completion of the specified task
*
* This function causes the calling thread to sleep until all work units of
* 'task' have been completed.
*
* If an exception was caught during parallel excecution of 'task', the
* function \ref task_wait() will re-raise this exception in the context of the
* caller. Note that if a parallel task raises many exceptions, only a single
* one of them will be be captured in this way.
*
* \param task
* The task in question. When equal to \c nullptr, the operation is a no-op.
*/
extern NANOTHREAD_EXPORT void task_wait(Task *task) NANOTHREAD_THROW;
/*
* \brief Wait for the completion of the specified task and release its handle
*
* This function is equivalent to calling \ref task_wait() followed by \ref
* task_release().
*
* If an exception was caught during parallel excecution of 'task', the
* function \ref task_wait_and_release() will perform the release step and then
* re-raise this exception in the context of the caller. Note that if a
* parallel task raises many exceptions, only a single one of them will be be
* captured in this way.
*
* \param task
* The task in question. When equal to \c nullptr, the operation is a no-op.
*/
extern NANOTHREAD_EXPORT void task_wait_and_release(Task *task) NANOTHREAD_THROW;
/**
* \brief Query whether a task has completed without blocking
*
* This function checks if all work units of the specified task have been
* completed. Unlike \ref task_wait(), this function returns immediately
* without blocking.
*
* \param task
* The task in question. When equal to \c nullptr, the function returns true.
*
* \return
* true if the task has completed (or is nullptr), false otherwise.
*/
extern NANOTHREAD_EXPORT bool task_query(Task *task);
/**
* \brief Return the time consumed by the task in milliseconds
*
* To use this function, the underlying task must have been launched via \ref
* task_submit_dep() with the ``profile`` argument set to a nonzero value.
* Alternatively, you may call \ref pool_set_profile() before launching tasks
* to automatically enable profiling globally.
*
* The task must have finished previously, use \ref task_wait() if in doubt.
*/
extern NANOTHREAD_EXPORT double task_time(Task *task) NANOTHREAD_THROW;
/**
* \brief Return the difference between the start time of two different tasks
*
* To use this function, the underlying task2 must have been launched via \ref
* task_submit_dep() with the ``profile`` argument set to a nonzero value.
* Alternatively, you may call \ref pool_set_profile() before launching tasks
* to automatically enable profiling globally.
*
* Both tasks must have finished previously, use \ref task_wait() if in doubt.
*/
extern NANOTHREAD_EXPORT double task_time_rel(Task *task_1, Task *task_2) NANOTHREAD_THROW;
/*
* \brief Increase the reference count of a task
*
* In advanced use case, it may be helpful if multiple parts of the system can
* hold references to a task (and e.g. query timing information or
* completeness). The \c task_retain operation enables this by increasing an
* internal reference counter so that \ref task_release() must be called
* multiple times before the task is actually released.
*
* \param task
* The task in question. When equal to \c nullptr, the operation is a no-op.
*/
extern NANOTHREAD_EXPORT void task_retain(Task *task);
/// Convenience wrapper around task_submit_dep(), but without dependencies
static inline
Task *task_submit(Pool *pool,
uint32_t size NANOTHREAD_DEF(1),
void (*func)(uint32_t, void *) NANOTHREAD_DEF(0),
void *payload NANOTHREAD_DEF(0),
uint32_t payload_size NANOTHREAD_DEF(0),
void (*payload_deleter)(void *) NANOTHREAD_DEF(0),
int always_async NANOTHREAD_DEF(0)) {
return task_submit_dep(pool, 0, 0, size, func, payload, payload_size,
payload_deleter, always_async, 0);
}
/// Convenience wrapper around task_submit(), but fully synchronous
static inline
void task_submit_and_wait(Pool *pool,
uint32_t size NANOTHREAD_DEF(1),
void (*func)(uint32_t, void *) NANOTHREAD_DEF(0),
void *payload NANOTHREAD_DEF(0)) {
Task *task = task_submit(pool, size, func, payload, 0, 0, 0);
task_wait_and_release(task);
}
#if defined(__cplusplus)
}
#include <utility>
namespace drjit {
template <typename Int> struct blocked_range {
public:
blocked_range(Int begin, Int end, Int block_size = 1)
: m_begin(begin), m_end(end), m_block_size(block_size) { }
struct iterator {
Int value;
iterator(Int value) : value(value) { }
Int operator*() const { return value; }
operator Int() const { return value;}
void operator++() { value++; }
bool operator==(const iterator &it) { return value == it.value; }
bool operator!=(const iterator &it) { return value != it.value; }
};
uint32_t blocks() const {
return (uint32_t) ((m_end - m_begin + m_block_size - 1) / m_block_size);
}
iterator begin() const { return iterator(m_begin); }
iterator end() const { return iterator(m_end); }
Int block_size() const { return m_block_size; }
private:
Int m_begin;
Int m_end;
Int m_block_size;
};
template <typename Int, typename Func>
void parallel_for(const blocked_range<Int> &range, Func &&func,
Pool *pool = nullptr) {
struct Payload {
Func *f;
Int begin, end, block_size;
};
Payload payload{ &func, range.begin(), range.end(),
range.block_size() };
auto callback = [](uint32_t index, void *payload) {
Payload *p = (Payload *) payload;
Int begin = p->begin + p->block_size * (Int) index,
end = begin + p->block_size;
if (end > p->end)
end = p->end;
(*p->f)(blocked_range<Int>(begin, end));
};
task_submit_and_wait(pool, range.blocks(), callback, &payload);
}
template <typename Int, typename Func>
Task *parallel_for_async(const blocked_range<Int> &range, Func &&func,
const Task * const *parents,
size_t parent_count,
Pool *pool = nullptr) {
using BaseFunc = typename std::decay<Func>::type;
struct Payload {
BaseFunc f;
Int begin, end, block_size;
};
auto callback = [](uint32_t index, void *payload) {
Payload *p = (Payload *) payload;
Int begin = p->begin + p->block_size * (Int) index,
end = begin + p->block_size;
if (end > p->end)
end = p->end;
p->f(blocked_range<Int>(begin, end));
};
if (std::is_trivially_copyable<BaseFunc>::value &&
std::is_trivially_destructible<BaseFunc>::value) {
Payload payload{ std::forward<Func>(func), range.begin(),
range.end(), range.block_size() };
return task_submit_dep(pool, parents,
(uint32_t) parent_count, range.blocks(),
callback, &payload, sizeof(Payload), nullptr, 1);
} else {
Payload *payload = new Payload{ std::forward<Func>(func), range.begin(),
range.end(), range.block_size() };
auto deleter = [](void *payload) {
delete (Payload *) payload;
};
return task_submit_dep(pool, parents,
(uint32_t) parent_count, range.blocks(),
callback, payload, 0, deleter, 1);
}
}
template <typename Int, typename Func>
Task *parallel_for_async(const blocked_range<Int> &range, Func &&func,
std::initializer_list<const Task *> parents = { },
Pool *pool = nullptr) {
return parallel_for_async(range, func, parents.begin(), parents.size(),
pool);
}
template <typename Func>
Task *do_async(Func &&func, const Task * const *parents, size_t parent_count,
Pool *pool = nullptr) {
using BaseFunc = typename std::decay<Func>::type;
struct Payload {
BaseFunc f;
};
auto callback = [](uint32_t /* unused */, void *payload) {
((Payload *) payload)->f();
};
if (std::is_trivially_copyable<BaseFunc>::value &&
std::is_trivially_destructible<BaseFunc>::value) {
Payload payload {std::forward<Func>(func) };
return task_submit_dep(pool, parents,
(uint32_t) parent_count, 1, callback,
&payload, sizeof(Payload), nullptr, 1);
} else {
Payload *payload = new Payload{ std::forward<Func>(func) };
auto deleter = [](void *payload) { delete (Payload *) payload; };
return task_submit_dep(pool, parents,
(uint32_t) parent_count, 1, callback,
payload, 0, deleter, 1);
}
}
template <typename Func>
Task *do_async(Func &&func, std::initializer_list<const Task *> parents = {},
Pool *pool = nullptr) {
return do_async(func, parents.begin(), parents.size(), pool);
}
}
#endif