Skip to content

ListOffsets KIP : 396#4225

Merged
Emanuele Sabellico (emasab) merged 22 commits intomasterfrom
feature/listOffsets-AdminClient
Oct 17, 2023
Merged

ListOffsets KIP : 396#4225
Emanuele Sabellico (emasab) merged 22 commits intomasterfrom
feature/listOffsets-AdminClient

Conversation

@mahajanadhitya
Copy link
Copy Markdown
Contributor

… queue implemented

@mahajanadhitya mahajanadhitya force-pushed the feature/listOffsets-AdminClient branch from cbd499c to 9f89db5 Compare May 26, 2023 11:42
@mahajanadhitya mahajanadhitya force-pushed the feature/listOffsets-AdminClient branch from 6b35ec6 to 3689716 Compare July 18, 2023 09:58
@cla-assistant
Copy link
Copy Markdown

cla-assistant Bot commented Aug 21, 2023

CLA assistant check
All committers have signed the CLA.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First review comments about general checks and rdkafka.h additions.

Comment thread examples/list_offsets.c
Comment thread src/rdkafka.h Outdated
Comment thread src/rdkafka.h Outdated
Comment thread src/rdkafka.h Outdated
Comment thread examples/list_offsets.c Outdated
Comment thread src/rdkafka.h Outdated
Comment thread src/rdkafka.h
Comment thread src/rdkafka.h Outdated
Comment thread src/rdkafka.h Outdated
Comment thread src/rdkafka.h Outdated
@mahajanadhitya mahajanadhitya force-pushed the feature/listOffsets-AdminClient branch 3 times, most recently from 09524c4 to 6fcce61 Compare September 28, 2023 06:16
Comment thread examples/list_offsets Outdated
Comment thread src/rdkafka.h Outdated
Comment thread src/rdkafka.h
Comment thread src/rdkafka.h Outdated
Comment thread src/rdkafka.h Outdated
Comment thread src/rdkafka.h Outdated
Comment thread src/rdkafka.h Outdated
Comment thread src/rdkafka.h Outdated
Comment thread src/rdkafka.h Outdated
Comment thread src/rdkafka.h Outdated
Comment thread src/rdkafka.h Outdated
Comment thread src/rdkafka.h Outdated
Comment thread src/rdkafka.h Outdated
RD_EXPORT
rd_kafka_error_t *rd_kafka_abort_transaction(rd_kafka_t *rk, int timeout_ms);

typedef enum rd_kafka_OffsetSpec_s {
Copy link
Copy Markdown
Contributor

@emasab Emanuele Sabellico (emasab) Oct 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing to move the section.
You can also put it after DeleteConsumerGroupOffsets, before Admin API - User SCRAM credentials

Comment thread src/rdkafka_admin.h Outdated
Comment thread src/rdkafka.h Outdated
Comment thread src/rdkafka.h Outdated
Comment thread src/rdkafka.h
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some things to change to keep backward compatibility for consumer rd_kafka_query_watermark_offsets and rd_kafka_offsets_for_times functions and to upgrade internal consumer calls too.

Comment thread src/rdkafka_admin.c Outdated
Comment thread src/rdkafka_admin.h Outdated
Comment thread src/rdkafka_admin.h Outdated
Comment thread src/rdkafka_request.c Outdated
Comment thread src/rdkafka_request.c Outdated
Comment thread src/rdkafka_request.c Outdated
Comment thread src/rdkafka_request.c Outdated
Comment thread src/rdkafka_request.c Outdated
Comment thread src/rdkafka_request.c
Comment thread src/rdkafka_request.c Outdated
@mahajanadhitya mahajanadhitya force-pushed the feature/listOffsets-AdminClient branch from 8f43390 to f97979d Compare October 11, 2023 09:20
@emasab
Copy link
Copy Markdown
Contributor

rd_kafka_offsets_for_times needs to be fixed as well but will be in a different PR

Comment thread src/rdkafka.c
if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
/* Remove its cache in case the topic isn't a known topic. */
rd_kafka_wrlock(rk);
rd_kafka_metadata_cache_delete_by_name(rk, state->topic);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can check here also that if cache is hinted for this topic, and in that case, do nothing as a refresh is already ongoing.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for ListOffsets

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already ensured by rd_kafka_metadata_cache_delete_by_name because it does a rd_kafka_metadata_cache_find with valid equal to 1.

Copy link
Copy Markdown
Contributor

@milindl Milind L (milindl) left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It mostly looks great, thanks for the PR mahajanadhitya and Emanuele Sabellico (@emasab) !
Some minor comments about the conventions and declarations of the variables
Two comments for the tests.
Good fix for the not leader issue, let's slowly (in other PRs) see if we can extend it to other admin functions which have similar retriable errors.

Comment thread CHANGELOG.md Outdated
Comment thread tests/0081-admin.c
rd_usleep(100000, 0);
} else if (err) {
TEST_FAIL("Failed with error: %s",
rd_kafka_err2name(err));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: break the outer loop here, it's cleaner that way without the else

Comment thread src/rdkafka_admin.c Outdated
goto err;
}

rd_list_t *topic_partitions_sorted = rd_list_new(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move declaration up

Comment thread src/rdkafka_admin.c Outdated

rd_list_t *topic_partitions_sorted = rd_list_new(
topic_partitions->cnt, rd_kafka_topic_partition_destroy_free);
for (i = 0; i < topic_partitions->cnt; i++) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for (i = 0; i < topic_partitions->cnt; i++) {
for (i = 0; i < topic_partitions->cnt; i++)

Comment thread src/rdkafka_admin.c Outdated
rd_list_add(
topic_partitions_sorted,
rd_kafka_topic_partition_copy(&topic_partitions->elems[i]));
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
}

Comment thread tests/0081-admin.c
err = rd_kafka_event_error(event);
if (err == RD_KAFKA_RESP_ERR__NOENT) {
/* Still looking for the leader */
rd_usleep(100000, 0);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: continue the loop here, avoids if elses

Comment thread src/rdkafka.h Outdated
RD_KAFKA_ADMIN_OP_DESCRIBECLUSTER, /**< DescribeCluster */
RD_KAFKA_ADMIN_OP__CNT /**< Number of ops defined */
/** ListOffsets */
RD_KAFKA_ADMIN_OP_LISTOFFSETS,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
RD_KAFKA_ADMIN_OP_LISTOFFSETS,
RD_KAFKA_ADMIN_OP_LISTOFFSETS, /**< ListOffsets */

Comment thread src/rdkafka.h Outdated
RD_KAFKA_ADMIN_OP_DESCRIBETOPICS, /**< DescribeTopics */
RD_KAFKA_ADMIN_OP_DESCRIBECLUSTER, /**< DescribeCluster */
RD_KAFKA_ADMIN_OP__CNT /**< Number of ops defined */
/** ListOffsets */
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/** ListOffsets */

Comment thread tests/0138-admin_mock.c Outdated

rd_kafka_event_destroy(rkev);

rd_kafka_mock_broker_push_request_error_rtts(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the watermarks test - replace push of errors with rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 2); instead and let the mock handler introduce the error

Comment thread tests/0081-admin.c Outdated

TEST_SAY("Testing offset %" PRId64 "\n", test_fixture.query);

rd_kafka_topic_partition_list_t *topic_partitions_copy =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

declaration might need to be moved up

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I approve. Given there are my changes too, let's wait for a second approval by Milind L (@milindl)

Copy link
Copy Markdown
Contributor

@milindl Milind L (milindl) left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for these changes!

rd_kafka_query_watermark_offsets metadata refresh bug
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants