Description
Using a client with rd_kafka_queue_io_event_enable that is a consumer using rd_kafka_subscribe.
This works fine until metadata is requested via rd_kafka_metadata.
If the metadata is requested any time after making the subscription, the poll returns a message with error set to UNKNOWN_TOPIC_OR_PART (offset -1001) for every topic subscribed to (although the metadata returns correct results and the subscription continues getting messages).
If the metadata is requested prior to making any subscription, I dont get an error message from the poll.
How to reproduce
example client program (given topic exists called topic1 with partition 0, broker localhost:9092) purely to show the error message.
gcc main.c -lrdkafka -lz -lpthread -lssl
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <librdkafka/rdkafka.h>
int main(int argc,char** argv){
rd_kafka_t *rk=0;
rd_kafka_conf_t *conf=0;
char *brokers = "localhost:9092";
rd_kafka_queue_t *queue=0;
rd_kafka_resp_err_t err=0;
rd_kafka_topic_partition_list_t *t_partition;
rd_kafka_message_t *msg;
char errstr[512];
int spair[2];
if(pipe(spair)==-1){
fprintf(stderr, "pipe fail\n");
exit(1);
}
conf = rd_kafka_conf_new();
if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr,sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% %s\n", errstr);
exit(1);
}
if (rd_kafka_conf_set(conf, "group.id","0", errstr,sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% %s\n", errstr);
exit(1);
}
if (rd_kafka_conf_set(conf, "debug","all", errstr,sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% %s\n", errstr);
exit(1);
}
if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr,sizeof(errstr)))) {
fprintf(stderr,"Failed to create new consumer: %s\n",errstr);
exit(1);
}
err=rd_kafka_poll_set_consumer(rk);
if(RD_KAFKA_RESP_ERR_NO_ERROR != err){
fprintf(stderr,"rd_kafka_poll_set_consumer err %s\n",rd_kafka_err2str(err));
exit(1);
}
queue = rd_kafka_queue_get_consumer(rk);
if (!queue){
fprintf(stderr, "queue fail\n");
exit(1);
}
rd_kafka_queue_io_event_enable(queue,spair[1],"X",1);
rd_kafka_set_log_queue(rk,NULL);
///////// subscribe
printf("Subscribing...\n");
t_partition = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(t_partition,"test1",RD_KAFKA_PARTITION_UA);
if(RD_KAFKA_RESP_ERR_NO_ERROR != (err= rd_kafka_subscribe(rk, t_partition))){
fprintf(stderr,"rd_kafka_subscribe err %s\n",rd_kafka_err2str(err));
exit(1);
}
rd_kafka_topic_partition_list_destroy(t_partition);
//////// metadata (comment out this section to prevent error)
{
printf("Get metadata...\n");
const struct rd_kafka_metadata *metadata;
err = rd_kafka_metadata(rk, 1, 0, &metadata,5000);
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
fprintf(stderr,"Failed to acquire metadata: %s\n",rd_kafka_err2str(err));
exit(1);
}
rd_kafka_metadata_destroy(metadata);
}
///////// poll
printf("Poll...\n");
while((msg= rd_kafka_consumer_poll(rk, 10000))) {
if (msg->err)
fprintf(stderr,"Got error msg %d (%s) [topic %s offset %lld]\n",msg->err,rd_kafka_err2name(msg->err),rd_kafka_topic_name(msg->rkt),msg->offset);
else
fprintf(stderr,"Got msg [topic %s offset %lld]\n",rd_kafka_topic_name(msg->rkt),msg->offset);
rd_kafka_message_destroy(msg);
}
printf("Finished...\n");
return 1;
}
outputs
Subscribing...
Get metadata...
Poll...
Got error msg 3 (UNKNOWN_TOPIC_OR_PART) [topic test1 offset -1001]
Finished...
if you edit the code to put the metadata request before the subscription, the error msg no longer appears.
The debug log gets a TOPICERR entry only when metadata requested after subscription.
Checklist
IMPORTANT: We will close issues where the checklist has not been completed.
Please provide the following information:
Description
Using a client with
rd_kafka_queue_io_event_enablethat is a consumer usingrd_kafka_subscribe.This works fine until metadata is requested via
rd_kafka_metadata.If the metadata is requested any time after making the subscription, the poll returns a message with error set to UNKNOWN_TOPIC_OR_PART (offset -1001) for every topic subscribed to (although the metadata returns correct results and the subscription continues getting messages).
If the metadata is requested prior to making any subscription, I dont get an error message from the poll.
How to reproduce
example client program (given topic exists called
topic1with partition0, brokerlocalhost:9092) purely to show the error message.gcc main.c -lrdkafka -lz -lpthread -lssl
outputs
if you edit the code to put the metadata request before the subscription, the error msg no longer appears.
The debug log gets a
TOPICERRentry only when metadata requested after subscription.Checklist
IMPORTANT: We will close issues where the checklist has not been completed.
Please provide the following information:
2.3.07.0.1see example aboveOSX(x64)>debug=..as necessary) from librdkafka