Skip to content

Commit c80d6f4

Browse files
committed
kgo: add Buffered{Fetch,Produce}Bytes
This slows down fetching a little bit. If it is egregious, we can fix the perf by tracking the size buffered when processing the fetch itself, and then adding a new field to batch-untrack the size. That's left as an exercise for a person that cares. This is now done since the prior commit introduces buffered produce bytes, and we may as well add it while fetching for both-sizes consistency, and we may as well expose it.
1 parent 304559f commit c80d6f4

File tree

3 files changed

+23
-1
lines changed

3 files changed

+23
-1
lines changed

pkg/kgo/consumer.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ func (o Offset) At(at int64) Offset {
162162

163163
type consumer struct {
164164
bufferedRecords atomicI64
165+
bufferedBytes atomicI64
165166

166167
cl *Client
167168

@@ -285,6 +286,13 @@ func (cl *Client) BufferedFetchRecords() int64 {
285286
return cl.consumer.bufferedRecords.Load()
286287
}
287288

289+
// BufferedFetchBytes returns the number of bytes currently buffered from
290+
// fetching within the client. This is the sum of all keys, values, and header
291+
// keys/values. See the related [BufferedFetchRecords] for more information.
292+
func (cl *Client) BufferedFetchBytes() int64 {
293+
return cl.consumer.bufferedBytes.Load()
294+
}
295+
288296
type usedCursors map[*cursor]struct{}
289297

290298
func (u *usedCursors) use(c *cursor) {

pkg/kgo/producer.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,13 @@ func (cl *Client) BufferedProduceRecords() int64 {
8888
return cl.producer.bufferedRecords.Load()
8989
}
9090

91+
// BufferedProduceBytes returns the number of bytes currently buffered for
92+
// producing within the client. This is the sum of all keys, values, and header
93+
// keys/values. See the related [BufferedProduceRecords] for more information.
94+
func (cl *Client) BufferedProduceBytes() int64 {
95+
return cl.producer.bufferedBytes.Load()
96+
}
97+
9198
type unknownTopicProduces struct {
9299
buffered []promisedRec
93100
wait chan error // retryable errors

pkg/kgo/source.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,16 +323,23 @@ func (s *source) hook(f *Fetch, buffered, polled bool) {
323323
})
324324

325325
var nrecs int
326+
var nbytes int64
326327
for i := range f.Topics {
327328
t := &f.Topics[i]
328329
for j := range t.Partitions {
329-
nrecs += len(t.Partitions[j].Records)
330+
p := &t.Partitions[j]
331+
nrecs += len(p.Records)
332+
for k := range p.Records {
333+
nbytes += p.Records[k].userSize()
334+
}
330335
}
331336
}
332337
if buffered {
333338
s.cl.consumer.bufferedRecords.Add(int64(nrecs))
339+
s.cl.consumer.bufferedBytes.Add(nbytes)
334340
} else {
335341
s.cl.consumer.bufferedRecords.Add(-int64(nrecs))
342+
s.cl.consumer.bufferedBytes.Add(-nbytes)
336343
}
337344
}
338345

0 commit comments

Comments
 (0)