Skip to content

Commit fd35494

Browse files
author
Sargun Dhillon
committed
Make image (layer) downloads faster by using pigz
The Golang built-in gzip library is serialized, and fairly slow at decompressing. It also only decompresses on demand, versus pipelining decompression. This change switches to using the pigz external command for gzip decompression, as opposed to using the built-in golang one. This code is not vendored, but will be used if it autodetected as part of the OS. This also switches to using context, versus a manually managed channel to manage cancellations, and synchronization. There is a little bit of weirdness around manually having to cancel in the error cases. Signed-off-by: Sargun Dhillon <[email protected]>
1 parent be14665 commit fd35494

10 files changed

Lines changed: 109 additions & 21 deletions

File tree

Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ RUN apt-get update && apt-get install -y \
6262
libudev-dev \
6363
mercurial \
6464
net-tools \
65+
pigz \
6566
pkg-config \
6667
protobuf-compiler \
6768
protobuf-c-compiler \

Dockerfile.aarch64

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ RUN apt-get update && apt-get install -y \
5252
libudev-dev \
5353
mercurial \
5454
net-tools \
55+
pigz \
5556
pkg-config \
5657
protobuf-compiler \
5758
protobuf-c-compiler \

Dockerfile.armhf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ RUN apt-get update && apt-get install -y \
4545
libtool \
4646
libudev-dev \
4747
mercurial \
48+
pigz \
4849
pkg-config \
4950
python-backports.ssl-match-hostname \
5051
python-dev \

Dockerfile.e2e

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ RUN apk add --update \
4747
g++ \
4848
git \
4949
iptables \
50+
pigz \
5051
tar \
5152
xz \
5253
&& rm -rf /var/cache/apk/*

Dockerfile.ppc64le

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ RUN apt-get update && apt-get install -y \
4646
libtool \
4747
libudev-dev \
4848
mercurial \
49+
pigz \
4950
pkg-config \
5051
python-backports.ssl-match-hostname \
5152
python-dev \

Dockerfile.s390x

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ RUN apt-get update && apt-get install -y \
4242
libtool \
4343
libudev-dev \
4444
mercurial \
45+
pigz \
4546
pkg-config \
4647
python-backports.ssl-match-hostname \
4748
python-dev \

Dockerfile.simple

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
2828
e2fsprogs \
2929
iptables \
3030
pkg-config \
31+
pigz \
3132
procps \
3233
xfsprogs \
3334
xz-utils \

pkg/archive/archive.go

Lines changed: 52 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,15 @@ import (
66
"bytes"
77
"compress/bzip2"
88
"compress/gzip"
9+
"context"
910
"fmt"
1011
"io"
1112
"io/ioutil"
1213
"os"
1314
"os/exec"
1415
"path/filepath"
1516
"runtime"
17+
"strconv"
1618
"strings"
1719
"syscall"
1820

@@ -24,6 +26,17 @@ import (
2426
"github.com/sirupsen/logrus"
2527
)
2628

29+
var unpigzPath string
30+
31+
func init() {
32+
if path, err := exec.LookPath("unpigz"); err != nil {
33+
logrus.Debug("unpigz binary not found in PATH, falling back to go gzip library")
34+
} else {
35+
logrus.Debugf("Using unpigz binary found at path %s", path)
36+
unpigzPath = path
37+
}
38+
}
39+
2740
type (
2841
// Compression is the state represents if compressed or not.
2942
Compression int
@@ -136,10 +149,34 @@ func DetectCompression(source []byte) Compression {
136149
return Uncompressed
137150
}
138151

139-
func xzDecompress(archive io.Reader) (io.ReadCloser, <-chan struct{}, error) {
152+
func xzDecompress(ctx context.Context, archive io.Reader) (io.ReadCloser, error) {
140153
args := []string{"xz", "-d", "-c", "-q"}
141154

142-
return cmdStream(exec.Command(args[0], args[1:]...), archive)
155+
return cmdStream(exec.CommandContext(ctx, args[0], args[1:]...), archive)
156+
}
157+
158+
func gzDecompress(ctx context.Context, buf io.Reader) (io.ReadCloser, error) {
159+
if unpigzPath == "" {
160+
return gzip.NewReader(buf)
161+
}
162+
163+
disablePigzEnv := os.Getenv("MOBY_DISABLE_PIGZ")
164+
if disablePigzEnv != "" {
165+
if disablePigz, err := strconv.ParseBool(disablePigzEnv); err != nil {
166+
return nil, err
167+
} else if disablePigz {
168+
return gzip.NewReader(buf)
169+
}
170+
}
171+
172+
return cmdStream(exec.CommandContext(ctx, unpigzPath, "-d", "-c"), buf)
173+
}
174+
175+
func wrapReadCloser(readBuf io.ReadCloser, cancel context.CancelFunc) io.ReadCloser {
176+
return ioutils.NewReadCloserWrapper(readBuf, func() error {
177+
cancel()
178+
return readBuf.Close()
179+
})
143180
}
144181

145182
// DecompressStream decompresses the archive and returns a ReaderCloser with the decompressed archive.
@@ -163,26 +200,29 @@ func DecompressStream(archive io.Reader) (io.ReadCloser, error) {
163200
readBufWrapper := p.NewReadCloserWrapper(buf, buf)
164201
return readBufWrapper, nil
165202
case Gzip:
166-
gzReader, err := gzip.NewReader(buf)
203+
ctx, cancel := context.WithCancel(context.Background())
204+
205+
gzReader, err := gzDecompress(ctx, buf)
167206
if err != nil {
207+
cancel()
168208
return nil, err
169209
}
170210
readBufWrapper := p.NewReadCloserWrapper(buf, gzReader)
171-
return readBufWrapper, nil
211+
return wrapReadCloser(readBufWrapper, cancel), nil
172212
case Bzip2:
173213
bz2Reader := bzip2.NewReader(buf)
174214
readBufWrapper := p.NewReadCloserWrapper(buf, bz2Reader)
175215
return readBufWrapper, nil
176216
case Xz:
177-
xzReader, chdone, err := xzDecompress(buf)
217+
ctx, cancel := context.WithCancel(context.Background())
218+
219+
xzReader, err := xzDecompress(ctx, buf)
178220
if err != nil {
221+
cancel()
179222
return nil, err
180223
}
181224
readBufWrapper := p.NewReadCloserWrapper(buf, xzReader)
182-
return ioutils.NewReadCloserWrapper(readBufWrapper, func() error {
183-
<-chdone
184-
return readBufWrapper.Close()
185-
}), nil
225+
return wrapReadCloser(readBufWrapper, cancel), nil
186226
default:
187227
return nil, fmt.Errorf("Unsupported compression format %s", (&compression).Extension())
188228
}
@@ -1163,8 +1203,7 @@ func remapIDs(idMappings *idtools.IDMappings, hdr *tar.Header) error {
11631203
// cmdStream executes a command, and returns its stdout as a stream.
11641204
// If the command fails to run or doesn't complete successfully, an error
11651205
// will be returned, including anything written on stderr.
1166-
func cmdStream(cmd *exec.Cmd, input io.Reader) (io.ReadCloser, <-chan struct{}, error) {
1167-
chdone := make(chan struct{})
1206+
func cmdStream(cmd *exec.Cmd, input io.Reader) (io.ReadCloser, error) {
11681207
cmd.Stdin = input
11691208
pipeR, pipeW := io.Pipe()
11701209
cmd.Stdout = pipeW
@@ -1173,7 +1212,7 @@ func cmdStream(cmd *exec.Cmd, input io.Reader) (io.ReadCloser, <-chan struct{},
11731212

11741213
// Run the command and return the pipe
11751214
if err := cmd.Start(); err != nil {
1176-
return nil, nil, err
1215+
return nil, err
11771216
}
11781217

11791218
// Copy stdout to the returned pipe
@@ -1183,10 +1222,9 @@ func cmdStream(cmd *exec.Cmd, input io.Reader) (io.ReadCloser, <-chan struct{},
11831222
} else {
11841223
pipeW.Close()
11851224
}
1186-
close(chdone)
11871225
}()
11881226

1189-
return pipeR, chdone, nil
1227+
return pipeR, nil
11901228
}
11911229

11921230
// NewTempArchive reads the content of src into a temporary file, and returns the contents

pkg/archive/archive_test.go

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package archive
33
import (
44
"archive/tar"
55
"bytes"
6+
"compress/gzip"
67
"fmt"
78
"io"
89
"io/ioutil"
@@ -15,6 +16,7 @@ import (
1516
"time"
1617

1718
"github.com/docker/docker/pkg/idtools"
19+
"github.com/docker/docker/pkg/ioutils"
1820
"github.com/stretchr/testify/assert"
1921
"github.com/stretchr/testify/require"
2022
)
@@ -87,7 +89,7 @@ func TestIsArchivePathTar(t *testing.T) {
8789
}
8890
}
8991

90-
func testDecompressStream(t *testing.T, ext, compressCommand string) {
92+
func testDecompressStream(t *testing.T, ext, compressCommand string) io.Reader {
9193
cmd := exec.Command("sh", "-c",
9294
fmt.Sprintf("touch /tmp/archive && %s /tmp/archive", compressCommand))
9395
output, err := cmd.CombinedOutput()
@@ -111,6 +113,8 @@ func testDecompressStream(t *testing.T, ext, compressCommand string) {
111113
if err = r.Close(); err != nil {
112114
t.Fatalf("Failed to close the decompressed stream: %v ", err)
113115
}
116+
117+
return r
114118
}
115119

116120
func TestDecompressStreamGzip(t *testing.T) {
@@ -206,7 +210,7 @@ func TestExtensionXz(t *testing.T) {
206210

207211
func TestCmdStreamLargeStderr(t *testing.T) {
208212
cmd := exec.Command("sh", "-c", "dd if=/dev/zero bs=1k count=1000 of=/dev/stderr; echo hello")
209-
out, _, err := cmdStream(cmd, nil)
213+
out, err := cmdStream(cmd, nil)
210214
if err != nil {
211215
t.Fatalf("Failed to start command: %s", err)
212216
}
@@ -231,7 +235,7 @@ func TestCmdStreamBad(t *testing.T) {
231235
t.Skip("Failing on Windows CI machines")
232236
}
233237
badCmd := exec.Command("sh", "-c", "echo hello; echo >&2 error couldn\\'t reverse the phase pulser; exit 1")
234-
out, _, err := cmdStream(badCmd, nil)
238+
out, err := cmdStream(badCmd, nil)
235239
if err != nil {
236240
t.Fatalf("Failed to start command: %s", err)
237241
}
@@ -246,7 +250,7 @@ func TestCmdStreamBad(t *testing.T) {
246250

247251
func TestCmdStreamGood(t *testing.T) {
248252
cmd := exec.Command("sh", "-c", "echo hello; exit 0")
249-
out, _, err := cmdStream(cmd, nil)
253+
out, err := cmdStream(cmd, nil)
250254
if err != nil {
251255
t.Fatal(err)
252256
}
@@ -1318,3 +1322,38 @@ func readFileFromArchive(t *testing.T, archive io.ReadCloser, name string, expec
13181322
assert.NoError(t, err)
13191323
return string(content)
13201324
}
1325+
1326+
func TestDisablePigz(t *testing.T) {
1327+
_, err := exec.LookPath("unpigz")
1328+
if err != nil {
1329+
t.Log("Test will not check full path when Pigz not installed")
1330+
}
1331+
1332+
os.Setenv("MOBY_DISABLE_PIGZ", "true")
1333+
defer os.Unsetenv("MOBY_DISABLE_PIGZ")
1334+
1335+
r := testDecompressStream(t, "gz", "gzip -f")
1336+
// For the bufio pool
1337+
outsideReaderCloserWrapper := r.(*ioutils.ReadCloserWrapper)
1338+
// For the context canceller
1339+
contextReaderCloserWrapper := outsideReaderCloserWrapper.Reader.(*ioutils.ReadCloserWrapper)
1340+
1341+
assert.IsType(t, &gzip.Reader{}, contextReaderCloserWrapper.Reader)
1342+
}
1343+
1344+
func TestPigz(t *testing.T) {
1345+
r := testDecompressStream(t, "gz", "gzip -f")
1346+
// For the bufio pool
1347+
outsideReaderCloserWrapper := r.(*ioutils.ReadCloserWrapper)
1348+
// For the context canceller
1349+
contextReaderCloserWrapper := outsideReaderCloserWrapper.Reader.(*ioutils.ReadCloserWrapper)
1350+
1351+
_, err := exec.LookPath("unpigz")
1352+
if err == nil {
1353+
t.Log("Tested whether Pigz is used, as it installed")
1354+
assert.IsType(t, &io.PipeReader{}, contextReaderCloserWrapper.Reader)
1355+
} else {
1356+
t.Log("Tested whether Pigz is not used, as it not installed")
1357+
assert.IsType(t, &gzip.Reader{}, contextReaderCloserWrapper.Reader)
1358+
}
1359+
}

pkg/ioutils/readers.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,22 @@ import (
88
"golang.org/x/net/context"
99
)
1010

11-
type readCloserWrapper struct {
11+
// ReadCloserWrapper wraps an io.Reader, and implements an io.ReadCloser
12+
// It calls the given callback function when closed. It should be constructed
13+
// with NewReadCloserWrapper
14+
type ReadCloserWrapper struct {
1215
io.Reader
1316
closer func() error
1417
}
1518

16-
func (r *readCloserWrapper) Close() error {
19+
// Close calls back the passed closer function
20+
func (r *ReadCloserWrapper) Close() error {
1721
return r.closer()
1822
}
1923

2024
// NewReadCloserWrapper returns a new io.ReadCloser.
2125
func NewReadCloserWrapper(r io.Reader, closer func() error) io.ReadCloser {
22-
return &readCloserWrapper{
26+
return &ReadCloserWrapper{
2327
Reader: r,
2428
closer: closer,
2529
}

0 commit comments

Comments
 (0)