Skip to content

Commit 7ac72cc

Browse files
authored
feat(bigtable): make downscalling less aggressive (#14123)
will only downscale if three consecutive runs happen.
1 parent b5f7bf2 commit 7ac72cc

3 files changed

Lines changed: 107 additions & 50 deletions

File tree

bigtable/internal/option/option.go

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -231,27 +231,29 @@ func Debugf(logger *log.Logger, format string, v ...interface{}) {
231231

232232
// DynamicChannelPoolConfig holds the parameters for dynamic channel pool scaling.
233233
type DynamicChannelPoolConfig struct {
234-
Enabled bool // Whether dynamic scaling is enabled.
235-
MinConns int // Minimum conns allowed
236-
MaxConns int // Maximum conns allowed.
237-
AvgLoadHighThreshold float64 // Average weighted load per connection to trigger scale-up.
238-
AvgLoadLowThreshold float64 // Average weighted load per connection to trigger scale-down.
239-
MinScalingInterval time.Duration // Minimum time between scaling operations (both up and down).
240-
CheckInterval time.Duration // How often to check if scaling is needed.
241-
MaxRemoveConns int // Maximum number of connections to remove at once.
234+
Enabled bool // Whether dynamic scaling is enabled.
235+
MinConns int // Minimum conns allowed
236+
MaxConns int // Maximum conns allowed.
237+
AvgLoadHighThreshold float64 // Average weighted load per connection to trigger scale-up.
238+
AvgLoadLowThreshold float64 // Average weighted load per connection to trigger scale-down.
239+
MinScalingInterval time.Duration // Minimum time between scaling operations (both up and down).
240+
CheckInterval time.Duration // How often to check if scaling is needed.
241+
MaxRemoveConns int // Maximum number of connections to remove at once.
242+
ContinuousDownscaleRunsThreshold int // Continous downscale signals for downscale to actually occur
242243
}
243244

244245
// DefaultDynamicChannelPoolConfig is default settings for dynamic channel pool
245246
func DefaultDynamicChannelPoolConfig() DynamicChannelPoolConfig {
246247
return DynamicChannelPoolConfig{
247-
Enabled: true, // Enabled by default
248-
MinConns: 10,
249-
MaxConns: 200,
250-
AvgLoadHighThreshold: 50,
251-
AvgLoadLowThreshold: 5,
252-
MinScalingInterval: 1 * time.Minute,
253-
CheckInterval: 30 * time.Second,
254-
MaxRemoveConns: 2, // Only Cap for removals
248+
Enabled: true, // Enabled by default
249+
MinConns: 10,
250+
MaxConns: 200,
251+
AvgLoadHighThreshold: 50,
252+
AvgLoadLowThreshold: 5,
253+
MinScalingInterval: 1 * time.Minute,
254+
CheckInterval: 30 * time.Second,
255+
MaxRemoveConns: 2, // Only Cap for removals
256+
ContinuousDownscaleRunsThreshold: 3,
255257
}
256258
}
257259

bigtable/internal/transport/dynamic_scale_monitor.go

Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,19 +27,24 @@ import (
2727
// DynamicScaleMonitor manages upscale and downscale of the connection pool.
2828
// Owner: It is owned by BigtableClient
2929
type DynamicScaleMonitor struct {
30-
config btopt.DynamicChannelPoolConfig
31-
pool *BigtableChannelPool
32-
lastScalingTime time.Time
33-
mu sync.Mutex
34-
ticker *time.Ticker
35-
done chan struct{}
36-
stopOnce sync.Once
37-
perConnTargetLoad float64 // target load per conn
30+
config btopt.DynamicChannelPoolConfig
31+
pool *BigtableChannelPool
32+
lastScalingTime time.Time
33+
mu sync.Mutex
34+
ticker *time.Ticker
35+
done chan struct{}
36+
stopOnce sync.Once
37+
perConnTargetLoad float64 // target load per conn
38+
continuousDownscaleRuns int // avoid downscaling in one run
3839

3940
}
4041

4142
// NewDynamicScaleMonitor creates a new DynamicScaleMonitor.
4243
func NewDynamicScaleMonitor(config btopt.DynamicChannelPoolConfig, pool *BigtableChannelPool) *DynamicScaleMonitor {
44+
// Fallback to a default threshold of 3 if specified 0.
45+
if config.ContinuousDownscaleRunsThreshold == 0 {
46+
config.ContinuousDownscaleRunsThreshold = 3
47+
}
4348

4449
perConnTargetLoad := math.Floor(config.AvgLoadLowThreshold+config.AvgLoadHighThreshold) / 2.0
4550
if perConnTargetLoad < 1.0 {
@@ -110,10 +115,28 @@ func (dsm *DynamicScaleMonitor) evaluateAndScale() {
110115
}
111116
currentAvgLoadPerConn := float64(currentLoadSum) / float64(currentConnsCount)
112117

113-
if currentAvgLoadPerConn >= dsm.config.AvgLoadHighThreshold {
114-
dsm.scaleUp(currentLoadSum, currentConnsCount)
115-
} else if currentAvgLoadPerConn <= dsm.config.AvgLoadLowThreshold {
116-
dsm.scaleDown(currentLoadSum, currentConnsCount)
118+
btopt.Debugf(dsm.pool.logger, "bigtable_connpool: evaluateAndScale currentLoadSum: %d, currentChannel: %d, avgLoad: %.2f\n", currentLoadSum, currentConnsCount, currentAvgLoadPerConn)
119+
120+
if currentAvgLoadPerConn <= dsm.config.AvgLoadLowThreshold {
121+
dsm.continuousDownscaleRuns++
122+
123+
btopt.Debugf(dsm.pool.logger, "bigtable_connpool: Low load detected. Downscale streak: %d/%d\n", dsm.continuousDownscaleRuns, dsm.config.ContinuousDownscaleRunsThreshold)
124+
125+
if dsm.continuousDownscaleRuns >= dsm.config.ContinuousDownscaleRunsThreshold {
126+
dsm.scaleDown(currentLoadSum, currentConnsCount)
127+
dsm.continuousDownscaleRuns = 0
128+
}
129+
} else {
130+
// Reset the downscale streak
131+
if dsm.continuousDownscaleRuns > 0 {
132+
btopt.Debugf(dsm.pool.logger, "bigtable_connpool: Load above low threshold. Resetting downscale streak from %d to 0.\n", dsm.continuousDownscaleRuns)
133+
dsm.continuousDownscaleRuns = 0
134+
}
135+
136+
// Proceed to check if we need to scale up
137+
if currentAvgLoadPerConn >= dsm.config.AvgLoadHighThreshold {
138+
dsm.scaleUp(currentLoadSum, currentConnsCount)
139+
}
117140
}
118141
}
119142

@@ -140,6 +163,11 @@ func ValidateDynamicConfig(config btopt.DynamicChannelPoolConfig, connPoolSize i
140163
if config.MaxRemoveConns <= 0 {
141164
return fmt.Errorf("bigtable_connpool: DynamicChannelPoolConfig.MaxRemoveConns must be positive")
142165
}
166+
167+
// Validate the new config field
168+
if config.ContinuousDownscaleRunsThreshold < 0 {
169+
return fmt.Errorf("bigtable_connpool: DynamicChannelPoolConfig.ContinuousDownscaleRunsThreshold cannot be negative")
170+
}
143171
return nil
144172
}
145173

bigtable/internal/transport/dynamic_scale_monitor_test.go

Lines changed: 49 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -29,21 +29,23 @@ func TestDynamicChannelScaling(t *testing.T) {
2929
dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) }
3030

3131
baseConfig := btopt.DynamicChannelPoolConfig{
32-
Enabled: true,
33-
MinConns: 2,
34-
MaxConns: 10,
35-
AvgLoadHighThreshold: 10.0, // Scale up if avg load >= 10
36-
AvgLoadLowThreshold: 3.0, // Scale down if avg load <= 3
37-
MinScalingInterval: 0, // Disable time throttling for most tests
38-
CheckInterval: 10 * time.Second, // Not directly used by calling evaluateAndScale
39-
MaxRemoveConns: 3,
32+
Enabled: true,
33+
MinConns: 2,
34+
MaxConns: 10,
35+
AvgLoadHighThreshold: 10.0, // Scale up if avg load >= 10
36+
AvgLoadLowThreshold: 3.0, // Scale down if avg load <= 3
37+
MinScalingInterval: 0, // Disable time throttling for most tests
38+
CheckInterval: 10 * time.Second, // Not directly used by calling evaluateAndScale
39+
MaxRemoveConns: 3,
40+
ContinuousDownscaleRunsThreshold: 3,
4041
}
4142
tests := []struct {
42-
name string
43-
initialSize int
44-
configOpt func(*btopt.DynamicChannelPoolConfig)
45-
setLoad func(conns []*connEntry)
46-
wantSize int
43+
name string
44+
initialSize int
45+
configOpt func(*btopt.DynamicChannelPoolConfig)
46+
setLoad func(conns []*connEntry)
47+
wantSize int
48+
evaluateCalls int
4749
}{
4850
{
4951
name: "ScaleUp",
@@ -52,7 +54,8 @@ func TestDynamicChannelScaling(t *testing.T) {
5254
setConnLoads(conns, 12, 0) // Avg load 12 > 10
5355
},
5456
// Total load = 3 * 12 = 36. Desired = ceil(36 / 6.5) = 6
55-
wantSize: 6,
57+
wantSize: 6,
58+
evaluateCalls: 1,
5659
},
5760
{
5861
name: "ScaleUpCappedAtMax",
@@ -61,7 +64,8 @@ func TestDynamicChannelScaling(t *testing.T) {
6164
setConnLoads(conns, 20, 0) // Avg load 20 > 10
6265
},
6366
// Total load = 8 * 20 = 160. Desired = ceil(160 / 6.5) = 25. Capped at MaxConns = 10
64-
wantSize: 10,
67+
wantSize: 10,
68+
evaluateCalls: 1,
6569
},
6670
{
6771
name: "ScaleDown",
@@ -70,7 +74,8 @@ func TestDynamicChannelScaling(t *testing.T) {
7074
setConnLoads(conns, 1, 0) // Avg load 1 < 3
7175
},
7276
// Total load = 9 * 1 = 9. Desired = ceil(9 / 6.5) = 2.
73-
wantSize: 6,
77+
wantSize: 6,
78+
evaluateCalls: 3,
7479
},
7580
{
7681
name: "ScaleDownCappedAtMin",
@@ -79,7 +84,8 @@ func TestDynamicChannelScaling(t *testing.T) {
7984
setConnLoads(conns, 1, 0) // Avg load 1 < 3
8085
},
8186
// Total load = 3 * 1 = 3. Desired = ceil(3 / 6.5) = 1. Capped at MinConns = 2
82-
wantSize: 2,
87+
wantSize: 2,
88+
evaluateCalls: 3,
8389
},
8490
{
8591
name: "ScaleDownLimitedByMaxRemove",
@@ -91,23 +97,26 @@ func TestDynamicChannelScaling(t *testing.T) {
9197
setConnLoads(conns, 0, 0) // Avg load 0 < 3
9298
},
9399
// Total load = 0. Desired = 2 (MinConns). removeCount = 10 - 2 = 8. Limited by MaxRemoveConns = 2.
94-
wantSize: 10 - 2,
100+
wantSize: 10 - 2,
101+
evaluateCalls: 3,
95102
},
96103
{
97104
name: "NoScaleUp",
98105
initialSize: 5,
99106
setLoad: func(conns []*connEntry) {
100107
setConnLoads(conns, 7, 0) // 3 < Avg load 7 < 10
101108
},
102-
wantSize: 5,
109+
wantSize: 5,
110+
evaluateCalls: 1,
103111
},
104112
{
105113
name: "NoScaleDown",
106114
initialSize: 5,
107115
setLoad: func(conns []*connEntry) {
108116
setConnLoads(conns, 5, 1) // Weighted load 5*1 + 1*2 = 7. 3 < 7 < 10
109117
},
110-
wantSize: 5,
118+
wantSize: 5,
119+
evaluateCalls: 3,
111120
},
112121
{
113122
name: "ScaleUpAddAtLeastOne",
@@ -116,7 +125,17 @@ func TestDynamicChannelScaling(t *testing.T) {
116125
setConnLoads(conns, 10, 0) // Avg load 10, right at threshold.
117126
},
118127
// Total load = 20. Desired = ceil(20 / 6.5) = 4. Add 2.
119-
wantSize: 4,
128+
wantSize: 4,
129+
evaluateCalls: 1,
130+
},
131+
{
132+
name: "NoScaleDownWithEvaluations<ContinuousDownscaleRunsThreshold",
133+
initialSize: 6,
134+
setLoad: func(conns []*connEntry) {
135+
setConnLoads(conns, 1, 0) // Avg load 1 < 3 (Low load)
136+
},
137+
wantSize: 6,
138+
evaluateCalls: 2,
120139
},
121140
}
122141

@@ -139,7 +158,15 @@ func TestDynamicChannelScaling(t *testing.T) {
139158
tc.setLoad(pool.getConns())
140159
}
141160

142-
dsm.evaluateAndScale()
161+
calls := tc.evaluateCalls
162+
if calls == 0 {
163+
calls = 1
164+
}
165+
166+
// Simulate the ticker calling evaluateAndScale
167+
for i := 0; i < calls; i++ {
168+
dsm.evaluateAndScale()
169+
}
143170
time.Sleep(50 * time.Millisecond) // Allow add/remove goroutines to potentially run
144171

145172
if gotSize := pool.Num(); gotSize != tc.wantSize {

0 commit comments

Comments
 (0)