Skip to content

Commit c8ae2e2

Browse files
authored
fix(bigtable): refactor bigtable.go into separate file path per interfaces (#13818)
1 parent 1127141 commit c8ae2e2

7 files changed

Lines changed: 1380 additions & 1215 deletions

File tree

bigtable/apply_bulk.go

Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
// Copyright 2026 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package bigtable
16+
17+
import (
18+
"context"
19+
"errors"
20+
"fmt"
21+
"io"
22+
23+
btpb "cloud.google.com/go/bigtable/apiv2/bigtablepb"
24+
"cloud.google.com/go/internal/trace"
25+
gax "github.com/googleapis/gax-go/v2"
26+
"google.golang.org/grpc/codes"
27+
"google.golang.org/grpc/metadata"
28+
"google.golang.org/grpc/status"
29+
"google.golang.org/protobuf/proto"
30+
)
31+
32+
// ApplyBulk applies multiple Mutations.
33+
// Each mutation is individually applied atomically,
34+
// but the set of mutations may be applied in any order.
35+
//
36+
// Two types of failures may occur. If the entire process
37+
// fails, (nil, err) will be returned. If specific mutations
38+
// fail to apply, ([]err, nil) will be returned, and the errors
39+
// will correspond to the relevant rowKeys/muts arguments.
40+
//
41+
// Conditional mutations cannot be applied in bulk and providing one will result in an error.
42+
func (t *Table) ApplyBulk(ctx context.Context, rowKeys []string, muts []*Mutation, opts ...ApplyOption) (errs []error, err error) {
43+
ctx = mergeOutgoingMetadata(ctx, t.md)
44+
ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigtable/ApplyBulk")
45+
defer func() { trace.EndSpan(ctx, err) }()
46+
47+
if len(rowKeys) != len(muts) {
48+
return nil, fmt.Errorf("mismatched rowKeys and mutation array lengths: %d, %d", len(rowKeys), len(muts))
49+
}
50+
51+
origEntries := make([]*entryErr, len(rowKeys))
52+
for i, key := range rowKeys {
53+
mut := muts[i]
54+
if mut.isConditional {
55+
return nil, errors.New("conditional mutations cannot be applied in bulk")
56+
}
57+
origEntries[i] = &entryErr{Entry: &btpb.MutateRowsRequest_Entry{RowKey: []byte(key), Mutations: mut.ops}}
58+
}
59+
60+
var firstGroupErr error
61+
numFailed := 0
62+
groups := groupEntries(origEntries, maxMutations)
63+
for _, group := range groups {
64+
err := t.applyGroup(ctx, group, opts...)
65+
if err != nil {
66+
if firstGroupErr == nil {
67+
firstGroupErr = err
68+
}
69+
numFailed++
70+
}
71+
}
72+
73+
if numFailed == len(groups) {
74+
return nil, firstGroupErr
75+
}
76+
77+
// All the errors are accumulated into an array and returned, interspersed with nils for successful
78+
// entries. The absence of any errors means we should return nil.
79+
var foundErr bool
80+
for _, entry := range origEntries {
81+
if entry.Err == nil && entry.TopLevelErr != nil {
82+
// Populate per mutation error if top level error is not nil
83+
entry.Err = entry.TopLevelErr
84+
}
85+
if entry.Err != nil {
86+
foundErr = true
87+
}
88+
errs = append(errs, entry.Err)
89+
}
90+
if foundErr {
91+
return errs, nil
92+
}
93+
return nil, nil
94+
}
95+
96+
func (t *Table) applyGroup(ctx context.Context, group []*entryErr, opts ...ApplyOption) (err error) {
97+
attrMap := make(map[string]interface{})
98+
mt := t.newBuiltinMetricsTracer(ctx, true)
99+
defer mt.recordOperationCompletion()
100+
101+
err = gaxInvokeWithRecorder(ctx, mt, "MutateRows", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error {
102+
attrMap["rowCount"] = len(group)
103+
trace.TracePrintf(ctx, attrMap, "Row count in ApplyBulk")
104+
err := t.doApplyBulk(ctx, group, headerMD, trailerMD, opts...)
105+
if err != nil {
106+
// We want to retry the entire request with the current group
107+
return err
108+
}
109+
// Get the entries that need to be retried
110+
group = t.getApplyBulkRetries(group)
111+
if len(group) > 0 && len(idempotentRetryCodes) > 0 {
112+
// We have at least one mutation that needs to be retried.
113+
// Return an arbitrary error that is retryable according to callOptions.
114+
return status.Errorf(idempotentRetryCodes[0], "Synthetic error: partial failure of ApplyBulk")
115+
}
116+
return nil
117+
}, t.c.retryOption)
118+
119+
statusCode, statusErr := convertToGrpcStatusErr(err)
120+
mt.setCurrOpStatus(statusCode)
121+
return statusErr
122+
}
123+
124+
// getApplyBulkRetries returns the entries that need to be retried
125+
func (t *Table) getApplyBulkRetries(entries []*entryErr) []*entryErr {
126+
var retryEntries []*entryErr
127+
for _, entry := range entries {
128+
err := entry.Err
129+
if err != nil && isIdempotentRetryCode[status.Code(err)] && mutationsAreRetryable(entry.Entry.Mutations) {
130+
// There was an error and the entry is retryable.
131+
retryEntries = append(retryEntries, entry)
132+
}
133+
}
134+
return retryEntries
135+
}
136+
137+
// doApplyBulk does the work of a single ApplyBulk invocation
138+
func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, headerMD, trailerMD *metadata.MD, opts ...ApplyOption) error {
139+
after := func(res proto.Message) {
140+
for _, o := range opts {
141+
o.after(res)
142+
}
143+
}
144+
145+
var topLevelErr error
146+
defer func() {
147+
populateTopLevelError(entryErrs, topLevelErr)
148+
}()
149+
150+
entries := make([]*btpb.MutateRowsRequest_Entry, len(entryErrs))
151+
for i, entryErr := range entryErrs {
152+
entries[i] = entryErr.Entry
153+
}
154+
req := &btpb.MutateRowsRequest{
155+
AppProfileId: t.c.appProfile,
156+
Entries: entries,
157+
}
158+
if t.authorizedView == "" {
159+
req.TableName = t.c.fullTableName(t.table)
160+
} else {
161+
req.AuthorizedViewName = t.c.fullAuthorizedViewName(t.table, t.authorizedView)
162+
}
163+
164+
stream, err := t.c.client.MutateRows(ctx, req)
165+
if err != nil {
166+
_, topLevelErr = convertToGrpcStatusErr(err)
167+
return err
168+
}
169+
170+
// Ignore error since header is only being used to record builtin metrics
171+
// Failure to record metrics should not fail the operation
172+
*headerMD, _ = stream.Header()
173+
for {
174+
res, err := stream.Recv()
175+
if err == io.EOF {
176+
*trailerMD = stream.Trailer()
177+
break
178+
}
179+
if err != nil {
180+
*trailerMD = stream.Trailer()
181+
_, topLevelErr = convertToGrpcStatusErr(err)
182+
return err
183+
}
184+
185+
for _, entry := range res.Entries {
186+
s := entry.Status
187+
if s.Code == int32(codes.OK) {
188+
entryErrs[entry.Index].Err = nil
189+
} else {
190+
entryErrs[entry.Index].Err = status.Error(codes.Code(s.Code), s.Message)
191+
}
192+
}
193+
after(res)
194+
}
195+
return nil
196+
}
197+
198+
func populateTopLevelError(entries []*entryErr, topLevelErr error) {
199+
for _, entry := range entries {
200+
entry.TopLevelErr = topLevelErr
201+
}
202+
}
203+
204+
// groupEntries groups entries into groups of a specified size without breaking up
205+
// individual entries.
206+
func groupEntries(entries []*entryErr, maxSize int) [][]*entryErr {
207+
var (
208+
res [][]*entryErr
209+
start int
210+
gmuts int
211+
)
212+
addGroup := func(end int) {
213+
if end-start > 0 {
214+
res = append(res, entries[start:end])
215+
start = end
216+
gmuts = 0
217+
}
218+
}
219+
for i, e := range entries {
220+
emuts := len(e.Entry.Mutations)
221+
if gmuts+emuts > maxSize {
222+
addGroup(i)
223+
}
224+
gmuts += emuts
225+
}
226+
addGroup(len(entries))
227+
return res
228+
}
229+
230+
// entryErr is a container that combines an entry with the error that was returned for it.
231+
// Err may be nil if no error was returned for the Entry, or if the Entry has not yet been processed.
232+
type entryErr struct {
233+
Entry *btpb.MutateRowsRequest_Entry
234+
Err error
235+
236+
// TopLevelErr is the error received either from
237+
// 1. client.MutateRows
238+
// 2. stream.Recv
239+
TopLevelErr error
240+
}

0 commit comments

Comments
 (0)