Producer.produce() leaks memory when it is called with a non-None headers parameter and the function raises BufferError (queue full) or RuntimeError (producer closed). The leak can be observed by repeatedly calling produce() with headers while the producer's queue is full or after closing the producer (see examples below).
The leak is caused by Producer_produce failing to free rd_headers before exiting early, or after rd_kafka_producev returns an error (without taking ownership of freeing the headers itself).
Reproduction script 1 (BufferError):
import itertools
import resource
from confluent_kafka import Producer
p = Producer({"bootstrap.servers": "127.0.0.1:9200"}) # non-existent server
data = b"hello world"
headers = {}
for i in itertools.count():
try:
p.produce("mytopic", value=data, headers=headers)
except BufferError:
pass
if i % 100_000 == 0:
rss_mb = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss // 1024
print(f"RSS: {rss_mb} MB")
if rss_mb > 200:
print("Memory usage exceeded limit, exiting.")
break
Reproduction script 2 (RuntimeError):
import itertools
import resource
from confluent_kafka import Producer
p = Producer({"bootstrap.servers": "127.0.0.1:9200"})
p.close()
data = b"hello world"
headers = {}
for i in itertools.count():
try:
p.produce("mytopic", value=data, headers=headers)
except RuntimeError:
pass
if i % 100_000 == 0:
rss_mb = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss // 1024
print(f"RSS: {rss_mb} MB")
if rss_mb > 200:
print("Memory usage exceeded limit, exiting.")
break
Valgrind excerpt:
==1283== 4,800,000 bytes in 100,000 blocks are definitely lost in loss record 48 of 48
==1283== at 0x48417B4: malloc (vg_replace_malloc.c:381)
==1283== by 0x5931D42: rd_malloc (rd.h:146)
==1283== by 0x5931D42: rd_kafka_headers_new (rdkafka_header.c:44)
==1283== by 0x5810674: py_headers_dict_to_c (confluent_kafka.c:1777)
==1283== by 0x5810674: py_headers_to_c (confluent_kafka.c:1802)
==1283== by 0x580CF08: Producer_produce (Producer.c:287)
Tested confluent-kafka-python v2.13.0 on Linux with Python 3.13.5.
Producer.produce()leaks memory when it is called with a non-Noneheadersparameter and the function raisesBufferError(queue full) orRuntimeError(producer closed). The leak can be observed by repeatedly callingproduce()with headers while the producer's queue is full or after closing the producer (see examples below).The leak is caused by
Producer_producefailing to freerd_headersbefore exiting early, or afterrd_kafka_producevreturns an error (without taking ownership of freeing the headers itself).Reproduction script 1 (
BufferError):Reproduction script 2 (
RuntimeError):Valgrind excerpt:
Tested confluent-kafka-python v2.13.0 on Linux with Python 3.13.5.