1919
2020import org .apache .eventmesh .common .protocol .tcp .Package ;
2121
22- import java .util .concurrent .CountDownLatch ;
22+ import java .util .concurrent .CompletableFuture ;
23+ import java .util .concurrent .ExecutionException ;
24+ import java .util .concurrent .TimeUnit ;
25+ import java .util .concurrent .TimeoutException ;
2326
2427import lombok .extern .slf4j .Slf4j ;
2528
2629@ Slf4j
2730public class RequestContext {
2831
29- private transient Object key ;
30- private transient Package request ;
31- private transient Package response ;
32- private transient CountDownLatch latch ;
32+ private Object key ;
33+ private Package request ;
34+ private final CompletableFuture <Package > future = new CompletableFuture <>();
3335
34- public RequestContext (final Object key , final Package request , final CountDownLatch latch ) {
36+ public RequestContext (final Object key , final Package request ) {
3537 this .key = key ;
3638 this .request = request ;
37- this .latch = latch ;
3839 }
3940
4041 public Object getKey () {
@@ -53,33 +54,28 @@ public void setRequest(final Package request) {
5354 this .request = request ;
5455 }
5556
56- public Package getResponse () {
57- return response ;
57+ public CompletableFuture < Package > future () {
58+ return this . future ;
5859 }
5960
60- public void setResponse ( final Package response ) {
61- this .response = response ;
61+ public Package getResponse ( long timeout , TimeUnit timeUnit ) throws ExecutionException , InterruptedException , TimeoutException {
62+ return this .future . get ( timeout , timeUnit ) ;
6263 }
6364
64- public CountDownLatch getLatch () {
65- return latch ;
66- }
67-
68- public void setLatch (final CountDownLatch latch ) {
69- this .latch = latch ;
65+ public Package getResponse (long timeout ) throws ExecutionException , InterruptedException , TimeoutException {
66+ return this .future .get (timeout , TimeUnit .MILLISECONDS );
7067 }
7168
7269 public void finish (final Package msg ) {
73- this .response = msg ;
74- latch .countDown ();
70+ this .future .complete (msg );
7571 }
7672
77- public static RequestContext context (final Object key , final Package request , final CountDownLatch latch ) throws Exception {
78- final RequestContext c = new RequestContext (key , request , latch );
73+ public static RequestContext context (final Object key , final Package request ) throws Exception {
74+ final RequestContext context = new RequestContext (key , request );
7975 if (log .isInfoEnabled ()) {
8076 log .info ("_RequestContext|create|key={}" , key );
8177 }
82- return c ;
78+ return context ;
8379 }
8480
8581
0 commit comments