@@ -159,8 +159,11 @@ func TestIntegration_ManagedWriter(t *testing.T) {
159159 // Don't run this in parallel, we only want to collect stats from this subtest.
160160 testInstrumentation (ctx , t , mwClient , bqClient , dataset )
161161 })
162- t .Run ("TestLargeInsert" , func (t * testing.T ) {
163- testLargeInsert (ctx , t , mwClient , bqClient , dataset )
162+ t .Run ("TestLargeInsertNoRetry" , func (t * testing.T ) {
163+ testLargeInsertNoRetry (ctx , t , mwClient , bqClient , dataset )
164+ })
165+ t .Run ("TestLargeInsertWithRetry" , func (t * testing.T ) {
166+ testLargeInsertWithRetry (ctx , t , mwClient , bqClient , dataset )
164167 })
165168 })
166169}
@@ -596,7 +599,73 @@ func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl
596599 withExactRowCount (int64 (len (testSimpleData ))))
597600}
598601
599- func testLargeInsert (ctx context.Context , t * testing.T , mwClient * Client , bqClient * bigquery.Client , dataset * bigquery.Dataset ) {
602+ func testLargeInsertNoRetry (ctx context.Context , t * testing.T , mwClient * Client , bqClient * bigquery.Client , dataset * bigquery.Dataset ) {
603+ testTable := dataset .Table (tableIDs .New ())
604+ if err := testTable .Create (ctx , & bigquery.TableMetadata {Schema : testdata .SimpleMessageSchema }); err != nil {
605+ t .Fatalf ("failed to create test table %s: %v" , testTable .FullyQualifiedName (), err )
606+ }
607+
608+ m := & testdata.SimpleMessageProto2 {}
609+ descriptorProto := protodesc .ToDescriptorProto (m .ProtoReflect ().Descriptor ())
610+
611+ ms , err := mwClient .NewManagedStream (ctx ,
612+ WithDestinationTable (TableParentFromParts (testTable .ProjectID , testTable .DatasetID , testTable .TableID )),
613+ WithType (CommittedStream ),
614+ WithSchemaDescriptor (descriptorProto ),
615+ )
616+ if err != nil {
617+ t .Fatalf ("NewManagedStream: %v" , err )
618+ }
619+ validateTableConstraints (ctx , t , bqClient , testTable , "before send" ,
620+ withExactRowCount (0 ))
621+
622+ // Construct a Very Large request.
623+ var data [][]byte
624+ targetSize := 11 * 1024 * 1024 // 11 MB
625+ b , err := proto .Marshal (testSimpleData [0 ])
626+ if err != nil {
627+ t .Errorf ("failed to marshal message: %v" , err )
628+ }
629+
630+ numRows := targetSize / len (b )
631+ data = make ([][]byte , numRows )
632+
633+ for i := 0 ; i < numRows ; i ++ {
634+ data [i ] = b
635+ }
636+
637+ result , err := ms .AppendRows (ctx , data , WithOffset (0 ))
638+ if err != nil {
639+ t .Errorf ("single append failed: %v" , err )
640+ }
641+ _ , err = result .GetResult (ctx )
642+ if err != nil {
643+ apiErr , ok := apierror .FromError (err )
644+ if ! ok {
645+ t .Errorf ("GetResult error was not an instance of ApiError" )
646+ }
647+ status := apiErr .GRPCStatus ()
648+ if status .Code () != codes .InvalidArgument {
649+ t .Errorf ("expected InvalidArgument status, got %v" , status )
650+ }
651+ }
652+ // our next append should fail (we don't have retries enabled).
653+ if _ , err = ms .AppendRows (ctx , [][]byte {b }); err == nil {
654+ t .Fatalf ("expected second append to fail, got success: %v" , err )
655+ }
656+
657+ // The send failure triggers reconnect, so an additional append will succeed.
658+ result , err = ms .AppendRows (ctx , [][]byte {b })
659+ if err != nil {
660+ t .Fatalf ("third append expected to succeed, got error: %v" , err )
661+ }
662+ _ , err = result .GetResult (ctx )
663+ if err != nil {
664+ t .Errorf ("failure result from third append: %v" , err )
665+ }
666+ }
667+
668+ func testLargeInsertWithRetry (ctx context.Context , t * testing.T , mwClient * Client , bqClient * bigquery.Client , dataset * bigquery.Dataset ) {
600669 testTable := dataset .Table (tableIDs .New ())
601670 if err := testTable .Create (ctx , & bigquery.TableMetadata {Schema : testdata .SimpleMessageSchema }); err != nil {
602671 t .Fatalf ("failed to create test table %s: %v" , testTable .FullyQualifiedName (), err )
@@ -609,6 +678,7 @@ func testLargeInsert(ctx context.Context, t *testing.T, mwClient *Client, bqClie
609678 WithDestinationTable (TableParentFromParts (testTable .ProjectID , testTable .DatasetID , testTable .TableID )),
610679 WithType (CommittedStream ),
611680 WithSchemaDescriptor (descriptorProto ),
681+ EnableWriteRetries (true ),
612682 )
613683 if err != nil {
614684 t .Fatalf ("NewManagedStream: %v" , err )
@@ -646,15 +716,19 @@ func testLargeInsert(ctx context.Context, t *testing.T, mwClient *Client, bqClie
646716 t .Errorf ("expected InvalidArgument status, got %v" , status )
647717 }
648718 }
649- // send a subsequent append as verification we can proceed.
719+
720+ // The second append will succeed, but internally will show a retry.
650721 result , err = ms .AppendRows (ctx , [][]byte {b })
651722 if err != nil {
652- t .Fatalf ("subsequent append failed : %v" , err )
723+ t .Fatalf ("second append expected to succeed, got error : %v" , err )
653724 }
654725 _ , err = result .GetResult (ctx )
655726 if err != nil {
656727 t .Errorf ("failure result from second append: %v" , err )
657728 }
729+ if attempts , _ := result .TotalAttempts (ctx ); attempts != 2 {
730+ t .Errorf ("expected 2 attempts, got %d" , attempts )
731+ }
658732}
659733
660734func testInstrumentation (ctx context.Context , t * testing.T , mwClient * Client , bqClient * bigquery.Client , dataset * bigquery.Dataset ) {
0 commit comments