Skip to content

Commit 94562e9

Browse files
authored
[ISSUE #3267] Refactor RequestContext (#3268)
* [ISSUE #3267] Refactor RequestContext * polish code
1 parent 7ec90de commit 94562e9

File tree

2 files changed

+32
-31
lines changed

2 files changed

+32
-31
lines changed

eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/RequestContext.java

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,23 @@
1919

2020
import org.apache.eventmesh.common.protocol.tcp.Package;
2121

22-
import java.util.concurrent.CountDownLatch;
22+
import java.util.concurrent.CompletableFuture;
23+
import java.util.concurrent.ExecutionException;
24+
import java.util.concurrent.TimeUnit;
25+
import java.util.concurrent.TimeoutException;
2326

2427
import lombok.extern.slf4j.Slf4j;
2528

2629
@Slf4j
2730
public class RequestContext {
2831

29-
private transient Object key;
30-
private transient Package request;
31-
private transient Package response;
32-
private transient CountDownLatch latch;
32+
private Object key;
33+
private Package request;
34+
private final CompletableFuture<Package> future = new CompletableFuture<>();
3335

34-
public RequestContext(final Object key, final Package request, final CountDownLatch latch) {
36+
public RequestContext(final Object key, final Package request) {
3537
this.key = key;
3638
this.request = request;
37-
this.latch = latch;
3839
}
3940

4041
public Object getKey() {
@@ -53,33 +54,28 @@ public void setRequest(final Package request) {
5354
this.request = request;
5455
}
5556

56-
public Package getResponse() {
57-
return response;
57+
public CompletableFuture<Package> future() {
58+
return this.future;
5859
}
5960

60-
public void setResponse(final Package response) {
61-
this.response = response;
61+
public Package getResponse(long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException {
62+
return this.future.get(timeout, timeUnit);
6263
}
6364

64-
public CountDownLatch getLatch() {
65-
return latch;
66-
}
67-
68-
public void setLatch(final CountDownLatch latch) {
69-
this.latch = latch;
65+
public Package getResponse(long timeout) throws ExecutionException, InterruptedException, TimeoutException {
66+
return this.future.get(timeout, TimeUnit.MILLISECONDS);
7067
}
7168

7269
public void finish(final Package msg) {
73-
this.response = msg;
74-
latch.countDown();
70+
this.future.complete(msg);
7571
}
7672

77-
public static RequestContext context(final Object key, final Package request, final CountDownLatch latch) throws Exception {
78-
final RequestContext c = new RequestContext(key, request, latch);
73+
public static RequestContext context(final Object key, final Package request) throws Exception {
74+
final RequestContext context = new RequestContext(key, request);
7975
if (log.isInfoEnabled()) {
8076
log.info("_RequestContext|create|key={}", key);
8177
}
82-
return c;
78+
return context;
8379
}
8480

8581

eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,14 @@
2828
import java.io.Closeable;
2929
import java.net.InetSocketAddress;
3030
import java.util.Random;
31+
import java.util.concurrent.CompletableFuture;
3132
import java.util.concurrent.ConcurrentHashMap;
32-
import java.util.concurrent.CountDownLatch;
33+
import java.util.concurrent.ExecutionException;
3334
import java.util.concurrent.ScheduledExecutorService;
3435
import java.util.concurrent.ScheduledFuture;
3536
import java.util.concurrent.TimeUnit;
3637
import java.util.concurrent.TimeoutException;
38+
import java.util.function.Supplier;
3739

3840
import io.netty.bootstrap.Bootstrap;
3941
import io.netty.buffer.PooledByteBufAllocator;
@@ -176,20 +178,23 @@ protected void send(Package msg) throws Exception {
176178

177179
protected Package io(Package msg, long timeout) throws Exception {
178180
Object key = RequestContext.key(msg);
179-
CountDownLatch latch = new CountDownLatch(1);
180-
RequestContext c = RequestContext.context(key, msg, latch);
181-
if (!contexts.contains(c)) {
182-
contexts.put(key, c);
181+
RequestContext context = RequestContext.context(key, msg);
182+
if (!contexts.contains(context)) {
183+
contexts.put(key, context);
183184
} else {
184185
if (log.isInfoEnabled()) {
185186
log.info("duplicate key : {}", key);
186187
}
187188
}
188189
send(msg);
189-
if (!c.getLatch().await(timeout, TimeUnit.MILLISECONDS)) {
190-
throw new TimeoutException("operation timeout, context.key=" + c.getKey());
191-
}
192-
return c.getResponse();
190+
Supplier<Package> supplier = () -> {
191+
try {
192+
return context.getResponse(timeout);
193+
} catch (ExecutionException | InterruptedException | TimeoutException exception) {
194+
throw new RuntimeException(exception);
195+
}
196+
};
197+
return CompletableFuture.supplyAsync(supplier).get();
193198
}
194199

195200
// todo: remove hello

0 commit comments

Comments
 (0)