Skip to content

Commit db358a9

Browse files
committed
Fix panic when bufio Reader called in 2 goroutines
A panic was seen related to the buffer being reset in one goroutine while being read in another. In the case of pigz an early cancellation will cause the reader to close, resetting the buffer and signaling the process to shut down, but races since the process must stop reading before the reset otherwise the a panic may occur. This fix guarantees that the bufio is always reset and returned to the pool on the same goroutine that is doing the read. If a buffer is not fully read the buffered reader should just be discarded and not returned back to the pool. Signed-off-by: Derek McGowan <[email protected]>
1 parent 4b1d56e commit db358a9

File tree

1 file changed

+43
-14
lines changed

1 file changed

+43
-14
lines changed

archive/compression/compression.go

+43-14
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,36 @@ func (w *writeCloserWrapper) Close() error {
9292
return nil
9393
}
9494

95+
type bufferedReader struct {
96+
buf *bufio.Reader
97+
}
98+
99+
func newBufferedReader(r io.Reader) *bufferedReader {
100+
buf := bufioReader32KPool.Get().(*bufio.Reader)
101+
buf.Reset(r)
102+
return &bufferedReader{buf}
103+
}
104+
105+
func (r *bufferedReader) Read(p []byte) (n int, err error) {
106+
if r.buf == nil {
107+
return 0, io.EOF
108+
}
109+
n, err = r.buf.Read(p)
110+
if err == io.EOF {
111+
r.buf.Reset(nil)
112+
bufioReader32KPool.Put(r.buf)
113+
r.buf = nil
114+
}
115+
return
116+
}
117+
118+
func (r *bufferedReader) Peek(n int) ([]byte, error) {
119+
if r.buf == nil {
120+
return nil, io.EOF
121+
}
122+
return r.buf.Peek(n)
123+
}
124+
95125
// DetectCompression detects the compression algorithm of the source.
96126
func DetectCompression(source []byte) Compression {
97127
for compression, m := range map[Compression][]byte{
@@ -110,8 +140,7 @@ func DetectCompression(source []byte) Compression {
110140

111141
// DecompressStream decompresses the archive and returns a ReaderCloser with the decompressed archive.
112142
func DecompressStream(archive io.Reader) (DecompressReadCloser, error) {
113-
buf := bufioReader32KPool.Get().(*bufio.Reader)
114-
buf.Reset(archive)
143+
buf := newBufferedReader(archive)
115144
bs, err := buf.Peek(10)
116145
if err != nil && err != io.EOF {
117146
// Note: we'll ignore any io.EOF error because there are some odd
@@ -123,15 +152,12 @@ func DecompressStream(archive io.Reader) (DecompressReadCloser, error) {
123152
return nil, err
124153
}
125154

126-
closer := func() error {
127-
buf.Reset(nil)
128-
bufioReader32KPool.Put(buf)
129-
return nil
130-
}
131155
switch compression := DetectCompression(bs); compression {
132156
case Uncompressed:
133-
readBufWrapper := &readCloserWrapper{buf, compression, closer}
134-
return readBufWrapper, nil
157+
return &readCloserWrapper{
158+
Reader: buf,
159+
compression: compression,
160+
}, nil
135161
case Gzip:
136162
ctx, cancel := context.WithCancel(context.Background())
137163
gzReader, err := gzipDecompress(ctx, buf)
@@ -140,12 +166,15 @@ func DecompressStream(archive io.Reader) (DecompressReadCloser, error) {
140166
return nil, err
141167
}
142168

143-
readBufWrapper := &readCloserWrapper{gzReader, compression, func() error {
144-
cancel()
145-
return closer()
146-
}}
169+
return &readCloserWrapper{
170+
Reader: gzReader,
171+
compression: compression,
172+
closer: func() error {
173+
cancel()
174+
return gzReader.Close()
175+
},
176+
}, nil
147177

148-
return readBufWrapper, nil
149178
default:
150179
return nil, fmt.Errorf("unsupported compression format %s", (&compression).Extension())
151180
}

0 commit comments

Comments
 (0)