Skip to content

Commit a814153

Browse files
jameshartigjackc
authored andcommitted
pgxpool: health check should avoid going below minConns
1 parent 37c3f15 commit a814153

File tree

4 files changed

+212
-45
lines changed

4 files changed

+212
-45
lines changed

pgxpool/conn.go

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package pgxpool
22

33
import (
44
"context"
5-
"time"
5+
"sync/atomic"
66

77
"github.com/jackc/pgconn"
88
"github.com/jackc/pgx/v4"
@@ -26,9 +26,23 @@ func (c *Conn) Release() {
2626
res := c.res
2727
c.res = nil
2828

29-
now := time.Now()
30-
if conn.IsClosed() || conn.PgConn().IsBusy() || conn.PgConn().TxStatus() != 'I' || (now.Sub(res.CreationTime()) > c.p.maxConnLifetime) {
29+
if conn.IsClosed() || conn.PgConn().IsBusy() || conn.PgConn().TxStatus() != 'I' {
3130
res.Destroy()
31+
// Signal to the health check to run since we just destroyed a connections
32+
// and we might be below minConns now
33+
c.p.triggerHealthCheck()
34+
return
35+
}
36+
37+
// If the pool is consistently being used, we might never get to check the
38+
// lifetime of a connection since we only check idle connections in checkConnsHealth
39+
// so we also check the lifetime here and force a health check
40+
if c.p.isExpired(res) {
41+
atomic.AddInt64(&c.p.lifetimeDestroyCount, 1)
42+
res.Destroy()
43+
// Signal to the health check to run since we just destroyed a connections
44+
// and we might be below minConns now
45+
c.p.triggerHealthCheck()
3246
return
3347
}
3448

@@ -42,6 +56,9 @@ func (c *Conn) Release() {
4256
res.Release()
4357
} else {
4458
res.Destroy()
59+
// Signal to the health check to run since we just destroyed a connections
60+
// and we might be below minConns now
61+
c.p.triggerHealthCheck()
4562
}
4663
}()
4764
}

pgxpool/pool.go

Lines changed: 134 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ package pgxpool
33
import (
44
"context"
55
"fmt"
6+
"math/rand"
67
"runtime"
78
"strconv"
89
"sync"
10+
"sync/atomic"
911
"time"
1012

1113
"github.com/jackc/pgconn"
@@ -70,16 +72,23 @@ func (cr *connResource) getPoolRows(c *Conn, r pgx.Rows) *poolRows {
7072

7173
// Pool allows for connection reuse.
7274
type Pool struct {
73-
p *puddle.Pool
74-
config *Config
75-
beforeConnect func(context.Context, *pgx.ConnConfig) error
76-
afterConnect func(context.Context, *pgx.Conn) error
77-
beforeAcquire func(context.Context, *pgx.Conn) bool
78-
afterRelease func(*pgx.Conn) bool
79-
minConns int32
80-
maxConnLifetime time.Duration
81-
maxConnIdleTime time.Duration
82-
healthCheckPeriod time.Duration
75+
p *puddle.Pool
76+
config *Config
77+
beforeConnect func(context.Context, *pgx.ConnConfig) error
78+
afterConnect func(context.Context, *pgx.Conn) error
79+
beforeAcquire func(context.Context, *pgx.Conn) bool
80+
afterRelease func(*pgx.Conn) bool
81+
minConns int32
82+
maxConns int32
83+
maxConnLifetime time.Duration
84+
maxConnLifetimeJitter time.Duration
85+
maxConnIdleTime time.Duration
86+
healthCheckPeriod time.Duration
87+
healthCheckChan chan struct{}
88+
89+
newConnsCount int64
90+
lifetimeDestroyCount int64
91+
idleDestroyCount int64
8392

8493
closeOnce sync.Once
8594
closeChan chan struct{}
@@ -109,14 +118,19 @@ type Config struct {
109118
// MaxConnLifetime is the duration since creation after which a connection will be automatically closed.
110119
MaxConnLifetime time.Duration
111120

121+
// MaxConnLifetimeJitter is the duration after MaxConnLifetime to randomly decide to close a connection.
122+
// This helps prevent all connections from being closed at the exact same time, starving the pool.
123+
MaxConnLifetimeJitter time.Duration
124+
112125
// MaxConnIdleTime is the duration after which an idle connection will be automatically closed by the health check.
113126
MaxConnIdleTime time.Duration
114127

115128
// MaxConns is the maximum size of the pool. The default is the greater of 4 or runtime.NumCPU().
116129
MaxConns int32
117130

118-
// MinConns is the minimum size of the pool. The health check will increase the number of connections to this
119-
// amount if it had dropped below.
131+
// MinConns is the minimum size of the pool. After connection closes, the pool might dip below MinConns. A low
132+
// number of MinConns might mean the pool is empty after MaxConnLifetime until the health check has a chance
133+
// to create new connections.
120134
MinConns int32
121135

122136
// HealthCheckPeriod is the duration between checks of the health of idle connections.
@@ -164,16 +178,19 @@ func ConnectConfig(ctx context.Context, config *Config) (*Pool, error) {
164178
}
165179

166180
p := &Pool{
167-
config: config,
168-
beforeConnect: config.BeforeConnect,
169-
afterConnect: config.AfterConnect,
170-
beforeAcquire: config.BeforeAcquire,
171-
afterRelease: config.AfterRelease,
172-
minConns: config.MinConns,
173-
maxConnLifetime: config.MaxConnLifetime,
174-
maxConnIdleTime: config.MaxConnIdleTime,
175-
healthCheckPeriod: config.HealthCheckPeriod,
176-
closeChan: make(chan struct{}),
181+
config: config,
182+
beforeConnect: config.BeforeConnect,
183+
afterConnect: config.AfterConnect,
184+
beforeAcquire: config.BeforeAcquire,
185+
afterRelease: config.AfterRelease,
186+
minConns: config.MinConns,
187+
maxConns: config.MaxConns,
188+
maxConnLifetime: config.MaxConnLifetime,
189+
maxConnLifetimeJitter: config.MaxConnLifetimeJitter,
190+
maxConnIdleTime: config.MaxConnIdleTime,
191+
healthCheckPeriod: config.HealthCheckPeriod,
192+
healthCheckChan: make(chan struct{}, 1),
193+
closeChan: make(chan struct{}),
177194
}
178195

179196
p.p = puddle.NewPool(
@@ -223,7 +240,7 @@ func ConnectConfig(ctx context.Context, config *Config) (*Pool, error) {
223240
)
224241

225242
if !config.LazyConnect {
226-
if err := p.createIdleResources(ctx, int(p.minConns)); err != nil {
243+
if err := p.checkMinConns(); err != nil {
227244
// Couldn't create resources for minpool size. Close unhealthy pool.
228245
p.Close()
229246
return nil, err
@@ -251,6 +268,7 @@ func ConnectConfig(ctx context.Context, config *Config) (*Pool, error) {
251268
// pool_max_conn_lifetime: duration string
252269
// pool_max_conn_idle_time: duration string
253270
// pool_health_check_period: duration string
271+
// pool_max_conn_lifetime_jitter: duration string
254272
//
255273
// See Config for definitions of these arguments.
256274
//
@@ -331,6 +349,15 @@ func ParseConfig(connString string) (*Config, error) {
331349
config.HealthCheckPeriod = defaultHealthCheckPeriod
332350
}
333351

352+
if s, ok := config.ConnConfig.Config.RuntimeParams["pool_max_conn_lifetime_jitter"]; ok {
353+
delete(connConfig.Config.RuntimeParams, "pool_max_conn_lifetime_jitter")
354+
d, err := time.ParseDuration(s)
355+
if err != nil {
356+
return nil, fmt.Errorf("invalid pool_max_conn_lifetime_jitter: %w", err)
357+
}
358+
config.MaxConnLifetimeJitter = d
359+
}
360+
334361
return config, nil
335362
}
336363

@@ -343,44 +370,105 @@ func (p *Pool) Close() {
343370
})
344371
}
345372

373+
func (p *Pool) isExpired(res *puddle.Resource) bool {
374+
now := time.Now()
375+
// Small optimization to avoid rand. If it's over lifetime AND jitter, immediately
376+
// return true.
377+
if now.Sub(res.CreationTime()) > p.maxConnLifetime+p.maxConnLifetimeJitter {
378+
return true
379+
}
380+
if p.maxConnLifetimeJitter == 0 {
381+
return false
382+
}
383+
jitterSecs := rand.Float64() * p.maxConnLifetimeJitter.Seconds()
384+
return now.Sub(res.CreationTime()) > p.maxConnLifetime+(time.Duration(jitterSecs)*time.Second)
385+
}
386+
387+
func (p *Pool) triggerHealthCheck() {
388+
go func() {
389+
// Destroy is asynchronous so we give it time to actually remove itself from
390+
// the pool otherwise we might try to check the pool size too soon
391+
time.Sleep(500 * time.Millisecond)
392+
select {
393+
case p.healthCheckChan <- struct{}{}:
394+
default:
395+
}
396+
}()
397+
}
398+
346399
func (p *Pool) backgroundHealthCheck() {
347400
ticker := time.NewTicker(p.healthCheckPeriod)
348-
401+
defer ticker.Stop()
349402
for {
350403
select {
351404
case <-p.closeChan:
352-
ticker.Stop()
353405
return
406+
case <-p.healthCheckChan:
407+
p.checkHealth()
354408
case <-ticker.C:
355-
p.checkIdleConnsHealth()
356-
p.checkMinConns()
409+
p.checkHealth()
357410
}
358411
}
359412
}
360413

361-
func (p *Pool) checkIdleConnsHealth() {
362-
resources := p.p.AcquireAllIdle()
414+
func (p *Pool) checkHealth() {
415+
for {
416+
// If checkMinConns failed we don't destroy any connections since we couldn't
417+
// even get to minConns
418+
if err := p.checkMinConns(); err != nil {
419+
// Should we log this error somewhere?
420+
break
421+
}
422+
if !p.checkConnsHealth() {
423+
// Since we didn't destroy any connections we can stop looping
424+
break
425+
}
426+
// Technically Destroy is asynchronous but 500ms should be enough for it to
427+
// remove it from the underlying pool
428+
select {
429+
case <-p.closeChan:
430+
return
431+
case <-time.After(500 * time.Millisecond):
432+
}
433+
}
434+
}
363435

364-
now := time.Now()
436+
// checkConnsHealth will check all idle connections, destroy a connection if
437+
// it's idle or too old, and returns true if any were destroyed
438+
func (p *Pool) checkConnsHealth() bool {
439+
var destroyed bool
440+
totalConns := p.Stat().TotalConns()
441+
resources := p.p.AcquireAllIdle()
365442
for _, res := range resources {
366-
if now.Sub(res.CreationTime()) > p.maxConnLifetime {
443+
// We're okay going under minConns if the lifetime is up
444+
if p.isExpired(res) && totalConns >= p.minConns {
445+
atomic.AddInt64(&p.lifetimeDestroyCount, 1)
367446
res.Destroy()
368-
} else if res.IdleDuration() > p.maxConnIdleTime {
447+
destroyed = true
448+
// Since Destroy is async we manually decrement totalConns.
449+
totalConns--
450+
} else if res.IdleDuration() > p.maxConnIdleTime && totalConns > p.minConns {
451+
atomic.AddInt64(&p.idleDestroyCount, 1)
369452
res.Destroy()
453+
destroyed = true
454+
// Since Destroy is async we manually decrement totalConns.
455+
totalConns--
370456
} else {
371457
res.ReleaseUnused()
372458
}
373459
}
460+
return destroyed
374461
}
375462

376-
func (p *Pool) checkMinConns() {
377-
for i := p.minConns - p.Stat().TotalConns(); i > 0; i-- {
378-
go func() {
379-
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
380-
defer cancel()
381-
p.p.CreateResource(ctx)
382-
}()
463+
func (p *Pool) checkMinConns() error {
464+
// TotalConns can include ones that are being destroyed but we should have
465+
// sleep(500ms) around all of the destroys to help prevent that from throwing
466+
// off this check
467+
toCreate := p.minConns - p.Stat().TotalConns()
468+
if toCreate > 0 {
469+
return p.createIdleResources(context.Background(), int(toCreate))
383470
}
471+
return nil
384472
}
385473

386474
func (p *Pool) createIdleResources(parentCtx context.Context, targetResources int) error {
@@ -391,6 +479,7 @@ func (p *Pool) createIdleResources(parentCtx context.Context, targetResources in
391479

392480
for i := 0; i < targetResources; i++ {
393481
go func() {
482+
atomic.AddInt64(&p.newConnsCount, 1)
394483
err := p.p.CreateResource(ctx)
395484
errs <- err
396485
}()
@@ -460,7 +549,12 @@ func (p *Pool) Config() *Config { return p.config.Copy() }
460549

461550
// Stat returns a pgxpool.Stat struct with a snapshot of Pool statistics.
462551
func (p *Pool) Stat() *Stat {
463-
return &Stat{s: p.p.Stat()}
552+
return &Stat{
553+
s: p.p.Stat(),
554+
newConnsCount: atomic.LoadInt64(&p.newConnsCount),
555+
lifetimeDestroyCount: atomic.LoadInt64(&p.lifetimeDestroyCount),
556+
idleDestroyCount: atomic.LoadInt64(&p.idleDestroyCount),
557+
}
464558
}
465559

466560
// Exec acquires a connection from the Pool and executes the given SQL.

0 commit comments

Comments
 (0)