Skip to content

Commit f0ac9a7

Browse files
committed
Pull-Compile- Run trait: remove end parameter
1 parent 11691a5 commit f0ac9a7

File tree

1 file changed

+43
-46
lines changed

1 file changed

+43
-46
lines changed

core/shared/src/main/scala/fs2/Pull.scala

Lines changed: 43 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -904,22 +904,22 @@ object Pull extends PullLowPriority {
904904

905905
}
906906

907-
trait Run[-G[_], -X, +End] {
908-
def done(scope: Scope[F]): End
909-
def out(head: Chunk[X], scope: Scope[F], tail: Pull[G, X, Unit]): End
910-
def interrupted(inter: Interrupted): End
911-
def fail(e: Throwable): End
907+
trait Run[-G[_], -X] {
908+
def done(scope: Scope[F]): F[B]
909+
def out(head: Chunk[X], scope: Scope[F], tail: Pull[G, X, Unit]): F[B]
910+
def interrupted(inter: Interrupted): F[B]
911+
def fail(e: Throwable): F[B]
912912
}
913913

914-
def go[G[_], X, End](
914+
def go[G[_], X](
915915
scope: Scope[F],
916916
extendedTopLevelScope: Option[Scope[F]],
917917
translation: G ~> F,
918-
runner: Run[G, X, F[End]],
918+
runner: Run[G, X],
919919
stream: Pull[G, X, Unit]
920-
): F[End] = {
920+
): F[B] = {
921921

922-
def interruptGuard(scope: Scope[F], view: Cont[INothing, G, X])(next: => F[End]): F[End] =
922+
def interruptGuard(scope: Scope[F], view: Cont[INothing, G, X])(next: => F[B]): F[B] =
923923
scope.isInterrupted.flatMap {
924924
case None => next
925925
case Some(outcome) =>
@@ -931,18 +931,18 @@ object Pull extends PullLowPriority {
931931
go(scope, extendedTopLevelScope, translation, runner, view(result))
932932
}
933933

934-
def goErr(err: Throwable, view: Cont[Nothing, G, X]): F[End] =
934+
def goErr(err: Throwable, view: Cont[Nothing, G, X]): F[B] =
935935
go(scope, extendedTopLevelScope, translation, runner, view(Fail(err)))
936936

937-
class ViewRunner(val view: Cont[Unit, G, X]) extends Run[G, X, F[End]] {
937+
class ViewRunner(val view: Cont[Unit, G, X]) extends Run[G, X] {
938938
private val prevRunner = runner
939939

940-
def done(doneScope: Scope[F]): F[End] =
940+
def done(doneScope: Scope[F]): F[B] =
941941
go(doneScope, extendedTopLevelScope, translation, prevRunner, view(unit))
942942

943-
def out(head: Chunk[X], scope: Scope[F], tail: Pull[G, X, Unit]): F[End] = {
943+
def out(head: Chunk[X], scope: Scope[F], tail: Pull[G, X, Unit]): F[B] = {
944944
@tailrec
945-
def outLoop(acc: Pull[G, X, Unit], pred: Run[G, X, F[End]]): F[End] =
945+
def outLoop(acc: Pull[G, X, Unit], pred: Run[G, X]): F[B] =
946946
// bit of an ugly hack to avoid a stack overflow when these accummulate
947947
pred match {
948948
case vrun: ViewRunner => outLoop(bindView(acc, vrun.view), vrun.prevRunner)
@@ -951,40 +951,40 @@ object Pull extends PullLowPriority {
951951
outLoop(tail, this)
952952
}
953953

954-
def interrupted(inter: Interrupted): F[End] =
954+
def interrupted(inter: Interrupted): F[B] =
955955
go(scope, extendedTopLevelScope, translation, prevRunner, view(inter))
956956

957-
def fail(e: Throwable): F[End] = goErr(e, view)
957+
def fail(e: Throwable): F[B] = goErr(e, view)
958958
}
959959

960-
class TranslateRunner[H[_]](fk: H ~> G, view: Cont[Unit, G, X]) extends Run[H, X, F[End]] {
961-
def done(doneScope: Scope[F]): F[End] =
960+
class TranslateRunner[H[_]](fk: H ~> G, view: Cont[Unit, G, X]) extends Run[H, X] {
961+
def done(doneScope: Scope[F]): F[B] =
962962
go(doneScope, extendedTopLevelScope, translation, runner, view(unit))
963-
def out(head: Chunk[X], scope: Scope[F], tail: Pull[H, X, Unit]): F[End] = {
963+
def out(head: Chunk[X], scope: Scope[F], tail: Pull[H, X, Unit]): F[B] = {
964964
val next = bindView(Translate(tail, fk), view)
965965
runner.out(head, scope, next)
966966
}
967-
def interrupted(inter: Interrupted): F[End] =
967+
def interrupted(inter: Interrupted): F[B] =
968968
go(scope, extendedTopLevelScope, translation, runner, view(inter))
969-
def fail(e: Throwable): F[End] = runner.fail(e)
969+
def fail(e: Throwable): F[B] = runner.fail(e)
970970
}
971971

972-
abstract class StepRunR[Y, S](view: Cont[Option[S], G, X]) extends Run[G, Y, F[End]] {
973-
def done(scope: Scope[F]): F[End] =
972+
abstract class StepRunR[Y, S](view: Cont[Option[S], G, X]) extends Run[G, Y] {
973+
def done(scope: Scope[F]): F[B] =
974974
interruptGuard(scope, view) {
975975
go(scope, extendedTopLevelScope, translation, runner, view(Succeeded(None)))
976976
}
977977

978-
def interrupted(inter: Interrupted): F[End] =
978+
def interrupted(inter: Interrupted): F[B] =
979979
go(scope, extendedTopLevelScope, translation, runner, view(inter))
980980

981-
def fail(e: Throwable): F[End] = goErr(e, view)
981+
def fail(e: Throwable): F[B] = goErr(e, view)
982982
}
983983

984984
class UnconsRunR[Y](view: Cont[Option[(Chunk[Y], Pull[G, Y, Unit])], G, X])
985985
extends StepRunR[Y, (Chunk[Y], Pull[G, Y, Unit])](view) {
986986

987-
def out(head: Chunk[Y], outScope: Scope[F], tail: Pull[G, Y, Unit]): F[End] =
987+
def out(head: Chunk[Y], outScope: Scope[F], tail: Pull[G, Y, Unit]): F[B] =
988988
// For a Uncons, we continue in same Scope at which we ended compilation of inner stream
989989
interruptGuard(outScope, view) {
990990
val result = Succeeded(Some((head, tail)))
@@ -995,7 +995,7 @@ object Pull extends PullLowPriority {
995995
class StepLegRunR[Y](view: Cont[Option[Stream.StepLeg[G, Y]], G, X])
996996
extends StepRunR[Y, Stream.StepLeg[G, Y]](view) {
997997

998-
def out(head: Chunk[Y], outScope: Scope[F], tail: Pull[G, Y, Unit]): F[End] =
998+
def out(head: Chunk[Y], outScope: Scope[F], tail: Pull[G, Y, Unit]): F[B] =
999999
// StepLeg: we shift back to the scope at which we were
10001000
// before we started to interpret the Leg's inner stream.
10011001
interruptGuard(scope, view) {
@@ -1004,8 +1004,7 @@ object Pull extends PullLowPriority {
10041004
}
10051005
}
10061006

1007-
class FlatMapR[Y](view: Cont[Unit, G, X], fun: Y => Pull[G, X, Unit])
1008-
extends Run[G, Y, F[End]] {
1007+
class FlatMapR[Y](view: Cont[Unit, G, X], fun: Y => Pull[G, X, Unit]) extends Run[G, Y] {
10091008
private[this] def unconsed(chunk: Chunk[Y], tail: Pull[G, Y, Unit]): Pull[G, X, Unit] =
10101009
if (chunk.size == 1 && tail.isInstanceOf[Succeeded[_]])
10111010
// nb: If tl is Pure, there's no need to propagate flatMap through the tail. Hence, we
@@ -1031,23 +1030,23 @@ object Pull extends PullLowPriority {
10311030
go(0)
10321031
}
10331032

1034-
def done(scope: Scope[F]): F[End] =
1033+
def done(scope: Scope[F]): F[B] =
10351034
interruptGuard(scope, view) {
10361035
go(scope, extendedTopLevelScope, translation, runner, view(unit))
10371036
}
10381037

1039-
def out(head: Chunk[Y], outScope: Scope[F], tail: Pull[G, Y, Unit]): F[End] = {
1038+
def out(head: Chunk[Y], outScope: Scope[F], tail: Pull[G, Y, Unit]): F[B] = {
10401039
val next = bindView(unconsed(head, tail), view)
10411040
go(outScope, extendedTopLevelScope, translation, runner, next)
10421041
}
10431042

1044-
def interrupted(inter: Interrupted): F[End] =
1043+
def interrupted(inter: Interrupted): F[B] =
10451044
go(scope, extendedTopLevelScope, translation, runner, view(inter))
10461045

1047-
def fail(e: Throwable): F[End] = goErr(e, view)
1046+
def fail(e: Throwable): F[B] = goErr(e, view)
10481047
}
10491048

1050-
def goEval[V](eval: Eval[G, V], view: Cont[V, G, X]): F[End] =
1049+
def goEval[V](eval: Eval[G, V], view: Cont[V, G, X]): F[B] =
10511050
scope.interruptibleEval(translation(eval.value)).flatMap { eitherOutcome =>
10521051
val result = eitherOutcome match {
10531052
case Right(r) => Succeeded(r)
@@ -1058,7 +1057,7 @@ object Pull extends PullLowPriority {
10581057
go(scope, extendedTopLevelScope, translation, runner, view(result))
10591058
}
10601059

1061-
def goAcquire[R](acquire: Acquire[G, R], view: Cont[R, G, X]): F[End] = {
1060+
def goAcquire[R](acquire: Acquire[G, R], view: Cont[R, G, X]): F[B] = {
10621061
val onScope = scope.acquireResource[R](
10631062
poll =>
10641063
if (acquire.cancelable) poll(translation(acquire.resource))
@@ -1080,7 +1079,7 @@ object Pull extends PullLowPriority {
10801079
def goInterruptWhen(
10811080
haltOnSignal: F[Either[Throwable, Unit]],
10821081
view: Cont[Unit, G, X]
1083-
): F[End] = {
1082+
): F[B] = {
10841083
val onScope = scope.acquireResource(
10851084
_ => scope.interruptWhen(haltOnSignal),
10861085
(f: Fiber[F, Throwable, Unit], _: ExitCase) => f.cancel
@@ -1101,7 +1100,7 @@ object Pull extends PullLowPriority {
11011100
stream: Pull[G, X, Unit],
11021101
useInterruption: Boolean,
11031102
view: Cont[Unit, G, X]
1104-
): F[End] = {
1103+
): F[B] = {
11051104
def endScope(scopeId: Unique.Token, result: Terminal[Unit]): Pull[G, X, Unit] =
11061105
result match {
11071106
case Succeeded(_) => SucceedScope(scopeId)
@@ -1128,7 +1127,7 @@ object Pull extends PullLowPriority {
11281127
interruptGuard(scope, view)(tail)
11291128
}
11301129

1131-
def goCloseScope(close: CloseScope, view: Cont[Unit, G, X]): F[End] = {
1130+
def goCloseScope(close: CloseScope, view: Cont[Unit, G, X]): F[B] = {
11321131
def addError(err: Throwable, res: Terminal[Unit]): Terminal[Unit] = res match {
11331132
case Succeeded(_) => Fail(err)
11341133
case Fail(err0) => Fail(CompositeFailure(err, err0, Nil))
@@ -1188,9 +1187,9 @@ object Pull extends PullLowPriority {
11881187

11891188
(viewL(stream): @unchecked) match { // unchecked b/c scala 3 erroneously reports exhaustiveness warning
11901189
case tst: Translate[h, G, _] @unchecked => // y = Unit
1191-
val translateRunner: Run[h, X, F[End]] = new TranslateRunner(tst.fk, getCont[Unit, G, X])
1190+
val translateRunner: Run[h, X] = new TranslateRunner(tst.fk, getCont[Unit, G, X])
11921191
val composed: h ~> F = translation.compose[h](tst.fk)
1193-
go[h, X, End](scope, extendedTopLevelScope, composed, translateRunner, tst.stream)
1192+
go[h, X](scope, extendedTopLevelScope, composed, translateRunner, tst.stream)
11941193

11951194
case output: Output[_] =>
11961195
val view = getCont[Unit, G, X]
@@ -1231,7 +1230,7 @@ object Pull extends PullLowPriority {
12311230

12321231
val initFk: F ~> F = cats.arrow.FunctionK.id[F]
12331232

1234-
class OuterRun(initB: B) extends Run[F, O, F[B]] { self =>
1233+
class OuterRun(initB: B) extends Run[F, O] { self =>
12351234
private[this] var accB: B = initB
12361235

12371236
override def done(scope: Scope[F]): F[B] = F.pure(accB)
@@ -1244,21 +1243,19 @@ object Pull extends PullLowPriority {
12441243
override def out(head: Chunk[O], scope: Scope[F], tail: Pull[F, O, Unit]): F[B] =
12451244
try {
12461245
accB = foldChunk(accB, head)
1247-
go[F, O, B](scope, None, initFk, self, tail)
1246+
go(scope, None, initFk, self, tail)
12481247
} catch {
12491248
case NonFatal(e) =>
12501249
viewL(tail) match {
1251-
case _: Action[F, O, _] =>
1252-
val v = contP.asInstanceOf[ContP[Unit, F, O, Unit]]
1253-
go[F, O, B](scope, None, initFk, self, v(Fail(e)))
1250+
case _: Action[F, O, _] => go(scope, None, initFk, self, getCont(Fail(e)))
12541251
case Succeeded(_) => F.raiseError(e)
12551252
case Fail(e2) => F.raiseError(CompositeFailure(e2, e))
12561253
case Interrupted(_, err) => F.raiseError(err.fold(e)(t => CompositeFailure(e, t)))
12571254
}
12581255
}
12591256
}
12601257

1261-
go[F, O, B](initScope, None, initFk, new OuterRun(init), stream)
1258+
go[F, O](initScope, None, initFk, new OuterRun(init), stream)
12621259
}
12631260

12641261
private[fs2] def flatMapOutput[F[_], F2[x] >: F[x], O, O2](

0 commit comments

Comments
 (0)