Skip to content

Data race in managedwriter.(*connection).lockingAppend() #9301

@sstemmle

Description

@sstemmle

Client

bigquery/storage (v1.58.0)

Environment

Any

Go Environment

$ go version
1.21.6
$ go env
GO111MODULE=''
GOARCH='amd64'
GOBIN=''
GOCACHE='/root/.cache/go-build'
GOENV='/root/.config/go/env'
GOEXE=''
GOEXPERIMENT=''
GOFLAGS=''
GOHOSTARCH='amd64'
GOHOSTOS='linux'
GOINSECURE=''
GOMODCACHE='/go/pkg/mod'
GONOPROXY=''
GONOSUMDB=''
GOOS='linux'
GOPATH='/go'
GOPRIVATE=''
GOPROXY='https://proxy.golang.org,direct'
GOROOT='/usr/local/go'
GOSUMDB='sum.golang.org'
GOTMPDIR=''
GOTOOLCHAIN='local'
GOTOOLDIR='/usr/local/go/pkg/tool/linux_amd64'
GOVCS=''
GOVERSION='go1.21.6'
GCCGO='gccgo'
GOAMD64='v1'
AR='ar'
CC='gcc'
CXX='g++'
CGO_ENABLED='1'
GOMOD='/dev/null'
GOWORK=''
CGO_CFLAGS='-O2 -g'
CGO_CPPFLAGS=''
CGO_CXXFLAGS='-O2 -g'
CGO_FFLAGS='-O2 -g'
CGO_LDFLAGS='-O2 -g'
PKG_CONFIG='pkg-config'
GOGCCFLAGS='-fPIC -m64 -pthread -Wl,--no-gc-sections -fmessage-length=0 -ffile-prefix-map=/tmp/go-build1646458025=/tmp/go-build -gno-record-gcc-switches'

Code

It happened so far only once in a bigger code base. The below snippet is a simplified version. For this I was not able to reproduce it, but from the stack trace it should be clear what happened.
e.g.

package main

import (
	"context"
	"errors"
	"fmt"
	"sync"
	"time"

	"cloud.google.com/go/bigquery/storage/managedwriter"
	"google.golang.org/api/option"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protodesc"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/descriptorpb"
	"google.golang.org/protobuf/types/dynamicpb"
)

const (
	projectID   = "project"
	dataset     = "dataset"
	tableName   = "test_table"
	keyfilePath = "keyfile.json"
)

func setupStream(msgDesc *descriptorpb.DescriptorProto) (*managedwriter.ManagedStream, func() error, error) {
	ctx := context.Background()
	client, err := managedwriter.NewClient(ctx, projectID, option.WithCredentialsFile(keyfilePath))
	if err != nil {
		return nil, nil, err
	}

	tableStr := fmt.Sprintf("projects/%s/datasets/%s/tables/%s", projectID, dataset, tableName)

	managedStream, err := client.NewManagedStream(ctx, managedwriter.WithDestinationTable(tableStr),
		managedwriter.WithType(managedwriter.DefaultStream),
		managedwriter.WithSchemaDescriptor(msgDesc))
	if err != nil {
		return nil, client.Close, err
	}

	return managedStream, client.Close, nil
}

func main() {
	descrProto, msgDescr, err := getDescriptor()
	if err != nil {
		panic(err)
	}

	stream, cleanup, err := setupStream(descrProto)
	if err != nil {
		panic(err)
	}

	defer cleanup()

	nWriters := 5
	nMsg := 10000

	wg := sync.WaitGroup{}
	wg.Add(nWriters)

	for i := 0; i < nWriters; i++ {
		go func() {
			for j := 0; j < nMsg; j++ {
				msg, _ := getMessage(msgDescr, int64(j))

				res, err := stream.AppendRows(context.Background(), [][]byte{msg})
				if err != nil {
					panic(err)
				}

				_, err = res.GetResult(context.Background())
				if err != nil {
					panic(err)
				}

				if j%100 == 0 {
					time.Sleep(5 * time.Second)
				}
			}
			wg.Done()
		}()
	}

	wg.Wait()
}

func getDescriptor() (*descriptorpb.DescriptorProto, protoreflect.MessageDescriptor, error) {
	descrProto := &descriptorpb.DescriptorProto{
		Name: p("record"),
		Field: []*descriptorpb.FieldDescriptorProto{
			{
				Name:     p("c"),
				Number:   p(int32(1)),
				Label:    p(descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL),
				Type:     p(descriptorpb.FieldDescriptorProto_TYPE_INT64),
				JsonName: p("c"),
			},
		},
	}

	fds := &descriptorpb.FileDescriptorSet{
		File: []*descriptorpb.FileDescriptorProto{
			{
				Name:        p("protos/record.proto"),
				Package:     p("main"),
				MessageType: []*descriptorpb.DescriptorProto{descrProto},
				Syntax:      p("proto3"),
			},
		},
	}

	files, err := protodesc.NewFiles(fds)
	if err != nil {
		return nil, nil, err
	}

	desc, err := files.FindDescriptorByName(protoreflect.FullName("main.record"))
	if err != nil {
		return nil, nil, err
	}

	msgDesc, ok := desc.(protoreflect.MessageDescriptor)
	if !ok {
		return nil, nil, errors.New("unable to assert into MessageDescriptor")
	}

	return descrProto, msgDesc, nil
}

func getMessage(msgDesc protoreflect.MessageDescriptor, c int64) ([]byte, error) {
	rqst := dynamicpb.NewMessage(msgDesc)
	fd := rqst.Descriptor().Fields().ByName(protoreflect.Name("c"))
	rqst.Set(fd, protoreflect.ValueOfInt64(c))

	data, err := proto.Marshal(rqst)
	if err != nil {
		return nil, err
	}

	return data, nil
}

func p[T any](t T) *T {
	p := new(T)
	*p = t
	return p
}

We ran this with the "race" flag of golang.

Expected behavior

No data race is detected.

Actual behavior

go reports the following data race:

==================
WARNING: DATA RACE
Write at 0x00c00278a928 by goroutine 3552:
  cloud.google.com/go/bigquery/storage/managedwriter.(*connection).getStream()
      /.../vendor/cloud.google.com/go/bigquery/storage/managedwriter/connection.go:485 +0x76b
  cloud.google.com/go/bigquery/storage/managedwriter.(*connection).lockingAppend()
      /.../vendor/cloud.google.com/go/bigquery/storage/managedwriter/connection.go:404 +0x851
  cloud.google.com/go/bigquery/storage/managedwriter.(*ManagedStream).appendWithRetry()
      /.../vendor/cloud.google.com/go/bigquery/storage/managedwriter/managed_stream.go:210 +0x2ca
  cloud.google.com/go/bigquery/storage/managedwriter.(*ManagedStream).processRetry()
      /.../vendor/cloud.google.com/go/bigquery/storage/managedwriter/managed_stream.go:368 +0x1a8
  cloud.google.com/go/bigquery/storage/managedwriter.connRecvProcessor()
      /.../vendor/cloud.google.com/go/bigquery/storage/managedwriter/connection.go:537 +0x6d2
  cloud.google.com/go/bigquery/storage/managedwriter.(*connectionPool).openWithRetry.func1()
      /.../vendor/cloud.google.com/go/bigquery/storage/managedwriter/connection.go:172 +0xdc

Previous read at 0x00c00278a928 by goroutine 3550:
  cloud.google.com/go/bigquery/storage/managedwriter.(*connection).lockingAppend.func2()
      /.../vendor/cloud.google.com/go/bigquery/storage/managedwriter/connection.go:447 +0x5e
  cloud.google.com/go/bigquery/storage/managedwriter.(*connection).lockingAppend.func1()
      /.../cloud.google.com/go/bigquery/storage/managedwriter/connection.go:367 +0x81
  runtime.deferreturn()
      /usr/local/go/src/runtime/panic.go:477 +0x30
  cloud.google.com/go/bigquery/storage/managedwriter.(*ManagedStream).appendWithRetry()
      /.../vendor/cloud.google.com/go/bigquery/storage/managedwriter/managed_stream.go:210 +0x2ca
  cloud.google.com/go/bigquery/storage/managedwriter.(*ManagedStream).AppendRows.func1()
      /.../vendor/cloud.google.com/go/bigquery/storage/managedwriter/managed_stream.go:322 +0xcb

Goroutine 3552 (running) created at:
  cloud.google.com/go/bigquery/storage/managedwriter.(*connectionPool).openWithRetry()
      /.../vendor/cloud.google.com/go/bigquery/storage/managedwriter/connection.go:172 +0xc3d
  cloud.google.com/go/bigquery/storage/managedwriter.(*connection).getStream()
      /.../vendor/cloud.google.com/go/bigquery/storage/managedwriter/connection.go:493 +0x965
  cloud.google.com/go/bigquery/storage/managedwriter.(*connection).lockingAppend()
      /.../vendor/cloud.google.com/go/bigquery/storage/managedwriter/connection.go:404 +0x851
  cloud.google.com/go/bigquery/storage/managedwriter.(*ManagedStream).appendWithRetry()
      /.../vendor/cloud.google.com/go/bigquery/storage/managedwriter/managed_stream.go:210 +0x2ca
  cloud.google.com/go/bigquery/storage/managedwriter.(*ManagedStream).AppendRows.func1()
      /.../vendor/cloud.google.com/go/bigquery/storage/managedwriter/managed_stream.go:322 +0xcb

Goroutine 3550 (finished) created at:
  cloud.google.com/go/bigquery/storage/managedwriter.(*ManagedStream).AppendRows()
      /.../vendor/cloud.google.com/go/bigquery/storage/managedwriter/managed_stream.go:320 +0x7db

See
https://github.com/googleapis/google-cloud-go/blob/bigquery/v1.58.0/bigquery/storage/managedwriter/connection.go#L485
https://github.com/googleapis/google-cloud-go/blob/bigquery/v1.58.0/bigquery/storage/managedwriter/connection.go#L447

Metadata

Metadata

Assignees

Labels

api: bigqueryIssues related to the BigQuery API.priority: p3Desirable enhancement or fix. May not be included in next release.type: bugError or flaw in code with unintended results or allowing sub-optimal usage patterns.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions