Skip to content

Commit df60d32

Browse files
authored
Merge pull request #2687 from dmcgowan/fix-pigz-panic
Fix panic when bufio Reader called in 2 goroutines
2 parents 75d7d6e + db358a9 commit df60d32

1 file changed

Lines changed: 43 additions & 14 deletions

File tree

archive/compression/compression.go

Lines changed: 43 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,36 @@ func (w *writeCloserWrapper) Close() error {
9292
return nil
9393
}
9494

95+
type bufferedReader struct {
96+
buf *bufio.Reader
97+
}
98+
99+
func newBufferedReader(r io.Reader) *bufferedReader {
100+
buf := bufioReader32KPool.Get().(*bufio.Reader)
101+
buf.Reset(r)
102+
return &bufferedReader{buf}
103+
}
104+
105+
func (r *bufferedReader) Read(p []byte) (n int, err error) {
106+
if r.buf == nil {
107+
return 0, io.EOF
108+
}
109+
n, err = r.buf.Read(p)
110+
if err == io.EOF {
111+
r.buf.Reset(nil)
112+
bufioReader32KPool.Put(r.buf)
113+
r.buf = nil
114+
}
115+
return
116+
}
117+
118+
func (r *bufferedReader) Peek(n int) ([]byte, error) {
119+
if r.buf == nil {
120+
return nil, io.EOF
121+
}
122+
return r.buf.Peek(n)
123+
}
124+
95125
// DetectCompression detects the compression algorithm of the source.
96126
func DetectCompression(source []byte) Compression {
97127
for compression, m := range map[Compression][]byte{
@@ -110,8 +140,7 @@ func DetectCompression(source []byte) Compression {
110140

111141
// DecompressStream decompresses the archive and returns a ReaderCloser with the decompressed archive.
112142
func DecompressStream(archive io.Reader) (DecompressReadCloser, error) {
113-
buf := bufioReader32KPool.Get().(*bufio.Reader)
114-
buf.Reset(archive)
143+
buf := newBufferedReader(archive)
115144
bs, err := buf.Peek(10)
116145
if err != nil && err != io.EOF {
117146
// Note: we'll ignore any io.EOF error because there are some odd
@@ -123,15 +152,12 @@ func DecompressStream(archive io.Reader) (DecompressReadCloser, error) {
123152
return nil, err
124153
}
125154

126-
closer := func() error {
127-
buf.Reset(nil)
128-
bufioReader32KPool.Put(buf)
129-
return nil
130-
}
131155
switch compression := DetectCompression(bs); compression {
132156
case Uncompressed:
133-
readBufWrapper := &readCloserWrapper{buf, compression, closer}
134-
return readBufWrapper, nil
157+
return &readCloserWrapper{
158+
Reader: buf,
159+
compression: compression,
160+
}, nil
135161
case Gzip:
136162
ctx, cancel := context.WithCancel(context.Background())
137163
gzReader, err := gzipDecompress(ctx, buf)
@@ -140,12 +166,15 @@ func DecompressStream(archive io.Reader) (DecompressReadCloser, error) {
140166
return nil, err
141167
}
142168

143-
readBufWrapper := &readCloserWrapper{gzReader, compression, func() error {
144-
cancel()
145-
return closer()
146-
}}
169+
return &readCloserWrapper{
170+
Reader: gzReader,
171+
compression: compression,
172+
closer: func() error {
173+
cancel()
174+
return gzReader.Close()
175+
},
176+
}, nil
147177

148-
return readBufWrapper, nil
149178
default:
150179
return nil, fmt.Errorf("unsupported compression format %s", (&compression).Extension())
151180
}

0 commit comments

Comments
 (0)