@@ -15,6 +15,7 @@ import (
1515
1616type producer struct {
1717 bufferedRecords atomicI64
18+ bufferedBytes atomicI64
1819 inflight atomicI64 // high 16: # waiters, low 48: # inflight
1920
2021 cl * Client
@@ -313,8 +314,9 @@ func (f *FirstErrPromise) Err() error {
313314}
314315
315316// TryProduce is similar to Produce, but rather than blocking if the client
316- // currently has MaxBufferedRecords buffered, this fails immediately with
317- // ErrMaxBuffered. See the Produce documentation for more details.
317+ // currently has MaxBufferedRecords or MaxBufferedBytes buffered, this fails
318+ // immediately with ErrMaxBuffered. See the Produce documentation for more
319+ // details.
318320func (cl * Client ) TryProduce (
319321 ctx context.Context ,
320322 r * Record ,
@@ -387,6 +389,21 @@ func (cl *Client) produce(
387389 }
388390 }
389391
392+ var (
393+ userSize = r .userSize ()
394+ bufRecs = p .bufferedRecords .Add (1 )
395+ bufBytes = p .bufferedBytes .Add (userSize )
396+ overMaxRecs = bufRecs > cl .cfg .maxBufferedRecords
397+ overMaxBytes bool
398+ )
399+ if cl .cfg .maxBufferedBytes > 0 {
400+ if userSize > cl .cfg .maxBufferedBytes {
401+ p .promiseRecord (promisedRec {ctx , promise , r }, kerr .MessageTooLarge )
402+ return
403+ }
404+ overMaxBytes = bufBytes > cl .cfg .maxBufferedBytes
405+ }
406+
390407 if r .Topic == "" {
391408 p .promiseRecord (promisedRec {ctx , promise , r }, errNoTopic )
392409 return
@@ -396,7 +413,11 @@ func (cl *Client) produce(
396413 return
397414 }
398415
399- if p .bufferedRecords .Add (1 ) > cl .cfg .maxBufferedRecords {
416+ if overMaxRecs || overMaxBytes {
417+ cl .cfg .logger .Log (LogLevelDebug , "blocking Produce because we are either over max buffered records or max buffered bytes" ,
418+ "over_max_records" , overMaxRecs ,
419+ "over_max_bytes" , overMaxBytes ,
420+ )
400421 // If the client ctx cancels or the produce ctx cancels, we
401422 // need to un-count our buffering of this record. We also need
402423 // to drain a slot from the waitBuffer chan, which could be
@@ -411,11 +432,14 @@ func (cl *Client) produce(
411432 }
412433 select {
413434 case <- p .waitBuffer :
435+ cl .cfg .logger .Log (LogLevelDebug , "Produce block signaled, continuing to produce" )
414436 case <- cl .ctx .Done ():
415437 drainBuffered (ErrClientClosed )
438+ cl .cfg .logger .Log (LogLevelDebug , "client ctx canceled while blocked in Produce, returning" )
416439 return
417440 case <- ctx .Done ():
418441 drainBuffered (ctx .Err ())
442+ cl .cfg .logger .Log (LogLevelDebug , "produce ctx canceled while blocked in Produce, returning" )
419443 return
420444 }
421445 }
@@ -478,15 +502,21 @@ func (cl *Client) finishRecordPromise(pr promisedRec, err error) {
478502 }
479503 }
480504
505+ // Capture user size before potential modification by the promise.
506+ userSize := pr .userSize ()
507+ nowBufBytes := p .bufferedBytes .Add (- userSize )
508+ nowBufRecs := p .bufferedRecords .Add (- 1 )
509+ wasOverMaxRecs := nowBufRecs >= cl .cfg .maxBufferedRecords
510+ wasOverMaxBytes := cl .cfg .maxBufferedBytes > 0 && nowBufBytes + userSize > cl .cfg .maxBufferedBytes
511+
481512 // We call the promise before finishing the record; this allows users
482513 // of Flush to know that all buffered records are completely done
483514 // before Flush returns.
484515 pr .promise (pr .Record , err )
485516
486- buffered := p .bufferedRecords .Add (- 1 )
487- if buffered >= cl .cfg .maxBufferedRecords {
517+ if wasOverMaxRecs || wasOverMaxBytes {
488518 p .waitBuffer <- struct {}{}
489- } else if buffered == 0 && p .flushing .Load () > 0 {
519+ } else if nowBufRecs == 0 && p .flushing .Load () > 0 {
490520 p .mu .Lock ()
491521 p .mu .Unlock () //nolint:gocritic,staticcheck // We use the lock as a barrier, unlocking immediately is safe.
492522 p .c .Broadcast ()
0 commit comments