Skip to content

Commit 2aa1e8b

Browse files
authored
feat: Add CompletionStages util. (#2060) (#2069)
(cherry picked from commit db92fad)
1 parent e6d4dcf commit 2aa1e8b

File tree

6 files changed

+412
-9
lines changed

6 files changed

+412
-9
lines changed
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.pekko.dispatch;
19+
20+
import com.google.common.collect.Lists;
21+
import org.junit.Assert;
22+
import org.junit.Test;
23+
import scala.concurrent.Await;
24+
import scala.concurrent.Future;
25+
import scala.concurrent.duration.Duration;
26+
27+
import java.util.*;
28+
import java.util.concurrent.*;
29+
30+
public class CompletionStagesTests {
31+
private final Duration timeout = Duration.create(5, TimeUnit.SECONDS);
32+
33+
@Test
34+
@SuppressWarnings("deprecation")
35+
public void testAsScala() throws Exception {
36+
final CompletionStage<Integer> successCs = CompletableFuture.completedFuture(42);
37+
Future<Integer> scalaFuture = CompletionStages.asScala(successCs);
38+
Assert.assertEquals(42, Await.result(scalaFuture, timeout).intValue());
39+
//failed
40+
Assert.assertThrows("Simulated failure", RuntimeException.class, () -> {
41+
final CompletionStage<Integer> failedCs = Futures.failedCompletionStage(new RuntimeException(
42+
"Simulated failure"));
43+
Await.result(CompletionStages.asScala(failedCs), timeout);
44+
});
45+
}
46+
47+
@Test
48+
public void testFind() throws Exception {
49+
final List<CompletionStage<Integer>> stages = new ArrayList<>();
50+
for (int i = 0; i < 5; i++) {
51+
stages.add(CompletableFuture.completedFuture(i));
52+
}
53+
final CompletionStage<Optional<Integer>> found = CompletionStages.find(stages, i -> i == 3);
54+
Assert.assertEquals(Optional.of(3), found.toCompletableFuture().get(3, TimeUnit.SECONDS));
55+
final CompletionStage<Optional<Integer>> notFound = CompletionStages.find(stages, i -> i == 42);
56+
Assert.assertEquals(Optional.empty(), notFound.toCompletableFuture().get(3, TimeUnit.SECONDS));
57+
}
58+
59+
@Test
60+
public void testFirstCompletedOf() throws Exception {
61+
final List<CompletionStage<Integer>> stages = new ArrayList<>();
62+
for (int i = 0; i < 5; i++) {
63+
stages.add(new CompletableFuture<>());
64+
}
65+
ForkJoinPool.commonPool().submit(() -> {
66+
try {
67+
Thread.sleep(200);
68+
} catch (InterruptedException e) {
69+
//ignore
70+
}
71+
((CompletableFuture<Integer>) stages.get(3)).complete(42);
72+
});
73+
final CompletionStage<Integer> first = CompletionStages.firstCompletedOf(stages);
74+
Assert.assertEquals(42, first.toCompletableFuture().get(3, TimeUnit.SECONDS).intValue());
75+
}
76+
77+
@Test
78+
public void testFold() throws Exception {
79+
final List<CompletionStage<Integer>> stages = new ArrayList<>();
80+
for (int i = 1; i <= 5; i++) {
81+
stages.add(CompletableFuture.completedFuture(i));
82+
}
83+
final CompletionStage<Integer> folded = CompletionStages.fold(0, stages, Integer::sum);
84+
Assert.assertEquals(15, folded.toCompletableFuture().get(3, TimeUnit.SECONDS).intValue());
85+
}
86+
87+
@Test
88+
public void testReduce() throws Exception {
89+
final List<CompletionStage<Integer>> stages = new ArrayList<>();
90+
for (int i = 1; i <= 5; i++) {
91+
stages.add(CompletableFuture.completedFuture(i));
92+
}
93+
final CompletionStage<Integer> reduced = CompletionStages.reduce(stages, Integer::sum);
94+
Assert.assertEquals(15, reduced.toCompletableFuture().get(3, TimeUnit.SECONDS).intValue());
95+
//reduce empty list
96+
final List<CompletionStage<Integer>> empty = new ArrayList<>();
97+
final CompletionStage<Integer> reducedEmpty = CompletionStages.reduce(empty, Integer::sum);
98+
try {
99+
reducedEmpty.toCompletableFuture().get(3, TimeUnit.SECONDS);
100+
} catch (Exception e) {
101+
Assert.assertTrue(e.getCause() instanceof NoSuchElementException);
102+
Assert.assertEquals("reduce of an empty iterable of CompletionStages", e.getCause().getMessage());
103+
}
104+
}
105+
106+
@Test
107+
public void testSequence() throws Exception {
108+
testSequence0(null);
109+
testSequence0(ForkJoinPool.commonPool());
110+
}
111+
112+
private void testSequence0(final Executor executor) throws Exception {
113+
final List<CompletionStage<Integer>> stages = new ArrayList<>();
114+
for (int i = 1; i <= 5; i++) {
115+
stages.add(CompletableFuture.completedFuture(i));
116+
}
117+
final CompletionStage<List<Integer>> sequenced = CompletionStages.sequence(stages, executor);
118+
Assert.assertEquals(Lists.newArrayList(1, 2, 3, 4, 5),
119+
sequenced.toCompletableFuture().get(3, TimeUnit.SECONDS));
120+
//sequence empty list
121+
final List<CompletionStage<Integer>> empty = new ArrayList<>();
122+
final CompletionStage<List<Integer>> sequencedEmpty = CompletionStages.sequence(empty, executor);
123+
Assert.assertEquals(Collections.emptyList(), sequencedEmpty.toCompletableFuture().get(3, TimeUnit.SECONDS));
124+
}
125+
126+
@Test
127+
public void testTraverse() throws Exception {
128+
testTraverse0(null);
129+
testTraverse0(ForkJoinPool.commonPool());
130+
}
131+
132+
private void testTraverse0(final Executor executor) throws Exception {
133+
final List<Integer> values = Arrays.asList(1, 2, 3, 4, 5);
134+
final CompletionStage<List<Integer>> traversed = CompletionStages.traverse(values,
135+
CompletableFuture::completedFuture, executor);
136+
Assert.assertEquals(Lists.newArrayList(1, 2, 3, 4, 5),
137+
traversed.toCompletableFuture().get(3, TimeUnit.SECONDS));
138+
//traverse empty list
139+
final List<Integer> empty = Collections.emptyList();
140+
final CompletionStage<List<Integer>> traversedEmpty = CompletionStages.traverse(empty,
141+
CompletableFuture::completedFuture, executor);
142+
Assert.assertEquals(Collections.emptyList(), traversedEmpty.toCompletableFuture().get(3, TimeUnit.SECONDS));
143+
}
144+
145+
146+
}

actor-tests/src/test/java/org/apache/pekko/dispatch/JavaFutureTests.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.concurrent.Callable;
3131
import java.util.LinkedList;
3232
import java.lang.Iterable;
33+
import java.util.concurrent.CompletableFuture;
3334
import java.util.concurrent.CountDownLatch;
3435
import java.util.concurrent.TimeUnit;
3536
import static org.apache.pekko.japi.Util.classTag;
@@ -46,6 +47,13 @@ public class JavaFutureTests extends JUnitSuite {
4647
private final ActorSystem system = actorSystemResource.getSystem();
4748
private final Duration timeout = Duration.create(5, TimeUnit.SECONDS);
4849

50+
@Test
51+
public void mustBeAbleToCreateAJavaCompletionStage() throws Exception {
52+
Future<Integer> f = Futures.successful(42);
53+
CompletableFuture<Integer> cs = Futures.asJava(f).toCompletableFuture();
54+
assertEquals(42, cs.get(3, TimeUnit.SECONDS).intValue());
55+
}
56+
4957
@Test
5058
public void mustBeAbleToMapAFuture() throws Exception {
5159

@@ -192,6 +200,7 @@ public Boolean apply(String r) {
192200

193201
// TODO: Improve this test, perhaps with an Actor
194202
@Test
203+
@SuppressWarnings("deprecation")
195204
public void mustSequenceAFutureList() throws Exception {
196205
LinkedList<Future<String>> listFutures = new LinkedList<>();
197206
LinkedList<String> listExpected = new LinkedList<>();
@@ -215,6 +224,7 @@ public String call() {
215224

216225
// TODO: Improve this test, perhaps with an Actor
217226
@Test
227+
@SuppressWarnings("deprecation")
218228
public void foldForJavaApiMustWork() throws Exception {
219229
LinkedList<Future<String>> listFutures = new LinkedList<>();
220230
StringBuilder expected = new StringBuilder();
@@ -246,6 +256,7 @@ public String apply(String r, String t) {
246256
}
247257

248258
@Test
259+
@SuppressWarnings("deprecation")
249260
public void reduceForJavaApiMustWork() throws Exception {
250261
LinkedList<Future<String>> listFutures = new LinkedList<>();
251262
StringBuilder expected = new StringBuilder();
@@ -276,6 +287,7 @@ public String apply(String r, String t) {
276287
}
277288

278289
@Test
290+
@SuppressWarnings("deprecation")
279291
public void traverseForJavaApiMustWork() throws Exception {
280292
LinkedList<String> listStrings = new LinkedList<>();
281293
LinkedList<String> expectedStrings = new LinkedList<>();
@@ -305,6 +317,7 @@ public String call() {
305317
}
306318

307319
@Test
320+
@SuppressWarnings("deprecation")
308321
public void findForJavaApiMustWork() throws Exception {
309322
LinkedList<Future<Integer>> listFutures = new LinkedList<>();
310323
for (int i = 0; i < 10; i++) {

actor-tests/src/test/java/org/apache/pekko/pattern/PatternsTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,7 @@ public void testCSRetry() throws Exception {
319319
}
320320

321321
@Test(expected = IllegalStateException.class)
322+
@SuppressWarnings("deprecation")
322323
public void testAfterFailedCallable() throws Exception {
323324
Callable<Future<String>> failedCallable =
324325
() -> Futures.failed(new IllegalStateException("Illegal!"));
@@ -335,6 +336,7 @@ public void testAfterFailedCallable() throws Exception {
335336
}
336337

337338
@Test(expected = IllegalStateException.class)
339+
@SuppressWarnings("deprecation")
338340
public void testAfterFailedFuture() throws Exception {
339341

340342
Future<String> delayedFuture =
@@ -349,6 +351,7 @@ public void testAfterFailedFuture() throws Exception {
349351
}
350352

351353
@Test
354+
@SuppressWarnings("deprecation")
352355
public void testAfterSuccessfulCallable() throws Exception {
353356
final String expected = "Hello";
354357

@@ -366,6 +369,7 @@ public void testAfterSuccessfulCallable() throws Exception {
366369
}
367370

368371
@Test
372+
@SuppressWarnings("deprecation")
369373
public void testAfterSuccessfulFuture() throws Exception {
370374
final String expected = "Hello";
371375

@@ -383,6 +387,7 @@ public void testAfterSuccessfulFuture() throws Exception {
383387
}
384388

385389
@Test
390+
@SuppressWarnings("deprecation")
386391
public void testAfterFiniteDuration() throws Exception {
387392
final String expected = "Hello";
388393

0 commit comments

Comments
 (0)