Skip to content

Commit 085a038

Browse files
authored
fix(bigquery/storage/managedwriter): improve network reconnection (#6338)
* fix(bigquery/storage/managedwriter): improve network reconnection Issuing a sufficiently large single append request is enough to trigger the server backend to close an existing grpc stream. This PR addresses the problem by allowing a failed request to signal that subsequent requests should request a new grpc stream connection. This PR also adds an integration test that induces the failure by issuing a large request, and ensures subsequent requests succeed. Towards: #6321
1 parent 89a049a commit 085a038

2 files changed

Lines changed: 93 additions & 3 deletions

File tree

bigquery/storage/managedwriter/integration_test.go

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"context"
1919
"fmt"
2020
"math"
21+
"strings"
2122
"sync"
2223
"testing"
2324
"time"
@@ -27,9 +28,11 @@ import (
2728
"cloud.google.com/go/bigquery/storage/managedwriter/testdata"
2829
"cloud.google.com/go/internal/testutil"
2930
"cloud.google.com/go/internal/uid"
31+
"github.com/googleapis/gax-go/v2/apierror"
3032
"go.opencensus.io/stats/view"
3133
"google.golang.org/api/option"
3234
storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1"
35+
"google.golang.org/grpc/codes"
3336
"google.golang.org/protobuf/encoding/protojson"
3437
"google.golang.org/protobuf/proto"
3538
"google.golang.org/protobuf/reflect/protodesc"
@@ -42,7 +45,7 @@ import (
4245
var (
4346
datasetIDs = uid.NewSpace("managedwriter_test_dataset", &uid.Options{Sep: '_', Time: time.Now()})
4447
tableIDs = uid.NewSpace("table", &uid.Options{Sep: '_', Time: time.Now()})
45-
defaultTestTimeout = 30 * time.Second
48+
defaultTestTimeout = 45 * time.Second
4649
)
4750

4851
// our test data has cardinality 5 for names, 3 for values
@@ -153,6 +156,9 @@ func TestIntegration_ManagedWriter(t *testing.T) {
153156
// Don't run this in parallel, we only want to collect stats from this subtest.
154157
testInstrumentation(ctx, t, mwClient, bqClient, dataset)
155158
})
159+
t.Run("TestLargeInsert", func(t *testing.T) {
160+
testLargeInsert(ctx, t, mwClient, bqClient, dataset)
161+
})
156162
})
157163
}
158164

@@ -469,6 +475,75 @@ func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl
469475
withExactRowCount(int64(len(testSimpleData))))
470476
}
471477

478+
func testLargeInsert(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
479+
testTable := dataset.Table(tableIDs.New())
480+
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
481+
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
482+
}
483+
484+
m := &testdata.SimpleMessageProto2{}
485+
descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
486+
487+
ms, err := mwClient.NewManagedStream(ctx,
488+
WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
489+
WithType(CommittedStream),
490+
WithSchemaDescriptor(descriptorProto),
491+
)
492+
if err != nil {
493+
t.Fatalf("NewManagedStream: %v", err)
494+
}
495+
validateTableConstraints(ctx, t, bqClient, testTable, "before send",
496+
withExactRowCount(0))
497+
498+
// Construct a Very Large request.
499+
var data [][]byte
500+
targetSize := 11 * 1024 * 1024 // 11 MB
501+
b, err := proto.Marshal(testSimpleData[0])
502+
if err != nil {
503+
t.Errorf("failed to marshal message: %v", err)
504+
}
505+
506+
numRows := targetSize / len(b)
507+
data = make([][]byte, numRows)
508+
509+
for i := 0; i < numRows; i++ {
510+
data[i] = b
511+
}
512+
513+
result, err := ms.AppendRows(ctx, data, WithOffset(0))
514+
if err != nil {
515+
t.Errorf("single append failed: %v", err)
516+
}
517+
_, err = result.GetResult(ctx)
518+
if err != nil {
519+
apiErr, ok := apierror.FromError(err)
520+
if !ok {
521+
t.Errorf("GetResult error was not an instance of ApiError")
522+
}
523+
if status := apiErr.GRPCStatus(); status.Code() != codes.InvalidArgument {
524+
t.Errorf("expected InvalidArgument status, got %v", status)
525+
}
526+
527+
details := apiErr.Details()
528+
if details.DebugInfo == nil {
529+
t.Errorf("expected DebugInfo to be populated, was nil")
530+
}
531+
wantSubstring := "Message size exceed the limitation of byte based flow control."
532+
if detail := details.DebugInfo.GetDetail(); !strings.Contains(detail, wantSubstring) {
533+
t.Errorf("detail missing desired substring: %s", detail)
534+
}
535+
}
536+
// send a subsequent append as verification we can proceed.
537+
result, err = ms.AppendRows(ctx, [][]byte{b})
538+
if err != nil {
539+
t.Fatalf("subsequent append failed: %v", err)
540+
}
541+
_, err = result.GetResult(ctx)
542+
if err != nil {
543+
t.Errorf("failure result from second append: %v", err)
544+
}
545+
}
546+
472547
func testInstrumentation(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
473548
testedViews := []*view.View{
474549
AppendRequestsView,

bigquery/storage/managedwriter/managed_stream.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package managedwriter
1616

1717
import (
1818
"context"
19+
"errors"
1920
"fmt"
2021
"io"
2122
"sync"
@@ -84,6 +85,7 @@ type ManagedStream struct {
8485

8586
mu sync.Mutex
8687
arc *storagepb.BigQueryWrite_AppendRowsClient // current stream connection
88+
reconnect bool // Request a reconnect before issuing another send.
8789
err error // terminal error
8890
pending chan *pendingWrite // writes awaiting status
8991
streamSetup *sync.Once // handles amending the first request in a new stream
@@ -184,13 +186,21 @@ func (ms *ManagedStream) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient
184186
return nil, nil, ms.err
185187
}
186188

189+
// Previous activity on the stream indicated it is not healthy, so propagate that as a reconnect.
190+
if ms.reconnect {
191+
forceReconnect = true
192+
ms.reconnect = false
193+
}
187194
// Always return the retained ARC if the arg differs.
188195
if arc != ms.arc && !forceReconnect {
189196
return ms.arc, ms.pending, nil
190197
}
191198
if arc != ms.arc && forceReconnect && ms.arc != nil {
192-
// In this case, we're forcing a close to apply changes to the stream
193-
// that currently can't be modified on an established connection.
199+
// In this case, we're forcing a close on the existing stream.
200+
// This is due to either needing to reconnect to satisfy the needs of
201+
// the current request (e.g. to signal a schema change), or because
202+
// a previous request on the stream yielded a transient error and we
203+
// want to reconnect before issuing a subsequent request.
194204
//
195205
// TODO: clean this up once internal issue 205756033 is resolved.
196206
(*ms.arc).CloseSend()
@@ -297,6 +307,11 @@ func (ms *ManagedStream) lockingAppend(requestCtx context.Context, pw *pendingWr
297307
err = (*arc).Send(pw.request)
298308
}
299309
if err != nil {
310+
// Transient connection loss. If we got io.EOF from a send, we want subsequent appends to
311+
// reconnect the network connection for the stream.
312+
if errors.Is(err, io.EOF) {
313+
ms.reconnect = true
314+
}
300315
return 0, err
301316
}
302317
// Compute numRows, once we pass ownership to the channel the request may be

0 commit comments

Comments
 (0)