@@ -18,6 +18,7 @@ import (
1818 "context"
1919 "fmt"
2020 "math"
21+ "strings"
2122 "sync"
2223 "testing"
2324 "time"
@@ -27,9 +28,11 @@ import (
2728 "cloud.google.com/go/bigquery/storage/managedwriter/testdata"
2829 "cloud.google.com/go/internal/testutil"
2930 "cloud.google.com/go/internal/uid"
31+ "github.com/googleapis/gax-go/v2/apierror"
3032 "go.opencensus.io/stats/view"
3133 "google.golang.org/api/option"
3234 storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1"
35+ "google.golang.org/grpc/codes"
3336 "google.golang.org/protobuf/encoding/protojson"
3437 "google.golang.org/protobuf/proto"
3538 "google.golang.org/protobuf/reflect/protodesc"
@@ -42,7 +45,7 @@ import (
4245var (
4346 datasetIDs = uid .NewSpace ("managedwriter_test_dataset" , & uid.Options {Sep : '_' , Time : time .Now ()})
4447 tableIDs = uid .NewSpace ("table" , & uid.Options {Sep : '_' , Time : time .Now ()})
45- defaultTestTimeout = 30 * time .Second
48+ defaultTestTimeout = 45 * time .Second
4649)
4750
4851// our test data has cardinality 5 for names, 3 for values
@@ -153,6 +156,9 @@ func TestIntegration_ManagedWriter(t *testing.T) {
153156 // Don't run this in parallel, we only want to collect stats from this subtest.
154157 testInstrumentation (ctx , t , mwClient , bqClient , dataset )
155158 })
159+ t .Run ("TestLargeInsert" , func (t * testing.T ) {
160+ testLargeInsert (ctx , t , mwClient , bqClient , dataset )
161+ })
156162 })
157163}
158164
@@ -469,6 +475,75 @@ func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl
469475 withExactRowCount (int64 (len (testSimpleData ))))
470476}
471477
478+ func testLargeInsert (ctx context.Context , t * testing.T , mwClient * Client , bqClient * bigquery.Client , dataset * bigquery.Dataset ) {
479+ testTable := dataset .Table (tableIDs .New ())
480+ if err := testTable .Create (ctx , & bigquery.TableMetadata {Schema : testdata .SimpleMessageSchema }); err != nil {
481+ t .Fatalf ("failed to create test table %s: %v" , testTable .FullyQualifiedName (), err )
482+ }
483+
484+ m := & testdata.SimpleMessageProto2 {}
485+ descriptorProto := protodesc .ToDescriptorProto (m .ProtoReflect ().Descriptor ())
486+
487+ ms , err := mwClient .NewManagedStream (ctx ,
488+ WithDestinationTable (TableParentFromParts (testTable .ProjectID , testTable .DatasetID , testTable .TableID )),
489+ WithType (CommittedStream ),
490+ WithSchemaDescriptor (descriptorProto ),
491+ )
492+ if err != nil {
493+ t .Fatalf ("NewManagedStream: %v" , err )
494+ }
495+ validateTableConstraints (ctx , t , bqClient , testTable , "before send" ,
496+ withExactRowCount (0 ))
497+
498+ // Construct a Very Large request.
499+ var data [][]byte
500+ targetSize := 11 * 1024 * 1024 // 11 MB
501+ b , err := proto .Marshal (testSimpleData [0 ])
502+ if err != nil {
503+ t .Errorf ("failed to marshal message: %v" , err )
504+ }
505+
506+ numRows := targetSize / len (b )
507+ data = make ([][]byte , numRows )
508+
509+ for i := 0 ; i < numRows ; i ++ {
510+ data [i ] = b
511+ }
512+
513+ result , err := ms .AppendRows (ctx , data , WithOffset (0 ))
514+ if err != nil {
515+ t .Errorf ("single append failed: %v" , err )
516+ }
517+ _ , err = result .GetResult (ctx )
518+ if err != nil {
519+ apiErr , ok := apierror .FromError (err )
520+ if ! ok {
521+ t .Errorf ("GetResult error was not an instance of ApiError" )
522+ }
523+ if status := apiErr .GRPCStatus (); status .Code () != codes .InvalidArgument {
524+ t .Errorf ("expected InvalidArgument status, got %v" , status )
525+ }
526+
527+ details := apiErr .Details ()
528+ if details .DebugInfo == nil {
529+ t .Errorf ("expected DebugInfo to be populated, was nil" )
530+ }
531+ wantSubstring := "Message size exceed the limitation of byte based flow control."
532+ if detail := details .DebugInfo .GetDetail (); ! strings .Contains (detail , wantSubstring ) {
533+ t .Errorf ("detail missing desired substring: %s" , detail )
534+ }
535+ }
536+ // send a subsequent append as verification we can proceed.
537+ result , err = ms .AppendRows (ctx , [][]byte {b })
538+ if err != nil {
539+ t .Fatalf ("subsequent append failed: %v" , err )
540+ }
541+ _ , err = result .GetResult (ctx )
542+ if err != nil {
543+ t .Errorf ("failure result from second append: %v" , err )
544+ }
545+ }
546+
472547func testInstrumentation (ctx context.Context , t * testing.T , mwClient * Client , bqClient * bigquery.Client , dataset * bigquery.Dataset ) {
473548 testedViews := []* view.View {
474549 AppendRequestsView ,
0 commit comments