@@ -99,6 +99,10 @@ public class StreamWriter implements AutoCloseable {
9999 private final Lock appendAndRefreshAppendLock ;
100100 private final MessagesBatch messagesBatch ;
101101
102+ // Indicates if a stream has some non recoverable exception happened.
103+ private final Lock exceptionLock ;
104+ private Throwable streamException ;
105+
102106 private BackgroundResource backgroundResources ;
103107 private List <BackgroundResource > backgroundResourceList ;
104108
@@ -145,10 +149,13 @@ private StreamWriter(Builder builder)
145149
146150 this .batchingSettings = builder .batchingSettings ;
147151 this .retrySettings = builder .retrySettings ;
148- this .messagesBatch = new MessagesBatch (batchingSettings , this .streamName );
152+ this .messagesBatch = new MessagesBatch (batchingSettings , this .streamName , this );
149153 messagesBatchLock = new ReentrantLock ();
150154 appendAndRefreshAppendLock = new ReentrantLock ();
151155 activeAlarm = new AtomicBoolean (false );
156+ this .exceptionLock = new ReentrantLock ();
157+ this .streamException = null ;
158+
152159 executor = builder .executorProvider .getExecutor ();
153160 backgroundResourceList = new ArrayList <>();
154161 if (builder .executorProvider .shouldAutoClose ()) {
@@ -212,6 +219,14 @@ public Boolean expired() {
212219 return createTime .plus (streamTTL ).compareTo (Instant .now ()) < 0 ;
213220 }
214221
222+ private void setException (Throwable t ) {
223+ exceptionLock .lock ();
224+ if (this .streamException == null ) {
225+ this .streamException = t ;
226+ }
227+ exceptionLock .unlock ();
228+ }
229+
215230 /**
216231 * Schedules the writing of a message. The write of the message may occur immediately or be
217232 * delayed based on the writer batching options.
@@ -265,6 +280,33 @@ public ApiFuture<AppendRowsResponse> append(AppendRowsRequest message) {
265280 return outstandingAppend .appendResult ;
266281 }
267282
283+ /**
284+ * This is the general flush method for asynchronise append operation. When you have outstanding
285+ * append requests, calling flush will make sure all outstanding append requests completed and
286+ * successful. Otherwise there will be an exception thrown.
287+ *
288+ * @throws Exception
289+ */
290+ public void flushAll (long timeoutMillis ) throws Exception {
291+ appendAndRefreshAppendLock .lock ();
292+ try {
293+ writeAllOutstanding ();
294+ synchronized (messagesWaiter ) {
295+ messagesWaiter .waitComplete (timeoutMillis );
296+ }
297+ } finally {
298+ appendAndRefreshAppendLock .unlock ();
299+ }
300+ exceptionLock .lock ();
301+ try {
302+ if (streamException != null ) {
303+ throw new Exception (streamException );
304+ }
305+ } finally {
306+ exceptionLock .unlock ();
307+ }
308+ }
309+
268310 /**
269311 * Flush the rows on a BUFFERED stream, up to the specified offset. After flush, rows will be
270312 * available for read. If no exception is thrown, it means the flush happened.
@@ -411,14 +453,15 @@ private static final class InflightBatch {
411453 private long expectedOffset ;
412454 private Boolean attachSchema ;
413455 private String streamName ;
414-
415456 private final AtomicBoolean failed ;
457+ private final StreamWriter streamWriter ;
416458
417459 InflightBatch (
418460 List <AppendRequestAndFutureResponse > inflightRequests ,
419461 long batchSizeBytes ,
420462 String streamName ,
421- Boolean attachSchema ) {
463+ Boolean attachSchema ,
464+ StreamWriter streamWriter ) {
422465 this .inflightRequests = inflightRequests ;
423466 this .offsetList = new ArrayList <Long >(inflightRequests .size ());
424467 for (AppendRequestAndFutureResponse request : inflightRequests ) {
@@ -435,6 +478,7 @@ private static final class InflightBatch {
435478 this .attachSchema = attachSchema ;
436479 this .streamName = streamName ;
437480 this .failed = new AtomicBoolean (false );
481+ this .streamWriter = streamWriter ;
438482 }
439483
440484 int count () {
@@ -482,7 +526,9 @@ private void onFailure(Throwable t) {
482526 return ;
483527 } else {
484528 LOG .info ("Setting " + t .toString () + " on response" );
529+ this .streamWriter .setException (t );
485530 }
531+
486532 for (AppendRequestAndFutureResponse request : inflightRequests ) {
487533 request .appendResult .setException (t );
488534 }
@@ -552,8 +598,12 @@ protected void shutdown() {
552598 currentAlarmFuture .cancel (false );
553599 }
554600 writeAllOutstanding ();
555- synchronized (messagesWaiter ) {
556- messagesWaiter .waitComplete ();
601+ try {
602+ synchronized (messagesWaiter ) {
603+ messagesWaiter .waitComplete (0 );
604+ }
605+ } catch (InterruptedException e ) {
606+ LOG .warning ("Failed to wait for messages to return " + e .toString ());
557607 }
558608 if (clientStream .isSendReady ()) {
559609 clientStream .closeSend ();
@@ -820,14 +870,14 @@ public void onStart(StreamController controller) {
820870 private void abortInflightRequests (Throwable t ) {
821871 synchronized (this .inflightBatches ) {
822872 while (!this .inflightBatches .isEmpty ()) {
823- this .inflightBatches
824- . poll ()
825- . onFailure (
826- new AbortedException (
827- "Request aborted due to previous failures" ,
828- t ,
829- GrpcStatusCode . of ( Status . Code . ABORTED ),
830- true ));
873+ InflightBatch inflightBatch = this .inflightBatches . poll ();
874+ inflightBatch . onFailure (
875+ new AbortedException (
876+ "Request aborted due to previous failures" ,
877+ t ,
878+ GrpcStatusCode . of ( Status . Code . ABORTED ) ,
879+ true ));
880+ streamWriter . messagesWaiter . release ( inflightBatch . getByteSize ( ));
831881 }
832882 }
833883 }
@@ -850,13 +900,15 @@ public void onResponse(AppendRowsResponse response) {
850900 streamWriter .getOnSchemaUpdateRunnable (), 0L , TimeUnit .MILLISECONDS );
851901 }
852902 }
853- // TODO: Deal with in stream errors .
903+ // Currently there is nothing retryable. If the error is already exists, then ignore it .
854904 if (response .hasError ()) {
855- StatusRuntimeException exception =
856- new StatusRuntimeException (
857- Status .fromCodeValue (response .getError ().getCode ())
858- .withDescription (response .getError ().getMessage ()));
859- inflightBatch .onFailure (exception );
905+ if (response .getError ().getCode () != 6 /* ALREADY_EXISTS */ ) {
906+ StatusRuntimeException exception =
907+ new StatusRuntimeException (
908+ Status .fromCodeValue (response .getError ().getCode ())
909+ .withDescription (response .getError ().getMessage ()));
910+ inflightBatch .onFailure (exception );
911+ }
860912 }
861913 if (inflightBatch .getExpectedOffset () > 0
862914 && response .getOffset () != inflightBatch .getExpectedOffset ()) {
@@ -907,30 +959,25 @@ public void onError(Throwable t) {
907959 }
908960 } else {
909961 inflightBatch .onFailure (t );
962+ abortInflightRequests (t );
910963 synchronized (streamWriter .currentRetries ) {
911964 streamWriter .currentRetries = 0 ;
912965 }
913966 }
914967 } catch (IOException | InterruptedException e ) {
915968 LOG .info ("Got exception while retrying." );
916969 inflightBatch .onFailure (e );
970+ abortInflightRequests (e );
917971 synchronized (streamWriter .currentRetries ) {
918972 streamWriter .currentRetries = 0 ;
919973 }
920974 }
921975 } else {
922976 inflightBatch .onFailure (t );
977+ abortInflightRequests (t );
923978 synchronized (streamWriter .currentRetries ) {
924979 streamWriter .currentRetries = 0 ;
925980 }
926- try {
927- if (!streamWriter .shutdown .get ()) {
928- // Establish a new connection.
929- streamWriter .refreshAppend ();
930- }
931- } catch (IOException | InterruptedException e ) {
932- LOG .info ("Failed to establish a new connection" );
933- }
934981 }
935982 } finally {
936983 streamWriter .messagesWaiter .release (inflightBatch .getByteSize ());
@@ -945,17 +992,21 @@ private static class MessagesBatch {
945992 private final BatchingSettings batchingSettings ;
946993 private Boolean attachSchema = true ;
947994 private final String streamName ;
995+ private final StreamWriter streamWriter ;
948996
949- private MessagesBatch (BatchingSettings batchingSettings , String streamName ) {
997+ private MessagesBatch (
998+ BatchingSettings batchingSettings , String streamName , StreamWriter streamWriter ) {
950999 this .batchingSettings = batchingSettings ;
9511000 this .streamName = streamName ;
1001+ this .streamWriter = streamWriter ;
9521002 reset ();
9531003 }
9541004
9551005 // Get all the messages out in a batch.
9561006 private InflightBatch popBatch () {
9571007 InflightBatch batch =
958- new InflightBatch (messages , batchedBytes , this .streamName , this .attachSchema );
1008+ new InflightBatch (
1009+ messages , batchedBytes , this .streamName , this .attachSchema , this .streamWriter );
9591010 this .attachSchema = false ;
9601011 reset ();
9611012 return batch ;
0 commit comments