Skip to content

Commit 7136e99

Browse files
authored
credentials/alts: Pool write buffers (#8919)
Fixes: #8906 This change removes the fixed-size read buffer used by each ALTS connection. A new, internal "dirty" buffer pool is introduced to allow fetching buffers without the overhead of clearing them first. This PR also addresses a few comments from #8910 that were not pushed before that PR was merged. ## Results A new micro-benchmark was added to measure memory usage by a `conn`, demonstrating a ~40% reduction in memory usage. Existing performance benchmark shows no regressions. ``` goos: linux goarch: amd64 pkg: google.golang.org/grpc/credentials/alts/internal/conn cpu: Intel(R) Xeon(R) CPU @ 2.60GHz │ old.txt │ new.txt │ │ sec/op │ sec/op vs base │ LargeMessage-48 80.81m ± ∞ ¹ 79.96m ± ∞ ¹ ~ (p=0.548 n=5) MemoryUsage-48 25.45µ ± ∞ ¹ 14.29µ ± ∞ ¹ -43.85% (p=0.008 n=5) geomean 1.434m 1.069m -25.46% ¹ need >= 6 samples for confidence interval at level 0.95 │ old.txt │ new.txt │ │ B/op │ B/op vs base │ LargeMessage-48 4.582Mi ± ∞ ¹ 4.578Mi ± ∞ ¹ -0.08% (p=0.008 n=5) MemoryUsage-48 147.16Ki ± ∞ ¹ 83.33Ki ± ∞ ¹ -43.38% (p=0.008 n=5) geomean 830.9Ki 625.0Ki -24.78% ¹ need >= 6 samples for confidence interval at level 0.95 │ old.txt │ new.txt │ │ allocs/op │ allocs/op vs base │ LargeMessage-48 2.000 ± ∞ ¹ 2.000 ± ∞ ¹ ~ (p=1.000 n=5) ² MemoryUsage-48 20.00 ± ∞ ¹ 18.00 ± ∞ ¹ -10.00% (p=0.008 n=5) geomean 6.325 6.000 -5.13% ¹ need >= 6 samples for confidence interval at level 0.95 ² all samples are equal ``` RELEASE NOTES: * credentials/alts: Pool write buffers to reduce memory usage
1 parent 46a31de commit 7136e99

5 files changed

Lines changed: 200 additions & 31 deletions

File tree

credentials/alts/internal/conn/record.go

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"net"
2828

2929
core "google.golang.org/grpc/credentials/alts/internal"
30+
"google.golang.org/grpc/internal/mem"
3031
)
3132

3233
// ALTSRecordCrypto is the interface for gRPC ALTS record protocol.
@@ -62,8 +63,6 @@ const (
6263
altsRecordDefaultLength = 4 * 1024 // 4KiB
6364
// Message type value included in ALTS record framing.
6465
altsRecordMsgType = uint32(0x06)
65-
// The initial write buffer size.
66-
altsWriteBufferInitialSize = 32 * 1024 // 32KiB
6766
// The maximum write buffer size. This *must* be multiple of
6867
// altsRecordDefaultLength.
6968
altsWriteBufferMaxSize = 512 * 1024 // 512KiB
@@ -74,9 +73,26 @@ const (
7473
)
7574

7675
var (
77-
protocols = make(map[string]ALTSRecordFunc)
76+
protocols = make(map[string]ALTSRecordFunc)
77+
writeBufPool *mem.BinaryTieredBufferPool
7878
)
7979

80+
func init() {
81+
pool, err := mem.NewDirtyBinaryTieredBufferPool(
82+
8,
83+
12, // Go page size, 4KB
84+
14, // 16KB (max HTTP/2 frame size used by gRPC)
85+
15, // 32KB (default buffer size for gRPC)
86+
16, // 64KB
87+
17, // 128KB
88+
19, // 512KB, max write buffer size
89+
)
90+
if err != nil {
91+
panic(fmt.Sprintf("Failed to create write buffer pool: %v", err))
92+
}
93+
writeBufPool = pool
94+
}
95+
8096
// RegisterProtocol register a ALTS record encryption protocol.
8197
func RegisterProtocol(protocol string, f ALTSRecordFunc) error {
8298
if _, ok := protocols[protocol]; ok {
@@ -97,9 +113,6 @@ type conn struct {
97113
// protected holds data read from the network but have not yet been
98114
// decrypted. This data might not compose a complete frame.
99115
protected []byte
100-
// writeBuf is a buffer used to contain encrypted frames before being
101-
// written to the network.
102-
writeBuf []byte
103116
// nextFrame stores the next frame (in protected buffer) info.
104117
nextFrame []byte
105118
// overhead is the calculated overhead of each frame.
@@ -132,7 +145,6 @@ func NewConn(c net.Conn, side core.Side, recordProtocol string, key []byte, prot
132145
crypto: crypto,
133146
payloadLengthLimit: payloadLengthLimit,
134147
protected: protectedBuf,
135-
writeBuf: make([]byte, altsWriteBufferInitialSize),
136148
nextFrame: protectedBuf,
137149
overhead: overhead,
138150
}
@@ -233,16 +245,16 @@ func (p *conn) Write(b []byte) (n int, err error) {
233245
// Calculate the output buffer size with framing and encryption overhead.
234246
numOfFrames := int(math.Ceil(float64(len(b)) / float64(p.payloadLengthLimit)))
235247
size := len(b) + numOfFrames*p.overhead
236-
// If writeBuf is too small, increase its size up to the maximum size.
237248
partialBSize := len(b)
238249
if size > altsWriteBufferMaxSize {
239250
size = altsWriteBufferMaxSize
240251
const numOfFramesInMaxWriteBuf = altsWriteBufferMaxSize / altsRecordDefaultLength
241252
partialBSize = numOfFramesInMaxWriteBuf * p.payloadLengthLimit
242253
}
243-
if len(p.writeBuf) < size {
244-
p.writeBuf = make([]byte, size)
245-
}
254+
// Get a writeBuf of the required length.
255+
bufHandle := writeBufPool.Get(size)
256+
defer writeBufPool.Put(bufHandle)
257+
writeBuf := *bufHandle
246258

247259
for partialBStart := 0; partialBStart < len(b); partialBStart += partialBSize {
248260
partialBEnd := partialBStart + partialBSize
@@ -263,7 +275,7 @@ func (p *conn) Write(b []byte) (n int, err error) {
263275
// if any.
264276

265277
// 1. Fill in type field.
266-
msg := p.writeBuf[writeBufIndex+MsgLenFieldSize:]
278+
msg := writeBuf[writeBufIndex+MsgLenFieldSize:]
267279
binary.LittleEndian.PutUint32(msg, altsRecordMsgType)
268280

269281
// 2. Encrypt the payload and create a tag if any.
@@ -273,12 +285,12 @@ func (p *conn) Write(b []byte) (n int, err error) {
273285
}
274286

275287
// 3. Fill in the size field.
276-
binary.LittleEndian.PutUint32(p.writeBuf[writeBufIndex:], uint32(len(msg)))
288+
binary.LittleEndian.PutUint32(writeBuf[writeBufIndex:], uint32(len(msg)))
277289

278290
// 4. Increase writeBufIndex.
279291
writeBufIndex += len(buf) + p.overhead
280292
}
281-
nn, err := p.Conn.Write(p.writeBuf[:writeBufIndex])
293+
nn, err := p.Conn.Write(writeBuf[:writeBufIndex])
282294
if err != nil {
283295
// We need to calculate the actual data size that was
284296
// written. This means we need to remove header,

credentials/alts/internal/conn/record_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,3 +369,17 @@ func (s) TestProtectedBuffer(t *testing.T) {
369369
testProtectedBuffer(t, rp)
370370
}
371371
}
372+
373+
// BenchmarkMemoryUsage measures the allocations per ALTS connection.
374+
// Run this with: go test -bench=BenchmarkMemoryUsage -benchmem
375+
func BenchmarkMemoryUsage(b *testing.B) {
376+
b.ReportAllocs()
377+
378+
for i := 0; i < b.N; i++ {
379+
c, _ := newConnPair(rekeyRecordProtocol, nil, nil)
380+
381+
if _, err := c.Write([]byte("d")); err != nil {
382+
b.Fatalf("Write failed: %v", err)
383+
}
384+
}
385+
}
Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616
*
1717
*/
1818

19-
// Package mem provides a tiered buffer pool implementation for efficient memory management.
19+
// Package mem provides utilities that facilitate memory reuse in byte slices
20+
// that are used as buffers.
2021
package mem
2122

2223
import (
@@ -46,7 +47,8 @@ type bufferPool interface {
4647
Put(*[]byte)
4748
}
4849

49-
// BinaryTieredBufferPool is a buffer pool that uses multiple sub-pools with power-of-two sizes.
50+
// BinaryTieredBufferPool is a buffer pool that uses multiple sub-pools with
51+
// power-of-two sizes.
5052
type BinaryTieredBufferPool struct {
5153
// exponentToNextLargestPoolMap maps a power-of-two exponent (e.g., 12 for
5254
// 4KB) to the index of the next largest sizedBufferPool. This is used by
@@ -69,12 +71,27 @@ type BinaryTieredBufferPool struct {
6971
// of 2), not the raw byte sizes. For example, to create a pool of 16KB buffers
7072
// (2^14 bytes), pass 14 as the argument.
7173
func NewBinaryTieredBufferPool(powerOfTwoExponents ...uint8) (*BinaryTieredBufferPool, error) {
74+
return newBinaryTiered(func(size int) bufferPool {
75+
return newSizedBufferPool(size, true)
76+
}, &simpleBufferPool{shouldZero: true}, powerOfTwoExponents...)
77+
}
78+
79+
// NewDirtyBinaryTieredBufferPool returns a BufferPool backed by multiple
80+
// sub-pools. It is similar to NewBinaryTieredBufferPool but it does not
81+
// initialize the buffers before returning them.
82+
func NewDirtyBinaryTieredBufferPool(powerOfTwoExponents ...uint8) (*BinaryTieredBufferPool, error) {
83+
return newBinaryTiered(func(size int) bufferPool {
84+
return newSizedBufferPool(size, false)
85+
}, &simpleBufferPool{shouldZero: false}, powerOfTwoExponents...)
86+
}
87+
88+
func newBinaryTiered(sizedPoolFactory func(int) bufferPool, fallbackPool bufferPool, powerOfTwoExponents ...uint8) (*BinaryTieredBufferPool, error) {
7289
slices.Sort(powerOfTwoExponents)
7390
powerOfTwoExponents = slices.Compact(powerOfTwoExponents)
7491

7592
// Determine the maximum exponent we need to support. This depends on the
7693
// word size (32-bit vs 64-bit).
77-
maxExponent := uintSize - 1
94+
maxExponent := uintSize - 2
7895
indexOfNextLargestBit := slices.Repeat([]int{-1}, maxExponent+1)
7996
indexOfPreviousLargestBit := slices.Repeat([]int{-1}, maxExponent+1)
8097

@@ -88,7 +105,7 @@ func NewBinaryTieredBufferPool(powerOfTwoExponents ...uint8) (*BinaryTieredBuffe
88105
return nil, fmt.Errorf("mem: allocating slice of size 2^%d is not possible", exp)
89106
}
90107
tierSize := 1 << exp
91-
pools = append(pools, newSizedBufferPool(tierSize))
108+
pools = append(pools, sizedPoolFactory(tierSize))
92109
maxTier = max(maxTier, tierSize)
93110

94111
// Map the exact power of 2 to this pool index.
@@ -117,7 +134,7 @@ func NewBinaryTieredBufferPool(powerOfTwoExponents ...uint8) (*BinaryTieredBuffe
117134
exponentToPreviousLargestPoolMap: indexOfPreviousLargestBit,
118135
sizedPools: pools,
119136
maxPoolCap: maxTier,
120-
fallbackPool: &simpleBufferPool{},
137+
fallbackPool: fallbackPool,
121138
}, nil
122139
}
123140

@@ -203,6 +220,7 @@ func (NopBufferPool) Put(*[]byte) {
203220
type sizedBufferPool struct {
204221
pool sync.Pool
205222
defaultSize int
223+
shouldZero bool
206224
}
207225

208226
func (p *sizedBufferPool) Get(size int) *[]byte {
@@ -212,7 +230,9 @@ func (p *sizedBufferPool) Get(size int) *[]byte {
212230
return &buf
213231
}
214232
b := *buf
215-
clear(b[:cap(b)])
233+
if p.shouldZero {
234+
clear(b[:cap(b)])
235+
}
216236
*buf = b[:size]
217237
return buf
218238
}
@@ -227,9 +247,10 @@ func (p *sizedBufferPool) Put(buf *[]byte) {
227247
p.pool.Put(buf)
228248
}
229249

230-
func newSizedBufferPool(size int) *sizedBufferPool {
250+
func newSizedBufferPool(size int, zero bool) *sizedBufferPool {
231251
return &sizedBufferPool{
232252
defaultSize: size,
253+
shouldZero: zero,
233254
}
234255
}
235256

@@ -246,10 +267,11 @@ func NewTieredBufferPool(poolSizes ...int) *TieredBufferPool {
246267
sort.Ints(poolSizes)
247268
pools := make([]*sizedBufferPool, len(poolSizes))
248269
for i, s := range poolSizes {
249-
pools[i] = newSizedBufferPool(s)
270+
pools[i] = newSizedBufferPool(s, true)
250271
}
251272
return &TieredBufferPool{
252-
sizedPools: pools,
273+
sizedPools: pools,
274+
fallbackPool: simpleBufferPool{shouldZero: true},
253275
}
254276
}
255277

@@ -280,13 +302,16 @@ func (p *TieredBufferPool) getPool(size int) bufferPool {
280302
// acquire a buffer from the pool but if that buffer is too small, it returns it
281303
// to the pool and creates a new one.
282304
type simpleBufferPool struct {
283-
pool sync.Pool
305+
pool sync.Pool
306+
shouldZero bool
284307
}
285308

286309
func (p *simpleBufferPool) Get(size int) *[]byte {
287310
bs, ok := p.pool.Get().(*[]byte)
288311
if ok && cap(*bs) >= size {
289-
clear((*bs)[:cap(*bs)])
312+
if p.shouldZero {
313+
clear((*bs)[:cap(*bs)])
314+
}
290315
*bs = (*bs)[:size]
291316
return bs
292317
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
*
3+
* Copyright 2026 gRPC authors.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package mem_test
20+
21+
import (
22+
"testing"
23+
"unsafe"
24+
25+
"google.golang.org/grpc/internal/grpctest"
26+
"google.golang.org/grpc/internal/mem"
27+
)
28+
29+
type s struct {
30+
grpctest.Tester
31+
}
32+
33+
func Test(t *testing.T) {
34+
grpctest.RunSubTests(t, s{})
35+
}
36+
37+
func (s) TestBufferPool_Clears(t *testing.T) {
38+
poolConfigs := []struct {
39+
name string
40+
factory func() (*mem.BinaryTieredBufferPool, error)
41+
wantCleared bool
42+
bufferSize int
43+
}{
44+
{
45+
name: "regular_sized",
46+
factory: func() (*mem.BinaryTieredBufferPool, error) {
47+
return mem.NewBinaryTieredBufferPool(3) // 8 bytes
48+
},
49+
bufferSize: 8,
50+
wantCleared: true,
51+
},
52+
{
53+
name: "regular_fallback",
54+
factory: func() (*mem.BinaryTieredBufferPool, error) {
55+
return mem.NewBinaryTieredBufferPool(3)
56+
},
57+
bufferSize: 10,
58+
wantCleared: true,
59+
},
60+
{
61+
name: "dirty_sized",
62+
factory: func() (*mem.BinaryTieredBufferPool, error) {
63+
return mem.NewDirtyBinaryTieredBufferPool(3)
64+
},
65+
bufferSize: 8,
66+
wantCleared: false,
67+
},
68+
{
69+
name: "dirty_fallback",
70+
factory: func() (*mem.BinaryTieredBufferPool, error) {
71+
return mem.NewDirtyBinaryTieredBufferPool(3)
72+
},
73+
bufferSize: 10,
74+
wantCleared: false,
75+
},
76+
}
77+
78+
for _, tc := range poolConfigs {
79+
t.Run(tc.name, func(t *testing.T) {
80+
pool, err := tc.factory()
81+
if err != nil {
82+
t.Fatalf("Failed to create pool: %v", err)
83+
}
84+
85+
for {
86+
buf1 := pool.Get(tc.bufferSize)
87+
// Mark the buffer with data.
88+
for i := range *buf1 {
89+
(*buf1)[i] = 0xAA
90+
}
91+
pool.Put(buf1)
92+
93+
buf2 := pool.Get(tc.bufferSize)
94+
// Check if we got the same underlying array.
95+
if unsafe.SliceData(*buf1) != unsafe.SliceData(*buf2) {
96+
pool.Put(buf2)
97+
continue
98+
}
99+
100+
// We have a reused buffer. Check if it's cleared.
101+
gotCleared := true
102+
for _, b := range *buf2 {
103+
if b != 0 {
104+
gotCleared = false
105+
break
106+
}
107+
}
108+
109+
if tc.wantCleared != gotCleared {
110+
t.Fatalf("buffer cleared state mismatch: want %t, got %v", tc.wantCleared, gotCleared)
111+
}
112+
113+
pool.Put(buf2)
114+
break
115+
}
116+
})
117+
}
118+
}

0 commit comments

Comments
 (0)