@@ -335,10 +335,12 @@ type pushWriter struct {
335335
336336 pipe * io.PipeWriter
337337
338- pipeC chan * io.PipeWriter
339- respC chan * http.Response
338+ done chan struct {}
340339 closeOnce sync.Once
341- errC chan error
340+
341+ pipeC chan * io.PipeWriter
342+ respC chan * http.Response
343+ errC chan error
342344
343345 isManifest bool
344346
@@ -356,19 +358,30 @@ func newPushWriter(db *dockerBase, ref string, expected digest.Digest, tracker S
356358 pipeC : make (chan * io.PipeWriter , 1 ),
357359 respC : make (chan * http.Response , 1 ),
358360 errC : make (chan error , 1 ),
361+ done : make (chan struct {}),
359362 isManifest : isManifest ,
360363 }
361364}
362365
363366func (pw * pushWriter ) setPipe (p * io.PipeWriter ) {
364- pw .pipeC <- p
367+ select {
368+ case <- pw .done :
369+ case pw .pipeC <- p :
370+ }
365371}
366372
367373func (pw * pushWriter ) setError (err error ) {
368- pw .errC <- err
374+ select {
375+ case <- pw .done :
376+ case pw .errC <- err :
377+ }
369378}
379+
370380func (pw * pushWriter ) setResponse (resp * http.Response ) {
371- pw .respC <- resp
381+ select {
382+ case <- pw .done :
383+ case pw .respC <- resp :
384+ }
372385}
373386
374387func (pw * pushWriter ) Write (p []byte ) (n int , err error ) {
@@ -378,22 +391,26 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) {
378391 }
379392
380393 if pw .pipe == nil {
381- p , ok := <- pw . pipeC
382- if ! ok {
394+ select {
395+ case <- pw . done :
383396 return 0 , io .ErrClosedPipe
397+ case p := <- pw .pipeC :
398+ pw .pipe = p
384399 }
385- pw .pipe = p
386400 } else {
387401 select {
388- case p , ok := <- pw .pipeC :
389- if ! ok {
390- return 0 , io .ErrClosedPipe
391- }
402+ case <- pw .done :
403+ return 0 , io .ErrClosedPipe
404+ case p := <- pw .pipeC :
392405 pw .pipe .CloseWithError (content .ErrReset )
393406 pw .pipe = p
394407
395408 // If content has already been written, the bytes
396- // cannot be written and the caller must reset
409+ // cannot be written again and the caller must reset
410+ status , err := pw .tracker .GetStatus (pw .ref )
411+ if err != nil {
412+ return 0 , err
413+ }
397414 status .Offset = 0
398415 status .UpdatedAt = time .Now ()
399416 pw .tracker .SetStatus (pw .ref , status )
@@ -422,7 +439,7 @@ func (pw *pushWriter) Close() error {
422439 // Ensure pipeC is closed but handle `Close()` being
423440 // called multiple times without panicking
424441 pw .closeOnce .Do (func () {
425- close (pw .pipeC )
442+ close (pw .done )
426443 })
427444 if pw .pipe != nil {
428445 status , err := pw .tracker .GetStatus (pw .ref )
@@ -462,17 +479,16 @@ func (pw *pushWriter) Commit(ctx context.Context, size int64, expected digest.Di
462479 // TODO: timeout waiting for response
463480 var resp * http.Response
464481 select {
482+ case <- pw .done :
483+ return io .ErrClosedPipe
465484 case err := <- pw .errC :
466485 return err
467486 case resp = <- pw .respC :
468487 defer resp .Body .Close ()
469- case p , ok := <- pw .pipeC :
488+ case p := <- pw .pipeC :
470489 // check whether the pipe has changed in the commit, because sometimes Write
471490 // can complete successfully, but the pipe may have changed. In that case, the
472491 // content needs to be reset.
473- if ! ok {
474- return io .ErrClosedPipe
475- }
476492 pw .pipe .CloseWithError (content .ErrReset )
477493 pw .pipe = p
478494
0 commit comments