ListOffsets KIP : 396#4225
Conversation
cbd499c to
9f89db5
Compare
6b35ec6 to
3689716
Compare
Emanuele Sabellico (emasab)
left a comment
There was a problem hiding this comment.
First review comments about general checks and rdkafka.h additions.
09524c4 to
6fcce61
Compare
| RD_EXPORT | ||
| rd_kafka_error_t *rd_kafka_abort_transaction(rd_kafka_t *rk, int timeout_ms); | ||
|
|
||
| typedef enum rd_kafka_OffsetSpec_s { |
There was a problem hiding this comment.
Missing to move the section.
You can also put it after DeleteConsumerGroupOffsets, before Admin API - User SCRAM credentials
Emanuele Sabellico (emasab)
left a comment
There was a problem hiding this comment.
There are some things to change to keep backward compatibility for consumer rd_kafka_query_watermark_offsets and rd_kafka_offsets_for_times functions and to upgrade internal consumer calls too.
rd_kafka_buf_set_maker and upgrade to version 7
8f43390 to
f97979d
Compare
rd_kafka_query_watermark_offsets if partition leader changes
|
|
| if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { | ||
| /* Remove its cache in case the topic isn't a known topic. */ | ||
| rd_kafka_wrlock(rk); | ||
| rd_kafka_metadata_cache_delete_by_name(rk, state->topic); |
There was a problem hiding this comment.
I think we can check here also that if cache is hinted for this topic, and in that case, do nothing as a refresh is already ongoing.
There was a problem hiding this comment.
Same for ListOffsets
There was a problem hiding this comment.
This is already ensured by rd_kafka_metadata_cache_delete_by_name because it does a rd_kafka_metadata_cache_find with valid equal to 1.
Milind L (milindl)
left a comment
There was a problem hiding this comment.
It mostly looks great, thanks for the PR mahajanadhitya and Emanuele Sabellico (@emasab) !
Some minor comments about the conventions and declarations of the variables
Two comments for the tests.
Good fix for the not leader issue, let's slowly (in other PRs) see if we can extend it to other admin functions which have similar retriable errors.
| rd_usleep(100000, 0); | ||
| } else if (err) { | ||
| TEST_FAIL("Failed with error: %s", | ||
| rd_kafka_err2name(err)); |
There was a problem hiding this comment.
nit: break the outer loop here, it's cleaner that way without the else
| goto err; | ||
| } | ||
|
|
||
| rd_list_t *topic_partitions_sorted = rd_list_new( |
There was a problem hiding this comment.
Move declaration up
|
|
||
| rd_list_t *topic_partitions_sorted = rd_list_new( | ||
| topic_partitions->cnt, rd_kafka_topic_partition_destroy_free); | ||
| for (i = 0; i < topic_partitions->cnt; i++) { |
There was a problem hiding this comment.
| for (i = 0; i < topic_partitions->cnt; i++) { | |
| for (i = 0; i < topic_partitions->cnt; i++) |
| rd_list_add( | ||
| topic_partitions_sorted, | ||
| rd_kafka_topic_partition_copy(&topic_partitions->elems[i])); | ||
| } |
There was a problem hiding this comment.
| } | |
| err = rd_kafka_event_error(event); | ||
| if (err == RD_KAFKA_RESP_ERR__NOENT) { | ||
| /* Still looking for the leader */ | ||
| rd_usleep(100000, 0); |
There was a problem hiding this comment.
nit: continue the loop here, avoids if elses
| RD_KAFKA_ADMIN_OP_DESCRIBECLUSTER, /**< DescribeCluster */ | ||
| RD_KAFKA_ADMIN_OP__CNT /**< Number of ops defined */ | ||
| /** ListOffsets */ | ||
| RD_KAFKA_ADMIN_OP_LISTOFFSETS, |
There was a problem hiding this comment.
| RD_KAFKA_ADMIN_OP_LISTOFFSETS, | |
| RD_KAFKA_ADMIN_OP_LISTOFFSETS, /**< ListOffsets */ |
| RD_KAFKA_ADMIN_OP_DESCRIBETOPICS, /**< DescribeTopics */ | ||
| RD_KAFKA_ADMIN_OP_DESCRIBECLUSTER, /**< DescribeCluster */ | ||
| RD_KAFKA_ADMIN_OP__CNT /**< Number of ops defined */ | ||
| /** ListOffsets */ |
There was a problem hiding this comment.
| /** ListOffsets */ |
|
|
||
| rd_kafka_event_destroy(rkev); | ||
|
|
||
| rd_kafka_mock_broker_push_request_error_rtts( |
There was a problem hiding this comment.
Similar to the watermarks test - replace push of errors with rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 2); instead and let the mock handler introduce the error
|
|
||
| TEST_SAY("Testing offset %" PRId64 "\n", test_fixture.query); | ||
|
|
||
| rd_kafka_topic_partition_list_t *topic_partitions_copy = |
There was a problem hiding this comment.
declaration might need to be moved up
Emanuele Sabellico (emasab)
left a comment
There was a problem hiding this comment.
I approve. Given there are my changes too, let's wait for a second approval by Milind L (@milindl)
Milind L (milindl)
left a comment
There was a problem hiding this comment.
Thanks for these changes!
rd_kafka_query_watermark_offsets metadata refresh bug
… queue implemented