@@ -6,13 +6,15 @@ import (
66 "bytes"
77 "compress/bzip2"
88 "compress/gzip"
9+ "context"
910 "fmt"
1011 "io"
1112 "io/ioutil"
1213 "os"
1314 "os/exec"
1415 "path/filepath"
1516 "runtime"
17+ "strconv"
1618 "strings"
1719 "syscall"
1820
@@ -24,6 +26,17 @@ import (
2426 "github.com/sirupsen/logrus"
2527)
2628
29+ var unpigzPath string
30+
31+ func init () {
32+ if path , err := exec .LookPath ("unpigz" ); err != nil {
33+ logrus .Debug ("unpigz binary not found in PATH, falling back to go gzip library" )
34+ } else {
35+ logrus .Debugf ("Using unpigz binary found at path %s" , path )
36+ unpigzPath = path
37+ }
38+ }
39+
2740type (
2841 // Compression is the state represents if compressed or not.
2942 Compression int
@@ -136,10 +149,34 @@ func DetectCompression(source []byte) Compression {
136149 return Uncompressed
137150}
138151
139- func xzDecompress (archive io.Reader ) (io.ReadCloser , <- chan struct {} , error ) {
152+ func xzDecompress (ctx context. Context , archive io.Reader ) (io.ReadCloser , error ) {
140153 args := []string {"xz" , "-d" , "-c" , "-q" }
141154
142- return cmdStream (exec .Command (args [0 ], args [1 :]... ), archive )
155+ return cmdStream (exec .CommandContext (ctx , args [0 ], args [1 :]... ), archive )
156+ }
157+
158+ func gzDecompress (ctx context.Context , buf io.Reader ) (io.ReadCloser , error ) {
159+ if unpigzPath == "" {
160+ return gzip .NewReader (buf )
161+ }
162+
163+ disablePigzEnv := os .Getenv ("MOBY_DISABLE_PIGZ" )
164+ if disablePigzEnv != "" {
165+ if disablePigz , err := strconv .ParseBool (disablePigzEnv ); err != nil {
166+ return nil , err
167+ } else if disablePigz {
168+ return gzip .NewReader (buf )
169+ }
170+ }
171+
172+ return cmdStream (exec .CommandContext (ctx , unpigzPath , "-d" , "-c" ), buf )
173+ }
174+
175+ func wrapReadCloser (readBuf io.ReadCloser , cancel context.CancelFunc ) io.ReadCloser {
176+ return ioutils .NewReadCloserWrapper (readBuf , func () error {
177+ cancel ()
178+ return readBuf .Close ()
179+ })
143180}
144181
145182// DecompressStream decompresses the archive and returns a ReaderCloser with the decompressed archive.
@@ -163,26 +200,29 @@ func DecompressStream(archive io.Reader) (io.ReadCloser, error) {
163200 readBufWrapper := p .NewReadCloserWrapper (buf , buf )
164201 return readBufWrapper , nil
165202 case Gzip :
166- gzReader , err := gzip .NewReader (buf )
203+ ctx , cancel := context .WithCancel (context .Background ())
204+
205+ gzReader , err := gzDecompress (ctx , buf )
167206 if err != nil {
207+ cancel ()
168208 return nil , err
169209 }
170210 readBufWrapper := p .NewReadCloserWrapper (buf , gzReader )
171- return readBufWrapper , nil
211+ return wrapReadCloser ( readBufWrapper , cancel ) , nil
172212 case Bzip2 :
173213 bz2Reader := bzip2 .NewReader (buf )
174214 readBufWrapper := p .NewReadCloserWrapper (buf , bz2Reader )
175215 return readBufWrapper , nil
176216 case Xz :
177- xzReader , chdone , err := xzDecompress (buf )
217+ ctx , cancel := context .WithCancel (context .Background ())
218+
219+ xzReader , err := xzDecompress (ctx , buf )
178220 if err != nil {
221+ cancel ()
179222 return nil , err
180223 }
181224 readBufWrapper := p .NewReadCloserWrapper (buf , xzReader )
182- return ioutils .NewReadCloserWrapper (readBufWrapper , func () error {
183- <- chdone
184- return readBufWrapper .Close ()
185- }), nil
225+ return wrapReadCloser (readBufWrapper , cancel ), nil
186226 default :
187227 return nil , fmt .Errorf ("Unsupported compression format %s" , (& compression ).Extension ())
188228 }
@@ -1163,8 +1203,7 @@ func remapIDs(idMappings *idtools.IDMappings, hdr *tar.Header) error {
11631203// cmdStream executes a command, and returns its stdout as a stream.
11641204// If the command fails to run or doesn't complete successfully, an error
11651205// will be returned, including anything written on stderr.
1166- func cmdStream (cmd * exec.Cmd , input io.Reader ) (io.ReadCloser , <- chan struct {}, error ) {
1167- chdone := make (chan struct {})
1206+ func cmdStream (cmd * exec.Cmd , input io.Reader ) (io.ReadCloser , error ) {
11681207 cmd .Stdin = input
11691208 pipeR , pipeW := io .Pipe ()
11701209 cmd .Stdout = pipeW
@@ -1173,7 +1212,7 @@ func cmdStream(cmd *exec.Cmd, input io.Reader) (io.ReadCloser, <-chan struct{},
11731212
11741213 // Run the command and return the pipe
11751214 if err := cmd .Start (); err != nil {
1176- return nil , nil , err
1215+ return nil , err
11771216 }
11781217
11791218 // Copy stdout to the returned pipe
@@ -1183,10 +1222,9 @@ func cmdStream(cmd *exec.Cmd, input io.Reader) (io.ReadCloser, <-chan struct{},
11831222 } else {
11841223 pipeW .Close ()
11851224 }
1186- close (chdone )
11871225 }()
11881226
1189- return pipeR , chdone , nil
1227+ return pipeR , nil
11901228}
11911229
11921230// NewTempArchive reads the content of src into a temporary file, and returns the contents
0 commit comments