Skip to content

Stream.fromBlockingIterator is never interruptible #2903

@vlushn

Description

@vlushn

fs2 version 3.2.7 (but this should affect all recent versions prior also).

There is no tweak to create an interruptible stream from a scala iterator. Stream.fromBlockingIterator is not sufficient:

import cats.effect._
import cats.effect.unsafe.implicits.global
import scala.concurrent.duration._

val iter = Iterator.continually {
  Thread.sleep(10_000)
  System.currentTimeMillis()
}

val stream = fs2.Stream.fromBlockingIterator[IO](iter, 1)

val res = for {
  startTime <- IO.monotonic
  runStream <- stream.compile.count.start
  _ <- IO.sleep(1.second)
  _ <- runStream.cancel
  stopTime <- IO.monotonic
} yield stopTime - startTime

// prints 10 seconds, should print closer to 1
println(res.unsafeRunSync())

scastie

I guess the fix would be to allow to create an interruptible iterator, but I would assume that lowers performance.

The real example where I'm hitting this is interop with JDBC and a long-running query that needs to be cancellable before even the first iterator result is received.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions