@@ -15,14 +15,15 @@ import (
1515 "os/exec"
1616 "path/filepath"
1717 "runtime"
18+ "runtime/debug"
1819 "strconv"
1920 "strings"
21+ "sync/atomic"
2022 "syscall"
2123 "time"
2224
2325 "github.com/containerd/log"
2426 "github.com/docker/docker/pkg/idtools"
25- "github.com/docker/docker/pkg/ioutils"
2627 "github.com/docker/docker/pkg/pools"
2728 "github.com/docker/docker/pkg/system"
2829 "github.com/klauspost/compress/zstd"
@@ -215,11 +216,22 @@ func gzDecompress(ctx context.Context, buf io.Reader) (io.ReadCloser, error) {
215216 return cmdStream (exec .CommandContext (ctx , unpigzPath , "-d" , "-c" ), buf )
216217}
217218
218- func wrapReadCloser (readBuf io.ReadCloser , cancel context.CancelFunc ) io.ReadCloser {
219- return ioutils .NewReadCloserWrapper (readBuf , func () error {
220- cancel ()
221- return readBuf .Close ()
222- })
219+ type readCloserWrapper struct {
220+ io.Reader
221+ closer func () error
222+ closed atomic.Bool
223+ }
224+
225+ func (r * readCloserWrapper ) Close () error {
226+ if ! r .closed .CompareAndSwap (false , true ) {
227+ log .G (context .TODO ()).Error ("subsequent attempt to close readCloserWrapper" )
228+ if log .GetLevel () >= log .DebugLevel {
229+ log .G (context .TODO ()).Errorf ("stack trace: %s" , string (debug .Stack ()))
230+ }
231+
232+ return nil
233+ }
234+ return r .closer ()
223235}
224236
225237// DecompressStream decompresses the archive and returns a ReaderCloser with the decompressed archive.
@@ -237,11 +249,26 @@ func DecompressStream(archive io.Reader) (io.ReadCloser, error) {
237249 return nil , err
238250 }
239251
252+ wrapReader := func (r io.Reader , cancel context.CancelFunc ) io.ReadCloser {
253+ return & readCloserWrapper {
254+ Reader : r ,
255+ closer : func () error {
256+ if cancel != nil {
257+ cancel ()
258+ }
259+ if readCloser , ok := r .(io.ReadCloser ); ok {
260+ readCloser .Close ()
261+ }
262+ p .Put (buf )
263+ return nil
264+ },
265+ }
266+ }
267+
240268 compression := DetectCompression (bs )
241269 switch compression {
242270 case Uncompressed :
243- readBufWrapper := p .NewReadCloserWrapper (buf , buf )
244- return readBufWrapper , nil
271+ return wrapReader (buf , nil ), nil
245272 case Gzip :
246273 ctx , cancel := context .WithCancel (context .Background ())
247274
@@ -250,12 +277,10 @@ func DecompressStream(archive io.Reader) (io.ReadCloser, error) {
250277 cancel ()
251278 return nil , err
252279 }
253- readBufWrapper := p .NewReadCloserWrapper (buf , gzReader )
254- return wrapReadCloser (readBufWrapper , cancel ), nil
280+ return wrapReader (gzReader , cancel ), nil
255281 case Bzip2 :
256282 bz2Reader := bzip2 .NewReader (buf )
257- readBufWrapper := p .NewReadCloserWrapper (buf , bz2Reader )
258- return readBufWrapper , nil
283+ return wrapReader (bz2Reader , nil ), nil
259284 case Xz :
260285 ctx , cancel := context .WithCancel (context .Background ())
261286
@@ -264,15 +289,13 @@ func DecompressStream(archive io.Reader) (io.ReadCloser, error) {
264289 cancel ()
265290 return nil , err
266291 }
267- readBufWrapper := p .NewReadCloserWrapper (buf , xzReader )
268- return wrapReadCloser (readBufWrapper , cancel ), nil
292+ return wrapReader (xzReader , cancel ), nil
269293 case Zstd :
270294 zstdReader , err := zstd .NewReader (buf )
271295 if err != nil {
272296 return nil , err
273297 }
274- readBufWrapper := p .NewReadCloserWrapper (buf , zstdReader )
275- return readBufWrapper , nil
298+ return wrapReader (zstdReader , nil ), nil
276299 default :
277300 return nil , fmt .Errorf ("Unsupported compression format %s" , (& compression ).Extension ())
278301 }
@@ -1424,11 +1447,14 @@ func cmdStream(cmd *exec.Cmd, input io.Reader) (io.ReadCloser, error) {
14241447 close (done )
14251448 }()
14261449
1427- return ioutils .NewReadCloserWrapper (pipeR , func () error {
1428- // Close pipeR, and then wait for the command to complete before returning. We have to close pipeR first, as
1429- // cmd.Wait waits for any non-file stdout/stderr/stdin to close.
1430- err := pipeR .Close ()
1431- <- done
1432- return err
1433- }), nil
1450+ return & readCloserWrapper {
1451+ Reader : pipeR ,
1452+ closer : func () error {
1453+ // Close pipeR, and then wait for the command to complete before returning. We have to close pipeR first, as
1454+ // cmd.Wait waits for any non-file stdout/stderr/stdin to close.
1455+ err := pipeR .Close ()
1456+ <- done
1457+ return err
1458+ },
1459+ }, nil
14341460}
0 commit comments