Skip to content

Fully utilize the max.in.flight.requests.per.connection parameter on the idempotent producer#4989

Open
Marcin Krystianc (marcin-krystianc) wants to merge 1 commit intoconfluentinc:masterfrom
marcin-krystianc:dev-20250310-inflight
Open

Fully utilize the max.in.flight.requests.per.connection parameter on the idempotent producer#4989
Marcin Krystianc (marcin-krystianc) wants to merge 1 commit intoconfluentinc:masterfrom
marcin-krystianc:dev-20250310-inflight

Conversation

@marcin-krystianc
Copy link
Copy Markdown
Contributor

Currently, the librdkafka implementation sends only one produce request at a time, regardless of the configured value for the max.in.flight.requests.per.connection parameter. This limitation prevents the producer from fully leveraging the potential of concurrent requests, which can impact throughput and performance.

With the proposed changes, the max.in.flight.requests.per.connection parameter will be respected, allowing the number of concurrent requests to be aligned with the specified value. This enhancement aims to optimize the idempotent producer's performance by enabling multiple produce requests to be sent simultaneously, thereby improving overall latency.

To validate my findings, I conducted tests on a network with a simulated delay of 1000 ms, configured using the tc command (tc qdisc add dev eth0 root netem delay 1000ms). I have also attached pcap files for your review.

Relevant producer parameters:

--config enable.idempotence=true
--config max.in.flight.requests.per.connection=5
--config linger.ms=200
--config socket.nagle.disable=true
418	13:01:07.506672	172.18.0.4	172.18.0.3	59686,19092	Kafka Produce v10 Request
432	13:01:08.508093	172.18.0.3	172.18.0.4	19092,59686	Kafka Produce v10 Response
454	13:01:08.508619	172.18.0.4	172.18.0.3	59686,19092	Kafka Produce v10 Request
459	13:01:09.509790	172.18.0.3	172.18.0.4	19092,59686	Kafka Produce v10 Response
467	13:01:09.510013	172.18.0.4	172.18.0.3	59686,19092	Kafka Produce v10 Request
474	13:01:10.511038	172.18.0.3	172.18.0.4	19092,59686	Kafka Produce v10 Response
482	13:01:10.511223	172.18.0.4	172.18.0.3	59686,19092	Kafka Produce v10 Request
487	13:01:11.512288	172.18.0.3	172.18.0.4	19092,59686	Kafka Produce v10 Response
495	13:01:11.512462	172.18.0.4	172.18.0.3	59686,19092	Kafka Produce v10 Request
500	13:01:12.513293	172.18.0.3	172.18.0.4	19092,59686	Kafka Produce v10 Response
508	13:01:12.513462	172.18.0.4	172.18.0.3	59686,19092	Kafka Produce v10 Request
237	12:58:32.288766	172.18.0.4	172.18.0.3	60190,19092	Kafka Produce v10 Request
239	12:58:32.288771	172.18.0.4	172.18.0.3	60190,19092	Kafka Produce v10 Request
246	12:58:32.329058	172.18.0.4	172.18.0.3	60190,19092	Kafka Produce v10 Request
247	12:58:32.329061	172.18.0.4	172.18.0.3	60190,19092	Kafka Produce v10 Request
250	12:58:32.788187	172.18.0.4	172.18.0.3	60190,19092	Kafka Produce v10 Request
260	12:58:33.301184	172.18.0.3	172.18.0.4	19092,60190	Kafka Produce v10 Response
271	12:58:33.301766	172.18.0.4	172.18.0.3	60190,19092	Kafka Produce v10 Request
272	12:58:33.302767	172.18.0.3	172.18.0.4	19092,60190	Kafka Produce v10 Response
275	12:58:33.330077	172.18.0.3	172.18.0.4	19092,60190	Kafka Produce v10 Response
277	12:58:33.330703	172.18.0.3	172.18.0.4	19092,60190	Kafka Produce v10 Response

@confluent-cla-assistant
Copy link
Copy Markdown

🎉 All Contributor License Agreements have been signed. Ready to merge.
✅ marcin-krystianc
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

Comment thread src/rdkafka_broker.c
/* Limit the number of in-flight requests (per partition)
* to the broker's sequence de-duplication window. */
max_requests = RD_MIN(max_requests,
RD_KAFKA_IDEMP_MAX_INFLIGHT - inflight);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The inflight variable holds the number of inflight messages (rd_atomic32_get(&rktp->rktp_msgs_inflight)) instead of the number of inflight requests.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Marcin Krystianc (@marcin-krystianc) for spotting this! I'll check if other changes are needed but it doesn't seem at the moment.

Marcin Krystianc (marcin-krystianc) added a commit to marcin-krystianc/librdkafka that referenced this pull request Apr 29, 2025
 - confluentinc#4972 (Avoid unnecessary producer epoch bumps)
 - confluentinc#4989 (Fully utilize the max.in.flight.requests.per.connection parameter on the idempotent producer)
 - confluentinc#5055 (Add missing wrlock around rd_kafka_metadata_cache_hint call)
Marcin Krystianc (marcin-krystianc) added a commit to marcin-krystianc/librdkafka that referenced this pull request Apr 29, 2025
 - confluentinc#4972 (Avoid unnecessary producer epoch bumps)
 - confluentinc#4989 (Fully utilize the max.in.flight.requests.per.connection parameter on the idempotent producer)
 - confluentinc#5055 (Add missing wrlock around rd_kafka_metadata_cache_hint call)
Marcin Krystianc (marcin-krystianc) added a commit to marcin-krystianc/librdkafka that referenced this pull request Apr 29, 2025
 - confluentinc#4972 (Avoid unnecessary producer epoch bumps)
 - confluentinc#4989 (Fully utilize the max.in.flight.requests.per.connection parameter on the idempotent producer)
 - confluentinc#5055 (Add missing wrlock around rd_kafka_metadata_cache_hint call)
Marcin Krystianc (marcin-krystianc) added a commit to G-Research/librdkafka that referenced this pull request Apr 30, 2025
- confluentinc#4972 (Avoid unnecessary producer epoch bumps)
 - confluentinc#4989 (Fully utilize the max.in.flight.requests.per.connection parameter on the idempotent producer)
 - confluentinc#5055 (Add missing wrlock around rd_kafka_metadata_cache_hint call)
Jonathan Giannuzzi (jgiannuzzi) added a commit to jgiannuzzi/librdkafka that referenced this pull request Jul 31, 2025
- G-Research#3 (CI/CD script)
- confluentinc#4972 (Avoid unnecessary producer epoch bumps)
- confluentinc#4989 (Fully utilize the max.in.flight.requests.per.connection parameter on the idempotent producer)
Mark Wadham (m4rkw) pushed a commit to G-Research/librdkafka that referenced this pull request Jul 31, 2025
* Hotfix Release: v2.11.0-gr - collective changes

- #3 (CI/CD script)
- confluentinc#4972 (Avoid unnecessary producer epoch bumps)
- confluentinc#4989 (Fully utilize the max.in.flight.requests.per.connection parameter on the idempotent producer)

* Fix line endings

* Make style checks CI job work

It will fail because of some style issues from upstream, but at least it should complete instead of hanging forever.

* Build for arm64 linux without emulation
@marcin-krystianc
Copy link
Copy Markdown
Contributor Author

Hi Emanuele Sabellico (@emasab) , do you have a rough timeline for when you anticipate being able to merge this change?

Jonathan Giannuzzi (jgiannuzzi) added a commit to jgiannuzzi/librdkafka that referenced this pull request Sep 8, 2025
- G-Research#3 (CI/CD script)
- confluentinc#4972 (Avoid unnecessary producer epoch bumps)
- confluentinc#4989 (Fully utilize the max.in.flight.requests.per.connection parameter on the idempotent producer)
 - confluentinc#4972 (Avoid unnecessary producer epoch bumps)
 - confluentinc#4989 (Fully utilize the max.in.flight.requests.per.connection parameter on the idempotent producer)
 - confluentinc#5168 (Use system-provided cyrus-sasl/libsasl2 at runtime)
Jonathan Giannuzzi (jgiannuzzi) added a commit to jgiannuzzi/librdkafka that referenced this pull request Sep 8, 2025
- G-Research#3 (CI/CD script)
- confluentinc#4972 (Avoid unnecessary producer epoch bumps)
- confluentinc#4989 (Fully utilize the max.in.flight.requests.per.connection parameter on the idempotent producer)
 - confluentinc#5168 (Use system-provided cyrus-sasl/libsasl2 at runtime)
Jonathan Giannuzzi (jgiannuzzi) added a commit to G-Research/librdkafka that referenced this pull request Sep 8, 2025
- #3 (CI/CD script)
- confluentinc#4972 (Avoid unnecessary producer epoch bumps)
- confluentinc#4989 (Fully utilize the max.in.flight.requests.per.connection parameter on the idempotent producer)
 - confluentinc#5168 (Use system-provided cyrus-sasl/libsasl2 at runtime)
Jonathan Giannuzzi (jgiannuzzi) added a commit to jgiannuzzi/librdkafka that referenced this pull request Oct 9, 2025
- G-Research#3 (CI/CD script)
- confluentinc#4972 (Avoid unnecessary producer epoch bumps)
- confluentinc#4989 (Fully utilize the max.in.flight.requests.per.connection parameter on the idempotent producer)
 - confluentinc#5168 (Use system-provided cyrus-sasl/libsasl2 at runtime)
Jonathan Giannuzzi (jgiannuzzi) added a commit to jgiannuzzi/librdkafka that referenced this pull request Oct 9, 2025
- G-Research#3 (CI/CD script)
- confluentinc#4972 (Avoid unnecessary producer epoch bumps)
- confluentinc#4989 (Fully utilize the max.in.flight.requests.per.connection parameter on the idempotent producer)
 - confluentinc#5168 (Use system-provided cyrus-sasl/libsasl2 at runtime)
Jonathan Giannuzzi (jgiannuzzi) added a commit to jgiannuzzi/librdkafka that referenced this pull request Oct 9, 2025
- G-Research#3 (CI/CD script)
- confluentinc#4972 (Avoid unnecessary producer epoch bumps)
- confluentinc#4989 (Fully utilize the max.in.flight.requests.per.connection parameter on the idempotent producer)
 - confluentinc#5168 (Use system-provided cyrus-sasl/libsasl2 at runtime)
Jonathan Giannuzzi (jgiannuzzi) added a commit to jgiannuzzi/librdkafka that referenced this pull request Oct 9, 2025
- G-Research#3 (CI/CD script)
- confluentinc#4972 (Avoid unnecessary producer epoch bumps)
- confluentinc#4989 (Fully utilize the max.in.flight.requests.per.connection parameter on the idempotent producer)
 - confluentinc#5168 (Use system-provided cyrus-sasl/libsasl2 at runtime)
Jonathan Giannuzzi (jgiannuzzi) added a commit to jgiannuzzi/librdkafka that referenced this pull request Oct 22, 2025
- G-Research#3 (CI/CD script)
- confluentinc#4972 (Avoid unnecessary producer epoch bumps)
- confluentinc#4989 (Fully utilize the max.in.flight.requests.per.connection parameter on the idempotent producer)
 - confluentinc#5168 (Use system-provided cyrus-sasl/libsasl2 at runtime)
Jonathan Giannuzzi (jgiannuzzi) added a commit to G-Research/librdkafka that referenced this pull request Oct 22, 2025
- #3 (CI/CD script)
- confluentinc#4972 (Avoid unnecessary producer epoch bumps)
- confluentinc#4989 (Fully utilize the max.in.flight.requests.per.connection parameter on the idempotent producer)
 - confluentinc#5168 (Use system-provided cyrus-sasl/libsasl2 at runtime)
Comment thread src/rdkafka_broker.c Outdated
* to the broker's sequence de-duplication window. */
max_requests = RD_MIN(max_requests,
RD_KAFKA_IDEMP_MAX_INFLIGHT - inflight);
max_requests = rkb->rkb_rk->rk_conf.max_inflight - rd_kafka_bufq_cnt(&rkb->rkb_waitresps);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even with the fix it should still respect the max_requests variable that corresponds to queue.buffering.backpressure.threshold and defaults to 1 and when idempotency is enabled cannot be changed, see here. I think this was before max.in.flight was increased from 1 to 5 for the idempotent producer and we could give an error in case it's lower than 5 but by changing that configuration check.

At the moment it won't limit the in-flight requests to one once we have this fix, but only the output buffers to 1

Suggested change
max_requests = rkb->rkb_rk->rk_conf.max_inflight - rd_kafka_bufq_cnt(&rkb->rkb_waitresps);
max_requests = RD_MIN(max_requests,
RD_KAFKA_IDEMP_MAX_INFLIGHT - rd_kafka_bufq_cnt(&rkb->rkb_waitresps));

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I've tested locally whether there is any observable difference in network traffic after your suggestion, and I didn't spot any change (thus it still works as expected).

@emasab
Copy link
Copy Markdown
Contributor

Emanuele Sabellico (emasab) commented Jan 14, 2026

Thanks Marcin Krystianc (@marcin-krystianc) ! Verified again this one and I still agree with the change, I just I request a small edit.
About the fix, even if queue.buffering.backpressure.threshold is 1 by default as you outlined it's likely there are more than 1 message per buffer and in that case it stops enqueuing until all message responses are received ('rd_atomic32_get(&rktp->rktp_msgs_inflight)') , not until the are no buffers in the output queue rkb_outbufs, actually limiting inflight requests to 1 instead of 5.

@emasab
Copy link
Copy Markdown
Contributor

/sem-approve

Jonathan Giannuzzi (jgiannuzzi) added a commit to jgiannuzzi/librdkafka that referenced this pull request Jan 16, 2026
- G-Research#3 (CI/CD script)
- confluentinc#4972 (Avoid
unnecessary producer epoch bumps)
- confluentinc#4989 (Fully utilize
the max.in.flight.requests.per.connection parameter on the idempotent
producer)
 - confluentinc#5168 (Use
system-provided cyrus-sasl/libsasl2 at runtime)
Jonathan Giannuzzi (jgiannuzzi) added a commit to jgiannuzzi/librdkafka that referenced this pull request Jan 16, 2026
- G-Research#3 (CI/CD script)
- confluentinc#4972 (Avoid
unnecessary producer epoch bumps)
- confluentinc#4989 (Fully utilize
the max.in.flight.requests.per.connection parameter on the idempotent
producer)
 - confluentinc#5168 (Use
system-provided cyrus-sasl/libsasl2 at runtime)
Jonathan Giannuzzi (jgiannuzzi) added a commit to jgiannuzzi/librdkafka that referenced this pull request Jan 16, 2026
Jonathan Giannuzzi (jgiannuzzi) added a commit to G-Research/librdkafka that referenced this pull request Jan 16, 2026
* CI/CD script

* Avoid unnecessary producer epoch bumps

confluentinc#4972

* Fully utilize the max.in.flight.requests.per.connection parameter on the idempotent producer

confluentinc#4989

* Use system-provided cyrus-sasl/libsasl2 at runtime

confluentinc#5168

* Update changelog
@emasab
Copy link
Copy Markdown
Contributor

/sem-approve

Jonathan Giannuzzi (jgiannuzzi) added a commit to jgiannuzzi/librdkafka that referenced this pull request Mar 6, 2026
Jonathan Giannuzzi (jgiannuzzi) added a commit to G-Research/librdkafka that referenced this pull request Mar 8, 2026
* CI/CD script

* Avoid unnecessary producer epoch bumps

confluentinc#4972

* Fully utilize the max.in.flight.requests.per.connection parameter on the idempotent producer

confluentinc#4989

* Use system-provided cyrus-sasl/libsasl2 at runtime

confluentinc#5168

* Update changelog
Jonathan Giannuzzi (jgiannuzzi) added a commit to jgiannuzzi/librdkafka that referenced this pull request Apr 9, 2026
Jonathan Giannuzzi (jgiannuzzi) added a commit to G-Research/librdkafka that referenced this pull request Apr 9, 2026
* CI/CD script

* Avoid unnecessary producer epoch bumps

confluentinc#4972

* Fully utilize the max.in.flight.requests.per.connection parameter on the idempotent producer

confluentinc#4989

* Use system-provided cyrus-sasl/libsasl2 at runtime

confluentinc#5168

* Update changelog
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants