Client
go/bigquery/storage/managedwriter
Environment
MacOS Monterey 12.3
Windows 10
Go Environment
$ go version: go1.18 darwin/amd64 or go1.17.5 windows/amd64
Code
e.g.
package main
import (
"cloud.google.com/go/bigquery/storage/managedwriter"
"context"
storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"log"
)
func main() {
ctx := context.Background()
ms, client := prepBQStream()
p := prepRows()
res, err := ms.AppendRows(ctx, p)
if err != nil {
log.Fatalf("AppendRows call error: %v", err)
}
_, err = res.GetResult(ctx)
if err != nil {
log.Fatalf("append returned error: %v", err)
}
_, err = ms.Finalize(ctx)
if err != nil {
log.Fatalf("error during Finalize: %v", err)
}
req := &storagepb.BatchCommitWriteStreamsRequest{
Parent: managedwriter.TableParentFromStreamName(ms.StreamName()),
WriteStreams: []string{ms.StreamName()},
}
resp, err := client.BatchCommitWriteStreams(ctx, req)
if err != nil {
log.Fatalf("client.BatchCommit: %v", err)
}
if len(resp.GetStreamErrors()) > 0 {
log.Fatalf("stream errors present: %v", resp.GetStreamErrors())
}
client.Close()
}
func prepRows() [][]byte {
rows := [][]byte{}
a := "akkkkkdkd"
b := "bwefdffdgfg"
g := "gsrertrthh"
datetime := proto.String(time.Now().Format("2006-01-02 15:04:05"))
q := 567865432.74
row := Row{Col1: &a, Col2: &b, Col3: &g, Date: datetime, Qty: &q}
l, err := proto.Marshal(&row)
if err != nil {
log.Fatalf("could not marshal proto %v", err)
}
for i := 0; i < 1000000; i++ {
rows = append(rows, l)
}
return rows
}
func prepBQStream() (*managedwriter.ManagedStream, *managedwriter.Client) {
ctx := context.Background()
client, err := managedwriter.NewClient(ctx, "globus-datahub-dev")
if err != nil {
log.Fatalf("managedwriter.NewClient: %v", err)
}
pendingStream, err := client.CreateWriteStream(ctx, &storagepb.CreateWriteStreamRequest{
Parent: "projects/globus-datahub-dev/datasets/Mashas_sandbox/tables/test",
WriteStream: &storagepb.WriteStream{
Type: storagepb.WriteStream_PENDING,
},
})
if err != nil {
log.Fatalf("CreateWriteStream: %v", err)
}
pf := &Row{}
descriptorProto := protodesc.ToDescriptorProto(pf.ProtoReflect().Descriptor())
managedStream, err := client.NewManagedStream(ctx, managedwriter.WithStreamName(pendingStream.GetName()),
managedwriter.WithSchemaDescriptor(descriptorProto))
if err != nil {
log.Fatalf("NewManagedStream: %v", err)
}
return managedStream, client
}
}
Expected behavior
data is loaded to BQ
Actual behavior
rpc error: code = InvalidArgument desc = Request contains an invalid argument.
which comes from
_, err = res.GetResult(ctx)
if err != nil {
log.Fatalf("append returned error: %v", err)
}
It happend because the list of rows is too big (with more complex data the limit is 100k, when I have less rows it all works). So:
- The error is not clear at all
- The error comes when I want to see a result, which in turn blocks the insert of the next bach, e.g. if I would have this code, as in the example, on the second step insert gets stuck forever, without any error or anything (but funny enough only if I append results).
results := []*managedwriter.AppendResult{}
for i := 0; i < 5; i++ {
p := prepRows()
res, err := ms.AppendRows(ctx, p)
if err != nil {
log.Fatalf("AppendRows call error: %v", err)
}
results = append(results, res)
}
for _, res := range results {
_, err := res.GetResult(ctx)
if err != nil {
log.Fatalf("append returned error: %v", err)
}
}
Client
go/bigquery/storage/managedwriter
Environment
MacOS Monterey 12.3
Windows 10
Go Environment
$ go version: go1.18 darwin/amd64 or go1.17.5 windows/amd64
Code
e.g.
Expected behavior
data is loaded to BQ
Actual behavior
rpc error: code = InvalidArgument desc = Request contains an invalid argument.which comes from
It happend because the list of rows is too big (with more complex data the limit is 100k, when I have less rows it all works). So: