Skip to content

Commit 2e84dcc

Browse files
committed
*Pipeline.getResults should close pipeline on error
Otherwise, it might be possible to panic when closing the pipeline if it tries to read a connection that should be closed but still has a fatal error on the wire. #1920
1 parent d149d3f commit 2e84dcc

File tree

2 files changed

+85
-0
lines changed

2 files changed

+85
-0
lines changed

pgconn/pgconn.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2094,6 +2094,8 @@ func (p *Pipeline) getResults() (results any, err error) {
20942094
for {
20952095
msg, err := p.conn.receiveMessage()
20962096
if err != nil {
2097+
p.closed = true
2098+
p.err = err
20972099
p.conn.asyncClose()
20982100
return nil, normalizeTimeoutError(p.ctx, err)
20992101
}

pgconn/pgconn_test.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3389,3 +3389,86 @@ func TestSNISupport(t *testing.T) {
33893389
})
33903390
}
33913391
}
3392+
3393+
// https://github.com/jackc/pgx/issues/1920
3394+
func TestFatalErrorReceivedInPipelineMode(t *testing.T) {
3395+
t.Parallel()
3396+
3397+
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
3398+
defer cancel()
3399+
3400+
steps := pgmock.AcceptUnauthenticatedConnRequestSteps()
3401+
steps = append(steps, pgmock.ExpectAnyMessage(&pgproto3.Parse{}))
3402+
steps = append(steps, pgmock.ExpectAnyMessage(&pgproto3.Describe{}))
3403+
steps = append(steps, pgmock.ExpectAnyMessage(&pgproto3.Parse{}))
3404+
steps = append(steps, pgmock.ExpectAnyMessage(&pgproto3.Describe{}))
3405+
steps = append(steps, pgmock.ExpectAnyMessage(&pgproto3.Parse{}))
3406+
steps = append(steps, pgmock.ExpectAnyMessage(&pgproto3.Describe{}))
3407+
steps = append(steps, pgmock.SendMessage(&pgproto3.RowDescription{Fields: []pgproto3.FieldDescription{
3408+
{Name: []byte("mock")},
3409+
}}))
3410+
steps = append(steps, pgmock.SendMessage(&pgproto3.ErrorResponse{Severity: "FATAL", Code: "57P01"}))
3411+
// We shouldn't get anything after the first fatal error. But the reported issue was with PgBouncer so maybe that
3412+
// causes the issue. Anyway, a FATAL error after the connection had already been killed could cause a panic.
3413+
steps = append(steps, pgmock.SendMessage(&pgproto3.ErrorResponse{Severity: "FATAL", Code: "57P01"}))
3414+
3415+
script := &pgmock.Script{Steps: steps}
3416+
3417+
ln, err := net.Listen("tcp", "127.0.0.1:")
3418+
require.NoError(t, err)
3419+
defer ln.Close()
3420+
3421+
serverKeepAlive := make(chan struct{})
3422+
defer close(serverKeepAlive)
3423+
3424+
serverErrChan := make(chan error, 1)
3425+
go func() {
3426+
defer close(serverErrChan)
3427+
3428+
conn, err := ln.Accept()
3429+
if err != nil {
3430+
serverErrChan <- err
3431+
return
3432+
}
3433+
defer conn.Close()
3434+
3435+
err = conn.SetDeadline(time.Now().Add(59 * time.Second))
3436+
if err != nil {
3437+
serverErrChan <- err
3438+
return
3439+
}
3440+
3441+
err = script.Run(pgproto3.NewBackend(conn, conn))
3442+
if err != nil {
3443+
serverErrChan <- err
3444+
return
3445+
}
3446+
3447+
<-serverKeepAlive
3448+
}()
3449+
3450+
parts := strings.Split(ln.Addr().String(), ":")
3451+
host := parts[0]
3452+
port := parts[1]
3453+
connStr := fmt.Sprintf("sslmode=disable host=%s port=%s", host, port)
3454+
3455+
ctx, cancel = context.WithTimeout(ctx, 59*time.Second)
3456+
defer cancel()
3457+
conn, err := pgconn.Connect(ctx, connStr)
3458+
require.NoError(t, err)
3459+
3460+
pipeline := conn.StartPipeline(ctx)
3461+
pipeline.SendPrepare("s1", "select 1", nil)
3462+
pipeline.SendPrepare("s2", "select 2", nil)
3463+
pipeline.SendPrepare("s3", "select 3", nil)
3464+
err = pipeline.Sync()
3465+
require.NoError(t, err)
3466+
3467+
_, err = pipeline.GetResults()
3468+
require.NoError(t, err)
3469+
_, err = pipeline.GetResults()
3470+
require.Error(t, err)
3471+
3472+
err = pipeline.Close()
3473+
require.Error(t, err)
3474+
}

0 commit comments

Comments
 (0)