Harden queue cancelation semantics#3000
Conversation
|
Oh interestingly… I think this test isn't really demonstrating the issue, since |
|
Confirmed by reverting that this new test does indeed reproduce the non-atomic |
|
I think there is a chance of lost wakeups here: import scala.concurrent.duration._
import munit.FunSuite
import cats.effect.IO
import cats.effect.std.Queue
import cats.effect.unsafe.IORuntime
final class CatsQueueTest2 extends FunSuite {
@volatile
private[this] var taken1: String = null
test("Cancel") {
val tsk = for {
q <- Queue.bounded[IO, String](capacity = 64)
take1 <- IO.uncancelable { poll =>
poll(q.take).flatTap { taken =>
IO { this.taken1 = taken }
}
}.start
take2 <- q.take.start
_ <- IO.sleep(500.millis)
_ <- IO.println("Starting race...")
_ <- IO.both(q.offer("foo"), take1.cancel)
_ <- IO.println("End race.")
t1 <- IO { this.taken1 }
_ <- if (t1 ne null) {
IO.println("Too late") <* take2.cancel
} else {
IO.println("OK, joining...") <* take2.join
}
} yield ()
(tsk.replicateA_(16)).unsafeRunSync()(IORuntime.global)
}
}This test occasionally deadlocks. I honestly don't know, which is worse: deadlocks like this, or losing elements (which is fixed by this PR). |
Nice catch. I actually have a solve for this in the |
This revises
Queueto ensure that the values are always passed through the backing value queue, rather than being passed directly to the taker. This ensures that values can never be "lost" even iftakeis canceled. Note that I would really like to iterate the unit test a lot more times, but it starts timing out rather quickly.Closes #2920