[KIP-714] Additional consumer metrics#4808
[KIP-714] Additional consumer metrics#4808Emanuele Sabellico (emasab) merged 2 commits intomasterfrom
Conversation
Emanuele Sabellico (emasab)
left a comment
There was a problem hiding this comment.
First round of comments
| rd_kafka_app_poll_blocking(rk); | ||
|
|
||
| rd_kafka_yield_thread = 0; | ||
| rd_ts_t now = rd_clock(); |
There was a problem hiding this comment.
There are many ways the user can call poll, see where rd_kafka_app_poll_blocking is called, those places aren't considered at the moment. So you can convert rd_kafka_app_poll_blocking to rd_kafka_app_poll_start with a parameter that says if it's blocking that corresponds to timeout_ms and do the calculation there.
Given we're in the hot path let's reduce system calls, you can get the now value here and pass it to rd_timeout_init0 and then pass it to rd_kafka_app_poll_start too.
There was a problem hiding this comment.
Can you explain this thing further, since we are only refactoring how are we changing the order to reduce system call in the hot path ?
There was a problem hiding this comment.
System call here is for getting the monotonic clock, we want to do it only once to reduce number of these calls (it's already done in rd_timeout_init).
|
|
||
| rd_kafka_yield_thread = 0; | ||
| rd_ts_t now = rd_clock(); | ||
| if (rk->rk_telemetry.ts_fetch_last != -1) { |
There was a problem hiding this comment.
It can be useful for something else than telemetry so we can call it rk->rk_ts_last_poll_start also the check is if it non-zero given it's initialized to zero.
| rd_avg_destroy( | ||
| &rk->rk_telemetry.rk_avg_rollover.rk_avg_poll_idle_ratio); | ||
| rd_avg_rollover( | ||
| &rk->rk_telemetry.rk_avg_current.rk_avg_poll_idle_ratio, | ||
| &rk->rk_telemetry.rk_avg_rollover.rk_avg_poll_idle_ratio); |
There was a problem hiding this comment.
It's to be done only if rk->rk_type == RD_KAFKA_CONSUMER, also add instructions for rk_avg_rebalance_latency and rk_avg_commit_latency
| rk->rk_telemetry.ts_fetch_last = -1; | ||
| rk->rk_telemetry.ts_fetch_cb_last = -1; |
There was a problem hiding this comment.
These two (renamed) don't need to be reset on push but just initialized to zero by default and used afterwards
| "consumer coordinator.", | ||
| .unit = "ms", | ||
| .is_int = rd_true, | ||
| .is_per_broker = rd_true, |
There was a problem hiding this comment.
Per broker is rd_false everywhere except for node.request.latency, it's different from where we store the values, see KIP labels
| .is_per_broker = rd_true, | ||
| .type = RD_KAFKA_TELEMETRY_METRIC_TYPE_SUM}, | ||
| [RD_KAFKA_TELEMETRY_METRIC_CONSUMER_FETCH_MANAGER_FETCH_LATENCY_AVG] = | ||
| {.name = "consumer.fetch.manager.fetch.latency.avg ", |
There was a problem hiding this comment.
Remove these additional spaces in metric names within this file
| {.name = "consumer.fetch.manager.fetch.latency.avg ", | |
| {.name = "consumer.fetch.manager.fetch.latency.avg", |
Emanuele Sabellico (emasab)
left a comment
There was a problem hiding this comment.
Comments after first changes and about unit and mock tests
|
|
||
| switch ((int)rko->rko_type) { | ||
| case RD_KAFKA_OP_FETCH: | ||
| rk->rk_telemetry.ts_fetch_cb_last = rd_clock(); |
There was a problem hiding this comment.
This line left with previous name is causing a compilation error
|
|
||
| if (rkbuf->rkbuf_reqhdr.ApiKey == RD_KAFKAP_Fetch) { | ||
| rd_avg_add(&rkb->rkb_rk->rk_telemetry.rd_avg_current | ||
| .rk_avg_fetch_latency, |
There was a problem hiding this comment.
Fetch latency needs to stay per broker as it's useful to have this information differently from rk_avg_commit_latency that is increased by a single broker at a time (the group coordinator).
There was a problem hiding this comment.
But ithas per_broker set to false in the KIP table itself.
There was a problem hiding this comment.
I will move it from rk to rkb
|
|
||
| if(join_state == RD_KAFKA_CGRP_JOIN_STATE_STEADY){ | ||
| rd_avg_add(&rkcg->rkcg_curr_coord->rkb_telemetry.rd_avg_current | ||
| .rkb_avg_rebalance_latency, |
There was a problem hiding this comment.
This avg is in rk now
| .rkb_avg_rebalance_latency, | ||
| rd_clock() - rkcg->rkcg_ts_rebalance_start); | ||
| } | ||
| switch ((int)rkcg->rkcg_join_state) { |
There was a problem hiding this comment.
You can change the switch here and join it with previous condition in a if ... else if as it's on different variables: join_state in one case and rkcg->rkcg_join_state in the other.
| /* Synchronize state */ | ||
| rd_kafka_wrlock(rk); | ||
| rd_kafka_wrunlock(rk); | ||
| if(rk->rk_type == RD_KAFKA_CONSUMER){ |
There was a problem hiding this comment.
There's already an if condition for the consumer later, add these instructions there
| * successful PushTelemetry requests. | ||
| * See `requests_expected` for detailed expected flow. | ||
| */ | ||
| void do_test_telemetry_get_subscription_push_telemetry_consumer(void) { |
There was a problem hiding this comment.
please add static to all functions in this file except the main one
There was a problem hiding this comment.
this should be the existing do_test_telemetry_get_subscription_push_telemetry but taking rd_kafka_type_t type as a parameter. So we call the same function before with the producer and then with the consumer.
There was a problem hiding this comment.
Testing with the producer and consumer should be done for all the tests in this file, that's the next thing you should work on
There was a problem hiding this comment.
these are not new metric specific, i need to discuss these
| mcluster = test_mock_cluster_new(1, &bootstraps); | ||
| rd_kafka_mock_telemetry_set_requested_metrics(mcluster, | ||
| expected_metrics, 1); | ||
| rd_kafka_mock_telemetry_set_push_interval(mcluster, push_interval); | ||
| rd_kafka_mock_start_request_tracking(mcluster); |
There was a problem hiding this comment.
This part can be refactored to a create_mcluster function and used in all the tests
| test_conf_init(&conf, NULL, 30); | ||
| test_conf_set(conf, "bootstrap.servers", bootstraps); | ||
| test_conf_set(conf, "debug", "telemetry"); | ||
| consumer = test_create_handle(RD_KAFKA_CONSUMER, conf); |
There was a problem hiding this comment.
Creating the handle can also be refactored and used in all the tests. The consumer must subscribe to a topic that is created and pre populated in advance so we can see metrics in logs consumer tests.
|
|
||
| /* Poll for enough time for two pushes to be triggered, and a little | ||
| * extra, so 2.5 x push interval. */ | ||
| test_poll_timeout(consumer, push_interval * 2.5); |
There was a problem hiding this comment.
test_poll_timeout should call test_consumer_poll_timeout with the consumer or test_produce_msgs with the consumer so we can see metric in logs that are not zero and check if they're correct. A later refactor would be to automatically check the metric values but for this step that is enough.
| * extra, so 2.5 x push interval. */ | ||
| test_poll_timeout(consumer, push_interval * 2.5); | ||
|
|
||
| requests = rd_kafka_mock_get_requests(mcluster, &request_cnt); |
There was a problem hiding this comment.
requests can be got and destroyed directly in test_telemetry_check_protocol_request_times so we can just pass mcluster to it and remove this repeated part of the code in all the subtests.
There was a problem hiding this comment.
Since these are dependent on all the tests, I will come back on these, in the office hours !
|
Librdkafka KIP 714 New Metrics -> Addition of Telemetry Metrics Formatting done via, |
| RD_KAFKA_TELEMETRY_METRIC_CONSUMER_POLL_IDLE_RATIO_AVG, | ||
| RD_KAFKA_TELEMETRY_METRIC_CONSUMER_COORDINATOR_COMMIT_LATENCY_AVG, | ||
| RD_KAFKA_TELEMETRY_METRIC_CONSUMER_COORDINATOR_COMMIT_LATENCY_MAX}; | ||
| for (int i = 0; i < 6; i++) { |
There was a problem hiding this comment.
initialize before using in loop, this is causing CI failure
Anchit Jain (anchitj)
left a comment
There was a problem hiding this comment.
Overall looks good, some minor changes. Please also fix the CI
| * between buf_enq0 | ||
| * and writing to socket | ||
| */ | ||
| rd_avg_t rd_avg_fetch_latency; /**< Current fetch |
There was a problem hiding this comment.
This should be named rkb_avg_fetch_latency since this is at broker level.
| rkb_avg_throttle; /**< Rolled over throttle avg */ | ||
| rd_avg_t rkb_avg_outbuf_latency; /**< Rolled over outbuf | ||
| * latency avg */ | ||
| rd_avg_t rd_avg_fetch_latency; /**< Rolled over fetch |
There was a problem hiding this comment.
rkb_avg_fetch_latency
| int *matched_metrics; | ||
| size_t matched_metrics_cnt; | ||
|
|
||
|
|
| rd_kafka_mock_request_t **requests_actual, | ||
| size_t actual_cnt, | ||
| rd_kafka_mock_cluster_t *mcluster, | ||
| rd_kafka_telemetry_expected_request_t *requests_expected, | ||
| size_t expected_cnt) { | ||
| size_t actual_cnt; | ||
| rd_kafka_mock_request_t **requests_actual = | ||
| rd_kafka_mock_get_requests(mcluster, &actual_cnt); |
There was a problem hiding this comment.
Why was this changed? We either should also remove rd_kafka_mock_get_requests in the methods or keep this as it is
|
Please also resolve the earlier comments if they've been fixed |
|
🎉 All Contributor License Agreements have been signed. Ready to merge. |
6de0833 to
f55f3ec
Compare
add missing consumer metrics described in the KIP: * consumer.coordinator.rebalance.latency.avg * consumer.coordinator.rebalance.latency.max * consumer.coordinator.rebalance.latency.total * consumer.fetch.manager.fetch.latency.avg * consumer.fetch.manager.fetch.latency.max * consumer.poll.idle.ratio.avg * consumer.coordinator.commit.latency.avg * consumer.coordinator.commit.latency.max additionally: * add unit tests for all the metrics * add integrations tests with the producer or consumer while they're active * configurable group initial rebalance delay ms to make integration tests reusable with both producer and consumer --------- Co-authored-by: Anchit Jain <[email protected]> Co-authored-by: mahajanadhitya <[email protected]>
No description provided.