Skip to content

High(er) performance async Queue#2885

Merged
djspiewak merged 46 commits intotypelevel:series/3.xfrom
djspiewak:feature/high-perf-queue
Jun 18, 2022
Merged

High(er) performance async Queue#2885
djspiewak merged 46 commits intotypelevel:series/3.xfrom
djspiewak:feature/high-perf-queue

Conversation

@djspiewak
Copy link
Copy Markdown
Member

@djspiewak djspiewak commented Mar 19, 2022

First off, one thing I discovered as part of this is that the existing Queue is really a lot faster than you probably think. But we can do better.

This relies on the runtime typecase trick on the typeclass to see if the GenConcurrent is secretly actually an Async. When it is, and when the bound is > 1, we transparently swap to this more efficient implementation. Benchmarks to follow, but it's somewhere between 2x and 10x faster, depending on the scenario and the number of physical threads. Implementation is based on jctools for the bounded part, and all credit is due to @viktorklang for the unbounded part. Bugs are probably my fault.

Lots of room to improve here! A non-exhaustive list:

  • We can support unbounded without too much trouble, just by building on a pair of UnboundedUnsafes
  • circular is also pretty easy with what we have
  • synchronous is going to require a brand new structure and a lot of thought. That's a very tricky case
  • Specialized versions for ScalaJS would technically be optimal
  • The jctools bounded queue does have support for a takeAll-like operation, meaning we can override tryTakeN and it will be much better than the naive version, which should in turn make Fs2 Channel vastly better
  • Right now, it starts failing if you enqueue more than Long.MaxValue. This is easily fixable, I was just lazy

An even more substantial optimization will be striping consumers. While this is an mpmc queue, the far-and-away most common scenario is mpsc, which would allow us to optimize it a lot. We can take advantage of that by striping sc queues so long as we don't care as much about fairness in that case, and since we know our consuming threads are bounded by the physical threads, we also know our stripes will be strictly bounded.

Anyway, lots of room for future excitement.

Closes #2771

@djspiewak
Copy link
Copy Markdown
Member Author

Auspicious…

Copy link
Copy Markdown
Contributor

@durban durban left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A general comment: none of these queues seem lock-free (which is not necessarily a problem, it's just an observation.)

// manually complete our own callback
// note that we could have a race condition here where we're already completed
// async will deduplicate these calls for us
// additionally, the continuation (below) is held until the registration completes
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ("the continuation is held") seems important here. Is this guaranteed by any Async?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is! And I agree it would be worth spelling it out more. We use this trick a lot though.

def bounded[F[_], A](capacity: Int)(implicit F: GenConcurrent[F, _]): F[Queue[F, A]] = {
def bounded[F[_], A](capacity: Int)(implicit F: GenConcurrent[F, _]): F[Queue[F, A]] =
F match {
case f0: Async[F] =>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not strictly related to this PR, but what's stopping someone implementing Cont from doing the same, and recovering an Async[G] from the MonadCancel[G, Throwable]?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing so and then setting up weird forking scenarios would result in runtime errors. So, caveat emptor.

def offer(a: A): F[Unit] = F defer {
try {
// attempt to put into the buffer; if the buffer is full, it will raise an exception
buffer.put(a)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since offer and take both first try to use buffer I think it can happen that a waiter is bypassed. For example, initially the queue is empty, and there is one taker waiting. Then, offer and take are racing, and take immediately removes the item inserted by offer, thus bypassing the waiter, which was earlier than take. (Although this whole thing might be unobservable. It seems a "fairness" issue, and not necessarily a "semantics" issue.)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're correct! There are actually a couple cases like this. Strict fairness is not guaranteed in highly contended races.

@djspiewak
Copy link
Copy Markdown
Member Author

They're definitely all lock free. They aren't contention free though.

@durban
Copy link
Copy Markdown
Contributor

durban commented Mar 21, 2022

Regarding lock freedom:

Let's look at UnsafeUnbounded: in take an interesting case is when taken is null, but last.get() ne null. In this case what it does is: take() // Waiting for prevLast.set(cell), so recurse. It waits on another thread (put), by spinning. This is... well, it's a spinlock. Acquired by put with last.getAndSet(cell), and released either with first.set(cell) or with prevLast.set(cell). And take is waiting (by spinning) on this lock with the recursive call.

There is another similar case (spinwaiting on another thread) in take after the long comment.

In UnsafeBounded there is something similar going on (if I understand correctly): put acquires a spinlock with tail.compareAndSet(currentTail, currentTail + 1), writes the data into buffer while holding the lock, then releases it with sequenceBuffer.incrementAndGet(project(currentTail)). take waits on this lock by spinning until seq == currentHead + 1.

@viktorklang
Copy link
Copy Markdown
Contributor

viktorklang commented Mar 22, 2022 via email

@djspiewak
Copy link
Copy Markdown
Member Author

Depends on #3000

@djspiewak
Copy link
Copy Markdown
Member Author

[info] Benchmark                                                  (size)   Mode  Cnt      Score     Error    Units
[info] QueueBenchmark.boundedAsyncEnqueueDequeueContended         100000  thrpt   10  29510.955 ± 206.937  ops/min
[info] QueueBenchmark.boundedAsyncEnqueueDequeueMany              100000  thrpt   10   6105.196 ±  27.218  ops/min
[info] QueueBenchmark.boundedAsyncEnqueueDequeueOne               100000  thrpt   10   6261.579 ±   5.845  ops/min
[info] QueueBenchmark.boundedConcurrentEnqueueDequeueContended    100000  thrpt   10  10060.914 ±  44.569  ops/min
[info] QueueBenchmark.boundedConcurrentEnqueueDequeueMany         100000  thrpt   10   4157.941 ±   7.719  ops/min
[info] QueueBenchmark.boundedConcurrentEnqueueDequeueOne          100000  thrpt   10   4278.601 ±  16.290  ops/min
[info] QueueBenchmark.unboundedAsyncEnqueueDequeueContended       100000  thrpt   10  30820.903 ±  93.492  ops/min
[info] QueueBenchmark.unboundedAsyncEnqueueDequeueMany            100000  thrpt   10  16245.842 ±  47.942  ops/min
[info] QueueBenchmark.unboundedAsyncEnqueueDequeueOne             100000  thrpt   10  16594.351 ± 126.705  ops/min
[info] QueueBenchmark.unboundedConcurrentEnqueueDequeueContended  100000  thrpt   10  10073.977 ±  76.811  ops/min
[info] QueueBenchmark.unboundedConcurrentEnqueueDequeueMany       100000  thrpt   10   4155.995 ±   5.937  ops/min
[info] QueueBenchmark.unboundedConcurrentEnqueueDequeueOne        100000  thrpt   10   4274.642 ±   6.468  ops/min

Plenty more room to improve, but I'll take a 2-4x as a starting point.

@djspiewak djspiewak merged commit d04b2e8 into typelevel:series/3.x Jun 18, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

High-performance Queue implementation

3 participants