-
-
Notifications
You must be signed in to change notification settings - Fork 629
Stream.fromBlockingIterator is never interruptible #2903
Copy link
Copy link
Closed
Labels
Description
fs2 version 3.2.7 (but this should affect all recent versions prior also).
There is no tweak to create an interruptible stream from a scala iterator. Stream.fromBlockingIterator is not sufficient:
import cats.effect._
import cats.effect.unsafe.implicits.global
import scala.concurrent.duration._
val iter = Iterator.continually {
Thread.sleep(10_000)
System.currentTimeMillis()
}
val stream = fs2.Stream.fromBlockingIterator[IO](iter, 1)
val res = for {
startTime <- IO.monotonic
runStream <- stream.compile.count.start
_ <- IO.sleep(1.second)
_ <- runStream.cancel
stopTime <- IO.monotonic
} yield stopTime - startTime
// prints 10 seconds, should print closer to 1
println(res.unsafeRunSync())I guess the fix would be to allow to create an interruptible iterator, but I would assume that lowers performance.
The real example where I'm hitting this is interop with JDBC and a long-running query that needs to be cancellable before even the first iterator result is received.
Reactions are currently unavailable