-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Memory issues with 0.7 branch in multithreaded program #109
Description
I am trying to use the 0.7 producer api inside a multithreaded c++ program. However the program aborts after the produce() call with some memory problems.
Here is the code I used.
Kafka kafka;
char *broker = "localhost:9092";
char* topic="test";
int partition = 0;
if (!(kafka.setHandle(RD_KAFKA_PRODUCER, broker, NULL))) {
perror("kafka_new producer");
exit(1);
}
char* testString = "Hello";
int len = 5;
kafka.produce(topic, partition,
RD_KAFKA_OP_F_FREE, testString, len);
The program aborts with the following error message :
*** glibc detected *** /home/y/bin64/ult_producer: munmap_chunk(): invalid pointer: 0x000000000043b791 ***
Running the program under valgrind shows the following:
Thread 2:
==15004== Invalid free() / delete / delete[] / realloc()
==15004== at 0x4A063F0: free (vg_replace_malloc.c:446)
==15004== by 0x5D79F36: rd_kafka_op_destroy (rdkafka.c:400)
==15004== by 0x5D7BA8A: rd_kafka_thread_main (rdkafka.c:907)
==15004== by 0x3C2E6079D0: start_thread (in /lib64/libpthread-2.12.so)
==15004== by 0x3C2E2E8B6C: clone (in /lib64/libc-2.12.so)
==15004== Address 0x43b791 is not stack'd, malloc'd or (recently) free'd
Am I missing something in the code, which is leading to this issue? Please help.
Also, how can I send message randomly to some partition, rather than sending to specific partition?