Skip to content

Commit 9af047f

Browse files
authored
feat(bigtable): add pacemaker in connpool (#13862)
1 parent 6b69ee3 commit 9af047f

3 files changed

Lines changed: 215 additions & 0 deletions

File tree

bigtable/internal/transport/connpool.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,11 @@ func NewBigtableChannelPool(ctx context.Context, connPoolSize int, strategy btop
434434
} else {
435435
btopt.Debugf(pool.logger, "bigtable_connpool: failed to create metrics reporter: %v\n", err)
436436
}
437+
438+
// Initialize and register the Pacemaker
439+
pacemaker := NewPacemaker(pool.meterProvider, pool.logger)
440+
pool.monitors = append(pool.monitors, pacemaker)
441+
437442
pool.startMonitors()
438443

439444
// record the client startup time
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
// Copyright 2026 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package internal
16+
17+
import (
18+
"context"
19+
"log"
20+
"time"
21+
22+
btopt "cloud.google.com/go/bigtable/internal/option"
23+
"go.opentelemetry.io/otel/attribute"
24+
"go.opentelemetry.io/otel/metric"
25+
)
26+
27+
// Pacemaker monitors the runtime scheduling delay
28+
// It measures the time difference between when a ticker was scheduled to fire
29+
// and when the ticker actually fires.
30+
type Pacemaker struct {
31+
meterProvider metric.MeterProvider
32+
logger *log.Logger
33+
histogram metric.Float64Histogram
34+
attrs metric.MeasurementOption
35+
}
36+
37+
// NewPacemaker creates a new Pacemaker and initializes its metrics.
38+
func NewPacemaker(mp metric.MeterProvider, logger *log.Logger) *Pacemaker {
39+
pm := &Pacemaker{
40+
meterProvider: mp,
41+
logger: logger,
42+
attrs: metric.WithAttributes(attribute.String("executor", "goroutine")),
43+
}
44+
45+
if mp == nil {
46+
return pm
47+
}
48+
49+
// create meter
50+
meter := mp.Meter(clientMeterName)
51+
var err error
52+
// Buckets in microseconds (us).
53+
// Ranges cover: 0us, 100us, 500us, 1ms(1k), 2ms(2k), 5ms(5k), 10ms(10k),
54+
// 50ms(50k), 100ms(100k), 500ms(500k), 1s(1M).
55+
bounds := []float64{0, 100, 500, 1000, 2000, 5000, 10000, 50000, 100000, 500000, 1000000}
56+
57+
pm.histogram, err = meter.Float64Histogram(
58+
"pacemaker_delays",
59+
metric.WithDescription("Distribution of delays between the scheduled time and actual execution time of the pacemaker goroutine."),
60+
metric.WithUnit("us"),
61+
metric.WithExplicitBucketBoundaries(bounds...),
62+
)
63+
if err != nil {
64+
btopt.Debugf(logger, "bigtable_connpool: failed to create pacemaker metric: %v", err)
65+
}
66+
67+
return pm
68+
}
69+
70+
// Start begins the pacemaker ticker.
71+
func (p *Pacemaker) Start(ctx context.Context) {
72+
if p.histogram == nil {
73+
btopt.Debugf(p.logger, "bigtable_connpool: Pacemaker skipped (no histogram initialized)")
74+
return
75+
}
76+
77+
go func() {
78+
interval := 100 * time.Millisecond
79+
ticker := time.NewTicker(interval)
80+
defer ticker.Stop()
81+
82+
lastTick := time.Now()
83+
84+
for {
85+
select {
86+
case t := <-ticker.C:
87+
actualInterval := t.Sub(lastTick)
88+
89+
delay := actualInterval - interval
90+
if delay < 0 {
91+
delay = 0
92+
}
93+
94+
delayUs := float64(delay.Nanoseconds()) / 1e3
95+
p.histogram.Record(ctx, delayUs, p.attrs)
96+
97+
lastTick = t
98+
99+
case <-ctx.Done():
100+
return
101+
}
102+
}
103+
}()
104+
}
105+
106+
// Stop acts as a cleanup method. no-op
107+
func (p *Pacemaker) Stop() {
108+
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
// Copyright 2026 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package internal
16+
17+
import (
18+
"context"
19+
"io"
20+
"log"
21+
"testing"
22+
"time"
23+
24+
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
25+
"go.opentelemetry.io/otel/sdk/metric/metricdata"
26+
)
27+
28+
// helper to find a metric in the collected resource metrics
29+
func findPacemakerMetric(rm metricdata.ResourceMetrics, name string) (metricdata.Metrics, bool) {
30+
for _, sm := range rm.ScopeMetrics {
31+
for _, m := range sm.Metrics {
32+
if m.Name == name {
33+
return m, true
34+
}
35+
}
36+
}
37+
return metricdata.Metrics{}, false
38+
}
39+
40+
func TestPacemakerExporting(t *testing.T) {
41+
reader := sdkmetric.NewManualReader()
42+
provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))
43+
44+
logger := log.New(io.Discard, "", 0)
45+
pm := NewPacemaker(provider, logger)
46+
47+
ctx, cancel := context.WithCancel(context.Background())
48+
defer cancel()
49+
50+
pm.Start(ctx)
51+
52+
// 4. Wait for ticks
53+
// The pacemaker ticks every 100ms. Waiting 250ms ensures we capture at least 2 ticks.
54+
time.Sleep(250 * time.Millisecond)
55+
56+
var rm metricdata.ResourceMetrics
57+
if err := reader.Collect(ctx, &rm); err != nil {
58+
t.Fatalf("Failed to collect metrics: %v", err)
59+
}
60+
61+
metric, ok := findPacemakerMetric(rm, "pacemaker_delays")
62+
if !ok {
63+
t.Fatalf("Metric 'pacemaker_delays' not found in exported metrics")
64+
}
65+
66+
if metric.Unit != "us" {
67+
t.Errorf("Metric unit mismatch: got %q, want 'us'", metric.Unit)
68+
}
69+
70+
hist, ok := metric.Data.(metricdata.Histogram[float64])
71+
if !ok {
72+
t.Fatalf("Metric data type mismatch: expected Histogram[float64], got %T", metric.Data)
73+
}
74+
75+
// 9. Verify Data Points
76+
// We expect the total count of recorded values to be at least 1
77+
var totalCount uint64
78+
for _, dp := range hist.DataPoints {
79+
totalCount += dp.Count
80+
// Check for the "executor" attribute
81+
foundExecutor := false
82+
for _, attr := range dp.Attributes.ToSlice() {
83+
if attr.Key == "executor" {
84+
if attr.Value.AsString() == "goroutine" {
85+
foundExecutor = true
86+
} else {
87+
t.Errorf("Unexpected attribute value for 'executor': got %q, want 'goroutine'", attr.Value.AsString())
88+
}
89+
}
90+
}
91+
if !foundExecutor {
92+
t.Errorf("Data point missing 'executor' attribute")
93+
}
94+
}
95+
96+
if totalCount < 1 {
97+
t.Errorf("Expected at least 1 recorded data points, got %d", totalCount)
98+
}
99+
100+
// 10. Cleanup
101+
pm.Stop()
102+
}

0 commit comments

Comments
 (0)