NONJAVACLI-4237 Fix memory leak in Producer.produce() when headers are provided and exceptions occur#2170
Conversation
|
🎉 All Contributor License Agreements have been signed. Ready to merge. |
8e5993d to
094d0c7
Compare
There was a problem hiding this comment.
Pull request overview
This PR fixes a memory leak in Producer.produce() that occurs when headers are provided and the method raises an exception (either BufferError when the queue is full, or RuntimeError when the producer is closed). The leak was caused by allocated header memory not being freed before returning from these error paths.
Key Changes
- Added
rd_kafka_headers_destroy()calls in two error paths withinProducer.produce()to properly free allocated headers before raising exceptions - Both fixes are properly guarded with
#ifdef RD_KAFKA_V_HEADERSto maintain compatibility
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
Build is failing. Maybe a transient error. Reran. |
|
Looks fine. Please add CHANGELOG. |
|
Can you reproduce the issue and add valgrind logs to confirm that we are not hitting memory leak anymore? |
|
Please fix the following
|
19b654e to
41d0fc7
Compare
|




Summary
Fixes a memory leak in
Producer.produce()that occurs when the method is called with a non-Noneheadersparameter and raisesBufferError(queue full) orRuntimeError(producer closed).Fixes #2167
Problem
When
Producer.produce()is called with headers and an error occurs, the allocatedrd_headersmemory is not freed before the function returns, causing a memory leak. This can be observed by repeatedly callingproduce()with headers while the producer's queue is full or after closing the producer.Root Cause
The
Producer_produce()function allocatesrd_headersviapy_headers_to_c()at line 287. However, in two error paths, the function returns without freeing this memory:!self->rk), the function returns early without freeingrd_headersrd_kafka_producev()returns an error, the function cleans upmsgstatebut doesn't freerd_headersbefore returningWhen
rd_kafka_producev()returns an error, it doesn't take ownership of the headers, so they must be freed manually by the caller.Solution
Added
rd_kafka_headers_destroy(rd_headers)calls in both error paths before returning:rd_headerswhen producer is closed before raisingRuntimeErrorrd_headerswhenProducer_producev()returns an error before raisingBufferErrororKafkaExceptionTesting
Reproduction Scripts
test_memory_leak_buffererror.py
Test Results
Before Fix:
After Fix:
Log
Before
After
References
produceis called withheadersand raises exception #2167