Skip to content

Memory leak when produce is called with headers and raises exception #2167

@tdryer

Description

@tdryer

Producer.produce() leaks memory when it is called with a non-None headers parameter and the function raises BufferError (queue full) or RuntimeError (producer closed). The leak can be observed by repeatedly calling produce() with headers while the producer's queue is full or after closing the producer (see examples below).

The leak is caused by Producer_produce failing to free rd_headers before exiting early, or after rd_kafka_producev returns an error (without taking ownership of freeing the headers itself).

Reproduction script 1 (BufferError):

import itertools
import resource
from confluent_kafka import Producer
p = Producer({"bootstrap.servers": "127.0.0.1:9200"})  # non-existent server
data = b"hello world"
headers = {}
for i in itertools.count():
    try:
        p.produce("mytopic", value=data, headers=headers)
    except BufferError:
        pass
    if i % 100_000 == 0:
        rss_mb = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss // 1024
        print(f"RSS: {rss_mb} MB")
        if rss_mb > 200:
            print("Memory usage exceeded limit, exiting.")
            break

Reproduction script 2 (RuntimeError):

import itertools
import resource
from confluent_kafka import Producer
p = Producer({"bootstrap.servers": "127.0.0.1:9200"})
p.close()
data = b"hello world"
headers = {}
for i in itertools.count():
    try:
        p.produce("mytopic", value=data, headers=headers)
    except RuntimeError:
        pass
    if i % 100_000 == 0:
        rss_mb = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss // 1024
        print(f"RSS: {rss_mb} MB")
        if rss_mb > 200:
            print("Memory usage exceeded limit, exiting.")
            break

Valgrind excerpt:

==1283== 4,800,000 bytes in 100,000 blocks are definitely lost in loss record 48 of 48
==1283==    at 0x48417B4: malloc (vg_replace_malloc.c:381)
==1283==    by 0x5931D42: rd_malloc (rd.h:146)
==1283==    by 0x5931D42: rd_kafka_headers_new (rdkafka_header.c:44)
==1283==    by 0x5810674: py_headers_dict_to_c (confluent_kafka.c:1777)
==1283==    by 0x5810674: py_headers_to_c (confluent_kafka.c:1802)
==1283==    by 0x580CF08: Producer_produce (Producer.c:287)

Tested confluent-kafka-python v2.13.0 on Linux with Python 3.13.5.

Metadata

Metadata

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions