@@ -384,6 +384,27 @@ func (pw *pushWriter) setResponse(resp *http.Response) {
384384 }
385385}
386386
387+ func (pw * pushWriter ) replacePipe (p * io.PipeWriter ) error {
388+ if pw .pipe == nil {
389+ pw .pipe = p
390+ return nil
391+ }
392+
393+ pw .pipe .CloseWithError (content .ErrReset )
394+ pw .pipe = p
395+
396+ // If content has already been written, the bytes
397+ // cannot be written again and the caller must reset
398+ status , err := pw .tracker .GetStatus (pw .ref )
399+ if err != nil {
400+ return err
401+ }
402+ status .Offset = 0
403+ status .UpdatedAt = time .Now ()
404+ pw .tracker .SetStatus (pw .ref , status )
405+ return content .ErrReset
406+ }
407+
387408func (pw * pushWriter ) Write (p []byte ) (n int , err error ) {
388409 status , err := pw .tracker .GetStatus (pw .ref )
389410 if err != nil {
@@ -395,26 +416,14 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) {
395416 case <- pw .done :
396417 return 0 , io .ErrClosedPipe
397418 case p := <- pw .pipeC :
398- pw .pipe = p
419+ pw .replacePipe ( p )
399420 }
400421 } else {
401422 select {
402423 case <- pw .done :
403424 return 0 , io .ErrClosedPipe
404425 case p := <- pw .pipeC :
405- pw .pipe .CloseWithError (content .ErrReset )
406- pw .pipe = p
407-
408- // If content has already been written, the bytes
409- // cannot be written again and the caller must reset
410- status , err := pw .tracker .GetStatus (pw .ref )
411- if err != nil {
412- return 0 , err
413- }
414- status .Offset = 0
415- status .UpdatedAt = time .Now ()
416- pw .tracker .SetStatus (pw .ref , status )
417- return 0 , content .ErrReset
426+ return 0 , pw .replacePipe (p )
418427 default :
419428 }
420429 }
@@ -427,19 +436,7 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) {
427436 case <- pw .done :
428437 case err = <- pw .errC :
429438 case p := <- pw .pipeC :
430- pw .pipe .CloseWithError (content .ErrReset )
431- pw .pipe = p
432-
433- // If content has already been written, the bytes
434- // cannot be written again and the caller must reset
435- status , err := pw .tracker .GetStatus (pw .ref )
436- if err != nil {
437- return 0 , err
438- }
439- status .Offset = 0
440- status .UpdatedAt = time .Now ()
441- pw .tracker .SetStatus (pw .ref , status )
442- return 0 , content .ErrReset
439+ return 0 , pw .replacePipe (p )
443440 }
444441 }
445442 status .Offset += int64 (n )
@@ -502,19 +499,7 @@ func (pw *pushWriter) Commit(ctx context.Context, size int64, expected digest.Di
502499 // check whether the pipe has changed in the commit, because sometimes Write
503500 // can complete successfully, but the pipe may have changed. In that case, the
504501 // content needs to be reset.
505- pw .pipe .CloseWithError (content .ErrReset )
506- pw .pipe = p
507-
508- // If content has already been written, the bytes
509- // cannot be written again and the caller must reset
510- status , err := pw .tracker .GetStatus (pw .ref )
511- if err != nil {
512- return err
513- }
514- status .Offset = 0
515- status .UpdatedAt = time .Now ()
516- pw .tracker .SetStatus (pw .ref , status )
517- return content .ErrReset
502+ return pw .replacePipe (p )
518503 }
519504
520505 // 201 is specified return status, some registries return
0 commit comments