Skip to content

Commit eedc632

Browse files
authored
feat(bigquery): add trace instrumentation support for individual rpcs (#6493)
* feat(bigquery): add trace instrumentation support for individual rpcs
1 parent e42772c commit eedc632

File tree

8 files changed

+156
-2
lines changed

8 files changed

+156
-2
lines changed

bigquery/bigquery.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"cloud.google.com/go/bigquery/internal"
2727
cloudinternal "cloud.google.com/go/internal"
2828
"cloud.google.com/go/internal/detect"
29+
"cloud.google.com/go/internal/trace"
2930
"cloud.google.com/go/internal/version"
3031
gax "github.com/googleapis/gax-go/v2"
3132
bq "google.golang.org/api/bigquery/v2"
@@ -119,7 +120,9 @@ func (c *Client) insertJob(ctx context.Context, job *bq.Job, media io.Reader) (*
119120
var res *bq.Job
120121
var err error
121122
invoke := func() error {
123+
sCtx := trace.StartSpan(ctx, "bigquery.jobs.insert")
122124
res, err = call.Do()
125+
trace.EndSpan(sCtx, err)
123126
return err
124127
}
125128
// A job with a client-generated ID can be retried; the presence of the
@@ -149,7 +152,9 @@ func (c *Client) runQuery(ctx context.Context, queryRequest *bq.QueryRequest) (*
149152
var res *bq.QueryResponse
150153
var err error
151154
invoke := func() error {
155+
sCtx := trace.StartSpan(ctx, "bigquery.jobs.query")
152156
res, err = call.Do()
157+
trace.EndSpan(sCtx, err)
153158
return err
154159
}
155160

bigquery/dataset.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,12 @@ func (d *Dataset) deleteInternal(ctx context.Context, deleteContents bool) (err
216216

217217
call := d.c.bqs.Datasets.Delete(d.ProjectID, d.DatasetID).Context(ctx).DeleteContents(deleteContents)
218218
setClientHeader(call.Header())
219-
return call.Do()
219+
return runWithRetry(ctx, func() (err error) {
220+
sCtx := trace.StartSpan(ctx, "bigquery.datasets.delete")
221+
err = call.Do()
222+
trace.EndSpan(sCtx, err)
223+
return err
224+
})
220225
}
221226

222227
// Metadata fetches the metadata for the dataset.
@@ -228,7 +233,9 @@ func (d *Dataset) Metadata(ctx context.Context) (md *DatasetMetadata, err error)
228233
setClientHeader(call.Header())
229234
var ds *bq.Dataset
230235
if err := runWithRetry(ctx, func() (err error) {
236+
sCtx := trace.StartSpan(ctx, "bigquery.datasets.get")
231237
ds, err = call.Do()
238+
trace.EndSpan(sCtx, err)
232239
return err
233240
}); err != nil {
234241
return nil, err
@@ -284,7 +291,9 @@ func (d *Dataset) Update(ctx context.Context, dm DatasetMetadataToUpdate, etag s
284291
}
285292
var ds2 *bq.Dataset
286293
if err := runWithRetry(ctx, func() (err error) {
294+
sCtx := trace.StartSpan(ctx, "bigquery.datasets.patch")
287295
ds2, err = call.Do()
296+
trace.EndSpan(sCtx, err)
288297
return err
289298
}); err != nil {
290299
return nil, err
@@ -391,7 +400,9 @@ var listTables = func(it *TableIterator, pageSize int, pageToken string) (*bq.Ta
391400
}
392401
var res *bq.TableList
393402
err := runWithRetry(it.ctx, func() (err error) {
403+
sCtx := trace.StartSpan(it.ctx, "bigquery.tables.list")
394404
res, err = call.Do()
405+
trace.EndSpan(sCtx, err)
395406
return err
396407
})
397408
return res, err
@@ -476,7 +487,9 @@ var listModels = func(it *ModelIterator, pageSize int, pageToken string) (*bq.Li
476487
}
477488
var res *bq.ListModelsResponse
478489
err := runWithRetry(it.ctx, func() (err error) {
490+
sCtx := trace.StartSpan(it.ctx, "bigquery.models.list")
479491
res, err = call.Do()
492+
trace.EndSpan(sCtx, err)
480493
return err
481494
})
482495
return res, err
@@ -563,7 +576,9 @@ var listRoutines = func(it *RoutineIterator, pageSize int, pageToken string) (*b
563576
}
564577
var res *bq.ListRoutinesResponse
565578
err := runWithRetry(it.ctx, func() (err error) {
579+
sCtx := trace.StartSpan(it.ctx, "bigquery.routines.list")
566580
res, err = call.Do()
581+
trace.EndSpan(sCtx, err)
567582
return err
568583
})
569584
return res, err
@@ -667,7 +682,9 @@ var listDatasets = func(it *DatasetIterator, pageSize int, pageToken string) (*b
667682
}
668683
var res *bq.DatasetList
669684
err := runWithRetry(it.ctx, func() (err error) {
685+
sCtx := trace.StartSpan(it.ctx, "bigquery.datasets.list")
670686
res, err = call.Do()
687+
trace.EndSpan(sCtx, err)
671688
return err
672689
})
673690
return res, err

bigquery/inserter.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,9 @@ func (u *Inserter) putMulti(ctx context.Context, src []ValueSaver) error {
182182
setClientHeader(call.Header())
183183
var res *bq.TableDataInsertAllResponse
184184
err = runWithRetry(ctx, func() (err error) {
185+
ctx = trace.StartSpan(ctx, "bigquery.tabledata.insertAll")
185186
res, err = call.Do()
187+
trace.EndSpan(ctx, err)
186188
return err
187189
})
188190
if err != nil {

bigquery/job.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,9 @@ func (j *Job) Cancel(ctx context.Context) error {
240240
Context(ctx)
241241
setClientHeader(call.Header())
242242
return runWithRetry(ctx, func() error {
243+
sCtx := trace.StartSpan(ctx, "bigquery.jobs.cancel")
243244
_, err := call.Do()
245+
trace.EndSpan(sCtx, err)
244246
return err
245247
})
246248
}
@@ -257,7 +259,9 @@ func (j *Job) Delete(ctx context.Context) (err error) {
257259
setClientHeader(call.Header())
258260

259261
return runWithRetry(ctx, func() (err error) {
262+
sCtx := trace.StartSpan(ctx, "bigquery.jobs.delete")
260263
err = call.Do()
264+
trace.EndSpan(sCtx, err)
261265
return err
262266
})
263267
}
@@ -343,7 +347,9 @@ func (j *Job) waitForQuery(ctx context.Context, projectID string) (Schema, uint6
343347
}
344348
var res *bq.GetQueryResultsResponse
345349
err := internal.Retry(ctx, backoff, func() (stop bool, err error) {
350+
sCtx := trace.StartSpan(ctx, "bigquery.jobs.getQueryResults")
346351
res, err = call.Do()
352+
trace.EndSpan(sCtx, err)
347353
if err != nil {
348354
return !retryableError(err, jobRetryReasons), err
349355
}
@@ -837,7 +843,14 @@ func (it *JobIterator) fetch(pageSize int, pageToken string) (string, error) {
837843
if it.ParentJobID != "" {
838844
req.ParentJobId(it.ParentJobID)
839845
}
840-
res, err := req.Do()
846+
var res *bq.JobList
847+
err := runWithRetry(it.ctx, func() (err error) {
848+
sCtx := trace.StartSpan(it.ctx, "bigquery.jobs.list")
849+
res, err = req.Do()
850+
trace.EndSpan(sCtx, err)
851+
return err
852+
})
853+
841854
if err != nil {
842855
return "", err
843856
}
@@ -870,7 +883,9 @@ func (c *Client) getJobInternal(ctx context.Context, jobID, location, projectID
870883
}
871884
setClientHeader(call.Header())
872885
err := runWithRetry(ctx, func() (err error) {
886+
sCtx := trace.StartSpan(ctx, "bigquery.jobs.get")
873887
job, err = call.Do()
888+
trace.EndSpan(sCtx, err)
874889
return err
875890
})
876891
if err != nil {

bigquery/model.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,9 @@ func (m *Model) Metadata(ctx context.Context) (mm *ModelMetadata, err error) {
8686
setClientHeader(req.Header())
8787
var model *bq.Model
8888
err = runWithRetry(ctx, func() (err error) {
89+
ctx = trace.StartSpan(ctx, "bigquery.models.get")
8990
model, err = req.Do()
91+
trace.EndSpan(ctx, err)
9092
return err
9193
})
9294
if err != nil {
@@ -111,7 +113,9 @@ func (m *Model) Update(ctx context.Context, mm ModelMetadataToUpdate, etag strin
111113
}
112114
var res *bq.Model
113115
if err := runWithRetry(ctx, func() (err error) {
116+
ctx = trace.StartSpan(ctx, "bigquery.models.patch")
114117
res, err = call.Do()
118+
trace.EndSpan(ctx, err)
115119
return err
116120
}); err != nil {
117121
return nil, err

bigquery/routine.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,9 @@ func (r *Routine) Metadata(ctx context.Context) (rm *RoutineMetadata, err error)
9797
setClientHeader(req.Header())
9898
var routine *bq.Routine
9999
err = runWithRetry(ctx, func() (err error) {
100+
ctx = trace.StartSpan(ctx, "bigquery.routines.get")
100101
routine, err = req.Do()
102+
trace.EndSpan(ctx, err)
101103
return err
102104
})
103105
if err != nil {
@@ -129,7 +131,9 @@ func (r *Routine) Update(ctx context.Context, upd *RoutineMetadataToUpdate, etag
129131
}
130132
var res *bq.Routine
131133
if err := runWithRetry(ctx, func() (err error) {
134+
ctx = trace.StartSpan(ctx, "bigquery.routines.update")
132135
res, err = call.Do()
136+
trace.EndSpan(ctx, err)
133137
return err
134138
}); err != nil {
135139
return nil, err

bigquery/table.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -581,7 +581,9 @@ func (t *Table) Create(ctx context.Context, tm *TableMetadata) (err error) {
581581
req := t.c.bqs.Tables.Insert(t.ProjectID, t.DatasetID, table).Context(ctx)
582582
setClientHeader(req.Header())
583583
return runWithRetry(ctx, func() (err error) {
584+
ctx = trace.StartSpan(ctx, "bigquery.tables.insert")
584585
_, err = req.Do()
586+
trace.EndSpan(ctx, err)
585587
return err
586588
})
587589
}
@@ -716,7 +718,9 @@ func (t *Table) Metadata(ctx context.Context, opts ...TableMetadataOption) (md *
716718
setClientHeader(tgc.call.Header())
717719
var res *bq.Table
718720
if err := runWithRetry(ctx, func() (err error) {
721+
sCtx := trace.StartSpan(ctx, "bigquery.tables.get")
719722
res, err = tgc.call.Do()
723+
trace.EndSpan(sCtx, err)
720724
return err
721725
}); err != nil {
722726
return nil, err
@@ -783,7 +787,9 @@ func (t *Table) Delete(ctx context.Context) (err error) {
783787
setClientHeader(call.Header())
784788

785789
return runWithRetry(ctx, func() (err error) {
790+
ctx = trace.StartSpan(ctx, "bigquery.tables.delete")
786791
err = call.Do()
792+
trace.EndSpan(ctx, err)
787793
return err
788794
})
789795
}
@@ -841,7 +847,9 @@ func (t *Table) Update(ctx context.Context, tm TableMetadataToUpdate, etag strin
841847
}
842848
var res *bq.Table
843849
if err := runWithRetry(ctx, func() (err error) {
850+
ctx = trace.StartSpan(ctx, "bigquery.tables.patch")
844851
res, err = tpc.call.Do()
852+
trace.EndSpan(ctx, err)
845853
return err
846854
}); err != nil {
847855
return nil, err

bigquery/trace_integration_test.go

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
// Copyright 2022 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package bigquery
16+
17+
import (
18+
"context"
19+
"strings"
20+
"testing"
21+
"time"
22+
23+
"go.opencensus.io/trace"
24+
)
25+
26+
// testExporter is a testing exporter for validating captured spans.
27+
type testExporter struct {
28+
spans []*trace.SpanData
29+
}
30+
31+
func (te *testExporter) ExportSpan(s *trace.SpanData) {
32+
te.spans = append(te.spans, s)
33+
}
34+
35+
// hasSpans checks that the exporter has all the span names
36+
// specified in the slice. It returns the unmatched names.
37+
func (te *testExporter) hasSpans(names []string) []string {
38+
matches := make(map[string]struct{})
39+
for _, n := range names {
40+
matches[n] = struct{}{}
41+
}
42+
for _, s := range te.spans {
43+
delete(matches, s.Name)
44+
}
45+
var unmatched []string
46+
for k := range matches {
47+
unmatched = append(unmatched, k)
48+
}
49+
return unmatched
50+
}
51+
52+
func TestIntegration_Tracing(t *testing.T) {
53+
if client == nil {
54+
t.Skip("Integration tests skipped")
55+
}
56+
57+
ctx := context.Background()
58+
59+
for _, tc := range []struct {
60+
description string
61+
callF func(ctx context.Context)
62+
wantSpans []string
63+
}{
64+
{
65+
description: "fast path query",
66+
callF: func(ctx context.Context) {
67+
client.Query("SELECT SESSION_USER()").Read(ctx)
68+
},
69+
wantSpans: []string{"bigquery.jobs.query", "cloud.google.com/go/bigquery.Query.Run"},
70+
},
71+
{
72+
description: "slow path query",
73+
callF: func(ctx context.Context) {
74+
q := client.Query("SELECT SESSION_USER()")
75+
q.JobTimeout = time.Hour
76+
q.Read(ctx)
77+
},
78+
wantSpans: []string{"bigquery.jobs.insert", "bigquery.jobs.getQueryResults", "cloud.google.com/go/bigquery.Job.Read", "cloud.google.com/go/bigquery.Query.Run"},
79+
},
80+
{
81+
description: "table metadata",
82+
callF: func(ctx context.Context) {
83+
client.DatasetInProject("bigquery-public-data", "samples").Table("shakespeare").Metadata(ctx)
84+
},
85+
wantSpans: []string{"bigquery.tables.get", "cloud.google.com/go/bigquery.Table.Metadata"},
86+
},
87+
} {
88+
exporter := &testExporter{}
89+
trace.RegisterExporter(exporter)
90+
traceCtx, span := trace.StartSpan(ctx, "testspan", trace.WithSampler(trace.AlwaysSample()))
91+
tc.callF(traceCtx)
92+
span.End()
93+
trace.UnregisterExporter(exporter)
94+
95+
if unmatched := exporter.hasSpans(tc.wantSpans); len(unmatched) > 0 {
96+
t.Errorf("case (%s): unmatched spans: %s", tc.description, strings.Join(unmatched, ","))
97+
}
98+
}
99+
}

0 commit comments

Comments
 (0)