Add Kafka Connect as a built‑in JMX metrics target#15561
Add Kafka Connect as a built‑in JMX metrics target#15561laurit merged 36 commits intoopen-telemetry:mainfrom
Conversation
|
@SylvainJuge could you review this |
SylvainJuge
left a comment
There was a problem hiding this comment.
Hi @aaaugustine29, thanks for opening this!
There are quite a lot of metrics added here, so it makes it quite challenging to review them all.
I don't have any expertise in Kafka Connect, so you are probably more knowledgeable here.
I would suggest to :
- implement test with a real instance of the target system, ideally the two apache/confluent variants
- as a first step, focus on the "essential" metrics, do not include everything that is available, this is where your knowledge might be useful
- try to simplify the the maximum by using metric attributes to provide breakdown when possible if the metrics represent a partition (for example on state).
|
@SylvainJuge Thanks for your help and guidance. At this point, the metrics have been reduced to the minimum set without losing any information. That being said, that doesn't mean we need to keep everything. In particular, your previous comment brings up the opportunity for consolidating some of them with metric attributes. However, there will be a loss of info for a niche and advanced group. What's your guidance on this? And to clarify your comment about testing, having tests that actually instantiate a kafka connect cluster will be very heavy, I could emulate what the apache jmx server would produce, would that be sufficient? |
|
Hi @SylvainJuge, I hope you had a good holiday season. I've tried to thematically include metric attributes and yaml tags where possible. Do you have any more comments or concerns? |
|
Hi @SylvainJuge, I was wondering if you had any time to take a look at the current state of the MR? |
Hi @SylvainJuge I've also removed some derivable metrics, please give it a look whenever you get a chance. Thanks! |
|
Hi @SylvainJuge, just checking in to see if you have had any time to review this. I think it should be in a good state. |
|
Hi @aaaugustine29, I'm currently waiting to have #16212 to be merged first. I think it does provide a few hints and general recommendations that would be worth following for this type of PR to add new JMX metrics. |
| metric: task.count | ||
| type: updowncounter | ||
| unit: "{task}" | ||
| desc: The number of tasks run in this worker. |
There was a problem hiding this comment.
This description is a bit unclear to me. Is it a total number of tasks that were run since the startup time ? If so then maybe it can be calculated by backend as a sum of data points of kafka.connect.worker.task.startup ? Also in this case type should be counter.
If this is a numbe of currently running tasks then description should reflect it clearly.
There was a problem hiding this comment.
Its the number of tasks (processes related to a type of connector) running on a worker (node). The number of tasks could go up and down depending on the rebalancing of that worker. But actually seeing that number go up or down in a useful way is a pretty advanced use case of kafka connect.
There was a problem hiding this comment.
So shouldn't this description look like this?
The number of tasks running in this worker.
|
Hi @robsunday, I really appreciate you taking the time to thoroughly review this PR. I have updated everything according to your comments, other than the two that I left responses to. I'm happy to change those after your response though. Thank you again! |
| metric: task.count | ||
| type: updowncounter | ||
| unit: "{task}" | ||
| desc: The number of tasks run in this worker. |
There was a problem hiding this comment.
So shouldn't this description look like this?
The number of tasks running in this worker.
|
|
||
| MetricsVerifier verifier = MetricsVerifier.create().disableStrictMode(); | ||
| for (String metricName : metricNames) { | ||
| verifier.add(metricName, metric -> {}); |
There was a problem hiding this comment.
I assume you are going to fill this method with metrics verification, like it is done for other targets.
If needed then you will soon be able to verify also values of retrieved metrics (see this PR )
| restarting: [restarting, RESTARTING] | ||
| destroyed: [destroyed, DESTROYED] | ||
| unknown: "*" | ||
| # kafka.connect.task.class |
| # kafka.connect.worker.rebalance.protocol | ||
| connect-protocol: | ||
| metric: protocol | ||
| type: updowncounter |
There was a problem hiding this comment.
Is updowncounter a good type for protocol? I think gauge is more appropriate.
The question is if this metric is really needed? Possibly it is just a configuration setting that will not change, but this is just my guess
There was a problem hiding this comment.
Couldn't find type: state used in any of the existing yamls. updowncounter vs gage can often be decided by thinking whether adding up the values for different attributes makes sense.
There was a problem hiding this comment.
@laurit you are right, there is currently no usage of state metrics in the metrics definitions that are included in this project.
I haven't looked closely, but having this defined as a state metric means that the value could change over the lifetime of the process.
Also worth asking about is the relevance of this metric, for example what does this metric provides in terms of information about the health or state of the system. The description seems to indicate it's more a property or the result of the cluster configuration, in that case I don't think it would be relevant to capture this as a metric as it would be constant most of the time.
| # kafka.connect.connector.status | ||
| status: | ||
| metric: status | ||
| type: updowncounter |
There was a problem hiding this comment.
I think it should be gauge
| metricAttribute: | ||
| kafka.connect.connector: param(connector) | ||
| mapping: | ||
| # kafka.connect.connector.status |
| type: gauge | ||
| unit: "1" | ||
| desc: The fraction of time this task has spent in the running state. | ||
| # kafka.connect.task.status |
| # kafka.connect.task.status | ||
| status: | ||
| metric: status | ||
| type: updowncounter |
| rule -> { | ||
| assertThat(rule.getMetricType()) | ||
| .isNotEqualTo( | ||
| io.opentelemetry.instrumentation.jmx.internal.engine.MetricInfo.Type.STATE); |
There was a problem hiding this comment.
could import io.opentelemetry.instrumentation.jmx.internal.engine.MetricInfo
|
|
||
| @Test | ||
| void kafkaConnectRulesUseBasicMetricTypes() throws Exception { | ||
| io.opentelemetry.instrumentation.jmx.internal.yaml.JmxConfig config = loadKafkaConnectConfig(); |
There was a problem hiding this comment.
could import io.opentelemetry.instrumentation.jmx.internal.yaml.JmxConfig
| try { | ||
| stream.close(); | ||
| } catch (IOException ignored) { | ||
| // best effort cleanup | ||
| } |
There was a problem hiding this comment.
you could try using try-with-resources for the stream to avoid handling close here
| }); | ||
| } | ||
|
|
||
| private static HttpResponseData sendRequest(String method, String url, String body) |
There was a problem hiding this comment.
Alternatively could consider using the armeria http client that is used in other tests. Could be a bit easier that using the http url connection.
import io.opentelemetry.testing.internal.armeria.client.WebClient;
import io.opentelemetry.testing.internal.armeria.common.AggregatedHttpRequest;
import io.opentelemetry.testing.internal.armeria.common.AggregatedHttpResponse;
import io.opentelemetry.testing.internal.armeria.common.HttpMethod;
import io.opentelemetry.testing.internal.armeria.common.MediaType;
private static final WebClient client = WebClient.of();
private static void createConnector(String connectUrl, String connectorConfigJson) {
AggregatedHttpResponse response =
sendRequest(HttpMethod.POST, connectUrl + "/connectors", connectorConfigJson);
assertThat(response.status().code()).isIn(200, 201, 409);
}
private static void awaitConnectorRunning(String connectUrl, String connectorName) {
await()
.atMost(Duration.ofMinutes(2))
.pollInterval(Duration.ofSeconds(1))
.untilAsserted(
() -> {
AggregatedHttpResponse response =
sendRequest(HttpMethod.GET, connectUrl + "/connectors/" + connectorName + "/status", null);
assertThat(response.status().code()).isEqualTo(200);
assertThat(response.contentUtf8()).contains("\"state\":\"RUNNING\"");
});
}
private static AggregatedHttpResponse sendRequest(HttpMethod method, String url, String body) {
AggregatedHttpRequest request =
body != null
? AggregatedHttpRequest.of(method, url, MediaType.JSON, body)
: AggregatedHttpRequest.of(method, url);
return client.execute(request).aggregate().join();
}| assertThat(metric.getMetricType()) | ||
| .isNotEqualTo( | ||
| io.opentelemetry.instrumentation.jmx.internal.engine.MetricInfo | ||
| .Type.STATE)); |
There was a problem hiding this comment.
curious why using state metrics would be a problem?
|
Thank you for your contribution @aaaugustine29! 🎉 We would like to hear from you about your experience contributing to OpenTelemetry by taking a few minutes to fill out this survey. |
Overview:
This change introduces Kafka Connect as a first‑class JMX target system in the JMX metrics library. It adds a ruleset and documentation that cover both Apache Kafka Connect and Confluent Platform variants from the outset, so users can enable Kafka Connect monitoring without custom YAML.
Details:
Added kafka-connect.yaml JMX rules that map worker, rebalance, connector, task, source/sink task, and task-error MBeans into OpenTelemetry metrics, including Apache‑only metrics (e.g., worker rebalance protocol, per‑connector task counts, predicate/transform metadata, converter metadata, source transaction sizes, sink record lag max).
Defined connector and task status as state metrics using the superset of status values across Apache and Confluent, to avoid vendor‑specific enum mismatches.
Documented the new target in kafka-connect.md, including metric groups, attributes, and the dual‑vendor compatibility model (no renames; Apache list as a superset of Confluent docs).
Added self‑contained tests for the Kafka Connect rules that load the YAML, build metric definitions, and validate key state mappings and metric presence, ensuring the new target is ready to consume from day one.
Testing:
./gradlew -Dorg.gradle.configuration-cache.parallel=false instrumentation:jmx-metrics:library:test