3232import com .google .api .gax .grpc .testing .LocalChannelProvider ;
3333import com .google .api .gax .grpc .testing .MockGrpcService ;
3434import com .google .api .gax .grpc .testing .MockServiceHelper ;
35- import com .google .api .gax .retrying . RetrySettings ;
35+ import com .google .api .gax .rpc . AbortedException ;
3636import com .google .api .gax .rpc .DataLossException ;
3737import com .google .cloud .bigquery .storage .test .Test .FooType ;
3838import com .google .common .base .Strings ;
@@ -510,42 +510,6 @@ public void testFlowControlBehaviorException() throws Exception {
510510 }
511511 }
512512
513- @ Test
514- public void testStreamReconnectionTransient () throws Exception {
515- StreamWriter writer =
516- getTestStreamWriterBuilder ()
517- .setBatchingSettings (
518- StreamWriter .Builder .DEFAULT_BATCHING_SETTINGS
519- .toBuilder ()
520- .setDelayThreshold (Duration .ofSeconds (100000 ))
521- .setElementCountThreshold (1L )
522- .setFlowControlSettings (
523- StreamWriter .Builder .DEFAULT_FLOW_CONTROL_SETTINGS
524- .toBuilder ()
525- .setMaxOutstandingElementCount (1L )
526- .setLimitExceededBehavior (FlowController .LimitExceededBehavior .Block )
527- .build ())
528- .build ())
529- .build ();
530-
531- testBigQueryWrite .addResponse (
532- AppendRowsResponse .newBuilder ()
533- .setAppendResult (
534- AppendRowsResponse .AppendResult .newBuilder ().setOffset (Int64Value .of (0 )).build ())
535- .build ());
536- testBigQueryWrite .addException (new StatusRuntimeException (Status .UNAVAILABLE ));
537- testBigQueryWrite .addResponse (
538- AppendRowsResponse .newBuilder ()
539- .setAppendResult (
540- AppendRowsResponse .AppendResult .newBuilder ().setOffset (Int64Value .of (1 )).build ())
541- .build ());
542- ApiFuture <AppendRowsResponse > future1 = sendTestMessage (writer , new String [] {"m1" });
543- ApiFuture <AppendRowsResponse > future2 = sendTestMessage (writer , new String [] {"m1" });
544- assertEquals (0L , future1 .get ().getAppendResult ().getOffset ().getValue ());
545- assertEquals (1L , future2 .get ().getAppendResult ().getOffset ().getValue ());
546- writer .close ();
547- }
548-
549513 @ Test
550514 public void testStreamReconnectionPermanant () throws Exception {
551515 StreamWriter writer =
@@ -569,36 +533,6 @@ public void testStreamReconnectionPermanant() throws Exception {
569533 writer .close ();
570534 }
571535
572- @ Test
573- public void testStreamReconnectionExceedRetry () throws Exception {
574- StreamWriter writer =
575- getTestStreamWriterBuilder ()
576- .setBatchingSettings (
577- StreamWriter .Builder .DEFAULT_BATCHING_SETTINGS
578- .toBuilder ()
579- .setDelayThreshold (Duration .ofSeconds (100000 ))
580- .setElementCountThreshold (1L )
581- .build ())
582- .setRetrySettings (
583- RetrySettings .newBuilder ()
584- .setMaxRetryDelay (Duration .ofMillis (100 ))
585- .setMaxAttempts (1 )
586- .build ())
587- .build ();
588- assertEquals (1 , writer .getRetrySettings ().getMaxAttempts ());
589- StatusRuntimeException transientError = new StatusRuntimeException (Status .UNAVAILABLE );
590- testBigQueryWrite .addException (transientError );
591- testBigQueryWrite .addException (transientError );
592- ApiFuture <AppendRowsResponse > future3 = sendTestMessage (writer , new String [] {"toomanyretry" });
593- try {
594- future3 .get ();
595- Assert .fail ("This should fail." );
596- } catch (ExecutionException e ) {
597- assertEquals (transientError .toString (), e .getCause ().getCause ().toString ());
598- }
599- writer .close ();
600- }
601-
602536 @ Test
603537 public void testOffset () throws Exception {
604538 try (StreamWriter writer =
@@ -665,7 +599,7 @@ public void testOffsetMismatch() throws Exception {
665599
666600 @ Test
667601 public void testErrorPropagation () throws Exception {
668- try ( StreamWriter writer =
602+ StreamWriter writer =
669603 getTestStreamWriterBuilder ()
670604 .setExecutorProvider (SINGLE_THREAD_EXECUTOR )
671605 .setBatchingSettings (
@@ -674,13 +608,23 @@ public void testErrorPropagation() throws Exception {
674608 .setElementCountThreshold (1L )
675609 .setDelayThreshold (Duration .ofSeconds (5 ))
676610 .build ())
677- .build ()) {
678- testBigQueryWrite .addException (Status .DATA_LOSS .asException ());
679- sendTestMessage (writer , new String [] {"A" }).get ();
611+ .build ();
612+ testBigQueryWrite .addException (Status .DATA_LOSS .asException ());
613+ testBigQueryWrite .addException (Status .DATA_LOSS .asException ());
614+ ApiFuture <AppendRowsResponse > future1 = sendTestMessage (writer , new String [] {"A" });
615+ ApiFuture <AppendRowsResponse > future2 = sendTestMessage (writer , new String [] {"B" });
616+ try {
617+ future1 .get ();
680618 fail ("should throw exception" );
681619 } catch (ExecutionException e ) {
682620 assertThat (e .getCause ()).isInstanceOf (DataLossException .class );
683621 }
622+ try {
623+ future2 .get ();
624+ fail ("should throw exception" );
625+ } catch (ExecutionException e ) {
626+ assertThat (e .getCause ()).isInstanceOf (AbortedException .class );
627+ }
684628 }
685629
686630 @ Test
@@ -957,43 +901,6 @@ public void testFlushAll() throws Exception {
957901 writer .close ();
958902 }
959903
960- @ Test
961- public void testFlushAllFailed () throws Exception {
962- StreamWriter writer =
963- getTestStreamWriterBuilder ()
964- .setBatchingSettings (
965- StreamWriter .Builder .DEFAULT_BATCHING_SETTINGS
966- .toBuilder ()
967- .setElementCountThreshold (2L )
968- .setDelayThreshold (Duration .ofSeconds (100000 ))
969- .build ())
970- .build ();
971-
972- testBigQueryWrite .addException (Status .DATA_LOSS .asException ());
973- testBigQueryWrite .addException (Status .DATA_LOSS .asException ());
974- testBigQueryWrite .addException (Status .DATA_LOSS .asException ());
975-
976- ApiFuture <AppendRowsResponse > appendFuture1 = sendTestMessage (writer , new String [] {"A" });
977- ApiFuture <AppendRowsResponse > appendFuture2 = sendTestMessage (writer , new String [] {"B" });
978- ApiFuture <AppendRowsResponse > appendFuture3 = sendTestMessage (writer , new String [] {"C" });
979-
980- assertFalse (appendFuture3 .isDone ());
981- try {
982- writer .flushAll (100000 );
983- fail ("Should have thrown an Exception" );
984- } catch (Exception expected ) {
985- if (expected .getCause () instanceof com .google .api .gax .rpc .DataLossException ) {
986- LOG .info ("got: " + expected .toString ());
987- } else {
988- fail ("Unexpected exception:" + expected .toString ());
989- }
990- }
991-
992- assertTrue (appendFuture3 .isDone ());
993-
994- writer .close ();
995- }
996-
997904 @ Test
998905 public void testDatasetTraceId () throws Exception {
999906 StreamWriter writer =
@@ -1032,10 +939,12 @@ public void testShutdownWithConnectionError() throws Exception {
1032939 AppendRowsResponse .AppendResult .newBuilder ().setOffset (Int64Value .of (1 )).build ())
1033940 .build ());
1034941 testBigQueryWrite .addException (Status .DATA_LOSS .asException ());
942+ testBigQueryWrite .addException (Status .DATA_LOSS .asException ());
1035943 testBigQueryWrite .setResponseDelay (Duration .ofSeconds (10 ));
1036944
1037945 ApiFuture <AppendRowsResponse > appendFuture1 = sendTestMessage (writer , new String [] {"A" });
1038946 ApiFuture <AppendRowsResponse > appendFuture2 = sendTestMessage (writer , new String [] {"B" });
947+ ApiFuture <AppendRowsResponse > appendFuture3 = sendTestMessage (writer , new String [] {"B" });
1039948 Thread .sleep (5000L );
1040949 // Move the needle for responses to be sent.
1041950 fakeExecutor .advanceTime (Duration .ofSeconds (20 ));
@@ -1044,9 +953,15 @@ public void testShutdownWithConnectionError() throws Exception {
1044953 assertEquals (1 , appendFuture1 .get ().getAppendResult ().getOffset ().getValue ());
1045954 try {
1046955 appendFuture2 .get ();
1047- fail ("Should fail with exception" );
1048- } catch (java .util .concurrent .ExecutionException e ) {
1049- assertEquals ("Request aborted due to previous failures" , e .getCause ().getMessage ());
956+ fail ("Should fail with exception future2" );
957+ } catch (ExecutionException e ) {
958+ assertThat (e .getCause ()).isInstanceOf (DataLossException .class );
959+ }
960+ try {
961+ appendFuture3 .get ();
962+ fail ("Should fail with exception future3" );
963+ } catch (ExecutionException e ) {
964+ assertThat (e .getCause ()).isInstanceOf (AbortedException .class );
1050965 }
1051966 }
1052967}
0 commit comments