Skip to content

Commit d6817ff

Browse files
authored
Merge pull request #2239 from djspiewak/bug/short-circuit-par
2 parents 6e45e80 + 29a9f46 commit d6817ff

File tree

3 files changed

+124
-23
lines changed

3 files changed

+124
-23
lines changed

kernel/shared/src/main/scala/cats/effect/kernel/instances/GenSpawnInstances.scala

Lines changed: 90 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -58,18 +58,57 @@ trait GenSpawnInstances {
5858
fiberA <- F.start(ParallelF.value(fa))
5959
fiberB <- F.start(ParallelF.value(fb))
6060

61+
// start a pair of supervisors to ensure that the opposite is canceled on error
62+
_ <- F start {
63+
fiberB.join flatMap {
64+
case Outcome.Succeeded(_) => F.unit
65+
case _ => fiberA.cancel
66+
}
67+
}
68+
69+
_ <- F start {
70+
fiberA.join flatMap {
71+
case Outcome.Succeeded(_) => F.unit
72+
case _ => fiberB.cancel
73+
}
74+
}
75+
6176
a <- F
6277
.onCancel(poll(fiberA.join), F.both(fiberA.cancel, fiberB.cancel).void)
6378
.flatMap[A] {
64-
case Outcome.Succeeded(fa) => fa
65-
case Outcome.Errored(e) => fiberB.cancel *> F.raiseError(e)
66-
case Outcome.Canceled() => fiberB.cancel *> poll(F.canceled *> F.never)
79+
case Outcome.Succeeded(fa) =>
80+
fa
81+
82+
case Outcome.Errored(e) =>
83+
fiberB.cancel *> F.raiseError(e)
84+
85+
case Outcome.Canceled() =>
86+
fiberB.cancel *> poll {
87+
fiberB.join flatMap {
88+
case Outcome.Succeeded(_) | Outcome.Canceled() =>
89+
F.canceled *> F.never
90+
case Outcome.Errored(e) =>
91+
F.raiseError(e)
92+
}
93+
}
6794
}
6895

6996
z <- F.onCancel(poll(fiberB.join), fiberB.cancel).flatMap[Z] {
70-
case Outcome.Succeeded(fb) => fb.map(b => f(a, b))
71-
case Outcome.Errored(e) => F.raiseError(e)
72-
case Outcome.Canceled() => poll(F.canceled *> F.never)
97+
case Outcome.Succeeded(fb) =>
98+
fb.map(b => f(a, b))
99+
100+
case Outcome.Errored(e) =>
101+
F.raiseError(e)
102+
103+
case Outcome.Canceled() =>
104+
poll {
105+
fiberA.join flatMap {
106+
case Outcome.Succeeded(_) | Outcome.Canceled() =>
107+
F.canceled *> F.never
108+
case Outcome.Errored(e) =>
109+
F.raiseError(e)
110+
}
111+
}
73112
}
74113
} yield z
75114
}
@@ -84,18 +123,57 @@ trait GenSpawnInstances {
84123
fiberA <- F.start(ParallelF.value(fa))
85124
fiberB <- F.start(ParallelF.value(fb.value))
86125

126+
// start a pair of supervisors to ensure that the opposite is canceled on error
127+
_ <- F start {
128+
fiberB.join flatMap {
129+
case Outcome.Succeeded(_) => F.unit
130+
case _ => fiberA.cancel
131+
}
132+
}
133+
134+
_ <- F start {
135+
fiberA.join flatMap {
136+
case Outcome.Succeeded(_) => F.unit
137+
case _ => fiberB.cancel
138+
}
139+
}
140+
87141
a <- F
88142
.onCancel(poll(fiberA.join), F.both(fiberA.cancel, fiberB.cancel).void)
89143
.flatMap[A] {
90-
case Outcome.Succeeded(fa) => fa
91-
case Outcome.Errored(e) => fiberB.cancel *> F.raiseError(e)
92-
case Outcome.Canceled() => fiberB.cancel *> poll(F.canceled *> F.never)
144+
case Outcome.Succeeded(fa) =>
145+
fa
146+
147+
case Outcome.Errored(e) =>
148+
fiberB.cancel *> F.raiseError(e)
149+
150+
case Outcome.Canceled() =>
151+
fiberB.cancel *> poll {
152+
fiberB.join flatMap {
153+
case Outcome.Succeeded(_) | Outcome.Canceled() =>
154+
F.canceled *> F.never
155+
case Outcome.Errored(e) =>
156+
F.raiseError(e)
157+
}
158+
}
93159
}
94160

95161
z <- F.onCancel(poll(fiberB.join), fiberB.cancel).flatMap[Z] {
96-
case Outcome.Succeeded(fb) => fb.map(b => f(a, b))
97-
case Outcome.Errored(e) => F.raiseError(e)
98-
case Outcome.Canceled() => poll(F.canceled *> F.never)
162+
case Outcome.Succeeded(fb) =>
163+
fb.map(b => f(a, b))
164+
165+
case Outcome.Errored(e) =>
166+
F.raiseError(e)
167+
168+
case Outcome.Canceled() =>
169+
poll {
170+
fiberA.join flatMap {
171+
case Outcome.Succeeded(_) | Outcome.Canceled() =>
172+
F.canceled *> F.never
173+
case Outcome.Errored(e) =>
174+
F.raiseError(e)
175+
}
176+
}
99177
}
100178
} yield z
101179
}

laws/shared/src/test/scala/cats/effect/laws/PureConcSpec.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,20 @@ class PureConcSpec extends Specification with Discipline with BaseSpec {
3333
implicit def exec(fb: TimeT[PureConc[Int, *], Boolean]): Prop =
3434
Prop(pure.run(TimeT.run(fb)).fold(false, _ => false, _.getOrElse(false)))
3535

36+
"parallel utilities" should {
37+
import cats.effect.kernel.{GenConcurrent, Outcome}
38+
import cats.effect.kernel.implicits._
39+
import cats.syntax.all._
40+
41+
type F[A] = PureConc[Int, A]
42+
val F = GenConcurrent[F]
43+
44+
"short-circuit on error" in {
45+
pure.run((F.never[Unit], F.raiseError[Unit](42)).parTupled) mustEqual Outcome.Errored(42)
46+
pure.run((F.raiseError[Unit](42), F.never[Unit]).parTupled) mustEqual Outcome.Errored(42)
47+
}
48+
}
49+
3650
checkAll(
3751
"TimeT[PureConc]",
3852
GenTemporalTests[TimeT[PureConc[Int, *], *], Int].temporal[Int, Int, Int](10.millis)

tests/shared/src/test/scala/cats/effect/IOSpec.scala

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1065,6 +1065,26 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification {
10651065

10661066
}
10671067

1068+
"parallel" should {
1069+
"run parallel actually in parallel" in real {
1070+
val x = IO.sleep(2.seconds) >> IO.pure(1)
1071+
val y = IO.sleep(2.seconds) >> IO.pure(2)
1072+
1073+
List(x, y).parSequence.timeout(3.seconds).flatMap { res =>
1074+
IO {
1075+
res mustEqual List(1, 2)
1076+
}
1077+
}
1078+
}
1079+
1080+
"short-circuit on error" in ticked { implicit ticker =>
1081+
case object TestException extends RuntimeException
1082+
1083+
(IO.never[Unit], IO.raiseError[Unit](TestException)).parTupled.void must failAs(TestException)
1084+
(IO.raiseError[Unit](TestException), IO.never[Unit]).parTupled.void must failAs(TestException)
1085+
}
1086+
}
1087+
10681088
"miscellaneous" should {
10691089

10701090
"round trip non-canceled through s.c.Future" in ticked { implicit ticker =>
@@ -1081,17 +1101,6 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification {
10811101
test must completeAs(42)
10821102
}
10831103

1084-
"run parallel actually in parallel" in real {
1085-
val x = IO.sleep(2.seconds) >> IO.pure(1)
1086-
val y = IO.sleep(2.seconds) >> IO.pure(2)
1087-
1088-
List(x, y).parSequence.timeout(3.seconds).flatMap { res =>
1089-
IO {
1090-
res mustEqual List(1, 2)
1091-
}
1092-
}
1093-
}
1094-
10951104
"run a synchronous IO" in ticked { implicit ticker =>
10961105
val ioa = IO(1).map(_ + 2)
10971106
val test = IO.fromFuture(IO(ioa.unsafeToFuture()))

0 commit comments

Comments
 (0)