Skip to content

Commit 4d7d63f

Browse files
committed
Improve layer decompression speed by using pigz
Signed-off-by: Maksym Pavlenko <[email protected]>
1 parent d09a1c6 commit 4d7d63f

File tree

3 files changed

+6155
-4
lines changed

3 files changed

+6155
-4
lines changed

archive/compression/compression.go

+83-2
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,15 @@ import (
2020
"bufio"
2121
"bytes"
2222
"compress/gzip"
23+
"context"
2324
"fmt"
2425
"io"
26+
"os"
27+
"os/exec"
28+
"strconv"
2529
"sync"
30+
31+
"github.com/containerd/containerd/log"
2632
)
2733

2834
type (
@@ -37,6 +43,16 @@ const (
3743
Gzip
3844
)
3945

46+
const disablePigzEnv = "CONTAINERD_DISABLE_PIGZ"
47+
48+
var unpigzPath string
49+
50+
func init() {
51+
if unpigzPath = detectPigz(); unpigzPath != "" {
52+
log.L.Debug("using pigz for decompression")
53+
}
54+
}
55+
4056
var (
4157
bufioReader32KPool = &sync.Pool{
4258
New: func() interface{} { return bufio.NewReaderSize(nil, 32*1024) },
@@ -120,11 +136,18 @@ func DecompressStream(archive io.Reader) (DecompressReadCloser, error) {
120136
readBufWrapper := &readCloserWrapper{buf, compression, closer}
121137
return readBufWrapper, nil
122138
case Gzip:
123-
gzReader, err := gzip.NewReader(buf)
139+
ctx, cancel := context.WithCancel(context.Background())
140+
gzReader, err := gzipDecompress(ctx, buf)
124141
if err != nil {
142+
cancel()
125143
return nil, err
126144
}
127-
readBufWrapper := &readCloserWrapper{gzReader, compression, closer}
145+
146+
readBufWrapper := &readCloserWrapper{gzReader, compression, func() error {
147+
cancel()
148+
return closer()
149+
}}
150+
128151
return readBufWrapper, nil
129152
default:
130153
return nil, fmt.Errorf("unsupported compression format %s", (&compression).Extension())
@@ -151,3 +174,61 @@ func (compression *Compression) Extension() string {
151174
}
152175
return ""
153176
}
177+
178+
func gzipDecompress(ctx context.Context, buf io.Reader) (io.ReadCloser, error) {
179+
if unpigzPath == "" {
180+
return gzip.NewReader(buf)
181+
}
182+
183+
return cmdStream(exec.CommandContext(ctx, unpigzPath, "-d", "-c"), buf)
184+
}
185+
186+
func cmdStream(cmd *exec.Cmd, in io.Reader) (io.ReadCloser, error) {
187+
reader, writer := io.Pipe()
188+
189+
cmd.Stdin = in
190+
cmd.Stdout = writer
191+
192+
var errBuf bytes.Buffer
193+
cmd.Stderr = &errBuf
194+
195+
if err := cmd.Start(); err != nil {
196+
return nil, err
197+
}
198+
199+
go func() {
200+
if err := cmd.Wait(); err != nil {
201+
writer.CloseWithError(fmt.Errorf("%s: %s", err, errBuf.String()))
202+
} else {
203+
writer.Close()
204+
}
205+
}()
206+
207+
return reader, nil
208+
}
209+
210+
func detectPigz() string {
211+
path, err := exec.LookPath("unpigz")
212+
if err != nil {
213+
log.L.WithError(err).Debug("unpigz not found, falling back to go gzip")
214+
return ""
215+
}
216+
217+
// Check if pigz disabled via CONTAINERD_DISABLE_PIGZ env variable
218+
value := os.Getenv(disablePigzEnv)
219+
if value == "" {
220+
return path
221+
}
222+
223+
disable, err := strconv.ParseBool(value)
224+
if err != nil {
225+
log.L.WithError(err).Warnf("could not parse %s: %s", disablePigzEnv, value)
226+
return path
227+
}
228+
229+
if disable {
230+
return ""
231+
}
232+
233+
return path
234+
}

archive/compression/compression_test.go

+170-2
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,15 @@ package compression
1818

1919
import (
2020
"bytes"
21+
"compress/gzip"
22+
"crypto/md5"
23+
"io"
2124
"io/ioutil"
2225
"math/rand"
26+
"os"
27+
"os/exec"
28+
"path/filepath"
29+
"runtime"
2330
"testing"
2431
)
2532

@@ -42,7 +49,7 @@ func generateData(t *testing.T, size int) []byte {
4249
return append(part0Data, append(part1Data, part2Data...)...)
4350
}
4451

45-
func testCompressDecompress(t *testing.T, size int, compression Compression) {
52+
func testCompressDecompress(t *testing.T, size int, compression Compression) DecompressReadCloser {
4653
orig := generateData(t, size)
4754
var b bytes.Buffer
4855
compressor, err := CompressStream(&b, compression)
@@ -72,12 +79,173 @@ func testCompressDecompress(t *testing.T, size int, compression Compression) {
7279
if !bytes.Equal(orig, decompressed) {
7380
t.Fatal("strange decompressed data")
7481
}
82+
83+
return decompressor
7584
}
7685

7786
func TestCompressDecompressGzip(t *testing.T) {
78-
testCompressDecompress(t, 1024*1024, Gzip)
87+
oldUnpigzPath := unpigzPath
88+
unpigzPath = ""
89+
defer func() { unpigzPath = oldUnpigzPath }()
90+
91+
decompressor := testCompressDecompress(t, 1024*1024, Gzip)
92+
wrapper := decompressor.(*readCloserWrapper)
93+
_, ok := wrapper.Reader.(*gzip.Reader)
94+
if !ok {
95+
t.Fatalf("unexpected compressor type: %T", wrapper.Reader)
96+
}
97+
}
98+
99+
func TestCompressDecompressPigz(t *testing.T) {
100+
if _, err := exec.LookPath("unpigz"); err != nil {
101+
t.Skip("pigz not installed")
102+
}
103+
104+
decompressor := testCompressDecompress(t, 1024*1024, Gzip)
105+
wrapper := decompressor.(*readCloserWrapper)
106+
_, ok := wrapper.Reader.(*io.PipeReader)
107+
if !ok {
108+
t.Fatalf("unexpected compressor type: %T", wrapper.Reader)
109+
}
79110
}
80111

81112
func TestCompressDecompressUncompressed(t *testing.T) {
82113
testCompressDecompress(t, 1024*1024, Uncompressed)
83114
}
115+
116+
func TestDetectPigz(t *testing.T) {
117+
// Create fake PATH with unpigz executable, make sure detectPigz can find it
118+
tempPath, err := ioutil.TempDir("", "containerd_temp_")
119+
if err != nil {
120+
t.Fatal(err)
121+
}
122+
123+
filename := "unpigz"
124+
if runtime.GOOS == "windows" {
125+
filename = "unpigz.exe"
126+
}
127+
128+
fullPath := filepath.Join(tempPath, filename)
129+
130+
if err := ioutil.WriteFile(fullPath, []byte(""), 0111); err != nil {
131+
t.Fatal(err)
132+
}
133+
134+
defer os.RemoveAll(tempPath)
135+
136+
oldPath := os.Getenv("PATH")
137+
os.Setenv("PATH", tempPath)
138+
defer os.Setenv("PATH", oldPath)
139+
140+
if pigzPath := detectPigz(); pigzPath == "" {
141+
t.Fatal("failed to detect pigz path")
142+
} else if pigzPath != fullPath {
143+
t.Fatalf("wrong pigz found: %s != %s", pigzPath, fullPath)
144+
}
145+
146+
os.Setenv(disablePigzEnv, "1")
147+
defer os.Unsetenv(disablePigzEnv)
148+
149+
if pigzPath := detectPigz(); pigzPath != "" {
150+
t.Fatalf("disable via %s doesn't work", disablePigzEnv)
151+
}
152+
}
153+
154+
func TestCmdStream(t *testing.T) {
155+
out, err := cmdStream(exec.Command("sh", "-c", "echo hello; exit 0"), nil)
156+
if err != nil {
157+
t.Fatal(err)
158+
}
159+
160+
buf, err := ioutil.ReadAll(out)
161+
if err != nil {
162+
t.Fatalf("failed to read from stdout: %s", err)
163+
}
164+
165+
if string(buf) != "hello\n" {
166+
t.Fatalf("unexpected command output ('%s' != '%s')", string(buf), "hello\n")
167+
}
168+
}
169+
170+
func TestCmdStreamBad(t *testing.T) {
171+
out, err := cmdStream(exec.Command("sh", "-c", "echo hello; echo >&2 bad result; exit 1"), nil)
172+
if err != nil {
173+
t.Fatalf("failed to start command: %v", err)
174+
}
175+
176+
if buf, err := ioutil.ReadAll(out); err == nil {
177+
t.Fatal("command should have failed")
178+
} else if err.Error() != "exit status 1: bad result\n" {
179+
t.Fatalf("wrong error: %s", err.Error())
180+
} else if string(buf) != "hello\n" {
181+
t.Fatalf("wrong output: %s", string(buf))
182+
}
183+
}
184+
185+
func generateCompressedData(b *testing.B, sizeInMb int) []byte {
186+
sizeInBytes := sizeInMb * 1024 * 1024
187+
data, _ := ioutil.ReadFile("testdata/test.json")
188+
189+
for len(data) < sizeInBytes {
190+
data = append(data, data...)
191+
}
192+
193+
b.SetBytes(int64(len(data)))
194+
195+
var buf bytes.Buffer
196+
compressor, err := CompressStream(&buf, Gzip)
197+
if err != nil {
198+
b.Fatal(err)
199+
}
200+
201+
if n, err := compressor.Write(data); err != nil || n != len(data) {
202+
b.Fatal(err)
203+
}
204+
205+
compressor.Close()
206+
return buf.Bytes()
207+
}
208+
209+
func benchmarkDecompression(sizeInMb int) func(*testing.B) {
210+
buf := make([]byte, 32*1024)
211+
return func(b *testing.B) {
212+
compressed := generateCompressedData(b, sizeInMb)
213+
hash := md5.New()
214+
215+
b.ResetTimer()
216+
for n := 0; n < b.N; n++ {
217+
decompressor, err := DecompressStream(bytes.NewReader(compressed))
218+
if err != nil {
219+
b.Fatal(err)
220+
}
221+
222+
if _, err = io.CopyBuffer(hash, decompressor, buf); err != nil {
223+
b.Fatal(err)
224+
}
225+
226+
decompressor.Close()
227+
}
228+
}
229+
}
230+
231+
func BenchmarkGzipDecompression(b *testing.B) {
232+
oldUnpigzPath := unpigzPath
233+
unpigzPath = ""
234+
defer func() { unpigzPath = oldUnpigzPath }()
235+
236+
b.Run("gzip-32mb", benchmarkDecompression(32))
237+
b.Run("gzip-64mb", benchmarkDecompression(64))
238+
b.Run("gzip-128mb", benchmarkDecompression(128))
239+
b.Run("gzip-256mb", benchmarkDecompression(256))
240+
}
241+
242+
func BenchmarkPigzDecompression(b *testing.B) {
243+
if _, err := exec.LookPath("unpigz"); err != nil {
244+
b.Skip("pigz not installed")
245+
}
246+
247+
b.Run("pigz-32mb", benchmarkDecompression(32))
248+
b.Run("pigz-64mb", benchmarkDecompression(64))
249+
b.Run("pigz-128mb", benchmarkDecompression(128))
250+
b.Run("pigz-256mb", benchmarkDecompression(256))
251+
}

0 commit comments

Comments
 (0)