Skip to content

Harden queue cancelation semantics#3000

Merged
djspiewak merged 12 commits intotypelevel:series/3.xfrom
djspiewak:feature/hardened-queue
Jun 17, 2022
Merged

Harden queue cancelation semantics#3000
djspiewak merged 12 commits intotypelevel:series/3.xfrom
djspiewak:feature/hardened-queue

Conversation

@djspiewak
Copy link
Copy Markdown
Member

This revises Queue to ensure that the values are always passed through the backing value queue, rather than being passed directly to the taker. This ensures that values can never be "lost" even if take is canceled. Note that I would really like to iterate the unit test a lot more times, but it starts timing out rather quickly.

Closes #2920

@djspiewak
Copy link
Copy Markdown
Member Author

Oh interestingly… I think this test isn't really demonstrating the issue, since Dequeue hasn't been fixed yet and still exhibits the issue, but the test passes.

@djspiewak djspiewak marked this pull request as draft May 22, 2022 00:24
@djspiewak djspiewak marked this pull request as ready for review May 29, 2022 18:19
@djspiewak
Copy link
Copy Markdown
Member Author

Confirmed by reverting that this new test does indeed reproduce the non-atomic take semantics. As it turns out, the non-BoundedQueues are already hardened.

@djspiewak djspiewak added this to the v3.4.0 milestone May 29, 2022
@durban
Copy link
Copy Markdown
Contributor

durban commented May 30, 2022

I think there is a chance of lost wakeups here: offer can wake up a take fiber which already have been cancelled; this causes other takers not to be waked. I wrote a test which (I think) demonstrates this:

import scala.concurrent.duration._

import munit.FunSuite

import cats.effect.IO
import cats.effect.std.Queue
import cats.effect.unsafe.IORuntime

final class CatsQueueTest2 extends FunSuite {

  @volatile
  private[this] var taken1: String = null

  test("Cancel") {
    val tsk = for {
      q <- Queue.bounded[IO, String](capacity = 64)
      take1 <- IO.uncancelable { poll =>
        poll(q.take).flatTap { taken =>
          IO { this.taken1 = taken }
        }
      }.start
      take2 <- q.take.start
      _ <- IO.sleep(500.millis)
      _ <- IO.println("Starting race...")
      _ <- IO.both(q.offer("foo"), take1.cancel)
      _ <- IO.println("End race.")
      t1 <- IO { this.taken1 }
      _ <- if (t1 ne null) {
        IO.println("Too late") <* take2.cancel
      } else {
        IO.println("OK, joining...") <* take2.join
      }
    } yield ()
    (tsk.replicateA_(16)).unsafeRunSync()(IORuntime.global)
  }
}

This test occasionally deadlocks. I honestly don't know, which is worse: deadlocks like this, or losing elements (which is fixed by this PR).

@djspiewak
Copy link
Copy Markdown
Member Author

I think there is a chance of lost wakeups here: offer can wake up a take fiber which already have been cancelled; this causes other takers not to be waked

Nice catch. I actually have a solve for this in the Async version which I think can be ported to the Concurrent one fairly easily.

@djspiewak djspiewak merged commit 5dbfd01 into typelevel:series/3.x Jun 17, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Harden Queue#take's cancelation properties

2 participants