Last active
October 5, 2024 09:17
-
-
Save kprotty/0d2dc3da4840341d6ff361b27bdac7dc to your computer and use it in GitHub Desktop.
Small & Fast synchronization primitives for Zig
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const std = @import("std"); | |
const system = std.os.system; | |
// https://vorbrodt.blog/2019/02/27/advanced-thread-pool/ | |
pub fn main() !void { | |
return benchPool(DistributedPool); | |
} | |
const REPS = 1; | |
const SPREAD = 100; | |
const COUNT = 10_000_000; | |
fn benchPool(comptime Pool: type) !void { | |
const Task = struct { | |
index: usize, | |
runnable: Pool.Runnable = Pool.Runnable.init(run), | |
fn run(runnable: *Pool.Runnable) void { | |
const self = @fieldParentPtr(@This(), "runnable", runnable); | |
var prng = std.rand.DefaultPrng.init(self.index); | |
const rng = &prng.random; | |
var x: usize = undefined; | |
var reps: usize = REPS + (REPS * rng.uintLessThan(usize, 5)); | |
while (reps > 0) : (reps -= 1) { | |
x = self.index + rng.int(usize); | |
} | |
var keep: usize = undefined; | |
@atomicStore(usize, &keep, x, .SeqCst); | |
} | |
}; | |
const Spawner = struct { | |
index: usize, | |
tasks: []Task, | |
runnable: Pool.Runnable = Pool.Runnable.init(run), | |
fn run(runnable: *Pool.Runnable) void { | |
const self = @fieldParentPtr(@This(), "runnable", runnable); | |
for (self.tasks) |*task, offset| { | |
task.* = Task{ .index = self.index + offset }; | |
Pool.schedule(&task.runnable); | |
} | |
} | |
}; | |
const Root = struct { | |
tasks: []Task, | |
spawners: []Spawner, | |
runnable: Pool.Runnable = Pool.Runnable.init(run), | |
fn run(runnable: *Pool.Runnable) void { | |
const self = @fieldParentPtr(@This(), "runnable", runnable); | |
var offset: usize = 0; | |
const chunk = self.tasks.len / self.spawners.len; | |
for (self.spawners) |*spawner, index| { | |
spawner.* = Spawner{ | |
.index = index, | |
.tasks = self.tasks[offset..][0..chunk], | |
}; | |
offset += chunk; | |
Pool.schedule(&spawner.runnable); | |
} | |
} | |
}; | |
const allocator = std.heap.page_allocator; | |
const tasks = try allocator.alloc(Task, COUNT); | |
defer allocator.free(tasks); | |
const spawners = try allocator.alloc(Spawner, SPREAD); | |
defer allocator.free(spawners); | |
var root = Root{ | |
.tasks = tasks, | |
.spawners = spawners, | |
}; | |
try Pool.run(&root.runnable); | |
} | |
const BasicPool = struct { | |
run_queue: ?*Runnable = null, | |
const Runnable = struct { | |
next: ?*Runnable = null, | |
callback: fn(*Runnable) void, | |
fn init(callback: fn(*Runnable) void) Runnable { | |
return .{ .callback = callback }; | |
} | |
}; | |
var tls_pool: ?*BasicPool = null; | |
fn run(runnable: *Runnable) !void { | |
var pool = BasicPool{}; | |
pool.push(runnable); | |
const old = tls_pool; | |
tls_pool = &pool; | |
defer tls_pool = old; | |
while (pool.pop()) |next| | |
(next.callback)(next); | |
} | |
fn schedule(runnable: *Runnable) void { | |
tls_pool.?.push(runnable); | |
} | |
fn push(self: *BasicPool, runnable: *Runnable) void { | |
runnable.next = self.run_queue; | |
self.run_queue = runnable; | |
} | |
fn pop(self: *BasicPool) ?*Runnable { | |
const runnable = self.run_queue orelse return null; | |
self.run_queue = runnable.next; | |
return runnable; | |
} | |
}; | |
const SharedPool = struct { | |
mutex: Mutex, | |
cond: Condvar, | |
run_queue: ?*Runnable, | |
running: usize, | |
const Runnable = struct { | |
next: ?*Runnable = null, | |
callback: fn(*Runnable) void, | |
fn init(callback: fn(*Runnable) void) Runnable { | |
return .{ .callback = callback }; | |
} | |
}; | |
var tls_pool: ?*SharedPool = null; | |
fn run(runnable: *Runnable) !void { | |
var self = SharedPool{ | |
.mutex = Mutex.init(), | |
.cond = Condvar.init(), | |
.run_queue = runnable, | |
.running = std.math.max(1, std.Thread.cpuCount() catch 1), | |
}; | |
const old_pool = tls_pool; | |
tls_pool = &self; | |
defer tls_pool = old_pool; | |
defer { | |
self.mutex.deinit(); | |
self.cond.deinit(); | |
} | |
const allocator = std.heap.page_allocator; | |
const threads = try allocator.alloc(*std.Thread, self.running - 1); | |
defer allocator.free(threads); | |
for (threads) |*thread| | |
thread.* = try std.Thread.spawn(&self, runWorker); | |
defer for (threads) |thread| | |
thread.wait(); | |
self.runWorker(); | |
} | |
fn schedule(runnable: *Runnable) void { | |
const self = tls_pool.?; | |
self.mutex.lock(); | |
defer self.mutex.unlock(); | |
runnable.next = self.run_queue; | |
self.run_queue = runnable; | |
self.cond.signal(); | |
} | |
fn runWorker(self: *SharedPool) void { | |
self.mutex.lock(); | |
defer self.mutex.unlock(); | |
while (true) { | |
if (self.run_queue) |runnable| { | |
self.run_queue = runnable.next; | |
self.mutex.unlock(); | |
(runnable.callback)(runnable); | |
self.mutex.lock(); | |
continue; | |
} | |
self.running -= 1; | |
if (self.running == 0) { | |
self.cond.broadcast(); | |
return; | |
} | |
self.cond.wait(&self.mutex); | |
if (self.running == 0) { | |
break; | |
} else { | |
self.running += 1; | |
} | |
} | |
} | |
}; | |
const DistributedPool = struct { | |
idle: usize = 0, | |
workers: []Worker, | |
run_queue: UnboundedQueue, | |
semaphore: Semaphore = Semaphore.init(0), | |
const Runnable = struct { | |
next: ?*Runnable = null, | |
callback: fn(*Runnable) void, | |
fn init(callback: fn(*Runnable) void) Runnable { | |
return .{ .callback = callback }; | |
} | |
}; | |
fn run(runnable: *Runnable) !void { | |
const threads = std.math.max(1, std.Thread.cpuCount() catch 1); | |
const allocator = std.heap.page_allocator; | |
var self = DistributedPool{ | |
.workers = try allocator.alloc(Worker, threads), | |
.run_queue = UnboundedQueue.init(), | |
}; | |
defer { | |
self.semaphore.deinit(); | |
self.run_queue.deinit(); | |
allocator.free(self.workers); | |
} | |
for (self.workers) |*worker| | |
worker.* = Worker.init(&self); | |
defer for (self.workers) |*worker| | |
worker.deinit(); | |
self.run_queue.pushFront(Batch.from(runnable)); | |
for (self.workers[1..]) |*worker| | |
worker.thread = try std.Thread.spawn(worker, Worker.run); | |
defer for (self.workers[1..]) |*worker| | |
worker.thread.wait(); | |
self.workers[0].run(); | |
} | |
fn schedule(runnable: *Runnable) void { | |
const worker = Worker.current.?; | |
if (worker.run_queue.push(runnable)) |overflowed| | |
worker.run_queue_overflow.pushFront(overflowed); | |
worker.pool.notify(false); | |
} | |
const Idle = struct { | |
state: State, | |
waiting: usize, | |
const State = enum(u2){ | |
pending = 0, | |
notified, | |
waking, | |
signalled, | |
}; | |
fn pack(self: Idle) usize { | |
return @enumToInt(self.state) | (self.waiting << 2); | |
} | |
fn unpack(value: usize) Idle { | |
return Idle{ | |
.state = @intToEnum(State, @truncate(u2, value)), | |
.waiting = value >> 2, | |
}; | |
} | |
}; | |
const Wait = enum { | |
retry, | |
waking, | |
shutdown, | |
}; | |
fn wait(self: *DistributedPool, is_waking: bool) Wait { | |
var idle = Idle.unpack(@atomicLoad(usize, &self.idle, .SeqCst)); | |
while (true) { | |
if (idle.waiting == self.workers.len - 1) { | |
self.semaphore.post(); | |
return Wait.shutdown; | |
} | |
const notified = switch (idle.state) { | |
.notified => true, | |
.signalled => is_waking, | |
else => false, | |
}; | |
var new_idle = idle; | |
if (notified) { | |
new_idle.state = if (is_waking) .waking else .pending; | |
} else { | |
new_idle.waiting += 1; | |
new_idle.state = if (is_waking) .notified else idle.state; | |
} | |
if (@cmpxchgWeak( | |
usize, | |
&self.idle, | |
idle.pack(), | |
new_idle.pack(), | |
.SeqCst, | |
.SeqCst, | |
)) |updated| { | |
idle = Idle.unpack(updated); | |
continue; | |
} | |
if (notified and is_waking) | |
return Wait.waking; | |
if (notified) | |
return Wait.retry; | |
self.semaphore.wait(); | |
return Wait.waking; | |
} | |
} | |
fn notify(self: *DistributedPool, is_waking: bool) void { | |
var idle = Idle.unpack(@atomicLoad(usize, &self.idle, .SeqCst)); | |
while (true) { | |
if (!is_waking and (idle.state == .notified or idle.state == .signalled)) | |
return; | |
var new_idle = idle; | |
if (idle.waiting > 0 and (is_waking or idle.state == .pending)) { | |
new_idle.waiting -= 1; | |
new_idle.state = .waking; | |
} else if (!is_waking and idle.state == .waking) { | |
new_idle.state = .signalled; | |
} else { | |
new_idle.state = .notified; | |
} | |
if (@cmpxchgWeak( | |
usize, | |
&self.idle, | |
idle.pack(), | |
new_idle.pack(), | |
.SeqCst, | |
.SeqCst, | |
)) |updated| { | |
idle = Idle.unpack(updated); | |
continue; | |
} | |
if (idle.waiting > new_idle.waiting) | |
self.semaphore.post(); | |
return; | |
} | |
} | |
const Worker = struct { | |
run_queue: BoundedQueue, | |
run_queue_overflow: UnboundedQueue, | |
thread: *std.Thread = undefined, | |
pool: *DistributedPool, | |
threadlocal var current: ?*Worker = null; | |
fn init(pool: *DistributedPool) Worker { | |
return Worker{ | |
.pool = pool, | |
.run_queue = BoundedQueue.init(), | |
.run_queue_overflow = UnboundedQueue.init(), | |
}; | |
} | |
fn deinit(self: *Worker) void { | |
self.run_queue.deinit(); | |
self.run_queue_overflow.deinit(); | |
} | |
fn run(self: *Worker) void { | |
const old = current; | |
current = self; | |
defer current = old; | |
var waking = false; | |
var tick = @ptrToInt(self); | |
var prng = @truncate(u32, tick >> @sizeOf(usize)); | |
while (true) { | |
if (self.poll(.{ | |
.tick = tick, | |
.rand = &prng, | |
})) |runnable| { | |
if (waking) | |
self.pool.notify(waking); | |
tick +%= 1; | |
waking = false; | |
(runnable.callback)(runnable); | |
continue; | |
} | |
waking = switch (self.pool.wait(waking)) { | |
.retry => false, | |
.waking => true, | |
.shutdown => break, | |
}; | |
} | |
} | |
fn poll(self: *Worker, args: anytype) ?*Runnable { | |
if (args.tick % 256 == 0) { | |
if (self.steal(args)) |runnable| | |
return runnable; | |
} | |
if (args.tick % 128 == 0) { | |
if (self.run_queue.stealUnbounded(&self.pool.run_queue)) |runnable| | |
return runnable; | |
} | |
if (args.tick % 64 == 0) { | |
if (self.run_queue.stealUnbounded(&self.run_queue_overflow)) |runnable| | |
return runnable; | |
} | |
if (self.run_queue.pop()) |runnable| | |
return runnable; | |
if (self.run_queue.stealUnbounded(&self.run_queue_overflow)) |runnable| | |
return runnable; | |
var attempts: u8 = 32; | |
while (attempts > 0) : (attempts -= 1) { | |
if (self.steal(args)) |runnable| | |
return runnable; | |
if (self.run_queue.stealUnbounded(&self.pool.run_queue)) |runnable| | |
return runnable; | |
std.os.sched_yield() catch spinLoopHint(); | |
} | |
return null; | |
} | |
fn steal(self: *Worker, args: anytype) ?*Runnable { | |
var workers = self.pool.workers; | |
var index = blk: { | |
var x = args.rand.*; | |
x ^= x << 13; | |
x ^= x >> 17; | |
x ^= x << 5; | |
args.rand.* = x; | |
break :blk x % workers.len; | |
}; | |
var iter = workers.len; | |
while (iter > 0) : (iter -= 1) { | |
const worker = &workers[index]; | |
index += 1; | |
if (index == workers.len) | |
index = 0; | |
if (worker == self) | |
continue; | |
if (self.run_queue.stealBounded(&worker.run_queue)) |runnable| | |
return runnable; | |
if (self.run_queue.stealUnbounded(&worker.run_queue_overflow)) |runnable| | |
return runnable; | |
} | |
return null; | |
} | |
}; | |
const Batch = struct { | |
size: usize = 0, | |
head: *Runnable = undefined, | |
tail: *Runnable = undefined, | |
fn isEmpty(self: Batch) bool { | |
return self.size == 0; | |
} | |
fn from(runnable: *Runnable) Batch { | |
runnable.next = null; | |
return Batch{ | |
.size = 1, | |
.head = runnable, | |
.tail = runnable, | |
}; | |
} | |
fn pushBack(self: *Batch, batch: Batch) void { | |
if (batch.isEmpty()) | |
return; | |
if (self.isEmpty()) { | |
self.* = batch; | |
} else { | |
self.tail.next = batch.head; | |
self.tail = batch.tail; | |
self.size += batch.size; | |
} | |
} | |
fn pushFront(self: *Batch, batch: Batch) void { | |
if (batch.isEmpty()) | |
return; | |
if (self.isEmpty()) { | |
self.* = batch; | |
} else { | |
batch.tail.next = self.head; | |
self.head = batch.head; | |
self.size += batch.size; | |
} | |
} | |
fn popFront(self: *Batch) ?*Runnable { | |
if (self.isEmpty()) | |
return null; | |
const runnable = self.head; | |
self.head = runnable.next orelse undefined; | |
self.size -= 1; | |
return runnable; | |
} | |
}; | |
const UnboundedQueue = struct { | |
mutex: Mutex, | |
batch: Batch = .{}, | |
size: usize = 0, | |
fn init() UnboundedQueue { | |
return UnboundedQueue{ .mutex = Mutex.init() }; | |
} | |
fn deinit(self: *UnboundedQueue) void { | |
self.mutex.deinit(); | |
self.* = undefined; | |
} | |
fn pushBack(self: *UnboundedQueue, batch: Batch) void { | |
self.push(batch, .back); | |
} | |
fn pushFront(self: *UnboundedQueue, batch: Batch) void { | |
self.push(batch, .front); | |
} | |
fn push(self: *UnboundedQueue, batch: Batch, side: enum{ front, back }) void { | |
if (batch.isEmpty()) | |
return; | |
self.mutex.lock(); | |
defer self.mutex.unlock(); | |
switch (side) { | |
.front => self.batch.pushFront(batch), | |
.back => self.batch.pushBack(batch), | |
} | |
@atomicStore(usize, &self.size, self.batch.size, .Release); | |
} | |
fn tryAcquireConsumer(self: *UnboundedQueue) ?Consumer { | |
if (@atomicLoad(usize, &self.size, .Acquire) == 0) | |
return null; | |
self.mutex.lock(); | |
if (self.size == 0) { | |
self.mutex.unlock(); | |
return null; | |
} | |
return Consumer{ | |
.queue = self, | |
}; | |
} | |
const Consumer = struct { | |
queue: *UnboundedQueue, | |
fn release(self: Consumer) void { | |
@atomicStore(usize, &self.queue.size, self.queue.batch.size, .Release); | |
self.queue.mutex.unlock(); | |
} | |
fn pop(self: *Consumer) ?*Runnable { | |
return self.queue.batch.popFront(); | |
} | |
}; | |
}; | |
const BoundedQueue = struct { | |
head: usize = 0, | |
tail: usize = 0, | |
buffer: [256]*Runnable = undefined, | |
fn init() BoundedQueue { | |
return .{}; | |
} | |
fn deinit(self: *BoundedQueue) void { | |
self.* = undefined; | |
} | |
fn push(self: *BoundedQueue, runnable: *Runnable) ?Batch { | |
var tail = self.tail; | |
var head = @atomicLoad(usize, &self.head, .Monotonic); | |
while (true) { | |
if (tail -% head < self.buffer.len) { | |
@atomicStore(*Runnable, &self.buffer[tail % self.buffer.len], runnable, .Unordered); | |
@atomicStore(usize, &self.tail, tail +% 1, .Release); | |
return null; | |
} | |
const new_head = head +% (self.buffer.len / 2); | |
if (@cmpxchgWeak( | |
usize, | |
&self.head, | |
head, | |
new_head, | |
.Acquire, | |
.Monotonic, | |
)) |updated| { | |
head = updated; | |
continue; | |
} | |
var batch = Batch{}; | |
while (head != new_head) : (head +%= 1) | |
batch.pushBack(Batch.from(self.buffer[head % self.buffer.len])); | |
batch.pushBack(Batch.from(runnable)); | |
return batch; | |
} | |
} | |
fn pop(self: *BoundedQueue) ?*Runnable { | |
var tail = self.tail; | |
var head = @atomicLoad(usize, &self.head, .Monotonic); | |
while (tail != head) { | |
head = @cmpxchgWeak( | |
usize, | |
&self.head, | |
head, | |
head +% 1, | |
.Acquire, | |
.Monotonic, | |
) orelse return self.buffer[head % self.buffer.len]; | |
} | |
return null; | |
} | |
fn stealUnbounded(self: *BoundedQueue, target: *UnboundedQueue) ?*Runnable { | |
var consumer = target.tryAcquireConsumer() orelse return null; | |
defer consumer.release(); | |
const first_runnable = consumer.pop(); | |
const head = @atomicLoad(usize, &self.head, .Monotonic); | |
const tail = self.tail; | |
var new_tail = tail; | |
while (new_tail -% head < self.buffer.len) { | |
const runnable = consumer.pop() orelse break; | |
@atomicStore(*Runnable, &self.buffer[new_tail % self.buffer.len], runnable, .Unordered); | |
new_tail +%= 1; | |
} | |
if (new_tail != tail) | |
@atomicStore(usize, &self.tail, new_tail, .Release); | |
return first_runnable; | |
} | |
fn stealBounded(self: *BoundedQueue, target: *BoundedQueue) ?*Runnable { | |
if (self == target) | |
return self.pop(); | |
const head = @atomicLoad(usize, &self.head, .Monotonic); | |
const tail = self.tail; | |
if (tail != head) | |
return self.pop(); | |
var target_head = @atomicLoad(usize, &target.head, .Monotonic); | |
while (true) { | |
const target_tail = @atomicLoad(usize, &target.tail, .Acquire); | |
const target_size = target_tail -% target_head; | |
if (target_size == 0) | |
return null; | |
var steal = target_size - (target_size / 2); | |
if (steal > target.buffer.len / 2) { | |
spinLoopHint(); | |
target_head = @atomicLoad(usize, &target.head, .Monotonic); | |
continue; | |
} | |
const first_runnable = @atomicLoad(*Runnable, &target.buffer[target_head % target.buffer.len], .Unordered); | |
var new_target_head = target_head +% 1; | |
var new_tail = tail; | |
steal -= 1; | |
while (steal > 0) : (steal -= 1) { | |
const runnable = @atomicLoad(*Runnable, &target.buffer[new_target_head % target.buffer.len], .Unordered); | |
new_target_head +%= 1; | |
@atomicStore(*Runnable, &self.buffer[new_tail % self.buffer.len], runnable, .Unordered); | |
new_tail +%= 1; | |
} | |
if (@cmpxchgWeak( | |
usize, | |
&target.head, | |
target_head, | |
new_target_head, | |
.AcqRel, | |
.Monotonic, | |
)) |updated| { | |
target_head = updated; | |
continue; | |
} | |
if (new_tail != tail) | |
@atomicStore(usize, &self.tail, new_tail, .Release); | |
return first_runnable; | |
} | |
} | |
}; | |
}; | |
pub const Semaphore = struct { | |
mutex: Mutex, | |
cond: Condvar, | |
permits: usize, | |
pub fn init(permits: usize) Semaphore { | |
return .{ | |
.mutex = Mutex.init(), | |
.cond = Condvar.init(), | |
.permits = permits, | |
}; | |
} | |
pub fn deinit(self: *Semaphore) void { | |
self.mutex.deinit(); | |
self.cond.deinit(); | |
self.* = undefined; | |
} | |
pub fn wait(self: *Semaphore) void { | |
self.mutex.lock(); | |
defer self.mutex.unlock(); | |
while (self.permits == 0) | |
self.cond.wait(&self.mutex); | |
self.permits -= 1; | |
if (self.permits > 0) | |
self.cond.signal(); | |
} | |
pub fn post(self: *Semaphore) void { | |
self.mutex.lock(); | |
defer self.mutex.unlock(); | |
self.permits += 1; | |
self.cond.signal(); | |
} | |
}; | |
pub const Mutex = if (std.builtin.os.tag == .windows) | |
struct { | |
srwlock: SRWLOCK, | |
pub fn init() Mutex { | |
return .{ .srwlock = SRWLOCK_INIT }; | |
} | |
pub fn deinit(self: *Mutex) void { | |
self.* = undefined; | |
} | |
pub fn tryLock(self: *Mutex) bool { | |
return TryAcquireSRWLockExclusive(&self.srwlock) != system.FALSE; | |
} | |
pub fn lock(self: *Mutex) void { | |
AcquireSRWLockExclusive(&self.srwlock); | |
} | |
pub fn unlock(self: *Mutex) void { | |
ReleaseSRWLockExclusive(&self.srwlock); | |
} | |
const SRWLOCK = usize; | |
const SRWLOCK_INIT: SRWLOCK = 0; | |
extern "kernel32" fn TryAcquireSRWLockExclusive(s: *SRWLOCK) callconv(system.WINAPI) system.BOOL; | |
extern "kernel32" fn AcquireSRWLockExclusive(s: *SRWLOCK) callconv(system.WINAPI) void; | |
extern "kernel32" fn ReleaseSRWLockExclusive(s: *SRWLOCK) callconv(system.WINAPI) void; | |
} | |
else if (std.builtin.link_libc) | |
struct { | |
mutex: if (std.builtin.link_libc) std.c.pthread_mutex_t else void, | |
pub fn init() Mutex { | |
return .{ .mutex = std.c.PTHREAD_MUTEX_INITIALIZER }; | |
} | |
pub fn deinit(self: *Mutex) void { | |
const safe_rc = switch (std.builtin.os.tag) { | |
.dragonfly, .netbsd => std.os.EAGAIN, | |
else => 0, | |
}; | |
const rc = std.c.pthread_mutex_destroy(&self.mutex); | |
std.debug.assert(rc == 0 or rc == safe_rc); | |
self.* = undefined; | |
} | |
pub fn tryLock(self: *Mutex) bool { | |
return pthread_mutex_trylock(&self.mutex) == 0; | |
} | |
pub fn lock(self: *Mutex) void { | |
const rc = std.c.pthread_mutex_lock(&self.mutex); | |
if (rc != 0) | |
std.debug.panic("pthread_mutex_lock() = {}\n", .{rc}); | |
} | |
pub fn unlock(self: *Mutex) void { | |
const rc = std.c.pthread_mutex_unlock(&self.mutex); | |
if (rc != 0) | |
std.debug.panic("pthread_mutex_unlock() = {}\n", .{rc}); | |
} | |
extern "c" fn pthread_mutex_trylock(m: *std.c.pthread_mutex_t) callconv(.C) c_int; | |
} | |
else if (std.builtin.os.tag == .linux) | |
struct { | |
state: State, | |
const State = enum(i32) { | |
unlocked, | |
locked, | |
waiting, | |
}; | |
pub fn init() Mutex { | |
return .{ .state = .unlocked }; | |
} | |
pub fn deinit(self: *Mutex) void { | |
self.* = undefined; | |
} | |
pub fn tryLock(self: *Mutex) bool { | |
return @cmpxchgStrong( | |
State, | |
&self.state, | |
.unlocked, | |
.locked, | |
.Acquire, | |
.Monotonic, | |
) == null; | |
} | |
pub fn lock(self: *Mutex) void { | |
switch (@atomicRmw(State, &self.state, .Xchg, .locked, .Acquire)) { | |
.unlocked => {}, | |
else => |s| self.lockSlow(s), | |
} | |
} | |
fn lockSlow(self: *Mutex, current_state: State) void { | |
@setCold(true); | |
var new_state = current_state; | |
var spin: u8 = 0; | |
while (spin < 100) : (spin += 1) { | |
const state = @cmpxchgWeak( | |
State, | |
&self.state, | |
.unlocked, | |
new_state, | |
.Acquire, | |
.Monotonic, | |
) orelse return; | |
switch (state) { | |
.unlocked => {}, | |
.locked => {}, | |
.waiting => break, | |
} | |
var iter = std.math.min(32, spin + 1); | |
while (iter > 0) : (iter -= 1) | |
spinLoopHint(); | |
} | |
new_state = .waiting; | |
while (true) { | |
switch (@atomicRmw(State, &self.state, .Xchg, new_state, .Acquire)) { | |
.unlocked => return, | |
else => {}, | |
} | |
Futex.wait( | |
@ptrCast(*const i32, &self.state), | |
@enumToInt(new_state), | |
); | |
} | |
} | |
pub fn unlock(self: *Mutex) void { | |
switch (@atomicRmw(State, &self.state, .Xchg, .unlocked, .Release)) { | |
.unlocked => unreachable, | |
.locked => {}, | |
.waiting => self.unlockSlow(), | |
} | |
} | |
fn unlockSlow(self: *Mutex) void { | |
@setCold(true); | |
Futex.wake(@ptrCast(*const i32, &self.state)); | |
} | |
} | |
else | |
struct { | |
is_locked: bool, | |
pub fn init() Mutex { | |
return .{ .is_locked = false }; | |
} | |
pub fn deinit(self: *Mutex) void { | |
self.* = undefined; | |
} | |
pub fn tryLock(self: *Mutex) bool { | |
return @atomicRmw(bool, &self.is_locked, .Xchg, true, .Acquire) == false; | |
} | |
pub fn lock(self: *Mutex) void { | |
while (!self.tryLock()) | |
spinLoopHint(); | |
} | |
pub fn unlock(self: *Mutex) void { | |
@atomicStore(bool, &self.is_locked, false, .Release); | |
} | |
}; | |
pub const Condvar = if (std.builtin.os.tag == .windows) | |
struct { | |
cond: CONDITION_VARIABLE, | |
pub fn init() Condvar { | |
return .{ .cond = CONDITION_VARIABLE_INIT }; | |
} | |
pub fn deinit(self: *Condvar) void { | |
self.* = undefined; | |
} | |
pub fn wait(self: *Condvar, mutex: *Mutex) void { | |
const rc = SleepConditionVariableSRW( | |
&self.cond, | |
&mutex.srwlock, | |
system.INFINITE, | |
@as(system.ULONG, 0), | |
); | |
std.debug.assert(rc != system.FALSE); | |
} | |
pub fn signal(self: *Condvar) void { | |
WakeConditionVariable(&self.cond); | |
} | |
pub fn broadcast(self: *Condvar) void { | |
WakeAllConditionVariable(&self.cond); | |
} | |
const SRWLOCK = usize; | |
const CONDITION_VARIABLE = usize; | |
const CONDITION_VARIABLE_INIT: CONDITION_VARIABLE = 0; | |
extern "kernel32" fn WakeAllConditionVariable(c: *CONDITION_VARIABLE) callconv(system.WINAPI) void; | |
extern "kernel32" fn WakeConditionVariable(c: *CONDITION_VARIABLE) callconv(system.WINAPI) void; | |
extern "kernel32" fn SleepConditionVariableSRW( | |
c: *CONDITION_VARIABLE, | |
s: *SRWLOCK, | |
t: system.DWORD, | |
f: system.ULONG, | |
) callconv(system.WINAPI) system.BOOL; | |
} | |
else if (std.builtin.link_libc) | |
struct { | |
cond: if (std.builtin.link_libc) std.c.pthread_cond_t else void, | |
pub fn init() Condvar { | |
return .{ .cond = std.c.PTHREAD_COND_INITIALIZER }; | |
} | |
pub fn deinit(self: *Condvar) void { | |
const safe_rc = switch (std.builtin.os.tag) { | |
.dragonfly, .netbsd => std.os.EAGAIN, | |
else => 0, | |
}; | |
const rc = std.c.pthread_cond_destroy(&self.cond); | |
std.debug.assert(rc == 0 or rc == safe_rc); | |
self.* = undefined; | |
} | |
pub fn wait(self: *Condvar, mutex: *Mutex) void { | |
const rc = std.c.pthread_cond_wait(&self.cond, &mutex.mutex); | |
std.debug.assert(rc == 0); | |
} | |
pub fn signal(self: *Condvar) void { | |
const rc = std.c.pthread_cond_signal(&self.cond); | |
std.debug.assert(rc == 0); | |
} | |
pub fn broadcast(self: *Condvar) void { | |
const rc = std.c.pthread_cond_broadcast(&self.cond); | |
std.debug.assert(rc == 0); | |
} | |
} | |
else | |
struct { | |
pending: bool, | |
queue_mutex: Mutex, | |
queue_list: std.SinglyLinkedList(struct { | |
futex: i32 = 0, | |
fn wait(self: *@This()) void { | |
while (@atomicLoad(i32, &self.futex, .Acquire) == 0) { | |
if (@hasDecl(Futex, "wait")) { | |
Futex.wait(&self.futex, 0); | |
} else { | |
spinLoopHint(); | |
} | |
} | |
} | |
fn notify(self: *@This()) void { | |
@atomicStore(i32, &self.futex, 1, .Release); | |
if (@hasDecl(Futex, "wake")) | |
Futex.wake(&self.futex); | |
} | |
}), | |
pub fn init() Condvar { | |
return .{ | |
.pending = false, | |
.queue_mutex = Mutex.init(), | |
.queue_list = .{}, | |
}; | |
} | |
pub fn deinit(self: *Condvar) void { | |
self.queue_mutex.deinit(); | |
self.* = undefined; | |
} | |
pub fn wait(self: *Condvar, mutex: *Mutex) void { | |
var waiter = @TypeOf(self.queue_list).Node{ .data = .{} }; | |
{ | |
self.queue_mutex.lock(); | |
defer self.queue_mutex.unlock(); | |
self.queue_list.prepend(&waiter); | |
@atomicStore(bool, &self.pending, true, .SeqCst); | |
} | |
mutex.unlock(); | |
waiter.data.wait(); | |
mutex.lock(); | |
} | |
pub fn signal(self: *Condvar) void { | |
if (@atomicLoad(bool, &self.pending, .SeqCst) == false) | |
return; | |
const maybe_waiter = blk: { | |
self.queue_mutex.lock(); | |
defer self.queue_mutex.unlock(); | |
const maybe_waiter = self.queue_list.popFirst(); | |
@atomicStore(bool, &self.pending, self.queue_list.first != null, .SeqCst); | |
break :blk maybe_waiter; | |
}; | |
if (maybe_waiter) |waiter| | |
waiter.data.notify(); | |
} | |
pub fn broadcast(self: *Condvar) void { | |
if (@atomicLoad(bool, &self.pending, .SeqCst) == false) | |
return; | |
@atomicStore(bool, &self.pending, false, .SeqCst); | |
var waiters = blk: { | |
self.queue_mutex.lock(); | |
defer self.queue_mutex.unlock(); | |
const waiters = self.queue_list; | |
self.queue_list = .{}; | |
break :blk waiters; | |
}; | |
while (waiters.popFirst()) |waiter| | |
waiter.data.notify(); | |
} | |
}; | |
const Futex = switch (std.builtin.os.tag) { | |
.linux => struct { | |
fn wait(ptr: *const i32, cmp: i32) void { | |
switch (system.getErrno(system.futex_wait( | |
ptr, | |
system.FUTEX_PRIVATE_FLAG | system.FUTEX_WAIT, | |
cmp, | |
null, | |
))) { | |
0 => {}, | |
std.os.EINTR => {}, | |
std.os.EAGAIN => {}, | |
else => unreachable, | |
} | |
} | |
fn wake(ptr: *const i32) void { | |
switch (system.getErrno(system.futex_wake( | |
ptr, | |
system.FUTEX_PRIVATE_FLAG | system.FUTEX_WAKE, | |
@as(i32, 1), | |
))) { | |
0 => {}, | |
std.os.EFAULT => {}, | |
else => unreachable, | |
} | |
} | |
}, | |
else => void, | |
}; | |
fn spinLoopHint() void { | |
switch (std.builtin.arch) { | |
.i386, .x86_64 => asm volatile("pause" ::: "memory"), | |
.arm, .aarch64 => asm volatile("yield" ::: "memory"), | |
else => {}, | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
pub fn ParkingLot(comptime Config: type) type { | |
return struct { | |
pub const Lock: type = Config.Lock; | |
pub const Event: type = Config.Event; | |
pub const nanotime: fn() u64 = switch (@hasDecl(Config, "nanotime")) { | |
true => Config.nanotime, | |
else => struct { | |
fn stub() u64 { | |
return 0; | |
} | |
}.stub, | |
}; | |
const bucket_count: usize = switch (@hasDecl(Config, "bucket_count")) { | |
true => Config.bucket_count, | |
else => std.meta.bitCount(usize) << 2, | |
}; | |
const tick_frequency_range: usize = switch (@hasDecl(Config, "tick_frequency_range")) { | |
true => Config.tick_frequency_range, | |
else => 1 * std.time.ns_per_ms, | |
}; | |
const Bucket = struct { | |
lock: Lock = .{}, | |
treap: usize = 0, | |
waiters: usize = 0, | |
next_tick: u64 = 0, | |
var table = [_]Bucket{Bucket{}} ** std.math.max(1, bucket_count); | |
fn from(address: usize) *Bucket { | |
return &table[address % table.len]; | |
} | |
}; | |
const Queue = struct { | |
head: ?*Waiter, | |
parent: ?*Waiter, | |
address: usize, | |
bucket: *Bucket, | |
fn find(bucket: *Bucket, address: usize) Queue { | |
@compileError("TODO: binary-search the bucket for the address"); | |
} | |
fn link(self: *Queue, head: *Waiter) void { | |
self.head = head; | |
head.address = self.address; | |
head.ticket = self.rand(usize) | 1; | |
head.left = null; | |
head.right = null; | |
head.parent = self.parent; | |
if (self.parent) |parent| { | |
if (parent.address < head.address) { | |
parent.right = head; | |
} else { | |
parent.left = head; | |
} | |
} else { | |
if (self.bucket.treap & 1 != 0) { | |
head.prng = @truncate(u16, self.bucket.treap >> 1); | |
} | |
self.bucket.treap = @ptrToInt(head); | |
} | |
while (head.parent) |parent| { | |
if (parent.ticket > head.ticket) { | |
if (parent.left == head) { | |
self.rotate(parent, .right); | |
} else if (parent.right == head) { | |
self.rotate(parent, .left); | |
} else { | |
unreachable; | |
} | |
} else { | |
break; | |
} | |
} | |
} | |
fn relink(self: *Queue, noalias head: *Waiter, noalias new_head: *Waiter) void { | |
new_head.prng = head.prng; | |
new_head.ticket = head.ticket; | |
new_head.left = parent.left; | |
new_head.right = parent.right; | |
new_head.parent = head.parent; | |
if (head.left) |left| { | |
left.parent = new_head; | |
} | |
if (head.right) |right| { | |
right.parent = new_head; | |
} | |
if (head.parent) |parent| { | |
if (parent.address < head.address) { | |
parent.right = new_head; | |
} else { | |
parent.left = new_head; | |
} | |
} | |
} | |
fn unlink(self: *Queue, head: *Waiter) void { | |
assert(self.head == head); | |
self.head = null; | |
while ((head.right orelse head.left) != null) { | |
var direction = Direciton.left; | |
defer self.rotate(head, direction); | |
if (head.right) |right| { | |
if (head.left) |left| { | |
if (left.tick < right.ticket) { | |
direction = .right; | |
} | |
} | |
} else { | |
direction = .right; | |
} | |
} | |
if (head.parent) |parent| { | |
if (parent.left == head) { | |
parent.left = null; | |
} else { | |
parent.right = null; | |
} | |
} else { | |
assert(self.bucket.treap & 1 == 0); | |
assert(self.bucket.treap != 0); | |
self.bucket.treap = (@as(usize, head.prng) << 16) | 1; | |
} | |
} | |
fn isEmpty(self: Queue) bool { | |
return self.head == null; | |
} | |
fn isWaiting(waiter: *Waiter) bool { | |
return waiter.token != 0; | |
} | |
fn push(self: *Queue, waiter: *Waiter) void { | |
waiter.prev = null; | |
waiter.next = null; | |
waiter.ticket = 1; | |
const head = self.head orelse { | |
waiter.len = 1; | |
waiter.tail = null; | |
self.link(waiter); | |
return; | |
}; | |
if (head.tail) |tail| { | |
waiter.prev = tail; | |
tail.next = waiter; | |
} | |
head.tail = waiter; | |
head.len += 1; | |
} | |
fn pop(self: *Queue) ?*Waiter { | |
const waiter = self.head orelse return null; | |
self.remove(waiter); | |
return waiter; | |
} | |
fn remove(self: *Queue, waiter: *Waiter) void { | |
assert(isWaiting(waiter)); | |
assert(!self.isEmpty()); | |
if (waiter.next) |next| { | |
next.prev = waiter.prev; | |
} | |
if (waiter.prev) |prev| { | |
prev.next = waiter.next; | |
} | |
const head = self.head orelse unreachable; | |
head.len -= 1; | |
if (waiter == head) { | |
if (waiter.next) |new_head| { | |
new_head.len = head.len; | |
new_head.tail = head.tail; | |
self.relink(head, new_head); | |
} else { | |
self.unlink(head); | |
} | |
} else if (waiter == head.tail) { | |
head.tail = waiter.prev; | |
} | |
waiter.prev = null; | |
waiter.next = null; | |
waiter.tail = null; | |
waiter.ticket = 0; | |
} | |
fn steal(noalias self: *Queue, noalias target: *Queue) void { | |
assert(!target.isEmpty()); | |
const target_head = target.head orelse unreachable; | |
const target_len = target_head.len; | |
target.unlink(target_head); | |
if (self.head) |head| { | |
if (head.tail) |tail| { | |
tail.next = target_head; | |
target_head.prev = tail; | |
} | |
head.tail = target_head.tail; | |
head.len += target_len; | |
} else { | |
self.link(target_head); | |
} | |
if (target.bucket != self.bucket) { | |
_ = atomic.fetchSub(&target.bucket.waiters, target_len, .Relaxed); | |
_ = atomic.fetchAdd(&self.bucket.waiters, target_len, .Relaxed); | |
} | |
} | |
fn iter() Iter { | |
return Iter{ .waiter = self.head }; | |
} | |
const Iter = struct { | |
waiter: ?*Waiter, | |
fn isEmpty(self: Iter) bool { | |
return self.waiter == null; | |
} | |
fn next(self: *Iter) ?*Waiter { | |
const waiter = self.waiter orelse return null; | |
self.waiter = waiter.next; | |
return waiter; | |
} | |
}; | |
fn pollTick(self: Queue) bool { | |
const expires_ptr = &self.bucket.next_tick; | |
const expires = expires_ptr.*; | |
const timestamp: u64 = nanotime(); | |
if (expires > 0 and (timestamp < expires)) { | |
return false; | |
} | |
const min_delay = tick_frequency_range - (tick_frequency_range / 2); | |
const timeout = min_deleay + (self.rand(u64) % min_delay); | |
expires_ptr.* = timestamp + timeout; | |
return true; | |
} | |
fn rand(self: *Queue, comptime Int: type) Int { | |
const prng = blk: { | |
const bucket = self.bucket; | |
assert(bucket.treap != 0); | |
if (bucket.treap & 1 == 0) { | |
break :blk &@intToPtr(*Waiter, bucket.treap).prng; | |
} else { | |
break :blk &@ptrCast([*]u16, &bucket.treap)[1]; | |
} | |
}; | |
if (prng.* == 0) { | |
var seed: u16 = 0; | |
prng.* = @truncate(u16, @ptrToInt(&seed) >> @sizeOf(u16)) | 1; | |
} | |
var value: Int = 0; | |
for (@ptrCast([*]u16, &rng)[0..(@sizeOf(Int) / @sizeOf(u16))]) |*chunk| { | |
var xorshift = prng.*; | |
xorshift ^= xorshift << 7; | |
xorshift ^= xorshift >> 9; | |
xorshift ^= xorshift << 8; | |
prng.* = xorshift; | |
chunk.* = xorshift; | |
} | |
return value; | |
} | |
}; | |
const Waiter = struct { | |
prev: ?*Waiter, | |
next: ?*Waiter, | |
tail: ?*Waiter, | |
left: ?*Waiter, | |
right: ?*Waiter, | |
parent: ?*Waiter, | |
address: Address, | |
ticket: usize, | |
prng: u16, | |
len: usize, | |
token: usize, | |
event: Event, | |
}; | |
pub fn park( | |
address: usize, | |
cancellation: ?Event.Cancellation, | |
context: anytype, | |
) error{Invalidated, Cancelled}!usize { | |
var waiter: Waiter = undefined; | |
var held: Lock.Held = undefined; | |
const bucket = Bucket.from(address); | |
{ | |
bucket.lock.acquire(&held); | |
defer bucket.lock.release(&held); | |
_ = atomic.fetchAdd(&bucket.waiters, 1, .Relaxed); | |
waiter.token = context.onValidate() orelse { | |
_ = atomic.fetchSub(&bucket.waiters, 1, .Relaxed); | |
return error.Invalidated; | |
}; | |
var queue = Queue.find(bucket, address); | |
queue.push(&waiter); | |
waiter.event.init(); | |
} | |
_ = context.onBeforeWait(); | |
var wait_result: bool = waiter.event.wait(cancellation); | |
var is_cancelled = wait_result == false; | |
if (is_cancelled) { | |
{ | |
bucket.lock.acquire(&held); | |
defer bucket.lock.release(&held); | |
is_cancelled = Queue.isWaiting(waiter); | |
if (is_cancelled) { | |
var queue = Queue.find(bucket, address); | |
queue.remove(&waiter); | |
context.onTimeout(!queue.isEmpty()); | |
_ = atomic.fetchSub(&bucket.waiters, 1, .Relaxed); | |
} | |
} | |
if (!is_cancelled) { | |
wait_result = waiter.event.wait(null); | |
assert(wait_result); | |
} | |
} | |
waiter.event.deinit(); | |
if (is_cancelled) return error.Cancelled; | |
return waiter.token; | |
} | |
pub fn unpark( | |
address: usize, | |
requeue: ?usize, | |
context: anytype, | |
) void { | |
const bucket = Bucket.from(address); | |
if (atomic.load(&bucket.waiters, .Relaxed) == 0) { | |
return; | |
} | |
var unparked = blk: { | |
var held: Lock.Held = undefined; | |
bucket.lock.acquire(&held); | |
defer bucket.lock.release(&held); | |
var unparked = UnparkList{}; | |
if (atomic.load(&bucket.waiters, .Relaxed) > 0) { | |
var queue = Queue.find(bucket, address); | |
if (requeue) |requeue_address| { | |
RequeueOp.apply(&queue, &unparked, requeue_address, context); | |
} else { | |
FilterOp.apply(&queue, &unparked, context); | |
} | |
} | |
if (unparked.len > 0) { | |
_ = atomic.fetchSub(&bucket.waiters, unparked.len, .Relaxed); | |
} | |
break :blk unparked; | |
}; | |
while (unparked.pop()) |waiter| { | |
waiter.event.set(); | |
} | |
} | |
const UnparkList = struct { | |
head: *Waiter = undefined, | |
tail: *Waiter = undefined, | |
len: usize = 0, | |
fn push(self: *UnparkList, waiter: *Waiter) void { | |
switch (self.len) { | |
0 => self.head = waiter, | |
else => self.tail.next = waiter, | |
} | |
waiter.next = null; | |
self.tail = waiter; | |
self.len += 1; | |
} | |
fn pop(self: *UnparkList) ?*Waiter { | |
if (self.len == 0) return null; | |
const waiter = self.head; | |
self.head = waiter.next orelse undefined; | |
self.len -= 1; | |
return waiter; | |
} | |
}; | |
pub const RequeueOp = enum{ | |
Abort, | |
UnparkOneLeaveRest, | |
UnparkOneRequeueRest, | |
RequeueOneLeaveRest, | |
RequeueAll, | |
fn apply( | |
queue: *Queue, | |
unparked: *UnparkList, | |
requeue_address: usize, | |
context: anytype, | |
) void { | |
const requeue_bucket = Bucket.from(requeue_address); | |
var requeue_held: Lock.Held = undefined; | |
requeue_bucket.lock.acquire(&requeue_held); | |
defer requeue_bucket.lock.release(&requeue_held); | |
const op: RequeueOp = context.onRequeue(); | |
defer _ = context.onBeforeWake(); | |
switch (op) { | |
.Abort => {}, | |
.UnparkOneLeaveRest => { | |
if (queue.pop()) |waiter| { | |
unparked.push(waiter); | |
} | |
}, | |
.UnparkOneRequeueRest => { | |
if (queue.pop()) |waiter| { | |
unparked.push(waiter); | |
} | |
if (!queue.isEmpty()) { | |
var target_queue = Queue.find(requeue_bucket, requeue_address); | |
target_queue.steal(queue); | |
} | |
}, | |
.RequeueOneLeaveRest => { | |
if (queue.pop()) |waiter| { | |
var target_queue = Queue.find(requeue_bucket, requeue_address); | |
target_queue.push(waiter); | |
} | |
}, | |
.RequeueAll => { | |
if (!queue.isEmpty()) { | |
var target_queue = Queue.find(requeue_bucket, requeue_address); | |
target_queue.steal(queue); | |
} | |
}, | |
} | |
} | |
}; | |
pub const FilterOp = union(enum) { | |
Stop, | |
Skip, | |
Unpark: usize, | |
pub const Context = struct { | |
token: usize, | |
has_more: bool, | |
queue: *Queue, | |
pub fn hasMore(self: Context) bool { | |
return self.has_more; | |
} | |
pub fn getToken(self: Context) usize { | |
return self.token; | |
} | |
pub fn didTick(self: Context) bool { | |
return queue.pollTick(); | |
} | |
}; | |
fn apply( | |
queue: *Queue, | |
unparked: *UnparkList, | |
context: anytype, | |
) void { | |
defer _ = context.onBeforeWake(); | |
var iter = queue.iter(); | |
while (iter.next()) |waiter| { | |
const op: FilterOp = context.onFilter(FilterOp.Context{ | |
.token = waiter.token, | |
.has_more = !iter.isEmpty(), | |
.queue = queue, | |
}); | |
switch (op) { | |
.Stop => break, | |
.Skip => continue, | |
.Unpark => |unpark_token| { | |
waiter.token = unpark_token; | |
queue.remove(waiter); | |
unparked.push(waiter); | |
}, | |
} | |
} | |
} | |
}; | |
}; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// SPDX-License-Identifier: MIT | |
// Copyright (c) 2015-2020 Zig Contributors | |
// This file is part of [zig](https://ziglang.org/), which is MIT licensed. | |
// The MIT license requires this copyright notice to be included in all copies | |
// and substantial portions of the software. | |
const std = @import("std"); | |
const builtin = std.builtin; | |
const system = std.os.system; | |
const assert = std.debug.assert; | |
pub const atomic = struct { | |
pub fn spinLoopHint() void { | |
switch (builtin.arch) { | |
.i386, .x86_64 => asm volatile("pause"), | |
.arm, .aarch64 => asm volatile("yield"), | |
else => {}, | |
} | |
} | |
pub const Ordering = enum { | |
unordered, | |
relaxed, | |
consume, | |
acquire, | |
release, | |
acq_rel, | |
seq_cst, | |
fn toBuiltin(comptime self: Ordering) builtin.AtomicOrder { | |
return switch (self) { | |
.unordered => .Unordered, | |
.relaxed => .Monotonic, | |
.consume => .Acquire, | |
.acquire => .Acquire, | |
.release => .Release, | |
.acq_rel => .AcqRel, | |
.seq_cst => .SeqCst, | |
}; | |
} | |
}; | |
pub fn fence(comptime ordering: Ordering) void { | |
@fence(comptime ordering.toBuiltin()); | |
} | |
pub fn compilerFence(comptime ordering: Ordering) void { | |
switch (ordering) { | |
.unordered => @compileError("Unordered memory ordering can only be on atomic variables"), | |
.relaxed => @compileError("Relaxed memory ordering can only be on atomic variables"), | |
.consume => @compileError("Consume memory ordering can only be on atomic variables"), | |
else => asm volatile("" ::: "memory"), | |
} | |
} | |
pub fn load(ptr: anytype, comptime ordering: Ordering) @TypeOf(ptr.*) { | |
return @atomicLoad(@TypeOf(ptr.*), ptr, comptime ordering.toBuiltin()); | |
} | |
pub fn store(ptr: anytype, value: @TypeOf(ptr.*), comptime ordering: Ordering) void { | |
return @atomicStore(@TypeOf(ptr.*), ptr, value, comptime ordering.toBuiltin()); | |
} | |
pub fn swap(ptr: anytype, value: @TypeOf(ptr.*), comptime ordering: Ordering) @TypeOf(ptr.*) { | |
return atomicRmw(@TypeOf(ptr.*), ptr, .Xchg, value, ordering); | |
} | |
pub fn fetchAdd(ptr: anytype, value: @TypeOf(ptr.*), comptime ordering: Ordering) @TypeOf(ptr.*) { | |
return atomicRmw(@TypeOf(ptr.*), ptr, .Add, value, ordering); | |
} | |
pub fn fetchSub(ptr: anytype, value: @TypeOf(ptr.*), comptime ordering: Ordering) @TypeOf(ptr.*) { | |
return atomicRmw(@TypeOf(ptr.*), ptr, .Sub, value, ordering); | |
} | |
pub fn fetchAnd(ptr: anytype, value: @TypeOf(ptr.*), comptime ordering: Ordering) @TypeOf(ptr.*) { | |
return atomicRmw(@TypeOf(ptr.*), ptr, .And, value, ordering); | |
} | |
pub fn fetchOr(ptr: anytype, value: @TypeOf(ptr.*), comptime ordering: Ordering) @TypeOf(ptr.*) { | |
return atomicRmw(@TypeOf(ptr.*), ptr, .Or, value, ordering); | |
} | |
inline fn atomicRmw(comptime T: type, ptr: *T, comptime op: builtin.AtomicRmwOp, value: T, comptime ordering: Ordering) T { | |
return @atomicRmw(T, ptr, op, value, comptime ordering.toBuiltin()); | |
} | |
pub fn compareAndSwap(ptr: anytype, cmp: @TypeOf(ptr.*), xchg: @TypeOf(ptr.*), comptime success: Ordering, comptime failure: Ordering) ?@TypeOf(ptr.*) { | |
return @cmpxchgStrong(@TypeOf(ptr.*), ptr, cmp, xchg, comptime success.toBuiltin(), comptime failure.toBuiltin()); | |
} | |
pub fn tryCompareAndSwap(ptr: anytype, cmp: @TypeOf(ptr.*), xchg: @TypeOf(ptr.*), comptime success: Ordering, comptime failure: Ordering) ?@TypeOf(ptr.*) { | |
return @cmpxchgStrong(@TypeOf(ptr.*), ptr, cmp, xchg, comptime success.toBuiltin(), comptime failure.toBuiltin()); | |
} | |
}; | |
pub fn nanotime() u64 { | |
if (builtin.os.tag == .windows) { | |
while (true) { | |
const now = @intToPtr(*volatile u64, 0x7FFE0000 + 0x8).*; | |
const high = @intToPtr(*volatile u32, 0x7FFE0000 + 0x8 + 8).*; | |
if (high == @truncate(u32, now >> 32)) | |
return now * 100; | |
} | |
} | |
if (comptime std.Target.current.isDarwin()) { | |
var freq: system.mach_timebase_info_data = undefined; | |
system.mach_timebase_info(&freq); | |
var now = system.mach_absolute_time(); | |
if (freq.numer != 1) | |
now *= freq.numer; | |
if (freq.denom != 1) | |
now *= freq.denom; | |
return now; | |
} | |
var ts: system.timespec = undefined; | |
std.os.clock_gettime(system.CLOCK_MONOTONIC, &ts) catch unreachable; | |
return @intCast(u64, ts.tv_sec) * @as(u64, std.time.ns_per_s) + @intCast(u64, ts.tv_nsec); | |
} | |
pub const Event = | |
if (comptime builtin.os.tag == .windows) | |
struct { | |
key: u8 align(4), | |
pub fn init(self: *Event) void {} | |
pub fn deinit(self: *Event) void {} | |
pub fn reset(self: *Event) void {} | |
pub fn wait(self: *Event, deadline: ?u64) error{TimedOut}!void { | |
var timeout: system.LARGE_INTEGER = undefined; | |
var timeout_ptr: ?*const @TypeOf(timeout) = null; | |
if (deadline) |deadline_ns| { | |
const now = nanotime(); | |
if (now > deadline_ns) | |
return error.TimedOut; | |
timeout_ptr = &timeout; | |
timeout = -@intCast(system.LARGE_INTEGER, @divFloor(deadline_ns - now, 100)); | |
} | |
const key = @ptrCast(*align(4) const c_void, &self.key); | |
return switch (NtWaitForKeyedEvent(null, key, system.FALSE, timeout_ptr)) { | |
.SUCCESS => {}, | |
.TIMEOUT => error.TimedOut, | |
else => unreachable, | |
}; | |
} | |
pub fn notify(self: *Event) void { | |
const key = @ptrCast(*align(4) const c_void, &self.key); | |
const status = NtReleaseKeyedEvent(null, key, system.FALSE, null); | |
assert(status == .SUCCESS); | |
} | |
pub fn yield(iteration: ?usize) bool { | |
const iter = iteration orelse { | |
system.kernel32.Sleep(0); | |
return false; | |
}; | |
const max_iter = 4000; | |
if (iter > max_iter) | |
return false; | |
if (iter < 2000) { | |
atomic.spinLoopHint(); | |
} else if (iter < 3000) { | |
_ = system.kernel32.SwitchToThread(); | |
} else { | |
system.kernel32.Sleep(0); | |
} | |
return true; | |
} | |
extern "NtDll" fn NtWaitForKeyedEvent( | |
handle: ?system.HANDLE, | |
key: ?*align(4) const c_void, | |
alertable: system.BOOLEAN, | |
timeout: ?*const system.LARGE_INTEGER, | |
) callconv(system.WINAPI) system.NTSTATUS; | |
extern "NtDll" fn NtReleaseKeyedEvent( | |
handle: ?system.HANDLE, | |
key: ?*align(4) const c_void, | |
alertable: system.BOOLEAN, | |
timeout: ?*const system.LARGE_INTEGER, | |
) callconv(system.WINAPI) system.NTSTATUS; | |
} | |
else if (Futex.uses_os) | |
struct { | |
state: enum(u32) { | |
empty = 0, | |
waiting, | |
notified, | |
}, | |
pub fn init(self: *Event) void { | |
self.reset(); | |
} | |
pub fn deinit(self: *Event) void { | |
self.* = undefined; | |
} | |
pub fn reset(self: *Event) void { | |
self.state = .empty; | |
} | |
pub fn wait(self: *Event, deadline: ?u64) error{TimedOut}!void { | |
switch (atomic.swap(&self.state, .waiting, .acquire)) { | |
.empty => {}, | |
.waiting => unreachable, | |
.notified => return, | |
} | |
while (true) { | |
var timeout: ?u64 = null; | |
if (deadline) |deadline_ns| { | |
const now = nanotime(); | |
if (now > deadline_ns) { | |
return switch (atomic.swap(&self.state, .waiting, .acquire)) { | |
.empty => unreachable, | |
.waiting => error.TimedOut, | |
.notified => {}, | |
}; | |
} else { | |
timeout = deadline_ns - now; | |
} | |
} | |
Futex.wait( | |
@ptrCast(*const u32, &self.state), | |
@enumToInt(@TypeOf(self.state).waiting), | |
timeout, | |
) catch {}; | |
switch (atomic.load(&self.state, .acquire)) { | |
.empty => unreachable, | |
.waiting => {}, | |
.notified => return, | |
} | |
} | |
} | |
pub fn notify(self: *Event) void { | |
switch (atomic.swap(&self.state, .notified, .release)) { | |
.empty => return, | |
.waiting => {}, | |
.notified => unreachable, | |
} | |
const notify_all = true; | |
Futex.wake( | |
@ptrCast(*const u32, &self.state), | |
notify_all, | |
); | |
} | |
pub fn yield(iteration: ?usize) bool { | |
return Futex.yield(iteration); | |
} | |
} | |
else if (builtin.link_libc) | |
struct { | |
state: enum{ empty, waiting, notified }, | |
cond: if (!builtin.link_libc) void else system.pthread_cond_t, | |
mutex: if (!builtin.link_libc) void else system.pthread_mutex_t, | |
pub fn init(self: *Event) void { | |
self.* = .{ | |
.state = .empty, | |
.cond = system.PTHREAD_COND_INITIALIZER, | |
.mutex = system.PTHREAD_MUTEX_INITIALIZER, | |
}; | |
} | |
pub fn deinit(self: *Event) void { | |
const ok_err = switch (builtin.os.tag) { | |
.dragonfly, .openbsd => system.EINVAL, | |
else => 0, | |
}; | |
const ret_cond = system.pthread_cond_destroy(&self.cond); | |
assert(ret_cond == 0 or ret_cond == ok_err); | |
const ret_mutex = system.pthread_mutex_destroy(&self.mutex); | |
assert(ret_mutex == 0 or ret_mutex == ok_err); | |
} | |
pub fn reset(self: *Event) void { | |
self.state = .empty; | |
} | |
pub fn wait(self: *Event, deadline: ?u64) error{TimedOut}!void { | |
assert(system.pthread_mutex_lock(&self.mutex) == 0); | |
defer assert(system.pthread_mutex_unlock(&self.mutex) == 0); | |
switch (self.state) { | |
.empty => self.state = .waiting, | |
.waiting => unreachable, | |
.notified => return, | |
} | |
while (true) { | |
switch (self.state) { | |
.empty => unreachable, | |
.waiting => {}, | |
.notified => return, | |
} | |
const deadline_ns = deadline orelse { | |
assert(system.pthread_cond_wait(&self.cond, &self.mutex) == 0); | |
continue; | |
}; | |
const now = nanotime(); | |
if (now > deadline_ns) | |
return error.TimedOut; | |
var timeout = deadline_ns - now; | |
if (comptime std.Target.current.isDarwin()) { | |
var tv: system.timeval = undefined; | |
assert(system.gettimeofday(&tv, null) == 0); | |
timeout += @intCast(u64, tv.tv_sec) * std.time.ns_per_s; | |
timeout += @intCast(u64, tv.tv_usec) * std.time.ns_per_us; | |
} else { | |
var ts: system.timespec = undefined; | |
std.os.clock_gettime(system.CLOCK_REALTIME, &ts) catch unreachable; | |
timeout += @intCast(u64, ts.tv_sec) * std.time.ns_per_s; | |
timeout += @intCast(u64, ts.tv_nsec); | |
} | |
var ts: system.timespec = undefined; | |
ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), @divFloor(timeout, std.time.ns_per_s)); | |
ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), @mod(timeout, std.time.ns_per_s)); | |
switch (system.pthread_cond_timedwait(&self.cond, &self.mutex, &ts)) { | |
0 => {}, | |
system.ETIMEDOUT => {}, | |
system.EPERM => unreachable, | |
system.EINVAL => unreachable, | |
else => unreachable, | |
} | |
} | |
} | |
pub fn notify(self: *Event) void { | |
assert(system.pthread_mutex_lock(&self.mutex) == 0); | |
defer assert(system.pthread_mutex_unlock(&self.mutex) == 0); | |
switch (self.state) { | |
.empty => self.state = .notified, | |
.waiting => { | |
self.state = .notified; | |
assert(system.pthread_cond_signal(&self.cond) == 0); | |
}, | |
.notified => unreachable, | |
} | |
} | |
pub fn yield(iteration: ?usize) bool { | |
const iter = iteration orelse { | |
_ = system.sched_yield(); | |
return false; | |
}; | |
if (iter < 100) { | |
atomic.spinLoopHint(); | |
return true; | |
} | |
return false; | |
} | |
} | |
else | |
struct { | |
notified: bool, | |
pub fn init(self: *Event) void { | |
self.reset(); | |
} | |
pub fn deinit(self: *Event) void { | |
self.* = undefined; | |
} | |
pub fn reset(self: *Event) void { | |
self.notified = false; | |
} | |
pub fn wait(self: *Event, deadline: ?u64) error{TimedOut}!void { | |
while (!atomic.load(&self.notified, .acquire)) | |
atomic.spinLoopHint(); | |
} | |
pub fn notify(self: *Event) void { | |
atomic.store(&self.notified, true, .release); | |
} | |
pub fn yield(iteration: usize) bool { | |
atomic.spinLoopHint(); | |
return false; | |
} | |
}; | |
pub const Lock = | |
if (std.builtin.os.tag == .windows) | |
struct { | |
srwlock: if (std.builtin.os.tag == .windows) SRWLOCK else void = SRWLOCK_INIT, | |
pub fn acquire(self: *Lock) void { | |
AcquireSRWLockExclusive(&self.srwlock); | |
} | |
pub fn release(self: *Lock) void { | |
ReleaseSRWLockExclusive(&self.srwlock); | |
} | |
const SRWLOCK = ?system.PVOID; | |
const SRWLOCK_INIT: SRWLOCK = null; | |
extern "kernel32" fn AcquireSRWLockExclusive( | |
srwlock: *SRWLOCK, | |
) callconv(system.WINAPI) void; | |
extern "kernel32" fn ReleaseSRWLockExclusive( | |
srwlock: *SRWLOCK, | |
) callconv(system.WINAPI) void; | |
} | |
else if (comptime std.Target.current.isDarwin()) | |
struct { | |
lock: os_unfair_lock = OS_UNFAIR_LOCK_INIT, | |
pub fn acquire(self: *Lock) void { | |
os_unfair_lock_lock(&self.lock); | |
} | |
pub fn release(self: *Lock) void { | |
os_unfair_lock_unlock(&self.lock); | |
} | |
const os_unfair_lock = u32; | |
const os_unfair_lock_t = *os_unfair_lock; | |
const OS_UNFAIR_LOCK_INIT: os_unfair_lock = 0; | |
extern "c" fn os_unfair_lock_lock( | |
lock: os_unfair_lock_t, | |
) callconv(.C) void; | |
extern "c" fn os_unfair_lock_unlock( | |
lock: os_unfair_lock_t, | |
) callconv(.C) void; | |
} | |
else if (Futex.uses_os) | |
struct { | |
state: enum(u32){ | |
unlocked = 0, | |
locked, | |
waiting, | |
} = .unlocked, | |
pub fn acquire(self: *Lock) void { | |
const state = atomic.swap(&self.state, .locked, .acquire); | |
if (state != .unlocked) | |
self.acquireSlow(state); | |
} | |
fn acquireSlow(self: *Lock, current_state: @TypeOf(@as(Lock, undefined).state)) void { | |
@setCold(true); | |
var spin_iter: usize = 0; | |
var lock_state = current_state; | |
while (true) { | |
while (true) { | |
switch (atomic.load(&self.state, .relaxed)) { | |
.unlocked => _ = atomic.tryCompareAndSwap( | |
&self.state, | |
.unlocked, | |
lock_state, | |
.acquire, | |
.relaxed, | |
) orelse return, | |
.locked => {}, | |
.waiting => break, | |
} | |
if (Futex.yield(spin_iter)) { | |
spin_iter +%= 1; | |
} else { | |
break; | |
} | |
} | |
const state = atomic.swap(&self.state, .waiting, .acquire); | |
if (state == .unlocked) | |
return; | |
spin_iter = 0; | |
lock_state = .waiting; | |
Futex.wait( | |
@ptrCast(*const u32, &self.state), | |
@enumToInt(lock_state), | |
null, | |
) catch {}; | |
} | |
} | |
pub fn release(self: *Lock) void { | |
switch (atomic.swap(&self.state, .unlocked, .release)) { | |
.unlocked => unreachable, | |
.locked => {}, | |
.waiting => self.releaseSlow(), | |
} | |
} | |
fn releaseSlow(self: *Lock) void { | |
@setCold(true); | |
const notify_all = false; | |
Futex.wake( | |
@ptrCast(*const u32, &self.state), | |
notify_all, | |
); | |
} | |
} | |
else | |
struct { | |
state: usize = UNLOCKED, | |
const UNLOCKED = 0; | |
const LOCKED = 1; | |
const WAKING = 1 << 8; | |
const WAITING = ~@as(usize, (1 << 9) - 1); | |
const Waiter = struct { | |
prev: ?*Waiter align(std.math.max(@alignOf(usize), ~WAITING + 1)), | |
next: ?*Waiter, | |
tail: ?*Waiter, | |
event: Event, | |
}; | |
inline fn tryAcquire(self: *Lock) bool { | |
return switch (builtin.arch) { | |
.i386, .x86_64 => asm volatile( | |
"lock btsw $0, %[ptr]" | |
: [ret] "={@ccc}" (-> u8), | |
: [ptr] "*m" (&self.state) | |
: "cc", "memory" | |
) == 0, | |
else => atomic.swap( | |
@ptrCast(*u8, self.state), | |
LOCKED, | |
.acquire, | |
) == UNLOCKED, | |
}; | |
} | |
pub fn acquire(self: *Lock) void { | |
if (!self.tryAcquire()) | |
self.acquireSlow(); | |
} | |
pub fn release(self: *Lock) void { | |
atomic.store(@ptrCast(*u8, &self.state), UNLOCKED, .release); | |
const state = atomic.load(&self.state, .relaxed); | |
if (state & WAITING != 0) | |
self.releaseSlow(); | |
} | |
fn acquireSlow(self: *Lock) void { | |
@setCold(true); | |
var waiter: Waiter = undefined; | |
var has_event = false; | |
defer if (has_event) | |
waiter.event.deinit(); | |
var spin_iter: usize = 0; | |
var state = atomic.load(&self.state, .relaxed); | |
while (true) { | |
if (state & LOCKED == 0) { | |
if (self.tryAcquire()) | |
return; | |
std.os.sched_yield() catch unreachable; | |
state = atomic.load(&self.state, .relaxed); | |
continue; | |
} | |
const head = @intToPtr(?*Waiter, state & WAITING); | |
if (head == null and Event.yield(spin_iter)) { | |
spin_iter +%= 1; | |
state = atomic.load(&self.state, .relaxed); | |
continue; | |
} | |
waiter.prev = null; | |
waiter.next = head; | |
waiter.tail = if (head == null) &waiter else null; | |
if (!has_event) { | |
has_event = true; | |
waiter.event.init(); | |
} | |
state = atomic.tryCompareAndSwap( | |
&self.state, | |
state, | |
(state & ~WAITING) | @ptrToInt(&waiter), | |
.release, | |
.relaxed, | |
) orelse blk: { | |
waiter.event.wait(null) catch unreachable; | |
waiter.event.reset(); | |
spin_iter = 0; | |
break :blk atomic.load(&self.state, .relaxed); | |
}; | |
} | |
} | |
fn releaseSlow(self: *Lock) void { | |
@setCold(true); | |
var state = atomic.load(&self.state, .relaxed); | |
while (true) { | |
if ((state & WAITING == 0) or (state & (LOCKED | WAKING) != 0)) | |
return; | |
state = atomic.tryCompareAndSwap( | |
&self.state, | |
state, | |
state | WAKING, | |
.acquire, | |
.relaxed, | |
) orelse break; | |
} | |
state |= WAKING; | |
while (true) { | |
const head = @intToPtr(*Waiter, state & WAITING); | |
const tail = head.tail orelse blk: { | |
var current = head; | |
while (true) { | |
const next = current.next orelse unreachable; | |
next.prev = current; | |
current = next; | |
if (current.tail) |tail| { | |
head.tail = tail; | |
break :blk tail; | |
} | |
} | |
}; | |
if (state & LOCKED != 0) { | |
state = atomic.tryCompareAndSwap( | |
&self.state, | |
state, | |
state & ~@as(usize, WAKING), | |
.release, | |
.acquire, | |
) orelse return; | |
continue; | |
} | |
if (tail.prev) |new_tail| { | |
head.tail = new_tail; | |
_ = atomic.fetchAnd(&self.state, ~@as(usize, WAKING), .release); | |
} else if (atomic.tryCompareAndSwap( | |
&self.state, | |
state, | |
UNLOCKED, | |
.release, | |
.acquire, | |
)) |updated| { | |
state = updated; | |
continue; | |
} | |
tail.event.notify(); | |
return; | |
} | |
} | |
}; | |
pub const parking_lot = struct { | |
const Bucket = struct { | |
lock: Lock = .{}, | |
tree: Tree = .{}, | |
fairness: Fairness = .{}, | |
var array = [_]Bucket{Bucket{}} ** 256; | |
pub fn get(address: usize) *Bucket { | |
const seed = @truncate(usize, 0x9E3779B97F4A7C15); | |
const max = @popCount(usize, ~@as(usize, 0)); | |
const bits = @ctz(usize, array.len); | |
const index = (address *% seed) >> (max - bits); | |
return &array[index]; | |
} | |
}; | |
const Fairness = struct { | |
xorshift: u32 = 0, | |
times_out: u64 = 0, | |
const interval = 1 * std.time.ns_per_ms; | |
pub fn expired(self: *Fairness) bool { | |
if (self.xorshift == 0) | |
self.xorshift = @truncate(u32, @ptrToInt(self) >> @sizeOf(u32)); | |
const now = nanotime(); | |
if (now < self.times_out) | |
return false; | |
self.xorshift ^= self.xorshift << 13; | |
self.xorshift ^= self.xorshift >> 17; | |
self.xorshift ^= self.xorshift << 5; | |
self.times_out = now + (self.xorshift % interval); | |
return true; | |
} | |
}; | |
const Tree = struct { | |
tree_head: ?*Waiter = null, | |
pub fn insert(self: *Tree, address: usize, waiter: *Waiter) void { | |
waiter.address = address; | |
waiter.tree_next = null; | |
waiter.next = null; | |
waiter.tail = waiter; | |
if (self.lookup(address, &waiter.tree_prev)) |head| { | |
const tail = head.tail orelse unreachable; | |
tail.next = waiter; | |
waiter.prev = tail; | |
head.tail = waiter; | |
return; | |
} | |
waiter.prev = null; | |
if (waiter.tree_prev) |prev| { | |
prev.tree_next = waiter; | |
} else { | |
self.tree_head = waiter; | |
} | |
} | |
pub fn iter(self: *Tree, address: usize) Iter { | |
const head = self.lookup(address, null); | |
return .{ | |
.head = head, | |
.iter = head, | |
.tree = self, | |
}; | |
} | |
fn lookup(self: *Tree, address: usize, parent: ?*?*Waiter) ?*Waiter { | |
var waiter = self.tree_head; | |
if (parent) |p| | |
p.* = waiter; | |
while (true) { | |
const head = waiter orelse return null; | |
if (head.address == address) | |
return head; | |
waiter = head.tree_next; | |
if (parent) |p| | |
p.* = head; | |
} | |
} | |
fn replace(self: *Tree, waiter: *Waiter, new_waiter: *Waiter) void { | |
new_waiter.tree_next = waiter.tree_next; | |
new_waiter.tree_prev = waiter.tree_prev; | |
if (new_waiter.tree_prev) |prev| | |
prev.tree_next = new_waiter; | |
if (new_waiter.tree_next) |next| | |
next.tree_prev = new_waiter; | |
if (self.tree_head == waiter) | |
self.tree_head = new_waiter; | |
} | |
fn remove(self: *Tree, waiter: *Waiter) void { | |
if (waiter.tree_next) |next| | |
next.tree_prev = waiter.tree_prev; | |
if (waiter.tree_prev) |prev| | |
prev.tree_next = waiter.tree_next; | |
if (self.tree_head == waiter) | |
self.tree_head = null; | |
} | |
}; | |
const Iter = struct { | |
head: ?*Waiter, | |
iter: ?*Waiter, | |
tree: *Tree, | |
pub fn isEmpty(self: Iter) bool { | |
return self.iter == null; | |
} | |
pub fn next(self: *Iter) ?*Waiter { | |
const waiter = self.iter orelse return null; | |
self.iter = waiter.next; | |
return waiter; | |
} | |
pub fn isQueueEmpty(self: Iter) bool { | |
return self.head == null; | |
} | |
pub fn tryQueueRemove(self: *Iter, waiter: *Waiter) bool { | |
const head = self.head orelse return false; | |
if (waiter.tail == null) | |
return false; | |
if (self.iter == waiter) | |
self.iter = waiter.next; | |
if (waiter.prev) |p| | |
p.next = waiter.next; | |
if (waiter.next) |n| | |
n.prev = waiter.prev; | |
if (waiter == head) { | |
self.head = waiter.next; | |
} else if (waiter == head.tail) { | |
head.tail = waiter.prev orelse unreachable; | |
} | |
if (waiter == head) { | |
if (self.head) |new_head| { | |
new_head.tail = waiter.tail; | |
self.tree.replace(waiter, new_head); | |
} else { | |
self.tree.remove(waiter); | |
} | |
} | |
waiter.tail = null; | |
return true; | |
} | |
}; | |
const Waiter = struct { | |
tree_prev: ?*Waiter, | |
tree_next: ?*Waiter, | |
prev: ?*Waiter, | |
next: ?*Waiter, | |
tail: ?*Waiter, | |
address: usize, | |
token: usize, | |
event: Event, | |
}; | |
pub const UnparkContext = struct { | |
token: *usize, | |
iter: *Iter, | |
fairness: *Fairness, | |
pub fn getToken(self: UnparkContext) usize { | |
return self.token.*; | |
} | |
pub fn hasMore(self: UnparkContext) bool { | |
return !self.iter.isEmpty(); | |
} | |
pub fn beFair(self: UnparkContext) bool { | |
return self.fairness.expired(); | |
} | |
}; | |
pub fn parkConditionally(address: usize, deadline: ?u64, context: anytype) ?usize { | |
var bucket = Bucket.get(address); | |
bucket.lock.acquire(); | |
const token: usize = context.onValidate() orelse { | |
bucket.lock.release(); | |
return null; | |
}; | |
var waiter: Waiter = undefined; | |
waiter.token = token; | |
bucket.tree.insert(address, &waiter); | |
waiter.event.init(); | |
defer waiter.event.deinit(); | |
bucket.lock.release(); | |
var timed_out = false; | |
context.onBeforeWait(); | |
waiter.event.wait(deadline) catch { | |
timed_out = true; | |
}; | |
if (!timed_out) | |
return waiter.token; | |
bucket = Bucket.get(address); | |
bucket.lock.acquire(); | |
var iter = bucket.tree.iter(address); | |
if (iter.tryQueueRemove(&waiter)) { | |
context.onTimeout(!iter.isEmpty()); | |
bucket.lock.release(); | |
return null; | |
} | |
bucket.lock.release(); | |
waiter.event.wait(null) catch unreachable; | |
return waiter.token; | |
} | |
pub const UnparkFilter = union(enum) { | |
stop: void, | |
skip: void, | |
unpark: usize, | |
}; | |
pub fn unparkFilter(address: usize, context: anytype) void { | |
const bucket = Bucket.get(address); | |
bucket.lock.acquire(); | |
var wake_list: ?*Waiter = null; | |
var iter = bucket.tree.iter(address); | |
while (iter.next()) |waiter| { | |
switch (@as(UnparkFilter, context.onFilter(UnparkContext{ | |
.token = &waiter.token, | |
.iter = &iter, | |
.fairness = &bucket.fairness, | |
}))) { | |
.stop => break, | |
.skip => continue, | |
.unpark => |new_token| { | |
assert(iter.tryQueueRemove(waiter)); | |
waiter.token = new_token; | |
waiter.next = wake_list; | |
wake_list = waiter; | |
}, | |
} | |
} | |
context.onBeforeWake(); | |
bucket.lock.release(); | |
while (true) { | |
const waiter = wake_list orelse break; | |
wake_list = waiter.next; | |
waiter.event.notify(); | |
} | |
} | |
pub const UnparkResult = struct { | |
token: ?usize = null, | |
has_more: bool = false, | |
be_fair: bool = false, | |
}; | |
pub fn unparkOne(address: usize, context: anytype) void { | |
const Context = @TypeOf(context); | |
const Filter = struct { | |
ctx: Context, | |
called_unpark: bool = false, | |
pub fn onFilter(this: *@This(), unpark_context: UnparkContext) UnparkFilter { | |
if (this.called_unpark) | |
return .stop; | |
const unpark_token: usize = this.ctx.onUnpark(UnparkResult{ | |
.token = unpark_context.getToken(), | |
.has_more = unpark_context.hasMore(), | |
.be_fair = unpark_context.beFair(), | |
}); | |
this.called_unpark = true; | |
return .{ .unpark = unpark_token }; | |
} | |
pub fn onBeforeWake(this: @This()) void { | |
if (!this.called_unpark) { | |
_ = this.ctx.onUnpark(UnparkResult{}); | |
} | |
} | |
}; | |
var filter = Filter{ .ctx = context }; | |
return unparkFilter(address, &filter); | |
} | |
pub fn unparkAll(address: usize) void { | |
const Filter = struct { | |
pub fn onBeforeWake(this: @This()) void {} | |
pub fn onFilter(this: @This(), _: UnparkContext) UnparkFilter { | |
return .{ .unpark = 0 }; | |
} | |
}; | |
var filter = Filter{}; | |
return unparkFilter(address, filter); | |
} | |
}; | |
pub const Futex = | |
if (std.builtin.os.tag == .linux) | |
struct { | |
pub const uses_os = true; | |
pub fn wait(ptr: *const u32, cmp: u32, timeout: ?u64) error{TimedOut}!void { | |
var ts: system.timespec = undefined; | |
var ts_ptr: ?*system.timespec = null; | |
if (timeout) |timeout_ns| { | |
ts_ptr = &ts; | |
ts.tv_sec = @intCast(isize, timeout_ns / std.time.ns_per_s); | |
ts.tv_nsec = @intCast(isize, timeout_ns % std.time.ns_per_s); | |
} | |
return switch (system.getErrno(std.os.linux.futex_wait( | |
@ptrCast(*const i32, ptr), | |
system.FUTEX_PRIVATE_FLAG | system.FUTEX_WAIT, | |
@bitCast(i32, cmp), | |
ts_ptr, | |
))) { | |
0 => {}, | |
system.EINTR => {}, | |
system.EAGAIN => {}, | |
system.EFAULT => {}, | |
system.ETIMEDOUT => error.TimedOut, | |
else => unreachable, | |
}; | |
} | |
pub fn wake(ptr: *const u32, notify_all: bool) void { | |
return switch (system.getErrno(std.os.linux.futex_wake( | |
@ptrCast(*const i32, ptr), | |
system.FUTEX_PRIVATE_FLAG | system.FUTEX_WAKE, | |
if (notify_all) std.math.maxInt(i32) else 1, | |
))) { | |
0 => {}, | |
system.EINVAL => {}, | |
else => unreachable, | |
}; | |
} | |
pub fn yield(iteration: ?usize) bool { | |
const cpu_yield = 4; | |
const thread_yield = 1; | |
const iter = iteration orelse cpu_yield; | |
if (iter < cpu_yield) { | |
var spin: u8 = 30; | |
while (spin > 0) : (spin -= 1) | |
atomic.spinLoopHint(); | |
return true; | |
} | |
if (iter < cpu_yield + thread_yield) { | |
_ = system.sched_yield(); | |
return true; | |
} | |
return false; | |
} | |
} | |
else if (comptime std.Target.current.isDarwin()) | |
struct { | |
pub const uses_os = true; | |
pub fn wait(ptr: *const u32, cmp: u32, timeout: ?u64) error{TimedOut}!void { | |
var timeout_us = std.math.maxInt(u32); | |
if (timeout) |timeout_ns| | |
timeout_us = @intCast(u32, @divFloor(timeout_ns, std.time.ns_per_us)); | |
const ret = __ulock_wait( | |
UL_COMPARE_AND_WAIT | ULF_NO_ERRNO, | |
@ptrCast(*c_void, ptr), | |
@as(u64, cmp), | |
timeout_us, | |
); | |
if (ret < 0) { | |
switch (-ret) { | |
system.EINTR => {}, | |
system.EFAULT => {}, | |
system.ETIMEDOUT => return error.TimedOut, | |
else => unreachable, | |
} | |
} | |
} | |
pub fn wake(ptr: *const u32, notify_all: bool) void { | |
var operation: u32 = UL_COMPARE_AND_WAIT | ULF_NO_ERRNO; | |
if (notify_all) | |
operation |= ULF_WAKE_ALL; | |
while (true) { | |
const ret = __ulock_wake( | |
operation, | |
@ptrCast(*c_void, ptr), | |
@as(u64, 0), | |
); | |
if (ret < 0) { | |
switch (-ret) { | |
system.ENOENT => {}, | |
system.EINTR => continue, | |
else => unreachable, | |
} | |
} | |
return; | |
} | |
} | |
pub fn yield(iteration: ?usize) bool { | |
@compileError("TODO"); | |
} | |
const UL_COMPARE_AND_WAIT = 1; | |
const ULF_WAKE_ALL = 0x100; | |
const ULF_NO_ERRNO = 0x1000000; | |
extern "c" fn __ulock_wait( | |
operation: u32, | |
address: ?*c_void, | |
value: u64, | |
timeout_us: u32, | |
) callconv(.C) c_int; | |
extern "c" fn __ulock_wake( | |
operation: u32, | |
address: ?*c_void, | |
wake_value: u64, | |
) callconv(.C) c_int; | |
} | |
else | |
struct { | |
pub const uses_os = false; | |
pub fn wait(ptr: *const u32, cmp: u32, timeout: ?u64) error{TimedOut}!void { | |
const Parker = struct { | |
pointer: *const u32, | |
compare: u32, | |
timed_out: bool = false, | |
pub fn onValidate(this: @This()) ?usize { | |
if (atomic.load(this.pointer, .seq_cst) != this.compare) | |
return null; | |
return 0; | |
} | |
pub fn onBeforeWait(this: @This()) void {} | |
pub fn onTimeout(this: *@This(), _: parking_lot) void { | |
self.timed_out = true; | |
} | |
}; | |
var parker = Parker{ | |
.pointer = ptr, | |
.compare = cmp, | |
}; | |
_ = parking_lot.parkConditionally(@ptrToInt(address), &parker); | |
if (parker.timed_out) | |
return error.TimedOut; | |
} | |
pub fn wake(ptr: *const u32, notify_all: bool) void { | |
if (notify_all) | |
return parking_lot.unparkAll(@ptrToInt(ptr)); | |
const Unparker = struct { | |
pub fn onUnpark(this: @This(), result: UnparkResult) usize { | |
return 0; | |
} | |
}; | |
parking_lot.unparkOne(@ptrToInt(ptr), Unparker{}); | |
} | |
pub fn yield(iteration: ?usize) bool { | |
return Event.yield(iteration); | |
} | |
}; | |
pub const ResetEvent = struct { | |
is_set: bool = false, | |
pub fn isSet(self: *const ResetEvent) bool { | |
return atomic.load(&self.is_set, .seq_cst); | |
} | |
pub fn reset(self: *ResetEvent) void { | |
return atomic.store(&self.is_set, false, .seq_cst); | |
} | |
pub fn wait(self: *ResetEvent) void { | |
self.waitInner(null) catch unreachable; | |
} | |
pub fn tryWaitFor(self: *ResetEvent, duration: u64) error{TimedOut}!void { | |
return self.tryWaitUntil(nanotime() + duration); | |
} | |
pub fn tryWaitUntil(self: *ResetEvent, deadline: u64) error{TimedOut}!void { | |
return self.waitInner(deadline); | |
} | |
fn waitInner(self: *ResetEvent, deadline: ?u64) error{TimedOut}!void { | |
const Parker = struct { | |
event: *ResetEvent, | |
timed_out: bool = false, | |
pub fn onValidate(this: @This()) ?usize { | |
if (this.event.isSet()) | |
return null; | |
return 0; | |
} | |
pub fn onBeforeWait(this: @This()) void {} | |
pub fn onTimeout(this: *@This(), has_more: bool) void { | |
self.timed_out = true; | |
} | |
}; | |
var parker = Parker{ .event = self }; | |
while (true) { | |
if (this.event.isSet()) | |
return; | |
if (parker.timed_out) | |
return error.TimedOut; | |
_ = parking_lot.parkConditionally(@ptrToInt(self), deadline, &parker); | |
} | |
} | |
pub fn set(self: *ResetEvent) void { | |
atomic.store(&self.is_set, true, .seq_cst); | |
parking_lot.unparkAll(@ptrToInt(self)); | |
} | |
}; | |
pub const Mutex = struct { | |
state: u8 = UNLOCKED, | |
const UNLOCKED = 0; | |
const LOCKED = 1; | |
const PARKED = 2; | |
const RETRY = 0; | |
const HANDOFF = 1; | |
const TIMEOUT = 2; | |
const is_x86 = switch (builtin.arch) { | |
.i386, .x86_64 => true, | |
else => false, | |
}; | |
/// Try to acquire the mutex by setting the lock bit using x86 specific optimizations. | |
inline fn tryAcquirex86(self: *Mutex) bool { | |
// "lock btsl" has a smaller instruction-cache hit than "lock cmpxchg" as below. | |
// Its safe to use a dword instruction to operate on byte memory as, on x86, | |
// if any part of the address is valid, then the instruction succeeds AFAIK. | |
return asm volatile( | |
"lock btsl $0, %[ptr]" | |
: [ret] "={@ccc}" (-> u8), | |
: [ptr] "*m" (&self.state) | |
: "cc", "memory" | |
) == 0; | |
} | |
pub fn tryAcquire(self: *Mutex) bool { | |
if (is_x86) | |
return self.tryAcquirex86(); | |
var state = atomic.load(&self.state, .relaxed); | |
while (true) { | |
if (state & LOCKED != 0) | |
return false; | |
state = atomic.tryCompareAndSwap( | |
&self.state, | |
state, | |
state | LOCKED, | |
.acquire, | |
.relaxed, | |
) orelse return true; | |
} | |
} | |
pub fn acquire(self: *Mutex) void { | |
self.acquireInner(null) catch unreachable; | |
} | |
pub fn tryAcquireFor(self: *Mutex, duration: u64) error{TimedOut}!void { | |
return self.tryAcquireUntil(nanotime() + duration); | |
} | |
pub fn tryAcquireUntil(self: *Mutex, deadline: u64) error{TimedOut}!void { | |
return self.acquireInner(deadline); | |
} | |
pub inline fn release(self: *Mutex) void { | |
self.releaseInner(false); | |
} | |
pub inline fn releaseFair(self: *Mutex) void { | |
self.releaseInner(true); | |
} | |
inline fn acquireInner(self: *Mutex, deadline: ?u64) error{TimedOut}!void { | |
if (is_x86) { | |
if (self.tryAcquirex86()) | |
return; | |
} else if (atomic.tryCompareAndSwap( | |
&self.state, | |
UNLOCKED, | |
LOCKED, | |
.acquire, | |
.relaxed, | |
) == null) { | |
return; | |
} | |
return self.acquireSlow(deadline); | |
} | |
inline fn releaseInner(self: *Mutex, be_fair: bool) void { | |
if (atomic.tryCompareAndSwap( | |
&self.state, | |
LOCKED, | |
UNLOCKED, | |
.release, | |
.relaxed, | |
)) |_| { | |
self.releaseSlow(be_fair); | |
} | |
} | |
fn acquireSlow(self: *Mutex, deadline: ?u64) error{TimedOut}!void { | |
@setCold(true); | |
var spin_iter: usize = 0; | |
var state = atomic.load(&self.state, .relaxed); | |
while (true) { | |
if (state & LOCKED == 0) { | |
if (is_x86) { | |
if (self.tryAcquirex86()) | |
return; | |
} else if (atomic.tryCompareAndSwap( | |
&self.state, | |
state, | |
state | LOCKED, | |
.acquire, | |
.relaxed, | |
) == null) { | |
return; | |
} | |
_ = Event.yield(null); | |
state = atomic.load(&self.state, .relaxed); | |
continue; | |
} | |
if (state & PARKED == 0) { | |
if (Event.yield(spin_iter)) { | |
spin_iter +%= 1; | |
state = atomic.load(&self.state, .relaxed); | |
continue; | |
} | |
if (atomic.tryCompareAndSwap( | |
&self.state, | |
state, | |
state | PARKED, | |
.relaxed, | |
.relaxed, | |
)) |updated| { | |
state = updated; | |
continue; | |
} | |
} | |
const Parker = struct { | |
mutex: *Mutex, | |
timed_out: bool = false, | |
pub fn onValidate(this: @This()) ?usize { | |
if (atomic.load(&this.mutex.state, .relaxed) != (LOCKED | PARKED)) | |
return null; | |
return 0; | |
} | |
pub fn onBeforeWait(this: @This()) void {} | |
pub fn onTimeout(this: *@This(), has_more: bool) void { | |
this.timed_out = true; | |
if (has_more) { | |
_ = atomic.fetchAnd(&this.mutex.state, ~@as(u8, PARKED), .relaxed); | |
} | |
} | |
}; | |
var parker = Parker{ .mutex = self }; | |
var token = parking_lot.parkConditionally(@ptrToInt(self), deadline, &parker); | |
if (parker.timed_out) | |
token = TIMEOUT; | |
switch (token orelse RETRY) { | |
RETRY => {}, | |
TIMEOUT => return error.TimedOut, | |
HANDOFF => return, | |
else => unreachable, | |
} | |
spin_iter = 0; | |
state = atomic.load(&self.state, .relaxed); | |
} | |
} | |
fn releaseSlow(self: *Mutex, force_fair: bool) void { | |
@setCold(true); | |
var state = atomic.load(&self.state, .relaxed); | |
while (state == LOCKED) { | |
state = atomic.tryCompareAndSwap( | |
&self.state, | |
LOCKED, | |
UNLOCKED, | |
.release, | |
.relaxed, | |
) orelse return; | |
} | |
const Unparker = struct { | |
mutex: *Mutex, | |
force_fair: bool, | |
pub fn onUnpark(this: @This(), result: parking_lot.UnparkResult) usize { | |
if (result.token != null and (this.force_fair or result.be_fair)) { | |
if (!result.has_more) | |
atomic.store(&this.mutex.state, LOCKED, .relaxed); | |
return HANDOFF; | |
} | |
const new_state = if (result.token == null) @as(u8, UNLOCKED) else PARKED; | |
atomic.store(&this.mutex.state, new_state, .relaxed); | |
return RETRY; | |
} | |
}; | |
parking_lot.unparkOne(@ptrToInt(self), Unparker{ | |
.mutex = self, | |
.force_fair = force_fair, | |
}); | |
} | |
}; | |
pub const Condvar = struct { | |
has_waiters: bool = false, | |
pub fn wait(self: *Condvar, mutex: anytype) void { | |
return self.waitInner(mutex, null) catch unreachable; | |
} | |
pub fn tryWaitFor(self: *Condvar, mutex: anytype, duration: u64) error{TimedOut}!void { | |
return self.tryWaitUntil(mutex, nanotime() + duration); | |
} | |
pub fn tryWaitUntil(self: *Condvar, mutex: anytype, deadline: u64) error{TimedOut}!void { | |
return self.waitInner(mutex, deadline); | |
} | |
fn waitInner(self: *Condvar, mutex: anytype, deadline: ?u64) error{TimedOut}!void { | |
const CondMutex = @TypeOf(mutex); | |
const Parker = struct { | |
cond: *Condvar, | |
cond_mutex: CondMutex, | |
timed_out: bool = false, | |
pub fn onValidate(this: @This()) ?usize { | |
atomic.store(&this.cond.has_waiters, true, .seq_cst); | |
return 0; | |
} | |
pub fn onBeforeWait(this: @This()) void { | |
this.cond_mutex.release(); | |
} | |
pub fn onTimeout(this: *@This(), has_more: bool) void { | |
this.timed_out = true; | |
} | |
}; | |
var parker = Parker{ | |
.cond = self, | |
.cond_mutex = mutex, | |
}; | |
_ = parking_lot.parkConditionally(@ptrToInt(self), deadline, &parker); | |
mutex.acquire(); | |
if (parker.timed_out) | |
return error.TimedOut; | |
} | |
pub fn notifyOne(self: *Condvar) void { | |
if (!atomic.load(&self.has_waiters, .seq_cst)) | |
return; | |
const Unparker = struct { | |
cond: *Condvar, | |
pub fn onUnpark(this: @This(), result: parking_lot.UnparkResult) usize { | |
atomic.store(&this.cond.has_waiters, result.has_more, .seq_cst); | |
return 0; | |
} | |
}; | |
parking_lot.unparkOne(@ptrToInt(self), Unparker{ .cond = self }); | |
} | |
pub fn notifyAll(self: *Condvar) void { | |
if (!atomic.load(&self.has_waiters, .seq_cst)) | |
return; | |
atomic.store(&self.has_waiters, false, .seq_cst); | |
parking_lot.unparkAll(@ptrToInt(self)); | |
} | |
}; | |
pub const WaitGroup = struct { | |
counter: usize = 0, | |
pub fn init(amount: usize) WaitGroup { | |
return .{ .counter = amount }; | |
} | |
pub fn tryBegin(self: *WaitGroup, amount: usize) bool { | |
return self.apply(true, amount); | |
} | |
pub fn begin(self: *WaitGroup, amount: usize) void { | |
assert(self.tryBegin(amount)); | |
} | |
pub fn tryEnd(self: *WaitGroup, amount: usize) bool { | |
return self.apply(false, amount); | |
} | |
pub fn end(self: *WaitGroup, amount: usize) void { | |
assert(self.tryEnd(amount)); | |
} | |
pub fn tryUpdate(self: *WaitGroup, amount: isize) bool { | |
const is_add = amount > 0; | |
const value = @intCast(usize, if (add) amount else -amount); | |
return self.apply(is_add, value); | |
} | |
pub fn update(self: *WaitGroup, amount: isize) void { | |
assert(self.tryUpdate(amount)); | |
} | |
fn apply(self: *WaitGroup, is_add: bool, amount: usize) bool { | |
const max = std.math.maxInt(usize); | |
if (amount == 0) | |
return true; | |
var counter = atomic.load(&self.counter, .seq_cst); | |
while (true) { | |
var new_counter: usize = undefined; | |
if (is_add) { | |
if (counter > max - amount) | |
return false; | |
new_counter = counter + amount; | |
} else { | |
if (amount > counter) | |
return false; | |
new_counter = counter - amount; | |
} | |
counter = atomic.tryCompareAndSwap( | |
&self.counter, | |
counter, | |
new_counter, | |
.seq_cst, | |
.seq_cst, | |
) orelse { | |
if (new_counter == 0) | |
parking_lot.unparkAll(@ptrToInt(self)); | |
return true; | |
}; | |
} | |
} | |
pub fn tryWait(self: *WaitGroup) bool { | |
return atomic.load(&self.counter, .relaxed); | |
} | |
pub fn wait(self: *WaitGroup) void { | |
return self.waitInner(null) catch unreachable; | |
} | |
pub fn tryWaitFor(self: *WaitGroup, duration: u64) error{TimedOut}!void { | |
return self.tryWaitUntil(nanotime() + duration); | |
} | |
pub fn tryWaitUntil(self: *WaitGroup, deadline: u64) error{TimedOut}!void { | |
return self.waitInner(deadline); | |
} | |
fn waitInner(self: *WaitGroup, deadline: ?u64) error{TimedOut}!void { | |
const Parker = struct { | |
wg: *WaitGroup, | |
timed_out: bool = false, | |
pub fn onValidate(this: @This()) ?usize { | |
if (atomic.load(&this.wg.counter, .seq_cst) == 0) | |
return null; | |
return 0; | |
} | |
pub fn onBeforeWait(this: @This()) void {} | |
pub fn onTimeout(this: *@This(), has_more: bool) void { | |
this.timed_out = true; | |
} | |
}; | |
while (true) { | |
if (atomic.load(&self.counter, .seq_cst) == 0) | |
return; | |
var parker = Parker{ .wg = self }; | |
_ = parking_lot.parkConditionally(@ptrToInt(self), deadline, &parker); | |
if (parker.timed_out) | |
return error.TimedOut; | |
} | |
} | |
}; | |
pub const Semaphore = struct { | |
permits: usize = 0, | |
pub fn init(permits: usize) Semaphore { | |
return .{ .permits = permits }; | |
} | |
pub fn tryAcquire(self: *Semaphore, permits: usize) bool { | |
var perms = atomic.load(&self.permits, .seq_cst); | |
while (true) { | |
if (perms < permits) | |
return false; | |
perms = atomic.tryCompareAndSwap( | |
&self.permits, | |
perms, | |
perms - permits, | |
.seq_cst, | |
.seq_cst, | |
) orelse return true; | |
} | |
} | |
pub fn acquire(self: *Semaphore, permits: usize) void { | |
self.acquireInner(permits, null) catch unreachable; | |
} | |
pub fn tryAcquireFor(self: *Semaphore, permits: usize, duration: u64) error{TimedOut}!void { | |
return self.tryAcquireUntil(permits, nanotime() + duration); | |
} | |
pub fn tryAcquireUntil(self: *Semaphore, permits: usize, deadline: u64) error{TimedOut}!void { | |
return self.acquireInner(permits, deadline); | |
} | |
fn acquireInner(self: *Semaphore, permits: usize, deadline: ?u64) error{TimedOut}!void { | |
const Parker = struct { | |
sema: *Semaphore, | |
perms: usize, | |
timed_out: bool = false, | |
pub fn onValidate(this: @This()) ?usize { | |
if (atomic.load(&this.sema.permits, .seq_cst) >= this.perms) | |
return null; | |
return this.perms; | |
} | |
pub fn onBeforeWait(this: @This()) void {} | |
pub fn onTimeout(this: *@This(), has_more: bool) void { | |
this.timed_out = true; | |
} | |
}; | |
var parker = Parker{ | |
.sema = self, | |
.perms = permits, | |
}; | |
while (true) { | |
if (self.tryAcquire(permits)) | |
return; | |
if (parker.timed_out) | |
return error.TimedOut; | |
_ = parking_lot.parkConditionally(@ptrToInt(self), deadline, &parker); | |
} | |
} | |
pub fn tryRelease(self: *Semaphore, permits: usize) bool { | |
var perms = atomic.load(&self.permits, .seq_cst); | |
while (true) { | |
if (perms > std.math.maxInt(usize) - permits) | |
return false; | |
perms = atomic.tryCompareAndSwap( | |
&self.permits, | |
perms, | |
perms + permits, | |
.seq_cst, | |
.seq_cst, | |
) orelse break; | |
} | |
const Filter = struct { | |
sema: *Semaphore, | |
consumed: usize = 0, | |
pub fn onBeforeWake(this: @This()) void {} | |
pub fn onFilter(this: *@This(), unpark_context: UnparkContext) UnparkFilter { | |
const waiter_perms = unpark_context.getToken(); | |
const perms = atomic.load(&this.sema.permits, .seq_cst); | |
if ( | |
(perms < this.consumed) or | |
((perms - this.consumed) < waiter_perms) or | |
(this.consumed > (std.math.maxInt(usize) - waiter_perms)) | |
) { | |
return .stop; | |
} | |
this.consumed += waiter_perms; | |
return .{ .unpark = 0 }; | |
} | |
}; | |
var filter = Filter{ .sema = self }; | |
parking_lot.unparkFilter(@ptrToInt(self), &filter); | |
return true; | |
} | |
pub fn release(self: *Semaphore, permits: usize) void { | |
assert(self.tryRelease(permits)); | |
} | |
}; | |
pub const RwLock = struct { | |
state: usize = 0, | |
// reader: | |
// - fast path: 0 -> ONE_READER | |
// - set has_parked & wait for !(has_parked | has_writer) | |
// - always handoff to read bit already added | |
// | |
// unlock: | |
// - dec ONE_READER, if readers != 0 or !has_parked, ret | |
// - unlock_common | |
// | |
// writer: | |
// - fast path: 0 -> HAS_WRITER | |
// - set has_parhed & wait for 0 | |
// - always handoff to HAS_WRITER | ?(HAS_PARKED) | |
// | |
// unlock: | |
// - dec HAS_WRITER, if 0, ret | |
// - unlock_common | |
// | |
// | |
// unlock_common: | |
// - if | |
const HAS_WRITER = 1 << 0; | |
const HAS_PARKED = 1 << 1; | |
const ONE_READER = 1 << 2; | |
const READ_SHIFT = @ctz(usize, ONE_READER); | |
pub fn tryAcquireReader(self: *RwLock) bool { | |
var state = atomic.load(&self.state, .relaxed); | |
while (true) { | |
if (state & (HAS_PARKED | HAS_WRITER) != 0) | |
return false; | |
if ((state >> READ_SHIFT) == (std.math.maxInt(usize) >> READ_SHIFT)) | |
return false; | |
state = atomic.tryCompareAndSwap( | |
&self.state, | |
state, | |
state + ONE_READER, | |
.acquire, | |
.relaxed, | |
) orelse return true; | |
} | |
} | |
pub fn acquireReader(self: *RwLock) void { | |
self.acquireReaderInner(null) catch unreachable; | |
} | |
pub fn tryAcquireReaderFor(self: *RwLock, duration: u64) error{TimedOut}!void { | |
return self.tryAcquireReaderUntil(nanotime() + duration); | |
} | |
pub fn tryAcquireReaderUntil(self: *RwLock, deadline: u64) error{TimedOut}!void { | |
return self.acquireReaderInner(deadline); | |
} | |
fn acquireReaderInner(self: *RwLock, deadline: ?u64) error{TimedOut}!void { | |
@compileError("TODO"); | |
} | |
pub fn releaseReader(self: *RwLock) void { | |
@compileError("TODO"); | |
} | |
pub fn tryAcquireWriter(self: *RwLock) bool { | |
@compileError("TODO"); | |
} | |
pub fn acquireWriter(self: *RwLock) void { | |
@compileError("TODO"); | |
} | |
pub fn tryAcquireWriterFor(self: *RwLock, duration: u64) error{TimedOut}!void { | |
@compileError("TODO"); | |
} | |
pub fn tryAcquireWriterUntil(self: *RwLock, deadline: u64) error{TimedOut}!void { | |
@compileError("TODO"); | |
} | |
pub fn releaseWriter(self: *RwLock) void { | |
@compileError("TODO"); | |
} | |
}; | |
pub const ThreadPool = struct { | |
lock: Mutex = .{}, | |
cond: Condvar = .{}, | |
allocator: *std.mem.Allocator, | |
is_running: bool = true, | |
spawned: usize = 0, | |
threads: []*std.Thread = &[_]*std.Thread{}, | |
run_queue: std.SinglyLinkedList(fn(usize) void) = .{}, | |
pub fn init(self: *ThreadPool, allocator: *std.mem.Allocator) !void { | |
self.* = .{ .allocator = allocator }; | |
errdefer self.deinit(); | |
const num_threads = std.math.max(1, std.Thread.cpuCount() catch 1); | |
self.threads = try allocator.alloc(*std.Thread, num_threads); | |
for (self.threads) |*thread| { | |
thread.* = try std.Thread.spawn(self, runWorker); | |
self.spawned += 1; | |
} | |
} | |
pub fn deinit(self: *ThreadPool) void { | |
{ | |
self.lock.acquire(); | |
defer self.lock.release(); | |
self.is_running = false; | |
self.cond.notifyAll(); | |
} | |
defer self.allocator.free(self.threads); | |
for (self.threads[0..self.spawned]) |thread| | |
thread.wait(); | |
while (self.run_queue.popFirst()) |run_node| | |
(run_node.data)(@ptrToInt(run_node)); | |
} | |
pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void { | |
const Args = @TypeOf(args); | |
const RunNode = @TypeOf(self.run_queue).Node; | |
const Closure = struct { | |
arguments: Args, | |
pool: *ThreadPool, | |
run_node: RunNode = .{ .data = run }, | |
fn run(ptr: usize) void { | |
const run_node = @intToPtr(*RunNode, ptr); | |
const closure = @fieldParentPtr(@This(), "run_node", run_node); | |
const result = @call(.{}, func, closure.arguments); | |
closure.pool.lock.acquire(); | |
defer closure.pool.lock.release(); | |
closure.pool.allocator.destroy(closure); | |
} | |
}; | |
self.lock.acquire(); | |
defer self.lock.release(); | |
const closure = try self.allocator.create(Closure); | |
closure.* = Closure{ | |
.arguments = args, | |
.pool = self, | |
}; | |
self.run_queue.prepend(&closure.run_node); | |
self.cond.notifyOne(); | |
} | |
fn runWorker(self: *ThreadPool) void { | |
self.lock.acquire(); | |
defer self.lock.release(); | |
while (self.is_running) { | |
const run_node = self.run_queue.popFirst() orelse { | |
self.cond.wait(&held); | |
continue; | |
}; | |
self.lock.release(); | |
(run_node.data)(@ptrToInt(run_node)); | |
self.lock.acquire(); | |
} | |
} | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
pub const ThreadPool = struct { | |
state: usize = 0, | |
spawned: usize = 0, | |
run_queue: Queue, | |
idle_semaphore: Semaphore, | |
allocator: *std.mem.Allocator, | |
workers: []Worker = &[_]Worker{}, | |
pub const InitConfig = struct { | |
allocator: ?*std.mem.Allocator = null, | |
max_threads: ?usize = null, | |
var default_gpa = std.heap.GeneralPurposeAllocator(.{}){}; | |
var default_allocator = &default_gpa.allocator; | |
}; | |
pub fn init(self: *ThreadPool, config: InitConfig) !void { | |
self.* = ThreadPool{ | |
.run_queue = Queue.init(), | |
.idle_semaphore = Semaphore.init(0), | |
.allocator = config.allocator orelse InitConfig.default_allocator, | |
}; | |
errdefer self.deinit(); | |
const num_workers = std.math.max(1, config.max_threads orelse std.Thread.cpuCount() catch 1); | |
self.workers = try self.allocator.alloc(Worker, num_workers); | |
for (self.workers) |*worker| { | |
try worker.init(self); | |
@atomicStore(usize, &self.spawned, self.spawned + 1, .SeqCst); | |
} | |
} | |
pub fn deinit(self: *ThreadPool) void { | |
self.shutdown(); | |
for (self.workers[0..self.spawned]) |*worker| | |
worker.deinit(); | |
while (self.run_queue.pop()) |run_node| | |
(run_node.data.runFn)(&run_node.data); | |
self.allocator.free(self.workers); | |
self.idle_semaphore.deinit(); | |
self.run_queue.deinit(); | |
self.* = undefined; | |
} | |
pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void { | |
const Args = @TypeOf(args); | |
const Closure = struct { | |
func_args: Args, | |
allocator: *std.mem.Allocator, | |
run_node: RunNode = .{ .data = .{ .runFn = runFn } }, | |
fn runFn(runnable: *Runnable) void { | |
const run_node = @fieldParentPtr(RunNode, "data", runnable); | |
const closure = @fieldParentPtr(@This(), "run_node", run_node); | |
const result = @call(.{}, func, closure.func_args); | |
closure.allocator.destroy(closure); | |
} | |
}; | |
const allocator = self.allocator; | |
const closure = try allocator.create(Closure); | |
errdefer allocator.free(closure); | |
closure.* = Closure{ | |
.func_args = args, | |
.allocator = allocator, | |
}; | |
const run_node = &closure.run_node; | |
if (Worker.current) |worker| { | |
worker.run_queue.push(run_node); | |
} else { | |
self.run_queue.push(run_node); | |
} | |
self.notify(); | |
} | |
const State = struct { | |
is_shutdown: bool = false, | |
is_notified: bool = false, | |
idle_workers: usize = 0, | |
fn pack(self: State) usize { | |
return ( | |
(@as(usize, @boolToInt(self.is_shutdown)) << 0) | | |
(@as(usize, @boolToInt(self.is_notified)) << 1) | | |
(self.idle_workers << 2) | |
); | |
} | |
fn unpack(value: usize) State { | |
return State{ | |
.is_shutdown = value & (1 << 0) != 0, | |
.is_notified = value & (1 << 1) != 0, | |
.idle_workers = value >> 2, | |
}; | |
} | |
}; | |
fn wait(self: *ThreadPool) error{Shutdown}!void { | |
var state = State.unpack(@atomicLoad(usize, &self.state, .SeqCst)); | |
while (true) { | |
if (state.is_shutdown) | |
return error.Shutdown; | |
var new_state = state; | |
if (state.is_notified) { | |
new_state.is_notified = false; | |
} else { | |
new_state.idle_workers += 1; | |
} | |
if (@cmpxchgWeak( | |
usize, | |
&self.state, | |
state.pack(), | |
new_state.pack(), | |
.SeqCst, | |
.SeqCst, | |
)) |updated| { | |
state = State.unpack(updated); | |
continue; | |
} | |
if (!state.is_notified) | |
self.idle_semaphore.wait(); | |
return; | |
} | |
} | |
fn notify(self: *ThreadPool) void { | |
var state = State.unpack(@atomicLoad(usize, &self.state, .SeqCst)); | |
while (true) { | |
if (state.is_shutdown) | |
return; | |
var new_state = state; | |
if (state.is_notified) { | |
return; | |
} else if (state.idle_workers == 0) { | |
new_state.is_notified = true; | |
} else { | |
new_state.idle_workers -= 1; | |
} | |
if (@cmpxchgWeak( | |
usize, | |
&self.state, | |
state.pack(), | |
new_state.pack(), | |
.SeqCst, | |
.SeqCst, | |
)) |updated| { | |
state = State.unpack(updated); | |
continue; | |
} | |
if (!new_state.is_notified) | |
self.idle_semaphore.post(); | |
return; | |
} | |
} | |
fn shutdown(self: *ThreadPool) void { | |
var state = State.unpack(@atomicRmw( | |
usize, | |
&self.state, | |
.Xchg, | |
(State{ .is_shutdown = true }).pack(), | |
.SeqCst, | |
)); | |
while (state.idle_workers > 0) : (state.idle_workers -= 1) | |
self.idle_semaphore.post(); | |
} | |
const Worker = struct { | |
thread: *std.Thread, | |
run_queue: Queue, | |
fn init(self: *Worker, pool: *ThreadPool) !void { | |
self.* = Worker{ | |
.thread = undefined, | |
.run_queue = Queue.init(), | |
}; | |
self.thread = std.Thread.spawn(RunConfig{ | |
.worker = self, | |
.pool = pool, | |
}, Worker.run) catch |err| { | |
self.run_queue.deinit(); | |
return err; | |
}; | |
} | |
fn deinit(self: *Worker) void { | |
self.thread.wait(); | |
self.run_queue.deinit(); | |
self.* = undefined; | |
} | |
threadlocal var current: ?*Worker = null; | |
const RunConfig = struct { | |
worker: *Worker, | |
pool: *ThreadPool, | |
}; | |
fn run(config: RunConfig) void { | |
const self = config.worker; | |
const pool = config.pool; | |
const old_current = current; | |
current = self; | |
defer current = old_current; | |
var tick = @ptrToInt(self); | |
var prng = std.rand.DefaultPrng.init(tick); | |
while (true) { | |
const run_node = self.poll(tick, pool, &prng.random) orelse { | |
pool.wait() catch break; | |
continue; | |
}; | |
tick +%= 1; | |
(run_node.data.runFn)(&run_node.data); | |
} | |
} | |
fn poll(self: *Worker, tick: usize, pool: *ThreadPool, rand: *std.rand.Random) ?*RunNode { | |
if (tick % 128 == 0) { | |
if (self.steal(pool, rand, .fair)) |run_node| | |
return run_node; | |
} | |
if (tick % 64 == 0) { | |
if (self.run_queue.steal(&pool.run_queue, .fair)) |run_node| | |
return run_node; | |
} | |
if (self.run_queue.pop()) |run_node| | |
return run_node; | |
var attempts: usize = 8; | |
while (attempts > 0) : (attempts -= 1) { | |
if (self.steal(pool, rand, .unfair)) |run_node| { | |
return run_node; | |
} else { | |
std.os.sched_yield() catch spinLoopHint(); | |
} | |
} | |
if (self.run_queue.steal(&pool.run_queue, .unfair)) |run_node| | |
return run_node; | |
return null; | |
} | |
fn steal(self: *Worker, pool: *ThreadPool, rand: *std.rand.Random, mode: anytype) ?*RunNode { | |
const spawned = @atomicLoad(usize, &pool.spawned, .SeqCst); | |
if (spawned < 2) | |
return null; | |
var index = rand.uintLessThan(usize, spawned); | |
var iter = spawned; | |
while (iter > 0) : (iter -= 1) { | |
const target = &pool.workers[index]; | |
index += 1; | |
if (index == spawned) | |
index = 0; | |
if (target == self) | |
continue; | |
if (self.run_queue.steal(&target.run_queue, mode)) |run_node| | |
return run_node; | |
} | |
return null; | |
} | |
}; | |
const Queue = struct { | |
mutex: Mutex, | |
size: usize, | |
list: List, | |
fn init() Queue { | |
return Queue{ | |
.mutex = Mutex.init(), | |
.size = 0, | |
.list = .{}, | |
}; | |
} | |
fn deinit(self: *Queue) void { | |
self.mutex.deinit(); | |
self.* = undefined; | |
} | |
fn push(self: *Queue, node: *List.Node) void { | |
self.mutex.lock(); | |
defer self.mutex.unlock(); | |
self.list.prepend(node); | |
@atomicStore(usize, &self.size, self.size + 1, .SeqCst); | |
} | |
fn pop(self: *Queue) ?*List.Node { | |
return self.popFrom(.head); | |
} | |
fn steal(self: *Queue, target: *Queue, mode: enum { fair, unfair }) ?*RunNode { | |
return target.popFrom(switch (mode) { | |
.fair => .tail, | |
.unfair => .head, | |
}); | |
} | |
fn popFrom(self: *Queue, side: enum { head, tail }) ?*RunNode { | |
if (@atomicLoad(usize, &self.size, .SeqCst) == 0) | |
return null; | |
self.mutex.lock(); | |
defer self.mutex.unlock(); | |
// potential deadlock when all pops are fair.. | |
const run_node = switch (side) { | |
.head => self.list.popFirst(), | |
.tail => self.list.pop(), | |
}; | |
if (run_node != null) | |
@atomicStore(usize, &self.size, self.size - 1, .SeqCst); | |
return run_node; | |
} | |
}; | |
const List = std.TailQueue(Runnable); | |
const RunNode = List.Node; | |
const Runnable = struct { | |
runFn: fn(*Runnable) void, | |
}; | |
}; | |
pub fn Channel( | |
comptime T: type, | |
comptime buffer_type: std.fifo.LinearFifoBufferType, | |
) type { | |
return struct { | |
mutex: Mutex, | |
putters: Condvar, | |
getters: Condvar, | |
buffer: Buffer, | |
is_closed: bool, | |
const Self = @This(); | |
const Buffer = std.fifo.LinearFifo(T, buffer_type); | |
pub usingnamespace switch (buffer_type) { | |
.Static => struct { | |
pub fn init() Self { | |
return Self.withBuffer(Buffer.init()); | |
} | |
}, | |
.Slice => struct { | |
pub fn init(buf: []T) Self { | |
return Self.withBuffer(Buffer.init(buf)); | |
} | |
}, | |
.Dynamic => struct { | |
pub fn init(allocator: *std.mem.Allocator) Self { | |
return Self.withBuffer(Buffer.init(allocator)); | |
} | |
}, | |
}; | |
fn withBuffer(buffer: Buffer) Self { | |
return Self{ | |
.mutex = Mutex.init(), | |
.putters = Condvar.init(), | |
.getters = Condvar.init(), | |
.buffer = buffer, | |
.is_closed = false, | |
}; | |
} | |
pub fn deinit(self: *Self) void { | |
self.mutex.deinit(); | |
self.putters.deinit(); | |
self.getters.deinit(); | |
self.buffer.deinit(); | |
self.* = undefined; | |
} | |
pub fn close(self: *Self) void { | |
self.mutex.lock(); | |
defer self.mutex.unlock(); | |
if (self.is_closed) | |
return; | |
self.is_closed = true; | |
self.putters.broadcast(); | |
self.getters.broadcast(); | |
} | |
pub fn tryWriteItem(self: *Self, item: T) !bool { | |
const wrote = try self.write(&[1]T{item}); | |
return wrote == 1; | |
} | |
pub fn writeItem(self: *Self, item: T) !void { | |
return self.writeAll(&[1]T{item}); | |
} | |
pub fn write(self: *Self, items: []const T) !usize { | |
return self.writeItems(items, false); | |
} | |
pub fn tryReadItem(self: *Self) !?T { | |
var items: [1]T = undefined; | |
if ((try self.read(&items)) != 1) | |
return null; | |
return items[0]; | |
} | |
pub fn readItem(self: *Self) !T { | |
var items: [1]T = undefined; | |
try self.readAll(&items); | |
return items[0]; | |
} | |
pub fn read(self: *Self, items: []T) !usize { | |
return self.readItems(items, false); | |
} | |
pub fn writeAll(self: *Self, items: []const T) !void { | |
std.debug.assert((try self.writeItems(items, true)) == items.len); | |
} | |
pub fn readAll(self: *Self, items: []T) !void { | |
std.debug.assert((try self.readItems(items, true)) == items.len); | |
} | |
fn writeItems(self: *Self, items: []const T, should_block: bool) !usize { | |
self.mutex.lock(); | |
defer self.mutex.unlock(); | |
var pushed: usize = 0; | |
while (pushed < items.len) { | |
const did_push = blk: { | |
if (self.is_closed) | |
return error.Closed; | |
self.buffer.writeItem(item) catch |err| { | |
if (buffer_type == .Dynamic) | |
return err; | |
break :blk false; | |
}; | |
self.getters.signal(); | |
break :blk true; | |
}; | |
if (did_push) { | |
pushed += 1; | |
} else if (should_block) { | |
self.putters.wait(&self.mutex); | |
} else { | |
break; | |
} | |
} | |
return pushed; | |
} | |
fn readItems(self: *Self, items: []T, should_block: bool) !usize { | |
self.mutex.lock(); | |
defer self.mutex.unlock(); | |
var popped: usize = 0; | |
while (popped < items.len) { | |
const new_item = blk: { | |
if (self.buffer.readItem()) |item| { | |
self.putters.signal(); | |
break :blk item; | |
} | |
if (self.is_closed) | |
return error.Closed; | |
break :blk null; | |
}; | |
if (new_item) |item| { | |
items[popped] = item; | |
popped += 1; | |
} else if (should_block) { | |
self.getters.wait(&self.mutex); | |
} else { | |
break; | |
} | |
} | |
return popped; | |
} | |
}; | |
} | |
pub const RwLock = if (std.builtin.os.tag != .windows and std.builtin.link_libc) | |
struct { | |
rwlock: if (std.builtin.os.tag != .windows) pthread_rwlock_t else void, | |
pub fn init() RwLock { | |
return .{ .rwlock = PTHREAD_RWLOCK_INITIALIZER }; | |
} | |
pub fn deinit(self: *RwLock) void { | |
const safe_rc = switch (std.builtin.os.tag) { | |
.dragonfly, .netbsd => std.os.EAGAIN, | |
else => 0, | |
}; | |
const rc = std.c.pthread_rwlock_destroy(&self.rwlock); | |
std.debug.assert(rc == 0 or rc == safe_rc); | |
self.* = undefined; | |
} | |
pub fn tryLock(self: *RwLock) bool { | |
return pthread_rwlock_trywrlock(&self.rwlock) == 0; | |
} | |
pub fn lock(self: *RwLock) void { | |
const rc = pthread_rwlock_wrlock(&self.rwlock); | |
std.debug.assert(rc == 0); | |
} | |
pub fn unlock(self: *RwLock) void { | |
const rc = pthread_rwlock_unlock(&self.rwlock); | |
std.debug.assert(rc == 0); | |
} | |
pub fn tryLockShared(self: *RwLock) bool { | |
return pthread_rwlock_tryrdlock(&self.rwlock) == 0; | |
} | |
pub fn lockShared(self: *RwLock) void { | |
const rc = pthread_rwlock_rdlock(&self.rwlock); | |
std.debug.assert(rc == 0); | |
} | |
pub fn unlockShared(self: *RwLock) void { | |
const rc = pthread_rwlock_unlock(&self.rwlock); | |
std.debug.assert(rc == 0); | |
} | |
const PTHREAD_RWLOCK_INITIALIZER = pthread_rwlock_t{}; | |
const pthread_rwlock_t = switch (std.builtin.os.tag) { | |
.macos, .ios, .watchos, .tvos => extern struct { | |
__sig: c_long = 0x2DA8B3B4, | |
__opaque: [192]u8 = [_]u8{0} ** 192, | |
}, | |
.linux => switch (std.builtin.abi) { | |
.android => switch (@sizeOf(usize)) { | |
4 => extern struct { | |
lock: std.c.pthread_mutex_t = std.c.PTHREAD_MUTEX_INITIALIZER, | |
cond: std.c.pthread_cond_t = std.c.PTHREAD_COND_INITIALIZER, | |
numLocks: c_int = 0, | |
writerThreadId: c_int = 0, | |
pendingReaders: c_int = 0, | |
pendingWriters: c_int = 0, | |
attr: i32 = 0, | |
__reserved: [12]u8 = [_]u8{0} ** 2, | |
}, | |
8 => extern struct { | |
numLocks: c_int = 0, | |
writerThreadId: c_int = 0, | |
pendingReaders: c_int = 0, | |
pendingWriters: c_int = 0, | |
attr: i32 = 0, | |
__reserved: [36]u8 = [_]u8{0} ** 36, | |
}, | |
else => unreachable, | |
}, | |
else => extern struct { | |
size: [56]u8 align(@alignOf(usize)) = [_]u8{0} ** 56, | |
}, | |
}, | |
.fuchsia => extern struct { | |
size: [56]u8 align(@alignOf(usize)) = [_]u8{0} ** 56, | |
}, | |
.emscripten => extern struct { | |
size: [32]u8 align(4) = [_]u8{0} ** 32, | |
}, | |
.netbsd => extern struct { | |
ptr_magic: c_uint = 0x99990009, | |
ptr_interlock: switch (std.builtin.arch) { | |
.aarch64, .sparc, .x86_64, .i386 => u8, | |
.arm, .powerpc => c_int, | |
else => unreachable, | |
} = 0, | |
ptr_rblocked_first: ?*u8 = null, | |
ptr_rblocked_last: ?*u8 = null, | |
ptr_wblocked_first: ?*u8 = null, | |
ptr_wblocked_last: ?*u8 = null, | |
ptr_nreaders: c_uint = 0, | |
ptr_owner: std.c.pthread_t = null, | |
ptr_private: ?*c_void = null, | |
}, | |
.haiku => extern struct { | |
flags: u32 = 0, | |
owner: i32 = -1, | |
lock_sem: i32 = 0, | |
lock_count: i32 = 0, | |
reader_count: i32 = 0, | |
writer_count: i32 = 0, | |
waiters: [2]?*c_void = [_]?*c_void{null, null}, | |
}, | |
.kfreebsd, .freebsd, .openbsd => extern struct { | |
ptr: ?*c_void = null, | |
}, | |
.hermit => extern struct { | |
ptr: usize = std.math.maxInt(usize), | |
}, | |
else => @compileError("pthread_rwlock_t not implemented for this platform"), | |
}; | |
extern "c" fn pthread_rwlock_destroy(p: *pthread_rwlock_t) callconv(.C) c_int; | |
extern "c" fn pthread_rwlock_rdlock(p: *pthread_rwlock_t) callconv(.C) c_int; | |
extern "c" fn pthread_rwlock_wrlock(p: *pthread_rwlock_t) callconv(.C) c_int; | |
extern "c" fn pthread_rwlock_tryrdlock(p: *pthread_rwlock_t) callconv(.C) c_int; | |
extern "c" fn pthread_rwlock_trywrlock(p: *pthread_rwlock_t) callconv(.C) c_int; | |
extern "c" fn pthread_rwlock_unlock(p: *pthread_rwlock_t) callconv(.C) c_int; | |
} | |
else | |
struct { | |
/// https://github.com/bloomberg/rwl-bench/blob/master/bench11.cpp | |
state: usize, | |
mutex: Mutex, | |
semaphore: Semaphore, | |
const IS_WRITING: usize = 1; | |
const WRITER: usize = 1 << 1; | |
const READER: usize = 1 << (1 + std.meta.bitCount(Count)); | |
const WRITER_MASK: usize = std.math.maxInt(Count) << @ctz(usize, WRITER); | |
const READER_MASK: usize = std.math.maxInt(Count) << @ctz(usize, READER); | |
const Count = std.meta.Int(.unsigned, @divFloor(std.meta.bitCount(usize) - 1, 2)); | |
pub fn init() RwLock { | |
return .{ | |
.state = 0, | |
.mutex = Mutex.init(), | |
.semaphore = Semaphore.init(0), | |
}; | |
} | |
pub fn deinit(self: *RwLock) void { | |
self.semaphore.deinit(); | |
self.mutex.deinit(); | |
self.* = undefined; | |
} | |
pub fn tryLock(self: *RwLock) bool { | |
if (self.mutex.tryLock()) { | |
const state = @atomicLoad(usize, &self.state, .SeqCst); | |
if (state & READER_MASK == 0) { | |
_ = @atomicRmw(usize, &self.state, .Or, IS_WRITING, .SeqCst); | |
return true; | |
} | |
self.mutex.unlock(); | |
} | |
return false; | |
} | |
pub fn lock(self: *RwLock) void { | |
_ = @atomicRmw(usize, &self.state, .Add, WRITER, .SeqCst); | |
self.mutex.lock(); | |
const state = @atomicRmw(usize, &self.state, .Or, IS_WRITING, .SeqCst); | |
if (state & READER_MASK != 0) | |
self.semaphore.wait(); | |
} | |
pub fn unlock(self: *RwLock) void { | |
_ = @atomicRmw(usize, &self.state, .And, ~IS_WRITING, .SeqCst); | |
self.mutex.unlock(); | |
} | |
pub fn tryLockShared(self: *RwLock) bool { | |
const state = @atomicLoad(usize, &self.state, .SeqCst); | |
if (state & (IS_WRITING | WRITER_MASK) == 0) { | |
_ = @cmpxchgStrong( | |
usize, | |
&self.state, | |
state, | |
state + READER, | |
.SeqCst, | |
.SeqCst, | |
) orelse return true; | |
} | |
if (self.mutex.tryLock()) { | |
_ = @atomicRmw(usize, &self.state, .Add, READER, .SeqCst); | |
self.mutex.unlock(); | |
return true; | |
} | |
return false; | |
} | |
pub fn lockShared(self: *RwLock) void { | |
var state = @atomicLoad(usize, &self.state, .SeqCst); | |
while (state & (IS_WRITING | WRITER_MASK) == 0) { | |
state = @cmpxchgWeak( | |
usize, | |
&self.state, | |
state, | |
state + READER, | |
.SeqCst, | |
.SeqCst, | |
) orelse return; | |
} | |
self.mutex.lock(); | |
_ = @atomicRmw(usize, &self.state, .Add, READER, .SeqCst); | |
self.mutex.unlock(); | |
} | |
pub fn unlockShared(self: *RwLock) void { | |
const state = @atomicRmw(usize, &self.state, .Sub, READER, .SeqCst); | |
if ((state & READER_MASK == READER) and (state & IS_WRITING != 0)) | |
self.semaphore.post(); | |
} | |
}; | |
pub const WaitGroup = struct { | |
mutex: Mutex, | |
cond: Condvar, | |
active: usize, | |
pub fn init() WaitGroup { | |
return .{ | |
.mutex = Mutex.init(), | |
.cond = Condvar.init(), | |
.active = 0, | |
}; | |
} | |
pub fn deinit(self: *WaitGroup) void { | |
self.mutex.deinit(); | |
self.cond.deinit(); | |
self.* = undefined; | |
} | |
pub fn add(self: *WaitGroup) void { | |
self.mutex.lock(); | |
defer self.mutex.unlock(); | |
self.active += 1; | |
} | |
pub fn done(self: *WaitGroup) void { | |
self.mutex.lock(); | |
defer self.mutex.unlock(); | |
self.active -= 1; | |
if (self.active == 0) | |
self.cond.signal(); | |
} | |
pub fn wait(self: *WaitGroup) void { | |
self.mutex.lock(); | |
defer self.mutex.unlock(); | |
while (self.active != 0) | |
self.cond.wait(&self.mutex); | |
} | |
}; | |
pub const Semaphore = struct { | |
mutex: Mutex, | |
cond: Condvar, | |
permits: usize, | |
pub fn init(permits: usize) Semaphore { | |
return .{ | |
.mutex = Mutex.init(), | |
.cond = Condvar.init(), | |
.permits = permits, | |
}; | |
} | |
pub fn deinit(self: *Semaphore) void { | |
self.mutex.deinit(); | |
self.cond.deinit(); | |
self.* = undefined; | |
} | |
pub fn wait(self: *Semaphore) void { | |
self.mutex.lock(); | |
defer self.mutex.unlock(); | |
while (self.permits == 0) | |
self.cond.wait(&self.mutex); | |
self.permits -= 1; | |
if (self.permits > 0) | |
self.cond.signal(); | |
} | |
pub fn post(self: *Semaphore) void { | |
self.mutex.lock(); | |
defer self.mutex.unlock(); | |
self.permits += 1; | |
self.cond.signal(); | |
} | |
}; | |
pub const Mutex = if (std.builtin.os.tag == .windows) | |
struct { | |
srwlock: SRWLOCK, | |
pub fn init() Mutex { | |
return .{ .srwlock = SRWLOCK_INIT }; | |
} | |
pub fn deinit(self: *Mutex) void { | |
self.* = undefined; | |
} | |
pub fn tryLock(self: *Mutex) bool { | |
return TryAcquireSRWLockExclusive(&self.srwlock) != system.FALSE; | |
} | |
pub fn lock(self: *Mutex) void { | |
AcquireSRWLockExclusive(&self.srwlock); | |
} | |
pub fn unlock(self: *Mutex) void { | |
ReleaseSRWLockExclusive(&self.srwlock); | |
} | |
const SRWLOCK = usize; | |
const SRWLOCK_INIT: SRWLOCK = 0; | |
extern "kernel32" fn TryAcquireSRWLockExclusive(s: *SRWLOCK) callconv(system.WINAPI) system.BOOL; | |
extern "kernel32" fn AcquireSRWLockExclusive(s: *SRWLOCK) callconv(system.WINAPI) void; | |
extern "kernel32" fn ReleaseSRWLockExclusive(s: *SRWLOCK) callconv(system.WINAPI) void; | |
} | |
else if (std.builtin.link_libc) | |
struct { | |
mutex: if (std.builtin.link_libc) std.c.pthread_mutex_t else void, | |
pub fn init() Mutex { | |
return .{ .mutex = std.c.PTHREAD_MUTEX_INITIALIZER }; | |
} | |
pub fn deinit(self: *Mutex) void { | |
const safe_rc = switch (std.builtin.os.tag) { | |
.dragonfly, .netbsd => std.os.EAGAIN, | |
else => 0, | |
}; | |
const rc = std.c.pthread_mutex_destroy(&self.mutex); | |
std.debug.assert(rc == 0 or rc == safe_rc); | |
self.* = undefined; | |
} | |
pub fn tryLock(self: *Mutex) bool { | |
return pthread_mutex_trylock(&self.mutex) == 0; | |
} | |
pub fn lock(self: *Mutex) void { | |
const rc = std.c.pthread_mutex_lock(&self.mutex); | |
std.debug.assert(rc == 0); | |
} | |
pub fn unlock(self: *Mutex) void { | |
const rc = std.c.pthread_mutex_unlock(&self.mutex); | |
std.debug.assert(rc == 0); | |
} | |
extern "c" fn pthread_mutex_trylock(m: *std.c.pthread_mutex_t) callconv(.C) c_int; | |
} | |
else if (std.builtin.os.tag == .linux) | |
struct { | |
state: State, | |
const State = enum(i32) { | |
unlocked, | |
locked, | |
waiting, | |
}; | |
pub fn init() Mutex { | |
return .{ .state = .unlocked }; | |
} | |
pub fn deinit(self: *Mutex) void { | |
self.* = undefined; | |
} | |
pub fn tryLock(self: *Mutex) bool { | |
return @cmpxchgStrong( | |
State, | |
&self.state, | |
.unlocked, | |
.locked, | |
.Acquire, | |
.Monotonic, | |
) == null; | |
} | |
pub fn lock(self: *Mutex) void { | |
switch (@atomicRmw(State, &self.state, .Xchg, .locked, .Acquire)) { | |
.unlocked => {}, | |
else => |s| self.lockSlow(s), | |
} | |
} | |
fn lockSlow(self: *Mutex, current_state: State) void { | |
@setCold(true); | |
var new_state = current_state; | |
while (true) { | |
var spin: u8 = 0; | |
while (spin < 100) : (spin += 1) { | |
const state = @cmpxchgWeak( | |
State, | |
&self.state, | |
.unlocked, | |
new_state, | |
.Acquire, | |
.Monotonic, | |
) orelse return; | |
switch (state) { | |
.unlocked => {}, | |
.locked => {}, | |
.waiting => break, | |
} | |
var iter = spin + 1; | |
while (iter > 0) : (iter -= 1) | |
spinLoopHint(); | |
} | |
new_state = .waiting; | |
switch (@atomicRmw(State, &self.state, .Xchg, new_state, .Acquire)) { | |
.unlocked => return, | |
else => {}, | |
} | |
Futex.wait( | |
@ptrCast(*const i32, &self.state), | |
@enumToInt(new_state), | |
); | |
} | |
} | |
pub fn unlock(self: *Mutex) void { | |
switch (@atomicRmw(State, &self.state, .Xchg, .unlocked, .Release)) { | |
.unlocked => unreachable, | |
.locked => {}, | |
.waiting => self.unlockSlow(), | |
} | |
} | |
fn unlockSlow(self: *Mutex) void { | |
@setCold(true); | |
Futex.wake(@ptrCast(*const i32, &self.state)); | |
} | |
} | |
else | |
struct { | |
is_locked: bool, | |
pub fn init() Mutex { | |
return .{ .is_locked = false }; | |
} | |
pub fn deinit(self: *Mutex) void { | |
self.* = undefined; | |
} | |
pub fn tryLock(self: *Mutex) bool { | |
return @atomicRmw(bool, &self.is_locked, .Xchg, true, .Acquire) == false; | |
} | |
pub fn lock(self: *Mutex) void { | |
while (!self.tryLock()) | |
spinLoopHint(); | |
} | |
pub fn unlock(self: *Mutex) void { | |
@atomicStore(bool, &self.is_locked, false, .Release); | |
} | |
}; | |
pub const Condvar = if (std.builtin.os.tag == .windows) | |
struct { | |
cond: CONDITION_VARIABLE, | |
pub fn init() Condvar { | |
return .{ .cond = CONDITION_VARIABLE_INIT }; | |
} | |
pub fn deinit(self: *Condvar) void { | |
self.* = undefined; | |
} | |
pub fn wait(self: *Condvar, mutex: *Mutex) void { | |
const rc = SleepConditionVariableSRW( | |
&self.cond, | |
&mutex.srwlock, | |
system.INFINITE, | |
@as(system.ULONG, 0), | |
); | |
std.debug.assert(rc != system.FALSE); | |
} | |
pub fn signal(self: *Condvar) void { | |
WakeConditionVariable(&self.cond); | |
} | |
pub fn broadcast(self: *Condvar) void { | |
WakeAllConditionVariable(&self.cond); | |
} | |
const SRWLOCK = usize; | |
const CONDITION_VARIABLE = usize; | |
const CONDITION_VARIABLE_INIT: CONDITION_VARIABLE = 0; | |
extern "kernel32" fn WakeAllConditionVariable(c: *CONDITION_VARIABLE) callconv(system.WINAPI) void; | |
extern "kernel32" fn WakeConditionVariable(c: *CONDITION_VARIABLE) callconv(system.WINAPI) void; | |
extern "kernel32" fn SleepConditionVariableSRW( | |
c: *CONDITION_VARIABLE, | |
s: *SRWLOCK, | |
t: system.DWORD, | |
f: system.ULONG, | |
) callconv(system.WINAPI) system.BOOL; | |
} | |
else if (std.builtin.link_libc) | |
struct { | |
cond: if (std.builtin.link_libc) std.c.pthread_cond_t else void, | |
pub fn init() Condvar { | |
return .{ .cond = std.c.PTHREAD_COND_INITIALIZER }; | |
} | |
pub fn deinit(self: *Condvar) void { | |
const safe_rc = switch (std.builtin.os.tag) { | |
.dragonfly, .netbsd => std.os.EAGAIN, | |
else => 0, | |
}; | |
const rc = std.c.pthread_cond_destroy(&self.cond); | |
std.debug.assert(rc == 0 or rc == safe_rc); | |
self.* = undefined; | |
} | |
pub fn wait(self: *Condvar, mutex: *Mutex) void { | |
const rc = std.c.pthread_cond_wait(&self.cond, &mutex.mutex); | |
std.debug.assert(rc == 0); | |
} | |
pub fn signal(self: *Condvar) void { | |
const rc = std.c.pthread_cond_signal(&self.cond); | |
std.debug.assert(rc == 0); | |
} | |
pub fn broadcast(self: *Condvar) void { | |
const rc = std.c.pthread_cond_broadcast(&self.cond); | |
std.debug.assert(rc == 0); | |
} | |
} | |
else | |
struct { | |
mutex: Mutex, | |
notified: bool, | |
waiters: std.SinglyLinkedList(Event), | |
pub fn init() Condvar { | |
return .{ | |
.mutex = Mutex.init(), | |
.notified = false, | |
.waiters = .{}, | |
}; | |
} | |
pub fn deinit(self: *Condvar) void { | |
self.mutex.deinit(); | |
self.* = undefined; | |
} | |
pub fn wait(self: *Condvar, mutex: *Mutex) void { | |
self.mutex.lock(); | |
if (self.notified) { | |
self.notified = false; | |
self.mutex.unlock(); | |
return; | |
} | |
var wait_node = @TypeOf(self.waiters).Node{ .data = .{} }; | |
self.waiters.prepend(&wait_node); | |
self.mutex.unlock(); | |
mutex.unlock(); | |
wait_node.data.wait(); | |
mutex.lock(); | |
} | |
pub fn signal(self: *Condvar) void { | |
self.mutex.lock(); | |
const maybe_wait_node = self.waiters.popFirst(); | |
if (maybe_wait_node == null) | |
self.notified = true; | |
self.mutex.unlock(); | |
if (maybe_wait_node) |wait_node| | |
wait_node.data.set(); | |
} | |
pub fn broadcast(self: *Condvar) void { | |
self.mutex.lock(); | |
var waiters = self.waiters; | |
self.notified = true; | |
self.mutex.unlock(); | |
while (waiters.popFirst()) |wait_node| | |
wait_node.data.set(); | |
} | |
const Event = struct { | |
futex: i32 = 0, | |
fn wait(self: *Event) void { | |
while (@atomicLoad(i32, &self.futex, .Acquire) == 0) { | |
if (@hasDecl(Futex, "wait")) { | |
Futex.wait(&self.futex, 0); | |
} else { | |
spinLoopHint(); | |
} | |
} | |
} | |
fn set(self: *Event) void { | |
@atomicStore(i32, &self.futex, 1, .Release); | |
if (@hasDecl(Futex, "wake")) | |
Futex.wake(&self.futex); | |
} | |
}; | |
}; | |
const Futex = switch (std.builtin.os.tag) { | |
.linux => struct { | |
fn wait(ptr: *const i32, cmp: i32) void { | |
switch (system.getErrno(system.futex_wait( | |
ptr, | |
system.FUTEX_PRIVATE_FLAG | system.FUTEX_WAIT, | |
cmp, | |
null, | |
))) { | |
0 => {}, | |
std.os.EINTR => {}, | |
std.os.EAGAIN => {}, | |
else => unreachable, | |
} | |
} | |
fn wake(ptr: *const i32) void { | |
switch (system.getErrno(system.futex_wake( | |
ptr, | |
system.FUTEX_PRIVATE_FLAG | system.FUTEX_WAKE, | |
@as(i32, 1), | |
))) { | |
0 => {}, | |
std.os.EFAULT => {}, | |
else => unreachable, | |
} | |
} | |
}, | |
else => void, | |
}; | |
fn spinLoopHint() void { | |
switch (std.builtin.arch) { | |
.i386, .x86_64 => asm volatile("pause" ::: "memory"), | |
.arm, .aarch64 => asm volatile("yield" ::: "memory"), | |
else => {}, | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment