@@ -3,9 +3,11 @@ package pgxpool
33import (
44 "context"
55 "fmt"
6+ "math/rand"
67 "runtime"
78 "strconv"
89 "sync"
10+ "sync/atomic"
911 "time"
1012
1113 "github.com/jackc/pgconn"
@@ -70,16 +72,23 @@ func (cr *connResource) getPoolRows(c *Conn, r pgx.Rows) *poolRows {
7072
7173// Pool allows for connection reuse.
7274type Pool struct {
73- p * puddle.Pool
74- config * Config
75- beforeConnect func (context.Context , * pgx.ConnConfig ) error
76- afterConnect func (context.Context , * pgx.Conn ) error
77- beforeAcquire func (context.Context , * pgx.Conn ) bool
78- afterRelease func (* pgx.Conn ) bool
79- minConns int32
80- maxConnLifetime time.Duration
81- maxConnIdleTime time.Duration
82- healthCheckPeriod time.Duration
75+ p * puddle.Pool
76+ config * Config
77+ beforeConnect func (context.Context , * pgx.ConnConfig ) error
78+ afterConnect func (context.Context , * pgx.Conn ) error
79+ beforeAcquire func (context.Context , * pgx.Conn ) bool
80+ afterRelease func (* pgx.Conn ) bool
81+ minConns int32
82+ maxConns int32
83+ maxConnLifetime time.Duration
84+ maxConnLifetimeJitter time.Duration
85+ maxConnIdleTime time.Duration
86+ healthCheckPeriod time.Duration
87+ healthCheckChan chan struct {}
88+
89+ newConnsCount int64
90+ lifetimeDestroyCount int64
91+ idleDestroyCount int64
8392
8493 closeOnce sync.Once
8594 closeChan chan struct {}
@@ -109,14 +118,19 @@ type Config struct {
109118 // MaxConnLifetime is the duration since creation after which a connection will be automatically closed.
110119 MaxConnLifetime time.Duration
111120
121+ // MaxConnLifetimeJitter is the duration after MaxConnLifetime to randomly decide to close a connection.
122+ // This helps prevent all connections from being closed at the exact same time, starving the pool.
123+ MaxConnLifetimeJitter time.Duration
124+
112125 // MaxConnIdleTime is the duration after which an idle connection will be automatically closed by the health check.
113126 MaxConnIdleTime time.Duration
114127
115128 // MaxConns is the maximum size of the pool. The default is the greater of 4 or runtime.NumCPU().
116129 MaxConns int32
117130
118- // MinConns is the minimum size of the pool. The health check will increase the number of connections to this
119- // amount if it had dropped below.
131+ // MinConns is the minimum size of the pool. After connection closes, the pool might dip below MinConns. A low
132+ // number of MinConns might mean the pool is empty after MaxConnLifetime until the health check has a chance
133+ // to create new connections.
120134 MinConns int32
121135
122136 // HealthCheckPeriod is the duration between checks of the health of idle connections.
@@ -164,16 +178,19 @@ func ConnectConfig(ctx context.Context, config *Config) (*Pool, error) {
164178 }
165179
166180 p := & Pool {
167- config : config ,
168- beforeConnect : config .BeforeConnect ,
169- afterConnect : config .AfterConnect ,
170- beforeAcquire : config .BeforeAcquire ,
171- afterRelease : config .AfterRelease ,
172- minConns : config .MinConns ,
173- maxConnLifetime : config .MaxConnLifetime ,
174- maxConnIdleTime : config .MaxConnIdleTime ,
175- healthCheckPeriod : config .HealthCheckPeriod ,
176- closeChan : make (chan struct {}),
181+ config : config ,
182+ beforeConnect : config .BeforeConnect ,
183+ afterConnect : config .AfterConnect ,
184+ beforeAcquire : config .BeforeAcquire ,
185+ afterRelease : config .AfterRelease ,
186+ minConns : config .MinConns ,
187+ maxConns : config .MaxConns ,
188+ maxConnLifetime : config .MaxConnLifetime ,
189+ maxConnLifetimeJitter : config .MaxConnLifetimeJitter ,
190+ maxConnIdleTime : config .MaxConnIdleTime ,
191+ healthCheckPeriod : config .HealthCheckPeriod ,
192+ healthCheckChan : make (chan struct {}, 1 ),
193+ closeChan : make (chan struct {}),
177194 }
178195
179196 p .p = puddle .NewPool (
@@ -223,7 +240,7 @@ func ConnectConfig(ctx context.Context, config *Config) (*Pool, error) {
223240 )
224241
225242 if ! config .LazyConnect {
226- if err := p .createIdleResources ( ctx , int ( p . minConns ) ); err != nil {
243+ if err := p .checkMinConns ( ); err != nil {
227244 // Couldn't create resources for minpool size. Close unhealthy pool.
228245 p .Close ()
229246 return nil , err
@@ -251,6 +268,7 @@ func ConnectConfig(ctx context.Context, config *Config) (*Pool, error) {
251268// pool_max_conn_lifetime: duration string
252269// pool_max_conn_idle_time: duration string
253270// pool_health_check_period: duration string
271+ // pool_max_conn_lifetime_jitter: duration string
254272//
255273// See Config for definitions of these arguments.
256274//
@@ -331,6 +349,15 @@ func ParseConfig(connString string) (*Config, error) {
331349 config .HealthCheckPeriod = defaultHealthCheckPeriod
332350 }
333351
352+ if s , ok := config .ConnConfig .Config .RuntimeParams ["pool_max_conn_lifetime_jitter" ]; ok {
353+ delete (connConfig .Config .RuntimeParams , "pool_max_conn_lifetime_jitter" )
354+ d , err := time .ParseDuration (s )
355+ if err != nil {
356+ return nil , fmt .Errorf ("invalid pool_max_conn_lifetime_jitter: %w" , err )
357+ }
358+ config .MaxConnLifetimeJitter = d
359+ }
360+
334361 return config , nil
335362}
336363
@@ -343,44 +370,105 @@ func (p *Pool) Close() {
343370 })
344371}
345372
373+ func (p * Pool ) isExpired (res * puddle.Resource ) bool {
374+ now := time .Now ()
375+ // Small optimization to avoid rand. If it's over lifetime AND jitter, immediately
376+ // return true.
377+ if now .Sub (res .CreationTime ()) > p .maxConnLifetime + p .maxConnLifetimeJitter {
378+ return true
379+ }
380+ if p .maxConnLifetimeJitter == 0 {
381+ return false
382+ }
383+ jitterSecs := rand .Float64 () * p .maxConnLifetimeJitter .Seconds ()
384+ return now .Sub (res .CreationTime ()) > p .maxConnLifetime + (time .Duration (jitterSecs )* time .Second )
385+ }
386+
387+ func (p * Pool ) triggerHealthCheck () {
388+ go func () {
389+ // Destroy is asynchronous so we give it time to actually remove itself from
390+ // the pool otherwise we might try to check the pool size too soon
391+ time .Sleep (500 * time .Millisecond )
392+ select {
393+ case p .healthCheckChan <- struct {}{}:
394+ default :
395+ }
396+ }()
397+ }
398+
346399func (p * Pool ) backgroundHealthCheck () {
347400 ticker := time .NewTicker (p .healthCheckPeriod )
348-
401+ defer ticker . Stop ()
349402 for {
350403 select {
351404 case <- p .closeChan :
352- ticker .Stop ()
353405 return
406+ case <- p .healthCheckChan :
407+ p .checkHealth ()
354408 case <- ticker .C :
355- p .checkIdleConnsHealth ()
356- p .checkMinConns ()
409+ p .checkHealth ()
357410 }
358411 }
359412}
360413
361- func (p * Pool ) checkIdleConnsHealth () {
362- resources := p .p .AcquireAllIdle ()
414+ func (p * Pool ) checkHealth () {
415+ for {
416+ // If checkMinConns failed we don't destroy any connections since we couldn't
417+ // even get to minConns
418+ if err := p .checkMinConns (); err != nil {
419+ // Should we log this error somewhere?
420+ break
421+ }
422+ if ! p .checkConnsHealth () {
423+ // Since we didn't destroy any connections we can stop looping
424+ break
425+ }
426+ // Technically Destroy is asynchronous but 500ms should be enough for it to
427+ // remove it from the underlying pool
428+ select {
429+ case <- p .closeChan :
430+ return
431+ case <- time .After (500 * time .Millisecond ):
432+ }
433+ }
434+ }
363435
364- now := time .Now ()
436+ // checkConnsHealth will check all idle connections, destroy a connection if
437+ // it's idle or too old, and returns true if any were destroyed
438+ func (p * Pool ) checkConnsHealth () bool {
439+ var destroyed bool
440+ totalConns := p .Stat ().TotalConns ()
441+ resources := p .p .AcquireAllIdle ()
365442 for _ , res := range resources {
366- if now .Sub (res .CreationTime ()) > p .maxConnLifetime {
443+ // We're okay going under minConns if the lifetime is up
444+ if p .isExpired (res ) && totalConns >= p .minConns {
445+ atomic .AddInt64 (& p .lifetimeDestroyCount , 1 )
367446 res .Destroy ()
368- } else if res .IdleDuration () > p .maxConnIdleTime {
447+ destroyed = true
448+ // Since Destroy is async we manually decrement totalConns.
449+ totalConns --
450+ } else if res .IdleDuration () > p .maxConnIdleTime && totalConns > p .minConns {
451+ atomic .AddInt64 (& p .idleDestroyCount , 1 )
369452 res .Destroy ()
453+ destroyed = true
454+ // Since Destroy is async we manually decrement totalConns.
455+ totalConns --
370456 } else {
371457 res .ReleaseUnused ()
372458 }
373459 }
460+ return destroyed
374461}
375462
376- func (p * Pool ) checkMinConns () {
377- for i := p . minConns - p . Stat (). TotalConns (); i > 0 ; i -- {
378- go func () {
379- ctx , cancel := context . WithTimeout ( context . Background (), time . Minute )
380- defer cancel ()
381- p . p . CreateResource ( ctx )
382- }( )
463+ func (p * Pool ) checkMinConns () error {
464+ // TotalConns can include ones that are being destroyed but we should have
465+ // sleep(500ms) around all of the destroys to help prevent that from throwing
466+ // off this check
467+ toCreate := p . minConns - p . Stat (). TotalConns ()
468+ if toCreate > 0 {
469+ return p . createIdleResources ( context . Background (), int ( toCreate ) )
383470 }
471+ return nil
384472}
385473
386474func (p * Pool ) createIdleResources (parentCtx context.Context , targetResources int ) error {
@@ -391,6 +479,7 @@ func (p *Pool) createIdleResources(parentCtx context.Context, targetResources in
391479
392480 for i := 0 ; i < targetResources ; i ++ {
393481 go func () {
482+ atomic .AddInt64 (& p .newConnsCount , 1 )
394483 err := p .p .CreateResource (ctx )
395484 errs <- err
396485 }()
@@ -460,7 +549,12 @@ func (p *Pool) Config() *Config { return p.config.Copy() }
460549
461550// Stat returns a pgxpool.Stat struct with a snapshot of Pool statistics.
462551func (p * Pool ) Stat () * Stat {
463- return & Stat {s : p .p .Stat ()}
552+ return & Stat {
553+ s : p .p .Stat (),
554+ newConnsCount : atomic .LoadInt64 (& p .newConnsCount ),
555+ lifetimeDestroyCount : atomic .LoadInt64 (& p .lifetimeDestroyCount ),
556+ idleDestroyCount : atomic .LoadInt64 (& p .idleDestroyCount ),
557+ }
464558}
465559
466560// Exec acquires a connection from the Pool and executes the given SQL.
0 commit comments