Skip to content

Commit 304559f

Browse files
committed
kgo: support MaxBufferedBytes
Closes #544.
1 parent 01651af commit 304559f

File tree

5 files changed

+69
-8
lines changed

5 files changed

+69
-8
lines changed

pkg/kgo/client.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,8 @@ func (cl *Client) OptValues(opt any) []any {
298298
return []any{cfg.maxRecordBatchBytes}
299299
case namefn(MaxBufferedRecords):
300300
return []any{cfg.maxBufferedRecords}
301+
case namefn(MaxBufferedBytes):
302+
return []any{cfg.maxBufferedBytes}
301303
case namefn(RecordPartitioner):
302304
return []any{cfg.partitioner}
303305
case namefn(ProduceRequestTimeout):

pkg/kgo/config.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ type cfg struct {
114114
defaultProduceTopic string
115115
maxRecordBatchBytes int32
116116
maxBufferedRecords int64
117+
maxBufferedBytes int64
117118
produceTimeout time.Duration
118119
recordRetries int64
119120
maxUnknownFailures int64
@@ -293,6 +294,7 @@ func (cfg *cfg) validate() error {
293294

294295
// Some random producer settings.
295296
{name: "max buffered records", v: cfg.maxBufferedRecords, allowed: 1, badcmp: i64lt},
297+
{name: "max buffered bytes", v: cfg.maxBufferedBytes, allowed: 0, badcmp: i64lt},
296298
{name: "linger", v: int64(cfg.linger), allowed: int64(time.Minute), badcmp: i64gt, durs: true},
297299
{name: "produce timeout", v: int64(cfg.produceTimeout), allowed: int64(100 * time.Millisecond), badcmp: i64lt, durs: true},
298300
{name: "record timeout", v: int64(cfg.recordTimeout), allowed: int64(time.Second), badcmp: func(l, r int64) (bool, string) {
@@ -948,6 +950,23 @@ func MaxBufferedRecords(n int) ProducerOpt {
948950
return producerOpt{func(cfg *cfg) { cfg.maxBufferedRecords = int64(n) }}
949951
}
950952

953+
// MaxBufferedBytes sets the max amount of bytes that the client will buffer
954+
// while producing, blocking produces until records are finished if this limit
955+
// is reached. This overrides the unlimited default.
956+
//
957+
// Note that this option does _not_ apply for consuming: the client cannot
958+
// limit bytes buffered for consuming because of decompression. You can roughly
959+
// control consuming memory by using [MaxConcurrentFetches], [FetchMaxBytes],
960+
// and [FetchMaxPartitionBytes].
961+
//
962+
// If you produce a record that is larger than n, the record is immediately
963+
// failed with kerr.MessageTooLarge.
964+
//
965+
// Note that this limit applies after [MaxBufferedRecords].
966+
func MaxBufferedBytes(n int) ProducerOpt {
967+
return producerOpt{func(cfg *cfg) { cfg.maxBufferedBytes = int64(n) }}
968+
}
969+
951970
// RecordPartitioner uses the given partitioner to partition records, overriding
952971
// the default UniformBytesPartitioner(64KiB, true, true, nil).
953972
func RecordPartitioner(partitioner Partitioner) ProducerOpt {
@@ -1047,8 +1066,8 @@ func ProducerLinger(linger time.Duration) ProducerOpt {
10471066
// ManualFlushing disables auto-flushing when producing. While you can still
10481067
// set lingering, it would be useless to do so.
10491068
//
1050-
// With manual flushing, producing while MaxBufferedRecords have already been
1051-
// produced and not flushed will return ErrMaxBuffered.
1069+
// With manual flushing, producing while MaxBufferedRecords or MaxBufferedBytes
1070+
// have already been produced and not flushed will return ErrMaxBuffered.
10521071
func ManualFlushing() ProducerOpt {
10531072
return producerOpt{func(cfg *cfg) { cfg.manualFlushing = true }}
10541073
}

pkg/kgo/group_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ func TestGroupETL(t *testing.T) {
3939
getSeedBrokers(),
4040
WithLogger(BasicLogger(os.Stderr, testLogLevel, nil)),
4141
MaxBufferedRecords(10000),
42+
MaxBufferedBytes(50000),
4243
UnknownTopicRetries(-1), // see txn_test comment
4344
)
4445
defer cl.Close()
@@ -124,6 +125,7 @@ func (c *testConsumer) etl(etlsBeforeQuit int) {
124125
ConsumeTopics(c.consumeFrom),
125126
Balancers(c.balancer),
126127
MaxBufferedRecords(10000),
128+
MaxBufferedBytes(50000),
127129
ConsumePreferringLagFn(PreferLagAt(1)),
128130
BlockRebalanceOnPoll(),
129131

pkg/kgo/producer.go

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515

1616
type producer struct {
1717
bufferedRecords atomicI64
18+
bufferedBytes atomicI64
1819
inflight atomicI64 // high 16: # waiters, low 48: # inflight
1920

2021
cl *Client
@@ -313,8 +314,9 @@ func (f *FirstErrPromise) Err() error {
313314
}
314315

315316
// TryProduce is similar to Produce, but rather than blocking if the client
316-
// currently has MaxBufferedRecords buffered, this fails immediately with
317-
// ErrMaxBuffered. See the Produce documentation for more details.
317+
// currently has MaxBufferedRecords or MaxBufferedBytes buffered, this fails
318+
// immediately with ErrMaxBuffered. See the Produce documentation for more
319+
// details.
318320
func (cl *Client) TryProduce(
319321
ctx context.Context,
320322
r *Record,
@@ -387,6 +389,21 @@ func (cl *Client) produce(
387389
}
388390
}
389391

392+
var (
393+
userSize = r.userSize()
394+
bufRecs = p.bufferedRecords.Add(1)
395+
bufBytes = p.bufferedBytes.Add(userSize)
396+
overMaxRecs = bufRecs > cl.cfg.maxBufferedRecords
397+
overMaxBytes bool
398+
)
399+
if cl.cfg.maxBufferedBytes > 0 {
400+
if userSize > cl.cfg.maxBufferedBytes {
401+
p.promiseRecord(promisedRec{ctx, promise, r}, kerr.MessageTooLarge)
402+
return
403+
}
404+
overMaxBytes = bufBytes > cl.cfg.maxBufferedBytes
405+
}
406+
390407
if r.Topic == "" {
391408
p.promiseRecord(promisedRec{ctx, promise, r}, errNoTopic)
392409
return
@@ -396,7 +413,11 @@ func (cl *Client) produce(
396413
return
397414
}
398415

399-
if p.bufferedRecords.Add(1) > cl.cfg.maxBufferedRecords {
416+
if overMaxRecs || overMaxBytes {
417+
cl.cfg.logger.Log(LogLevelDebug, "blocking Produce because we are either over max buffered records or max buffered bytes",
418+
"over_max_records", overMaxRecs,
419+
"over_max_bytes", overMaxBytes,
420+
)
400421
// If the client ctx cancels or the produce ctx cancels, we
401422
// need to un-count our buffering of this record. We also need
402423
// to drain a slot from the waitBuffer chan, which could be
@@ -411,11 +432,14 @@ func (cl *Client) produce(
411432
}
412433
select {
413434
case <-p.waitBuffer:
435+
cl.cfg.logger.Log(LogLevelDebug, "Produce block signaled, continuing to produce")
414436
case <-cl.ctx.Done():
415437
drainBuffered(ErrClientClosed)
438+
cl.cfg.logger.Log(LogLevelDebug, "client ctx canceled while blocked in Produce, returning")
416439
return
417440
case <-ctx.Done():
418441
drainBuffered(ctx.Err())
442+
cl.cfg.logger.Log(LogLevelDebug, "produce ctx canceled while blocked in Produce, returning")
419443
return
420444
}
421445
}
@@ -478,15 +502,21 @@ func (cl *Client) finishRecordPromise(pr promisedRec, err error) {
478502
}
479503
}
480504

505+
// Capture user size before potential modification by the promise.
506+
userSize := pr.userSize()
507+
nowBufBytes := p.bufferedBytes.Add(-userSize)
508+
nowBufRecs := p.bufferedRecords.Add(-1)
509+
wasOverMaxRecs := nowBufRecs >= cl.cfg.maxBufferedRecords
510+
wasOverMaxBytes := cl.cfg.maxBufferedBytes > 0 && nowBufBytes+userSize > cl.cfg.maxBufferedBytes
511+
481512
// We call the promise before finishing the record; this allows users
482513
// of Flush to know that all buffered records are completely done
483514
// before Flush returns.
484515
pr.promise(pr.Record, err)
485516

486-
buffered := p.bufferedRecords.Add(-1)
487-
if buffered >= cl.cfg.maxBufferedRecords {
517+
if wasOverMaxRecs || wasOverMaxBytes {
488518
p.waitBuffer <- struct{}{}
489-
} else if buffered == 0 && p.flushing.Load() > 0 {
519+
} else if nowBufRecs == 0 && p.flushing.Load() > 0 {
490520
p.mu.Lock()
491521
p.mu.Unlock() //nolint:gocritic,staticcheck // We use the lock as a barrier, unlocking immediately is safe.
492522
p.c.Broadcast()

pkg/kgo/record_and_fetch.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,14 @@ type Record struct {
151151
Context context.Context
152152
}
153153

154+
func (r *Record) userSize() int64 {
155+
s := len(r.Key) + len(r.Value)
156+
for _, h := range r.Headers {
157+
s += len(h.Key) + len(h.Value)
158+
}
159+
return int64(s)
160+
}
161+
154162
// When buffering records, we calculate the length and tsDelta ahead of time
155163
// (also because number width affects encoding length). We repurpose the Offset
156164
// field to save space.

0 commit comments

Comments
 (0)