88import java .util .Iterator ;
99import java .util .List ;
1010import java .util .Map ;
11+ import java .util .concurrent .CompletableFuture ;
12+ import java .util .concurrent .CompletionStage ;
1113import java .util .concurrent .ConcurrentHashMap ;
1214import java .util .concurrent .CountDownLatch ;
1315import java .util .concurrent .ExecutionException ;
@@ -31,7 +33,7 @@ public class TarantoolClientImpl extends TarantoolBase<Future<List<?>>> implemen
3133 protected volatile Exception thumbstone ;
3234 protected volatile CountDownLatch alive ;
3335
34- protected Map <Long , FutureImpl <List <?>>> futures ;
36+ protected Map <Long , CompletableFuture <List <?>>> futures ;
3537 protected AtomicInteger wait = new AtomicInteger ();
3638 /**
3739 * Write properties
@@ -49,6 +51,7 @@ public class TarantoolClientImpl extends TarantoolBase<Future<List<?>>> implemen
4951 */
5052 protected SyncOps syncOps ;
5153 protected FireAndForgetOps fireAndForgetOps ;
54+ protected ComposableAsyncOps composableAsyncOps ;
5255
5356 /**
5457 * Inner
@@ -77,7 +80,7 @@ public TarantoolClientImpl(SocketChannelProvider socketProvider, TarantoolClient
7780 this .alive = new CountDownLatch (1 );
7881 this .socketProvider = socketProvider ;
7982 this .stats = new TarantoolClientStats ();
80- this .futures = new ConcurrentHashMap <Long , FutureImpl < List <?>> >(config .predictedFutures );
83+ this .futures = new ConcurrentHashMap <>(config .predictedFutures );
8184 this .sharedBuffer = ByteBuffer .allocateDirect (config .sharedBufferSize );
8285 this .writerBuffer = ByteBuffer .allocateDirect (sharedBuffer .capacity ());
8386 this .connector .setDaemon (true );
@@ -197,21 +200,22 @@ protected void configureThreads(String threadName) {
197200 }
198201
199202
200- public Future <List <?>> exec (Code code , Object ... args ) {
203+ public CompletableFuture <List <?>> exec (Code code , Object ... args ) {
201204 validateArgs (args );
202- FutureImpl <List <?>> q = new FutureImpl <List <?>>(syncId .incrementAndGet ());
205+ long sid = syncId .incrementAndGet ();
206+ CompletableFuture <List <?>> q = new CompletableFuture <>();
203207 if (isDead (q )) {
204208 return q ;
205209 }
206- futures .put (q . getId () , q );
210+ futures .put (sid , q );
207211 if (isDead (q )) {
208- futures .remove (q . getId () );
212+ futures .remove (sid );
209213 return q ;
210214 }
211215 try {
212- write (code , q . getId () , null , args );
216+ write (code , sid , null , args );
213217 } catch (Exception e ) {
214- futures .remove (q . getId () );
218+ futures .remove (sid );
215219 fail (q , e );
216220 }
217221 return q ;
@@ -225,11 +229,11 @@ protected synchronized void die(String message, Exception cause) {
225229 this .thumbstone = new CommunicationException (message , cause );
226230 this .alive = new CountDownLatch (1 );
227231 while (!futures .isEmpty ()) {
228- Iterator <Map .Entry <Long , FutureImpl <List <?>>>> iterator = futures .entrySet ().iterator ();
232+ Iterator <Map .Entry <Long , CompletableFuture <List <?>>>> iterator = futures .entrySet ().iterator ();
229233 while (iterator .hasNext ()) {
230- Map .Entry <Long , FutureImpl <List <?>>> elem = iterator .next ();
234+ Map .Entry <Long , CompletableFuture <List <?>>> elem = iterator .next ();
231235 if (elem != null ) {
232- FutureImpl <List <?>> future = elem .getValue ();
236+ CompletableFuture <List <?>> future = elem .getValue ();
233237 fail (future , cause );
234238 }
235239 iterator .remove ();
@@ -333,7 +337,7 @@ protected void readThread() {
333337 readPacket (is );
334338 code = (Long ) headers .get (Key .CODE .getId ());
335339 Long syncId = (Long ) headers .get (Key .SYNC .getId ());
336- FutureImpl <List <?>> future = futures .remove (syncId );
340+ CompletableFuture <List <?>> future = futures .remove (syncId );
337341 stats .received ++;
338342 wait .decrementAndGet ();
339343 complete (code , future );
@@ -381,14 +385,14 @@ protected void writeThread() {
381385 }
382386
383387
384- protected void fail (FutureImpl <List <?>> q , Exception e ) {
385- q .setError (e );
388+ protected void fail (CompletableFuture <List <?>> q , Exception e ) {
389+ q .completeExceptionally (e );
386390 }
387391
388- protected void complete (long code , FutureImpl <List <?>> q ) {
392+ protected void complete (long code , CompletableFuture <List <?>> q ) {
389393 if (q != null ) {
390394 if (code == 0 ) {
391- q .setValue ((List ) body .get (Key .DATA .getId ()));
395+ q .complete ((List ) body .get (Key .DATA .getId ()));
392396 } else {
393397 Object error = body .get (Key .ERROR .getId ());
394398 fail (q , serverError (code , error ));
@@ -488,6 +492,11 @@ public TarantoolClientOps<Integer, List<?>, Object, Future<List<?>>> asyncOps()
488492 return this ;
489493 }
490494
495+ @ Override
496+ public TarantoolClientOps <Integer , List <?>, Object , CompletionStage <List <?>>> composableAsyncOps () {
497+ return composableAsyncOps ;
498+ }
499+
491500 @ Override
492501 public TarantoolClientOps <Integer , List <?>, Object , Long > fireAndForgetOps () {
493502 return fireAndForgetOps ;
@@ -529,7 +538,19 @@ public void close() {
529538 }
530539 }
531540
532- protected boolean isDead (FutureImpl <List <?>> q ) {
541+ protected class ComposableAsyncOps extends AbstractTarantoolOps <Integer , List <?>, Object , CompletionStage <List <?>>> {
542+ @ Override
543+ public CompletionStage <List <?>> exec (Code code , Object ... args ) {
544+ return TarantoolClientImpl .this .exec (code , args );
545+ }
546+
547+ @ Override
548+ public void close () {
549+ TarantoolClientImpl .this .close ();
550+ }
551+ }
552+
553+ protected boolean isDead (CompletableFuture <List <?>> q ) {
533554 if (TarantoolClientImpl .this .thumbstone != null ) {
534555 fail (q , new CommunicationException ("Connection is dead" , thumbstone ));
535556 return true ;
0 commit comments