Skip to content

Add Kafka Connect as a built‑in JMX metrics target#15561

Merged
laurit merged 36 commits intoopen-telemetry:mainfrom
aaaugustine29:main
Mar 10, 2026
Merged

Add Kafka Connect as a built‑in JMX metrics target#15561
laurit merged 36 commits intoopen-telemetry:mainfrom
aaaugustine29:main

Conversation

@aaaugustine29
Copy link
Copy Markdown
Contributor

Overview:
This change introduces Kafka Connect as a first‑class JMX target system in the JMX metrics library. It adds a ruleset and documentation that cover both Apache Kafka Connect and Confluent Platform variants from the outset, so users can enable Kafka Connect monitoring without custom YAML.

Details:
Added kafka-connect.yaml JMX rules that map worker, rebalance, connector, task, source/sink task, and task-error MBeans into OpenTelemetry metrics, including Apache‑only metrics (e.g., worker rebalance protocol, per‑connector task counts, predicate/transform metadata, converter metadata, source transaction sizes, sink record lag max).
Defined connector and task status as state metrics using the superset of status values across Apache and Confluent, to avoid vendor‑specific enum mismatches.
Documented the new target in kafka-connect.md, including metric groups, attributes, and the dual‑vendor compatibility model (no renames; Apache list as a superset of Confluent docs).
Added self‑contained tests for the Kafka Connect rules that load the YAML, build metric definitions, and validate key state mappings and metric presence, ensuring the new target is ready to consume from day one.

Testing:
./gradlew -Dorg.gradle.configuration-cache.parallel=false instrumentation:jmx-metrics:library:test

@aaaugustine29 aaaugustine29 requested a review from a team as a code owner December 6, 2025 18:08
@linux-foundation-easycla
Copy link
Copy Markdown

linux-foundation-easycla Bot commented Dec 6, 2025

CLA Signed

The committers listed above are authorized under a signed CLA.

@laurit
Copy link
Copy Markdown
Contributor

laurit commented Dec 8, 2025

@SylvainJuge could you review this

Copy link
Copy Markdown
Contributor

@SylvainJuge SylvainJuge left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @aaaugustine29, thanks for opening this!

There are quite a lot of metrics added here, so it makes it quite challenging to review them all.

I don't have any expertise in Kafka Connect, so you are probably more knowledgeable here.

I would suggest to :

  • implement test with a real instance of the target system, ideally the two apache/confluent variants
  • as a first step, focus on the "essential" metrics, do not include everything that is available, this is where your knowledge might be useful
  • try to simplify the the maximum by using metric attributes to provide breakdown when possible if the metrics represent a partition (for example on state).

Comment thread instrumentation/jmx-metrics/library/kafka-connect.md Outdated
Comment thread instrumentation/jmx-metrics/library/kafka-connect.md Outdated
Comment thread instrumentation/jmx-metrics/library/kafka-connect.md Outdated
Comment thread instrumentation/jmx-metrics/library/kafka-connect.md Outdated
Comment thread instrumentation/jmx-metrics/library/kafka-connect.md Outdated
@aaaugustine29
Copy link
Copy Markdown
Contributor Author

aaaugustine29 commented Dec 8, 2025

@SylvainJuge Thanks for your help and guidance. At this point, the metrics have been reduced to the minimum set without losing any information. That being said, that doesn't mean we need to keep everything. In particular, your previous comment brings up the opportunity for consolidating some of them with metric attributes. However, there will be a loss of info for a niche and advanced group. What's your guidance on this?

And to clarify your comment about testing, having tests that actually instantiate a kafka connect cluster will be very heavy, I could emulate what the apache jmx server would produce, would that be sufficient?

@aaaugustine29
Copy link
Copy Markdown
Contributor Author

Hi @SylvainJuge, I hope you had a good holiday season. I've tried to thematically include metric attributes and yaml tags where possible. Do you have any more comments or concerns?

@aaaugustine29
Copy link
Copy Markdown
Contributor Author

Hi @SylvainJuge, I was wondering if you had any time to take a look at the current state of the MR?

@aaaugustine29
Copy link
Copy Markdown
Contributor Author

Hi @aaaugustine29 , sorry I hadn't had time to dedicate to reviewing this PR.

In order to properly be able to validate the metrics, this PR is still lacking a real end-to-end test with a kafka connect instance. Without this we have no guarantee that any of the metrics described here will actually be properly captured when deployed on a real instance. So this makes it very hard for anyone to approve it without having any significant kafka (and kafka connect) knowledge and experience.

Another thing here is that there are lots of metrics that are "derived" from other metrics and we should not capture them as they should be computed on the backend. I would suggest to re-evaluate the relevance of all the metrics with .max, .avg, .rate or .min in their names. The goal of those pre-defined metrics is to capture the essential metrics for a given system, not to capture all the possible metrics exposed, otherwise this will create lots of noise and will likely require users to discard most metrics or having to maintain their own metrics definitions.

Hi @SylvainJuge
Testing has been updated to instantiate a container that can emit the metrics. Only a few can't be tested without instantiating multiple heavier containers. I can do so if you'd prefer.

I've also removed some derivable metrics, please give it a look whenever you get a chance. Thanks!

@aaaugustine29
Copy link
Copy Markdown
Contributor Author

Hi @SylvainJuge, just checking in to see if you have had any time to review this. I think it should be in a good state.

@SylvainJuge
Copy link
Copy Markdown
Contributor

Hi @aaaugustine29, I'm currently waiting to have #16212 to be merged first. I think it does provide a few hints and general recommendations that would be worth following for this type of PR to add new JMX metrics.

@laurit laurit added this to the v2.26.0 milestone Feb 27, 2026
metric: task.count
type: updowncounter
unit: "{task}"
desc: The number of tasks run in this worker.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This description is a bit unclear to me. Is it a total number of tasks that were run since the startup time ? If so then maybe it can be calculated by backend as a sum of data points of kafka.connect.worker.task.startup ? Also in this case type should be counter.

If this is a numbe of currently running tasks then description should reflect it clearly.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its the number of tasks (processes related to a type of connector) running on a worker (node). The number of tasks could go up and down depending on the rebalancing of that worker. But actually seeing that number go up or down in a useful way is a pretty advanced use case of kafka connect.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So shouldn't this description look like this?
The number of tasks running in this worker.

@aaaugustine29
Copy link
Copy Markdown
Contributor Author

Hi @robsunday, I really appreciate you taking the time to thoroughly review this PR. I have updated everything according to your comments, other than the two that I left responses to. I'm happy to change those after your response though. Thank you again!

metric: task.count
type: updowncounter
unit: "{task}"
desc: The number of tasks run in this worker.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So shouldn't this description look like this?
The number of tasks running in this worker.


MetricsVerifier verifier = MetricsVerifier.create().disableStrictMode();
for (String metricName : metricNames) {
verifier.add(metricName, metric -> {});
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume you are going to fill this method with metrics verification, like it is done for other targets.
If needed then you will soon be able to verify also values of retrieved metrics (see this PR )

restarting: [restarting, RESTARTING]
destroyed: [destroyed, DESTROYED]
unknown: "*"
# kafka.connect.task.class
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove it

# kafka.connect.worker.rebalance.protocol
connect-protocol:
metric: protocol
type: updowncounter
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is updowncounter a good type for protocol? I think gauge is more appropriate.
The question is if this metric is really needed? Possibly it is just a configuration setting that will not change, but this is just my guess

Copy link
Copy Markdown
Contributor

@laurit laurit Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SylvainJuge should this be a state metric?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't find type: state used in any of the existing yamls. updowncounter vs gage can often be decided by thinking whether adding up the values for different attributes makes sense.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@laurit you are right, there is currently no usage of state metrics in the metrics definitions that are included in this project.

I haven't looked closely, but having this defined as a state metric means that the value could change over the lifetime of the process.

Also worth asking about is the relevance of this metric, for example what does this metric provides in terms of information about the health or state of the system. The description seems to indicate it's more a property or the result of the cluster configuration, in that case I don't think it would be relevant to capture this as a metric as it would be constant most of the time.

# kafka.connect.connector.status
status:
metric: status
type: updowncounter
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be gauge

metricAttribute:
kafka.connect.connector: param(connector)
mapping:
# kafka.connect.connector.status
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is unit here?

type: gauge
unit: "1"
desc: The fraction of time this task has spent in the running state.
# kafka.connect.task.status
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please specify unit.

# kafka.connect.task.status
status:
metric: status
type: updowncounter
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather gauge?

rule -> {
assertThat(rule.getMetricType())
.isNotEqualTo(
io.opentelemetry.instrumentation.jmx.internal.engine.MetricInfo.Type.STATE);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could import io.opentelemetry.instrumentation.jmx.internal.engine.MetricInfo


@Test
void kafkaConnectRulesUseBasicMetricTypes() throws Exception {
io.opentelemetry.instrumentation.jmx.internal.yaml.JmxConfig config = loadKafkaConnectConfig();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could import io.opentelemetry.instrumentation.jmx.internal.yaml.JmxConfig

Comment on lines +394 to +398
try {
stream.close();
} catch (IOException ignored) {
// best effort cleanup
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could try using try-with-resources for the stream to avoid handling close here

});
}

private static HttpResponseData sendRequest(String method, String url, String body)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively could consider using the armeria http client that is used in other tests. Could be a bit easier that using the http url connection.

import io.opentelemetry.testing.internal.armeria.client.WebClient;
import io.opentelemetry.testing.internal.armeria.common.AggregatedHttpRequest;
import io.opentelemetry.testing.internal.armeria.common.AggregatedHttpResponse;
import io.opentelemetry.testing.internal.armeria.common.HttpMethod;
import io.opentelemetry.testing.internal.armeria.common.MediaType;

private static final WebClient client = WebClient.of();

  private static void createConnector(String connectUrl, String connectorConfigJson) {
    AggregatedHttpResponse response =
        sendRequest(HttpMethod.POST, connectUrl + "/connectors", connectorConfigJson);
    assertThat(response.status().code()).isIn(200, 201, 409);
  }

  private static void awaitConnectorRunning(String connectUrl, String connectorName) {
    await()
        .atMost(Duration.ofMinutes(2))
        .pollInterval(Duration.ofSeconds(1))
        .untilAsserted(
            () -> {
              AggregatedHttpResponse response =
                  sendRequest(HttpMethod.GET, connectUrl + "/connectors/" + connectorName + "/status", null);
              assertThat(response.status().code()).isEqualTo(200);
              assertThat(response.contentUtf8()).contains("\"state\":\"RUNNING\"");
            });
  }

  private static AggregatedHttpResponse sendRequest(HttpMethod method, String url, String body) {
    AggregatedHttpRequest request =
        body != null
            ? AggregatedHttpRequest.of(method, url, MediaType.JSON, body)
            : AggregatedHttpRequest.of(method, url);
    return client.execute(request).aggregate().join();
  }

assertThat(metric.getMetricType())
.isNotEqualTo(
io.opentelemetry.instrumentation.jmx.internal.engine.MetricInfo
.Type.STATE));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious why using state metrics would be a problem?

@laurit laurit merged commit 21432cf into open-telemetry:main Mar 10, 2026
93 checks passed
@otelbot
Copy link
Copy Markdown
Contributor

otelbot Bot commented Mar 10, 2026

Thank you for your contribution @aaaugustine29! 🎉 We would like to hear from you about your experience contributing to OpenTelemetry by taking a few minutes to fill out this survey.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants