Skip to content

Commit 6b69ee3

Browse files
authored
feat(bigtable): add preemptive connection recycler (#13860)
1 parent 1f49e87 commit 6b69ee3

4 files changed

Lines changed: 288 additions & 3 deletions

File tree

bigtable/client.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ type Client struct {
4949
enableDirectAccess bool
5050
featureFlagsMD metadata.MD // Pre-computed feature flags metadata to be sent with each request.
5151
dynamicScaleMonitor *btransport.DynamicScaleMonitor
52+
connsRecycler *btransport.ConnectionRecycler
5253
}
5354

5455
// ClientConfig has configurations for the client.
@@ -64,8 +65,13 @@ type ClientConfig struct {
6465
// TODO: support user provided meter provider
6566
MetricsProvider MetricsProvider
6667

67-
// If true, enable dynamic channel pool
68-
EnableDynamicChannelPool bool
68+
// DisableDynamicChannelPool disables the dynamic channel resizing based on load
69+
// Dynamic channel resizing is enabled by default to resize based on load and avoid queuing of requests.
70+
DisableDynamicChannelPool bool
71+
72+
// DisableConnectionRecycler disables the automatic preemptive refresh of connection.
73+
// Preemptive connection is default to true
74+
DisableConnectionRecycler bool
6975
}
7076

7177
// MetricsProvider is a wrapper for built in metrics meter provider
@@ -153,6 +159,8 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C
153159
var connPool gtransport.ConnPool
154160
var connPoolErr error
155161
var dsm *btransport.DynamicScaleMonitor
162+
var connRecycler *btransport.ConnectionRecycler
163+
156164
enableBigtableConnPool := btopt.EnableBigtableConnectionPool()
157165
if enableBigtableConnPool {
158166
fullInstanceName := fmt.Sprintf("projects/%s/instances/%s", project, instance)
@@ -182,14 +190,20 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C
182190
connPool = btPool
183191

184192
// Validate dynamic config early if enabled
185-
if config.EnableDynamicChannelPool {
193+
if !config.DisableDynamicChannelPool {
186194
if err := btransport.ValidateDynamicConfig(btopt.DefaultDynamicChannelPoolConfig(), defaultBigtableConnPoolSize); err != nil {
187195
return nil, fmt.Errorf("invalid DynamicChannelPoolConfig: %w", err)
188196
}
189197

190198
dsm = btransport.NewDynamicScaleMonitor(btopt.DefaultDynamicChannelPoolConfig(), btPool)
191199
dsm.Start(ctx) // Start the monitor's background goroutine
192200
}
201+
// connection recyler.
202+
if !config.DisableConnectionRecycler {
203+
connRecycler = btransport.NewConnectionRecycler(btopt.DefaultConnectionRecycleConfig(), btPool)
204+
connRecycler.Start(ctx) // Start the monitor's background goroutine
205+
}
206+
193207
}
194208

195209
} else {
@@ -214,6 +228,7 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C
214228
enableDirectAccess: enableDirectAccess,
215229
featureFlagsMD: ffMD,
216230
dynamicScaleMonitor: dsm,
231+
connsRecycler: connRecycler,
217232
}, nil
218233
}
219234

@@ -225,6 +240,9 @@ func (c *Client) Close() error {
225240
if c.metricsTracerFactory != nil {
226241
c.metricsTracerFactory.shutdown()
227242
}
243+
if c.connsRecycler != nil {
244+
c.connsRecycler.Stop()
245+
}
228246
return c.connPool.Close()
229247
}
230248

bigtable/internal/option/option.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,3 +271,23 @@ func DefaultMetricsReporterConfig() MetricsReporterConfig {
271271
ReportingInterval: 1 * time.Minute,
272272
}
273273
}
274+
275+
// ConnectionRecycleConfig controls the behavior of the connection recycler.
276+
type ConnectionRecycleConfig struct {
277+
// MaxAge is the base lifespan of a connection.
278+
MaxAge time.Duration
279+
// Jitter is the random buffer added to MaxAge which can allow for connection to be recycled.
280+
MaxJitter time.Duration
281+
// RunFrequency determines how often the recycler checks for expired connections.
282+
RunFrequency time.Duration
283+
}
284+
285+
// DefaultConnectionRecycleConfig returns the default configuration:
286+
// MaxAge: 45 minutes, Jitter: 5 minutes, RunFrequency: 1 minute, Enabled: true.
287+
func DefaultConnectionRecycleConfig() ConnectionRecycleConfig {
288+
return ConnectionRecycleConfig{
289+
MaxAge: 45 * time.Minute,
290+
MaxJitter: 5 * time.Minute,
291+
RunFrequency: 1 * time.Minute,
292+
}
293+
}
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
// Copyright 2026 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package internal
16+
17+
import (
18+
"context"
19+
"math/rand"
20+
"sync"
21+
"time"
22+
23+
btopt "cloud.google.com/go/bigtable/internal/option"
24+
)
25+
26+
// maxRecyclePerBatch limits the number of connections we replace in a single pass.
27+
const maxRecyclePerBatch = 2
28+
29+
// ConnectionRecycler monitors connection age and recycles them to prevent long-lived connections.
30+
type ConnectionRecycler struct {
31+
pool *BigtableChannelPool
32+
config btopt.ConnectionRecycleConfig
33+
ticker *time.Ticker
34+
done chan struct{}
35+
stopOnce sync.Once
36+
rng *rand.Rand
37+
}
38+
39+
// NewConnectionRecycler creates a new recycler with the provided configuration.
40+
func NewConnectionRecycler(config btopt.ConnectionRecycleConfig, pool *BigtableChannelPool) *ConnectionRecycler {
41+
return &ConnectionRecycler{
42+
pool: pool,
43+
config: config,
44+
done: make(chan struct{}),
45+
rng: rand.New(rand.NewSource(time.Now().UnixNano())),
46+
}
47+
}
48+
49+
// Start begins the periodic monitoring.
50+
func (cr *ConnectionRecycler) Start(ctx context.Context) {
51+
btopt.Debugf(cr.pool.logger, "bigtable_connpool: ConnectionRecyler starting...")
52+
53+
// default to 1 minute
54+
freq := cr.config.RunFrequency
55+
if freq < 1*time.Minute {
56+
freq = 1 * time.Minute
57+
}
58+
59+
// at least once per MaxAge interval.
60+
if cr.config.MaxAge > 0 && freq > cr.config.MaxAge {
61+
freq = cr.config.MaxAge
62+
}
63+
64+
cr.ticker = time.NewTicker(freq)
65+
go func() {
66+
defer cr.ticker.Stop()
67+
for {
68+
select {
69+
case <-cr.ticker.C:
70+
cr.checkRecycle()
71+
case <-cr.done:
72+
return
73+
case <-ctx.Done():
74+
return
75+
}
76+
}
77+
}()
78+
}
79+
80+
// Stop terminates the ConnectionRecycler.
81+
func (cr *ConnectionRecycler) Stop() {
82+
cr.stopOnce.Do(func() {
83+
close(cr.done)
84+
})
85+
}
86+
87+
// background period task
88+
func (cr *ConnectionRecycler) checkRecycle() {
89+
conns := cr.pool.getConns()
90+
recycledCount := 0
91+
92+
hasJitter := cr.config.MaxJitter > 0
93+
jitterVal := int64(cr.config.MaxJitter)
94+
95+
for _, entry := range conns {
96+
if recycledCount >= maxRecyclePerBatch {
97+
btopt.Debugf(cr.pool.logger, "bigtable_connpool: Hit max recycle cap (%d) for this round", maxRecyclePerBatch)
98+
break
99+
}
100+
101+
createdAt := time.UnixMilli(entry.createdAt())
102+
age := time.Since(createdAt)
103+
104+
var currentJitter time.Duration
105+
if hasJitter {
106+
currentJitter = time.Duration(cr.rng.Int63n(jitterVal))
107+
}
108+
109+
if age > cr.config.MaxAge+currentJitter {
110+
btopt.Debugf(cr.pool.logger, "bigtable_connpool: Recycling connection age %v > %v + %v", age, cr.config.MaxAge, currentJitter)
111+
cr.pool.replaceConnection(entry)
112+
recycledCount++
113+
}
114+
}
115+
}
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
// Copyright 2026 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package internal
16+
17+
import (
18+
"context"
19+
"testing"
20+
"time"
21+
22+
btopt "cloud.google.com/go/bigtable/internal/option"
23+
)
24+
25+
func TestConnectionRecycler_CheckRecycle(t *testing.T) {
26+
fake := &fakeService{}
27+
addr := setupTestServer(t, fake)
28+
dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) }
29+
ctx := context.Background()
30+
31+
setAge := func(entry *connEntry, age time.Duration) {
32+
entry.conn.createdAt.Store(time.Now().Add(-age).UnixMilli())
33+
}
34+
35+
t.Run("RecycleOldConnection", func(t *testing.T) {
36+
config := btopt.ConnectionRecycleConfig{
37+
MaxAge: 10 * time.Minute,
38+
MaxJitter: 0,
39+
}
40+
41+
pool, err := NewBigtableChannelPool(ctx, 1, btopt.RoundRobin, dialFunc, time.Now())
42+
if err != nil {
43+
t.Fatalf("Failed to create pool: %v", err)
44+
}
45+
defer pool.Close()
46+
47+
recycler := NewConnectionRecycler(config, pool)
48+
49+
conns := pool.getConns()
50+
if len(conns) != 1 {
51+
t.Fatalf("Expected 1 connection, got %d", len(conns))
52+
}
53+
originalEntry := conns[0]
54+
originalConnPtr := originalEntry.conn
55+
56+
// maxAge > 20m
57+
setAge(originalEntry, 20*time.Minute)
58+
recycler.checkRecycle()
59+
60+
// recycled fast as it does not have any pending rpcs
61+
newConns := pool.getConns()
62+
if newConns[0].conn == originalConnPtr {
63+
t.Error("Connection was older than MaxAge but was NOT recycled")
64+
}
65+
})
66+
67+
t.Run("DoesNotReplaceIfConnWithinMaxAge", func(t *testing.T) {
68+
config := btopt.ConnectionRecycleConfig{
69+
MaxAge: 10 * time.Minute,
70+
MaxJitter: 0,
71+
}
72+
73+
pool, err := NewBigtableChannelPool(ctx, 1, btopt.RoundRobin, dialFunc, time.Now())
74+
if err != nil {
75+
t.Fatalf("Failed to create pool: %v", err)
76+
}
77+
defer pool.Close()
78+
79+
recycler := NewConnectionRecycler(config, pool)
80+
81+
entry := pool.getConns()[0]
82+
originalConnPtr := entry.conn
83+
84+
// < 10mins
85+
setAge(entry, 5*time.Minute)
86+
87+
// recycled fast as it does not have any pending rpcs
88+
recycler.checkRecycle()
89+
90+
if pool.getConns()[0].conn != originalConnPtr {
91+
t.Error("Connection WAS recycled unexpectedly")
92+
}
93+
})
94+
95+
t.Run("RespectsMaxRecyclePerBatch", func(t *testing.T) {
96+
config := btopt.ConnectionRecycleConfig{
97+
MaxAge: 10 * time.Minute,
98+
MaxJitter: 0,
99+
}
100+
// 5 conns
101+
poolSize := 5
102+
pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, dialFunc, time.Now())
103+
if err != nil {
104+
t.Fatalf("Failed to create pool: %v", err)
105+
}
106+
defer pool.Close()
107+
108+
recycler := NewConnectionRecycler(config, pool)
109+
110+
// force age to be old
111+
conns := pool.getConns()
112+
originalConns := make(map[*BigtableConn]bool)
113+
for _, e := range conns {
114+
setAge(e, 60*time.Minute)
115+
originalConns[e.conn] = true
116+
}
117+
118+
// Trigger recycle
119+
recycler.checkRecycle()
120+
121+
currentConns := pool.getConns()
122+
changedCount := 0
123+
for _, e := range currentConns {
124+
if !originalConns[e.conn] {
125+
changedCount++
126+
}
127+
}
128+
if changedCount != maxRecyclePerBatch {
129+
t.Errorf("Expected exactly %d recycled connections (batch limit), but got %d", maxRecyclePerBatch, changedCount)
130+
}
131+
})
132+
}

0 commit comments

Comments
 (0)