Skip to content

Commit 63a7d8a

Browse files
committed
fix pusher concurrent close channel
Signed-off-by: rongfu.leng <[email protected]>
1 parent 8abee9c commit 63a7d8a

2 files changed

Lines changed: 11 additions & 14 deletions

File tree

remotes/docker/pusher.go

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"net/http"
2525
"net/url"
2626
"strings"
27+
"sync"
2728
"time"
2829

2930
"github.com/containerd/containerd/content"
@@ -320,9 +321,10 @@ type pushWriter struct {
320321

321322
pipe *io.PipeWriter
322323

323-
pipeC chan *io.PipeWriter
324-
respC chan *http.Response
325-
errC chan error
324+
pipeC chan *io.PipeWriter
325+
respC chan *http.Response
326+
closeOnce sync.Once
327+
errC chan error
326328

327329
isManifest bool
328330

@@ -398,14 +400,9 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) {
398400
func (pw *pushWriter) Close() error {
399401
// Ensure pipeC is closed but handle `Close()` being
400402
// called multiple times without panicking
401-
select {
402-
case _, ok := <-pw.pipeC:
403-
if ok {
404-
close(pw.pipeC)
405-
}
406-
default:
403+
pw.closeOnce.Do(func() {
407404
close(pw.pipeC)
408-
}
405+
})
409406
if pw.pipe != nil {
410407
status, err := pw.tracker.GetStatus(pw.ref)
411408
if err == nil && !status.Committed {

remotes/docker/pusher_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ func Test_dockerPusher_push(t *testing.T) {
293293
dp dockerPusher
294294
dockerBaseObject string
295295
args args
296-
checkerFunc func(writer pushWriter) bool
296+
checkerFunc func(writer *pushWriter) bool
297297
wantErr error
298298
}{
299299
{
@@ -306,7 +306,7 @@ func Test_dockerPusher_push(t *testing.T) {
306306
ref: fmt.Sprintf("manifest-%s", manifestContentDigest.String()),
307307
unavailableOnFail: false,
308308
},
309-
checkerFunc: func(writer pushWriter) bool {
309+
checkerFunc: func(writer *pushWriter) bool {
310310
select {
311311
case resp := <-writer.respC:
312312
// 201 should be the response code when uploading a new manifest
@@ -340,7 +340,7 @@ func Test_dockerPusher_push(t *testing.T) {
340340
ref: fmt.Sprintf("layer-%s", layerContentDigest.String()),
341341
unavailableOnFail: false,
342342
},
343-
checkerFunc: func(writer pushWriter) bool {
343+
checkerFunc: func(writer *pushWriter) bool {
344344
select {
345345
case resp := <-writer.respC:
346346
// 201 should be the response code when uploading a new blob
@@ -379,7 +379,7 @@ func Test_dockerPusher_push(t *testing.T) {
379379
}
380380

381381
// test whether a proper response has been received after the push operation
382-
assert.True(t, test.checkerFunc(*pw))
382+
assert.True(t, test.checkerFunc(pw))
383383

384384
})
385385
}

0 commit comments

Comments
 (0)