Skip to content

Commit f42ed03

Browse files
authored
Optimized bstream reader used by XORChunk iterator (#7390)
* Optimized bstream reader Signed-off-by: Marco Pracucci <[email protected]> * Fixed linter Signed-off-by: Marco Pracucci <[email protected]> * Added license to new file Signed-off-by: Marco Pracucci <[email protected]> * Fixed type cast Signed-off-by: Marco Pracucci <[email protected]> * Changed comments Signed-off-by: Marco Pracucci <[email protected]> * Improved comments and rolledback no-op changes Signed-off-by: Marco Pracucci <[email protected]> * Fixed race condition Signed-off-by: Marco Pracucci <[email protected]>
1 parent 268b4c2 commit f42ed03

File tree

3 files changed

+225
-67
lines changed

3 files changed

+225
-67
lines changed

tsdb/chunkenc/bstream.go

Lines changed: 104 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,6 @@ type bstream struct {
4949
count uint8 // how many bits are valid in current byte
5050
}
5151

52-
func newBReader(b []byte) bstream {
53-
return bstream{stream: b, count: 8}
54-
}
55-
5652
func (b *bstream) bytes() []byte {
5753
return b.stream
5854
}
@@ -111,90 +107,141 @@ func (b *bstream) writeBits(u uint64, nbits int) {
111107
}
112108
}
113109

114-
func (b *bstream) readBit() (bit, error) {
115-
if len(b.stream) == 0 {
116-
return false, io.EOF
117-
}
110+
type bstreamReader struct {
111+
stream []byte
112+
streamOffset int // The offset from which read the next byte from the stream.
118113

119-
if b.count == 0 {
120-
b.stream = b.stream[1:]
114+
buffer uint64 // The current buffer, filled from the stream, containing up to 8 bytes from which read bits.
115+
valid uint8 // The number of bits valid to read (from left) in the current buffer.
116+
}
117+
118+
func newBReader(b []byte) bstreamReader {
119+
return bstreamReader{
120+
stream: b,
121+
}
122+
}
121123

122-
if len(b.stream) == 0 {
124+
func (b *bstreamReader) readBit() (bit, error) {
125+
if b.valid == 0 {
126+
if !b.loadNextBuffer(1) {
123127
return false, io.EOF
124128
}
125-
b.count = 8
126129
}
127130

128-
d := (b.stream[0] << (8 - b.count)) & 0x80
129-
b.count--
130-
return d != 0, nil
131-
}
132-
133-
func (b *bstream) ReadByte() (byte, error) {
134-
return b.readByte()
131+
return b.readBitFast()
135132
}
136133

137-
func (b *bstream) readByte() (byte, error) {
138-
if len(b.stream) == 0 {
139-
return 0, io.EOF
134+
// readBitFast is like readBit but can return io.EOF if the internal buffer is empty.
135+
// If it returns io.EOF, the caller should retry reading bits calling readBit().
136+
// This function must be kept small and a leaf in order to help the compiler inlining it
137+
// and further improve performances.
138+
func (b *bstreamReader) readBitFast() (bit, error) {
139+
if b.valid == 0 {
140+
return false, io.EOF
140141
}
141142

142-
if b.count == 0 {
143-
b.stream = b.stream[1:]
143+
b.valid--
144+
bitmask := uint64(1) << b.valid
145+
return (b.buffer & bitmask) != 0, nil
146+
}
144147

145-
if len(b.stream) == 0 {
148+
func (b *bstreamReader) readBits(nbits uint8) (uint64, error) {
149+
if b.valid == 0 {
150+
if !b.loadNextBuffer(nbits) {
146151
return 0, io.EOF
147152
}
148-
return b.stream[0], nil
149153
}
150154

151-
if b.count == 8 {
152-
b.count = 0
153-
return b.stream[0], nil
155+
if nbits <= b.valid {
156+
return b.readBitsFast(nbits)
154157
}
155158

156-
byt := b.stream[0] << (8 - b.count)
157-
b.stream = b.stream[1:]
159+
// We have to read all remaining valid bits from the current buffer and a part from the next one.
160+
bitmask := (uint64(1) << b.valid) - 1
161+
nbits -= b.valid
162+
v := (b.buffer & bitmask) << nbits
163+
b.valid = 0
158164

159-
if len(b.stream) == 0 {
165+
if !b.loadNextBuffer(nbits) {
160166
return 0, io.EOF
161167
}
162168

163-
// We just advanced the stream and can assume the shift to be 0.
164-
byt |= b.stream[0] >> b.count
169+
bitmask = (uint64(1) << nbits) - 1
170+
v = v | ((b.buffer >> (b.valid - nbits)) & bitmask)
171+
b.valid -= nbits
165172

166-
return byt, nil
173+
return v, nil
167174
}
168175

169-
func (b *bstream) readBits(nbits int) (uint64, error) {
170-
var u uint64
176+
// readBitsFast is like readBits but can return io.EOF if the internal buffer is empty.
177+
// If it returns io.EOF, the caller should retry reading bits calling readBits().
178+
// This function must be kept small and a leaf in order to help the compiler inlining it
179+
// and further improve performances.
180+
func (b *bstreamReader) readBitsFast(nbits uint8) (uint64, error) {
181+
if nbits > b.valid {
182+
return 0, io.EOF
183+
}
171184

172-
for nbits >= 8 {
173-
byt, err := b.readByte()
174-
if err != nil {
175-
return 0, err
176-
}
185+
bitmask := (uint64(1) << nbits) - 1
186+
b.valid -= nbits
177187

178-
u = (u << 8) | uint64(byt)
179-
nbits -= 8
188+
return (b.buffer >> b.valid) & bitmask, nil
189+
}
190+
191+
func (b *bstreamReader) ReadByte() (byte, error) {
192+
v, err := b.readBits(8)
193+
if err != nil {
194+
return 0, err
180195
}
196+
return byte(v), nil
197+
}
181198

182-
if nbits == 0 {
183-
return u, nil
199+
// loadNextBuffer loads the next bytes from the stream into the internal buffer.
200+
// The input nbits is the minimum number of bits that must be read, but the implementation
201+
// can read more (if possible) to improve performances.
202+
func (b *bstreamReader) loadNextBuffer(nbits uint8) bool {
203+
if b.streamOffset >= len(b.stream) {
204+
return false
184205
}
185206

186-
if nbits > int(b.count) {
187-
u = (u << uint(b.count)) | uint64((b.stream[0]<<(8-b.count))>>(8-b.count))
188-
nbits -= int(b.count)
189-
b.stream = b.stream[1:]
207+
// Handle the case there are more then 8 bytes in the buffer (most common case)
208+
// in a optimized way. It's guaranteed that this branch will never read from the
209+
// very last byte of the stream (which suffers race conditions due to concurrent
210+
// writes).
211+
if b.streamOffset+8 < len(b.stream) {
212+
// This is ugly, but significantly faster.
213+
b.buffer =
214+
((uint64(b.stream[b.streamOffset])) << 56) |
215+
((uint64(b.stream[b.streamOffset+1])) << 48) |
216+
((uint64(b.stream[b.streamOffset+2])) << 40) |
217+
((uint64(b.stream[b.streamOffset+3])) << 32) |
218+
((uint64(b.stream[b.streamOffset+4])) << 24) |
219+
((uint64(b.stream[b.streamOffset+5])) << 16) |
220+
((uint64(b.stream[b.streamOffset+6])) << 8) |
221+
uint64(b.stream[b.streamOffset+7])
222+
223+
b.streamOffset += 8
224+
b.valid = 64
225+
return true
226+
}
190227

191-
if len(b.stream) == 0 {
192-
return 0, io.EOF
193-
}
194-
b.count = 8
228+
// We're here if the are 8 or less bytes left in the stream. Since this reader needs
229+
// to handle race conditions with concurrent writes happening on the very last byte
230+
// we make sure to never over more than the minimum requested bits (rounded up to
231+
// the next byte). The following code is slower but called less frequently.
232+
nbytes := int((nbits / 8) + 1)
233+
if b.streamOffset+nbytes > len(b.stream) {
234+
nbytes = len(b.stream) - b.streamOffset
235+
}
236+
237+
buffer := uint64(0)
238+
for i := 0; i < nbytes; i++ {
239+
buffer = buffer | (uint64(b.stream[b.streamOffset+i]) << uint(8*(nbytes-i-1)))
195240
}
196241

197-
u = (u << uint(nbits)) | uint64((b.stream[0]<<(8-b.count))>>(8-uint(nbits)))
198-
b.count -= uint8(nbits)
199-
return u, nil
242+
b.buffer = buffer
243+
b.streamOffset += nbytes
244+
b.valid = uint8(nbytes * 8)
245+
246+
return true
200247
}

tsdb/chunkenc/bstream_test.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
// Copyright 2017 The Prometheus Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
// The code in this file was largely written by Damian Gryski as part of
15+
// https://github.com/dgryski/go-tsz and published under the license below.
16+
// It received minor modifications to suit Prometheus's needs.
17+
18+
// Copyright (c) 2015,2016 Damian Gryski <[email protected]>
19+
// All rights reserved.
20+
21+
// Redistribution and use in source and binary forms, with or without
22+
// modification, are permitted provided that the following conditions are met:
23+
24+
// * Redistributions of source code must retain the above copyright notice,
25+
// this list of conditions and the following disclaimer.
26+
//
27+
// * Redistributions in binary form must reproduce the above copyright notice,
28+
// this list of conditions and the following disclaimer in the documentation
29+
// and/or other materials provided with the distribution.
30+
//
31+
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
32+
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
33+
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
34+
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
35+
// FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
36+
// DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
37+
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
38+
// CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
39+
// OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
40+
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41+
42+
package chunkenc
43+
44+
import (
45+
"testing"
46+
47+
"github.com/prometheus/prometheus/util/testutil"
48+
)
49+
50+
func TestBstreamReader(t *testing.T) {
51+
// Write to the bit stream.
52+
w := bstream{}
53+
for _, bit := range []bit{true, false} {
54+
w.writeBit(bit)
55+
}
56+
for nbits := 1; nbits <= 64; nbits++ {
57+
w.writeBits(uint64(nbits), nbits)
58+
}
59+
for v := 1; v < 10000; v += 123 {
60+
w.writeBits(uint64(v), 29)
61+
}
62+
63+
// Read back.
64+
r := newBReader(w.bytes())
65+
for _, bit := range []bit{true, false} {
66+
v, err := r.readBitFast()
67+
if err != nil {
68+
v, err = r.readBit()
69+
}
70+
testutil.Ok(t, err)
71+
testutil.Equals(t, bit, v)
72+
}
73+
for nbits := uint8(1); nbits <= 64; nbits++ {
74+
v, err := r.readBitsFast(nbits)
75+
if err != nil {
76+
v, err = r.readBits(nbits)
77+
}
78+
testutil.Ok(t, err)
79+
testutil.Equals(t, uint64(nbits), v, "nbits=%d", nbits)
80+
}
81+
for v := 1; v < 10000; v += 123 {
82+
actual, err := r.readBitsFast(29)
83+
if err != nil {
84+
actual, err = r.readBits(29)
85+
}
86+
testutil.Ok(t, err)
87+
testutil.Equals(t, uint64(v), actual, "v=%d", v)
88+
}
89+
}

0 commit comments

Comments
 (0)