Skip to content

Commit b38b442

Browse files
Merge pull request #2640 from mxpv/pgzip
Support parallel decompression (pigz)
2 parents 7141ea3 + e8fac24 commit b38b442

File tree

2 files changed

+195
-4
lines changed

2 files changed

+195
-4
lines changed

archive/compression/compression.go

+86-2
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,15 @@ import (
2020
"bufio"
2121
"bytes"
2222
"compress/gzip"
23+
"context"
2324
"fmt"
2425
"io"
26+
"os"
27+
"os/exec"
28+
"strconv"
2529
"sync"
30+
31+
"github.com/containerd/containerd/log"
2632
)
2733

2834
type (
@@ -37,6 +43,13 @@ const (
3743
Gzip
3844
)
3945

46+
const disablePigzEnv = "CONTAINERD_DISABLE_PIGZ"
47+
48+
var (
49+
initPigz sync.Once
50+
unpigzPath string
51+
)
52+
4053
var (
4154
bufioReader32KPool = &sync.Pool{
4255
New: func() interface{} { return bufio.NewReaderSize(nil, 32*1024) },
@@ -120,11 +133,18 @@ func DecompressStream(archive io.Reader) (DecompressReadCloser, error) {
120133
readBufWrapper := &readCloserWrapper{buf, compression, closer}
121134
return readBufWrapper, nil
122135
case Gzip:
123-
gzReader, err := gzip.NewReader(buf)
136+
ctx, cancel := context.WithCancel(context.Background())
137+
gzReader, err := gzipDecompress(ctx, buf)
124138
if err != nil {
139+
cancel()
125140
return nil, err
126141
}
127-
readBufWrapper := &readCloserWrapper{gzReader, compression, closer}
142+
143+
readBufWrapper := &readCloserWrapper{gzReader, compression, func() error {
144+
cancel()
145+
return closer()
146+
}}
147+
128148
return readBufWrapper, nil
129149
default:
130150
return nil, fmt.Errorf("unsupported compression format %s", (&compression).Extension())
@@ -151,3 +171,67 @@ func (compression *Compression) Extension() string {
151171
}
152172
return ""
153173
}
174+
175+
func gzipDecompress(ctx context.Context, buf io.Reader) (io.ReadCloser, error) {
176+
initPigz.Do(func() {
177+
if unpigzPath = detectPigz(); unpigzPath != "" {
178+
log.L.Debug("using pigz for decompression")
179+
}
180+
})
181+
182+
if unpigzPath == "" {
183+
return gzip.NewReader(buf)
184+
}
185+
186+
return cmdStream(exec.CommandContext(ctx, unpigzPath, "-d", "-c"), buf)
187+
}
188+
189+
func cmdStream(cmd *exec.Cmd, in io.Reader) (io.ReadCloser, error) {
190+
reader, writer := io.Pipe()
191+
192+
cmd.Stdin = in
193+
cmd.Stdout = writer
194+
195+
var errBuf bytes.Buffer
196+
cmd.Stderr = &errBuf
197+
198+
if err := cmd.Start(); err != nil {
199+
return nil, err
200+
}
201+
202+
go func() {
203+
if err := cmd.Wait(); err != nil {
204+
writer.CloseWithError(fmt.Errorf("%s: %s", err, errBuf.String()))
205+
} else {
206+
writer.Close()
207+
}
208+
}()
209+
210+
return reader, nil
211+
}
212+
213+
func detectPigz() string {
214+
path, err := exec.LookPath("unpigz")
215+
if err != nil {
216+
log.L.WithError(err).Debug("unpigz not found, falling back to go gzip")
217+
return ""
218+
}
219+
220+
// Check if pigz disabled via CONTAINERD_DISABLE_PIGZ env variable
221+
value := os.Getenv(disablePigzEnv)
222+
if value == "" {
223+
return path
224+
}
225+
226+
disable, err := strconv.ParseBool(value)
227+
if err != nil {
228+
log.L.WithError(err).Warnf("could not parse %s: %s", disablePigzEnv, value)
229+
return path
230+
}
231+
232+
if disable {
233+
return ""
234+
}
235+
236+
return path
237+
}

archive/compression/compression_test.go

+109-2
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,25 @@ package compression
1818

1919
import (
2020
"bytes"
21+
"compress/gzip"
22+
"context"
23+
"io"
2124
"io/ioutil"
2225
"math/rand"
26+
"os"
27+
"os/exec"
28+
"path/filepath"
29+
"runtime"
30+
"strings"
2331
"testing"
2432
)
2533

34+
func TestMain(m *testing.M) {
35+
// Force initPigz to be called, so tests start with the same initial state
36+
gzipDecompress(context.Background(), strings.NewReader(""))
37+
os.Exit(m.Run())
38+
}
39+
2640
// generateData generates data that composed of 2 random parts
2741
// and single zero-filled part within them.
2842
// Typically, the compression ratio would be about 67%.
@@ -42,7 +56,7 @@ func generateData(t *testing.T, size int) []byte {
4256
return append(part0Data, append(part1Data, part2Data...)...)
4357
}
4458

45-
func testCompressDecompress(t *testing.T, size int, compression Compression) {
59+
func testCompressDecompress(t *testing.T, size int, compression Compression) DecompressReadCloser {
4660
orig := generateData(t, size)
4761
var b bytes.Buffer
4862
compressor, err := CompressStream(&b, compression)
@@ -72,12 +86,105 @@ func testCompressDecompress(t *testing.T, size int, compression Compression) {
7286
if !bytes.Equal(orig, decompressed) {
7387
t.Fatal("strange decompressed data")
7488
}
89+
90+
return decompressor
7591
}
7692

7793
func TestCompressDecompressGzip(t *testing.T) {
78-
testCompressDecompress(t, 1024*1024, Gzip)
94+
oldUnpigzPath := unpigzPath
95+
unpigzPath = ""
96+
defer func() { unpigzPath = oldUnpigzPath }()
97+
98+
decompressor := testCompressDecompress(t, 1024*1024, Gzip)
99+
wrapper := decompressor.(*readCloserWrapper)
100+
_, ok := wrapper.Reader.(*gzip.Reader)
101+
if !ok {
102+
t.Fatalf("unexpected compressor type: %T", wrapper.Reader)
103+
}
104+
}
105+
106+
func TestCompressDecompressPigz(t *testing.T) {
107+
if _, err := exec.LookPath("unpigz"); err != nil {
108+
t.Skip("pigz not installed")
109+
}
110+
111+
decompressor := testCompressDecompress(t, 1024*1024, Gzip)
112+
wrapper := decompressor.(*readCloserWrapper)
113+
_, ok := wrapper.Reader.(*io.PipeReader)
114+
if !ok {
115+
t.Fatalf("unexpected compressor type: %T", wrapper.Reader)
116+
}
79117
}
80118

81119
func TestCompressDecompressUncompressed(t *testing.T) {
82120
testCompressDecompress(t, 1024*1024, Uncompressed)
83121
}
122+
123+
func TestDetectPigz(t *testing.T) {
124+
// Create fake PATH with unpigz executable, make sure detectPigz can find it
125+
tempPath, err := ioutil.TempDir("", "containerd_temp_")
126+
if err != nil {
127+
t.Fatal(err)
128+
}
129+
130+
filename := "unpigz"
131+
if runtime.GOOS == "windows" {
132+
filename = "unpigz.exe"
133+
}
134+
135+
fullPath := filepath.Join(tempPath, filename)
136+
137+
if err := ioutil.WriteFile(fullPath, []byte(""), 0111); err != nil {
138+
t.Fatal(err)
139+
}
140+
141+
defer os.RemoveAll(tempPath)
142+
143+
oldPath := os.Getenv("PATH")
144+
os.Setenv("PATH", tempPath)
145+
defer os.Setenv("PATH", oldPath)
146+
147+
if pigzPath := detectPigz(); pigzPath == "" {
148+
t.Fatal("failed to detect pigz path")
149+
} else if pigzPath != fullPath {
150+
t.Fatalf("wrong pigz found: %s != %s", pigzPath, fullPath)
151+
}
152+
153+
os.Setenv(disablePigzEnv, "1")
154+
defer os.Unsetenv(disablePigzEnv)
155+
156+
if pigzPath := detectPigz(); pigzPath != "" {
157+
t.Fatalf("disable via %s doesn't work", disablePigzEnv)
158+
}
159+
}
160+
161+
func TestCmdStream(t *testing.T) {
162+
out, err := cmdStream(exec.Command("sh", "-c", "echo hello; exit 0"), nil)
163+
if err != nil {
164+
t.Fatal(err)
165+
}
166+
167+
buf, err := ioutil.ReadAll(out)
168+
if err != nil {
169+
t.Fatalf("failed to read from stdout: %s", err)
170+
}
171+
172+
if string(buf) != "hello\n" {
173+
t.Fatalf("unexpected command output ('%s' != '%s')", string(buf), "hello\n")
174+
}
175+
}
176+
177+
func TestCmdStreamBad(t *testing.T) {
178+
out, err := cmdStream(exec.Command("sh", "-c", "echo hello; echo >&2 bad result; exit 1"), nil)
179+
if err != nil {
180+
t.Fatalf("failed to start command: %v", err)
181+
}
182+
183+
if buf, err := ioutil.ReadAll(out); err == nil {
184+
t.Fatal("command should have failed")
185+
} else if err.Error() != "exit status 1: bad result\n" {
186+
t.Fatalf("wrong error: %s", err.Error())
187+
} else if string(buf) != "hello\n" {
188+
t.Fatalf("wrong output: %s", string(buf))
189+
}
190+
}

0 commit comments

Comments
 (0)