Skip to content

Commit ba90fd8

Browse files
committed
Decouple pkg/archive from pkg/ioutils
Signed-off-by: Derek McGowan <[email protected]>
1 parent 7faa4ec commit ba90fd8

3 files changed

Lines changed: 65 additions & 42 deletions

File tree

pkg/archive/archive.go

Lines changed: 49 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,15 @@ import (
1515
"os/exec"
1616
"path/filepath"
1717
"runtime"
18+
"runtime/debug"
1819
"strconv"
1920
"strings"
21+
"sync/atomic"
2022
"syscall"
2123
"time"
2224

2325
"github.com/containerd/log"
2426
"github.com/docker/docker/pkg/idtools"
25-
"github.com/docker/docker/pkg/ioutils"
2627
"github.com/docker/docker/pkg/pools"
2728
"github.com/docker/docker/pkg/system"
2829
"github.com/klauspost/compress/zstd"
@@ -215,11 +216,22 @@ func gzDecompress(ctx context.Context, buf io.Reader) (io.ReadCloser, error) {
215216
return cmdStream(exec.CommandContext(ctx, unpigzPath, "-d", "-c"), buf)
216217
}
217218

218-
func wrapReadCloser(readBuf io.ReadCloser, cancel context.CancelFunc) io.ReadCloser {
219-
return ioutils.NewReadCloserWrapper(readBuf, func() error {
220-
cancel()
221-
return readBuf.Close()
222-
})
219+
type readCloserWrapper struct {
220+
io.Reader
221+
closer func() error
222+
closed atomic.Bool
223+
}
224+
225+
func (r *readCloserWrapper) Close() error {
226+
if !r.closed.CompareAndSwap(false, true) {
227+
log.G(context.TODO()).Error("subsequent attempt to close readCloserWrapper")
228+
if log.GetLevel() >= log.DebugLevel {
229+
log.G(context.TODO()).Errorf("stack trace: %s", string(debug.Stack()))
230+
}
231+
232+
return nil
233+
}
234+
return r.closer()
223235
}
224236

225237
// DecompressStream decompresses the archive and returns a ReaderCloser with the decompressed archive.
@@ -237,11 +249,26 @@ func DecompressStream(archive io.Reader) (io.ReadCloser, error) {
237249
return nil, err
238250
}
239251

252+
wrapReader := func(r io.Reader, cancel context.CancelFunc) io.ReadCloser {
253+
return &readCloserWrapper{
254+
Reader: r,
255+
closer: func() error {
256+
if cancel != nil {
257+
cancel()
258+
}
259+
if readCloser, ok := r.(io.ReadCloser); ok {
260+
readCloser.Close()
261+
}
262+
p.Put(buf)
263+
return nil
264+
},
265+
}
266+
}
267+
240268
compression := DetectCompression(bs)
241269
switch compression {
242270
case Uncompressed:
243-
readBufWrapper := p.NewReadCloserWrapper(buf, buf)
244-
return readBufWrapper, nil
271+
return wrapReader(buf, nil), nil
245272
case Gzip:
246273
ctx, cancel := context.WithCancel(context.Background())
247274

@@ -250,12 +277,10 @@ func DecompressStream(archive io.Reader) (io.ReadCloser, error) {
250277
cancel()
251278
return nil, err
252279
}
253-
readBufWrapper := p.NewReadCloserWrapper(buf, gzReader)
254-
return wrapReadCloser(readBufWrapper, cancel), nil
280+
return wrapReader(gzReader, cancel), nil
255281
case Bzip2:
256282
bz2Reader := bzip2.NewReader(buf)
257-
readBufWrapper := p.NewReadCloserWrapper(buf, bz2Reader)
258-
return readBufWrapper, nil
283+
return wrapReader(bz2Reader, nil), nil
259284
case Xz:
260285
ctx, cancel := context.WithCancel(context.Background())
261286

@@ -264,15 +289,13 @@ func DecompressStream(archive io.Reader) (io.ReadCloser, error) {
264289
cancel()
265290
return nil, err
266291
}
267-
readBufWrapper := p.NewReadCloserWrapper(buf, xzReader)
268-
return wrapReadCloser(readBufWrapper, cancel), nil
292+
return wrapReader(xzReader, cancel), nil
269293
case Zstd:
270294
zstdReader, err := zstd.NewReader(buf)
271295
if err != nil {
272296
return nil, err
273297
}
274-
readBufWrapper := p.NewReadCloserWrapper(buf, zstdReader)
275-
return readBufWrapper, nil
298+
return wrapReader(zstdReader, nil), nil
276299
default:
277300
return nil, fmt.Errorf("Unsupported compression format %s", (&compression).Extension())
278301
}
@@ -1424,11 +1447,14 @@ func cmdStream(cmd *exec.Cmd, input io.Reader) (io.ReadCloser, error) {
14241447
close(done)
14251448
}()
14261449

1427-
return ioutils.NewReadCloserWrapper(pipeR, func() error {
1428-
// Close pipeR, and then wait for the command to complete before returning. We have to close pipeR first, as
1429-
// cmd.Wait waits for any non-file stdout/stderr/stdin to close.
1430-
err := pipeR.Close()
1431-
<-done
1432-
return err
1433-
}), nil
1450+
return &readCloserWrapper{
1451+
Reader: pipeR,
1452+
closer: func() error {
1453+
// Close pipeR, and then wait for the command to complete before returning. We have to close pipeR first, as
1454+
// cmd.Wait waits for any non-file stdout/stderr/stdin to close.
1455+
err := pipeR.Close()
1456+
<-done
1457+
return err
1458+
},
1459+
}, nil
14341460
}

pkg/archive/archive_test.go

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ import (
1717
"time"
1818

1919
"github.com/docker/docker/pkg/idtools"
20-
"github.com/docker/docker/pkg/ioutils"
2120
"github.com/moby/sys/userns"
2221
"gotest.tools/v3/assert"
2322
is "gotest.tools/v3/assert/cmp"
@@ -1443,30 +1442,27 @@ func TestDisablePigz(t *testing.T) {
14431442
t.Setenv("MOBY_DISABLE_PIGZ", "true")
14441443

14451444
r := testDecompressStream(t, "gz", "gzip -f")
1446-
// For the bufio pool
1447-
outsideReaderCloserWrapper := r.(*ioutils.ReadCloserWrapper)
1448-
// For the context canceller
1449-
contextReaderCloserWrapper := outsideReaderCloserWrapper.Reader.(*ioutils.ReadCloserWrapper)
14501445

1451-
assert.Equal(t, reflect.TypeOf(contextReaderCloserWrapper.Reader), reflect.TypeOf(&gzip.Reader{}))
1446+
// wrapped in closer to cancel contex and release buffer to pool
1447+
wrapper := r.(*readCloserWrapper)
1448+
1449+
assert.Equal(t, reflect.TypeOf(wrapper.Reader), reflect.TypeOf(&gzip.Reader{}))
14521450
}
14531451

14541452
func TestPigz(t *testing.T) {
14551453
r := testDecompressStream(t, "gz", "gzip -f")
1456-
// For the bufio pool
1457-
outsideReaderCloserWrapper := r.(*ioutils.ReadCloserWrapper)
1458-
// For the context canceller
1459-
contextReaderCloserWrapper := outsideReaderCloserWrapper.Reader.(*ioutils.ReadCloserWrapper)
1454+
// wrapper for buffered reader and context cancel
1455+
wrapper := r.(*readCloserWrapper)
14601456

14611457
_, err := exec.LookPath("unpigz")
14621458
if err == nil {
14631459
t.Log("Tested whether Pigz is used, as it installed")
14641460
// For the command wait wrapper
1465-
cmdWaitCloserWrapper := contextReaderCloserWrapper.Reader.(*ioutils.ReadCloserWrapper)
1461+
cmdWaitCloserWrapper := wrapper.Reader.(*readCloserWrapper)
14661462
assert.Equal(t, reflect.TypeOf(cmdWaitCloserWrapper.Reader), reflect.TypeOf(&io.PipeReader{}))
14671463
} else {
14681464
t.Log("Tested whether Pigz is not used, as it not installed")
1469-
assert.Equal(t, reflect.TypeOf(contextReaderCloserWrapper.Reader), reflect.TypeOf(&gzip.Reader{}))
1465+
assert.Equal(t, reflect.TypeOf(wrapper.Reader), reflect.TypeOf(&gzip.Reader{}))
14701466
}
14711467
}
14721468

pkg/archive/diff_test.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@ import (
77
"path/filepath"
88
"reflect"
99
"testing"
10-
11-
"github.com/docker/docker/pkg/ioutils"
1210
)
1311

1412
func TestApplyLayerInvalidFilenames(t *testing.T) {
@@ -337,11 +335,14 @@ func makeTestLayer(paths []string) (rc io.ReadCloser, err error) {
337335
if err != nil {
338336
return
339337
}
340-
return ioutils.NewReadCloserWrapper(archive, func() error {
341-
err := archive.Close()
342-
os.RemoveAll(tmpDir)
343-
return err
344-
}), nil
338+
return &readCloserWrapper{
339+
Reader: archive,
340+
closer: func() error {
341+
err := archive.Close()
342+
os.RemoveAll(tmpDir)
343+
return err
344+
},
345+
}, nil
345346
}
346347

347348
func readDirContents(root string) ([]string, error) {

0 commit comments

Comments
 (0)