Skip to content

Commit ac1fbc1

Browse files
lengrongfuakhilerm
authored andcommitted
fix pusher concurrent close channel
Signed-off-by: rongfu.leng <[email protected]> (cherry picked from commit 63a7d8a) Signed-off-by: Akhil Mohan <[email protected]>
1 parent 77a2554 commit ac1fbc1

2 files changed

Lines changed: 11 additions & 14 deletions

File tree

remotes/docker/pusher.go

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"net/http"
2424
"net/url"
2525
"strings"
26+
"sync"
2627
"time"
2728

2829
"github.com/containerd/containerd/content"
@@ -317,9 +318,10 @@ type pushWriter struct {
317318

318319
pipe *io.PipeWriter
319320

320-
pipeC chan *io.PipeWriter
321-
respC chan *http.Response
322-
errC chan error
321+
pipeC chan *io.PipeWriter
322+
respC chan *http.Response
323+
closeOnce sync.Once
324+
errC chan error
323325

324326
isManifest bool
325327

@@ -395,14 +397,9 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) {
395397
func (pw *pushWriter) Close() error {
396398
// Ensure pipeC is closed but handle `Close()` being
397399
// called multiple times without panicking
398-
select {
399-
case _, ok := <-pw.pipeC:
400-
if ok {
401-
close(pw.pipeC)
402-
}
403-
default:
400+
pw.closeOnce.Do(func() {
404401
close(pw.pipeC)
405-
}
402+
})
406403
if pw.pipe != nil {
407404
status, err := pw.tracker.GetStatus(pw.ref)
408405
if err == nil && !status.Committed {

remotes/docker/pusher_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ func Test_dockerPusher_push(t *testing.T) {
293293
dp dockerPusher
294294
dockerBaseObject string
295295
args args
296-
checkerFunc func(writer pushWriter) bool
296+
checkerFunc func(writer *pushWriter) bool
297297
wantErr error
298298
}{
299299
{
@@ -306,7 +306,7 @@ func Test_dockerPusher_push(t *testing.T) {
306306
ref: fmt.Sprintf("manifest-%s", manifestContentDigest.String()),
307307
unavailableOnFail: false,
308308
},
309-
checkerFunc: func(writer pushWriter) bool {
309+
checkerFunc: func(writer *pushWriter) bool {
310310
select {
311311
case resp := <-writer.respC:
312312
// 201 should be the response code when uploading a new manifest
@@ -340,7 +340,7 @@ func Test_dockerPusher_push(t *testing.T) {
340340
ref: fmt.Sprintf("layer-%s", layerContentDigest.String()),
341341
unavailableOnFail: false,
342342
},
343-
checkerFunc: func(writer pushWriter) bool {
343+
checkerFunc: func(writer *pushWriter) bool {
344344
select {
345345
case resp := <-writer.respC:
346346
// 201 should be the response code when uploading a new blob
@@ -381,7 +381,7 @@ func Test_dockerPusher_push(t *testing.T) {
381381
}
382382

383383
// test whether a proper response has been received after the push operation
384-
assert.True(t, test.checkerFunc(*pw))
384+
assert.True(t, test.checkerFunc(pw))
385385

386386
})
387387
}

0 commit comments

Comments
 (0)