Skip to content

bigquery: storage/managedwriter: wrong error at the wrong place for large batch inserts #6321

@maffka123

Description

@maffka123

Client

go/bigquery/storage/managedwriter

Environment

MacOS Monterey 12.3
Windows 10

Go Environment

$ go version: go1.18 darwin/amd64 or go1.17.5 windows/amd64

Code

e.g.

package main

import (
	"cloud.google.com/go/bigquery/storage/managedwriter"
	"context"
	storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"log"
)

func main() {
	ctx := context.Background()
	ms, client := prepBQStream()
	p := prepRows()
	res, err := ms.AppendRows(ctx, p)
	if err != nil {
		log.Fatalf("AppendRows call error: %v", err)
	}
	_, err = res.GetResult(ctx)
	if err != nil {
		log.Fatalf("append returned error: %v", err)
	}
	_, err = ms.Finalize(ctx)
	if err != nil {
		log.Fatalf("error during Finalize: %v", err)
	}
	req := &storagepb.BatchCommitWriteStreamsRequest{
		Parent:       managedwriter.TableParentFromStreamName(ms.StreamName()),
		WriteStreams: []string{ms.StreamName()},
	}

	resp, err := client.BatchCommitWriteStreams(ctx, req)
	if err != nil {
		log.Fatalf("client.BatchCommit: %v", err)
	}
	if len(resp.GetStreamErrors()) > 0 {
		log.Fatalf("stream errors present: %v", resp.GetStreamErrors())
	}
	client.Close()

}

func prepRows() [][]byte {
	rows := [][]byte{}
	a := "akkkkkdkd"
	b := "bwefdffdgfg"
	g := "gsrertrthh"
	datetime := proto.String(time.Now().Format("2006-01-02 15:04:05"))
	q := 567865432.74

	row := Row{Col1: &a, Col2: &b, Col3: &g, Date: datetime, Qty: &q}
	l, err := proto.Marshal(&row)
	if err != nil {
		log.Fatalf("could not marshal proto %v", err)
	}

	for i := 0; i < 1000000; i++ {
		rows = append(rows, l)
	}
	return rows
}

func prepBQStream() (*managedwriter.ManagedStream, *managedwriter.Client) {
	ctx := context.Background()
	client, err := managedwriter.NewClient(ctx, "globus-datahub-dev")
	if err != nil {
		log.Fatalf("managedwriter.NewClient: %v", err)
	}

	pendingStream, err := client.CreateWriteStream(ctx, &storagepb.CreateWriteStreamRequest{
		Parent: "projects/globus-datahub-dev/datasets/Mashas_sandbox/tables/test",
		WriteStream: &storagepb.WriteStream{
			Type: storagepb.WriteStream_PENDING,
		},
	})
	if err != nil {
		log.Fatalf("CreateWriteStream: %v", err)
	}

	pf := &Row{}
	descriptorProto := protodesc.ToDescriptorProto(pf.ProtoReflect().Descriptor())

	managedStream, err := client.NewManagedStream(ctx, managedwriter.WithStreamName(pendingStream.GetName()),
		managedwriter.WithSchemaDescriptor(descriptorProto))
	if err != nil {
		log.Fatalf("NewManagedStream: %v", err)
	}
	return managedStream, client
}
}

Expected behavior

data is loaded to BQ

Actual behavior

rpc error: code = InvalidArgument desc = Request contains an invalid argument.

which comes from

	_, err = res.GetResult(ctx)
	if err != nil {
		log.Fatalf("append returned error: %v", err)
	}

It happend because the list of rows is too big (with more complex data the limit is 100k, when I have less rows it all works). So:

  1. The error is not clear at all
  2. The error comes when I want to see a result, which in turn blocks the insert of the next bach, e.g. if I would have this code, as in the example, on the second step insert gets stuck forever, without any error or anything (but funny enough only if I append results).
	results := []*managedwriter.AppendResult{}
	for i := 0; i < 5; i++ {
		p := prepRows()
		res, err := ms.AppendRows(ctx, p)
		if err != nil {
			log.Fatalf("AppendRows call error: %v", err)
		}
		results = append(results, res)
	}

	for _, res := range results {
		_, err := res.GetResult(ctx)
		if err != nil {
			log.Fatalf("append returned error: %v", err)
		}
	}

Metadata

Metadata

Assignees

Labels

api: bigqueryIssues related to the BigQuery API.priority: p2Moderately-important priority. Fix may not be included in next release.type: bugError or flaw in code with unintended results or allowing sub-optimal usage patterns.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions