Fully utilize the max.in.flight.requests.per.connection parameter on the idempotent producer#4989
Conversation
|
🎉 All Contributor License Agreements have been signed. Ready to merge. |
| /* Limit the number of in-flight requests (per partition) | ||
| * to the broker's sequence de-duplication window. */ | ||
| max_requests = RD_MIN(max_requests, | ||
| RD_KAFKA_IDEMP_MAX_INFLIGHT - inflight); |
There was a problem hiding this comment.
The inflight variable holds the number of inflight messages (rd_atomic32_get(&rktp->rktp_msgs_inflight)) instead of the number of inflight requests.
There was a problem hiding this comment.
Thanks Marcin Krystianc (@marcin-krystianc) for spotting this! I'll check if other changes are needed but it doesn't seem at the moment.
- CI/CD script (See #3) - confluentinc#4989 - confluentinc#4972 - confluentinc#4905 - confluentinc#4864
- confluentinc#4972 (Avoid unnecessary producer epoch bumps) - confluentinc#4989 (Fully utilize the max.in.flight.requests.per.connection parameter on the idempotent producer) - confluentinc#5055 (Add missing wrlock around rd_kafka_metadata_cache_hint call)
- confluentinc#4972 (Avoid unnecessary producer epoch bumps) - confluentinc#4989 (Fully utilize the max.in.flight.requests.per.connection parameter on the idempotent producer) - confluentinc#5055 (Add missing wrlock around rd_kafka_metadata_cache_hint call)
- confluentinc#4972 (Avoid unnecessary producer epoch bumps) - confluentinc#4989 (Fully utilize the max.in.flight.requests.per.connection parameter on the idempotent producer) - confluentinc#5055 (Add missing wrlock around rd_kafka_metadata_cache_hint call)
- confluentinc#4972 (Avoid unnecessary producer epoch bumps) - confluentinc#4989 (Fully utilize the max.in.flight.requests.per.connection parameter on the idempotent producer) - confluentinc#5055 (Add missing wrlock around rd_kafka_metadata_cache_hint call)
- G-Research#3 (CI/CD script) - confluentinc#4972 (Avoid unnecessary producer epoch bumps) - confluentinc#4989 (Fully utilize the max.in.flight.requests.per.connection parameter on the idempotent producer)
* Hotfix Release: v2.11.0-gr - collective changes - #3 (CI/CD script) - confluentinc#4972 (Avoid unnecessary producer epoch bumps) - confluentinc#4989 (Fully utilize the max.in.flight.requests.per.connection parameter on the idempotent producer) * Fix line endings * Make style checks CI job work It will fail because of some style issues from upstream, but at least it should complete instead of hanging forever. * Build for arm64 linux without emulation
|
Hi Emanuele Sabellico (@emasab) , do you have a rough timeline for when you anticipate being able to merge this change? |
- G-Research#3 (CI/CD script) - confluentinc#4972 (Avoid unnecessary producer epoch bumps) - confluentinc#4989 (Fully utilize the max.in.flight.requests.per.connection parameter on the idempotent producer) - confluentinc#4972 (Avoid unnecessary producer epoch bumps) - confluentinc#4989 (Fully utilize the max.in.flight.requests.per.connection parameter on the idempotent producer) - confluentinc#5168 (Use system-provided cyrus-sasl/libsasl2 at runtime)
- G-Research#3 (CI/CD script) - confluentinc#4972 (Avoid unnecessary producer epoch bumps) - confluentinc#4989 (Fully utilize the max.in.flight.requests.per.connection parameter on the idempotent producer) - confluentinc#5168 (Use system-provided cyrus-sasl/libsasl2 at runtime)
- #3 (CI/CD script) - confluentinc#4972 (Avoid unnecessary producer epoch bumps) - confluentinc#4989 (Fully utilize the max.in.flight.requests.per.connection parameter on the idempotent producer) - confluentinc#5168 (Use system-provided cyrus-sasl/libsasl2 at runtime)
- G-Research#3 (CI/CD script) - confluentinc#4972 (Avoid unnecessary producer epoch bumps) - confluentinc#4989 (Fully utilize the max.in.flight.requests.per.connection parameter on the idempotent producer) - confluentinc#5168 (Use system-provided cyrus-sasl/libsasl2 at runtime)
- G-Research#3 (CI/CD script) - confluentinc#4972 (Avoid unnecessary producer epoch bumps) - confluentinc#4989 (Fully utilize the max.in.flight.requests.per.connection parameter on the idempotent producer) - confluentinc#5168 (Use system-provided cyrus-sasl/libsasl2 at runtime)
- G-Research#3 (CI/CD script) - confluentinc#4972 (Avoid unnecessary producer epoch bumps) - confluentinc#4989 (Fully utilize the max.in.flight.requests.per.connection parameter on the idempotent producer) - confluentinc#5168 (Use system-provided cyrus-sasl/libsasl2 at runtime)
- G-Research#3 (CI/CD script) - confluentinc#4972 (Avoid unnecessary producer epoch bumps) - confluentinc#4989 (Fully utilize the max.in.flight.requests.per.connection parameter on the idempotent producer) - confluentinc#5168 (Use system-provided cyrus-sasl/libsasl2 at runtime)
- G-Research#3 (CI/CD script) - confluentinc#4972 (Avoid unnecessary producer epoch bumps) - confluentinc#4989 (Fully utilize the max.in.flight.requests.per.connection parameter on the idempotent producer) - confluentinc#5168 (Use system-provided cyrus-sasl/libsasl2 at runtime)
- #3 (CI/CD script) - confluentinc#4972 (Avoid unnecessary producer epoch bumps) - confluentinc#4989 (Fully utilize the max.in.flight.requests.per.connection parameter on the idempotent producer) - confluentinc#5168 (Use system-provided cyrus-sasl/libsasl2 at runtime)
| * to the broker's sequence de-duplication window. */ | ||
| max_requests = RD_MIN(max_requests, | ||
| RD_KAFKA_IDEMP_MAX_INFLIGHT - inflight); | ||
| max_requests = rkb->rkb_rk->rk_conf.max_inflight - rd_kafka_bufq_cnt(&rkb->rkb_waitresps); |
There was a problem hiding this comment.
Even with the fix it should still respect the max_requests variable that corresponds to queue.buffering.backpressure.threshold and defaults to 1 and when idempotency is enabled cannot be changed, see here. I think this was before max.in.flight was increased from 1 to 5 for the idempotent producer and we could give an error in case it's lower than 5 but by changing that configuration check.
At the moment it won't limit the in-flight requests to one once we have this fix, but only the output buffers to 1
| max_requests = rkb->rkb_rk->rk_conf.max_inflight - rd_kafka_bufq_cnt(&rkb->rkb_waitresps); | |
| max_requests = RD_MIN(max_requests, | |
| RD_KAFKA_IDEMP_MAX_INFLIGHT - rd_kafka_bufq_cnt(&rkb->rkb_waitresps)); |
There was a problem hiding this comment.
Thanks, I've tested locally whether there is any observable difference in network traffic after your suggestion, and I didn't spot any change (thus it still works as expected).
|
Thanks Marcin Krystianc (@marcin-krystianc) ! Verified again this one and I still agree with the change, I just I request a small edit. |
|
/sem-approve |
402cd13 to
704b784
Compare
…he broker's sequence de-duplication window.
704b784 to
98199cc
Compare
- G-Research#3 (CI/CD script) - confluentinc#4972 (Avoid unnecessary producer epoch bumps) - confluentinc#4989 (Fully utilize the max.in.flight.requests.per.connection parameter on the idempotent producer) - confluentinc#5168 (Use system-provided cyrus-sasl/libsasl2 at runtime)
- G-Research#3 (CI/CD script) - confluentinc#4972 (Avoid unnecessary producer epoch bumps) - confluentinc#4989 (Fully utilize the max.in.flight.requests.per.connection parameter on the idempotent producer) - confluentinc#5168 (Use system-provided cyrus-sasl/libsasl2 at runtime)
…the idempotent producer confluentinc#4989
* CI/CD script * Avoid unnecessary producer epoch bumps confluentinc#4972 * Fully utilize the max.in.flight.requests.per.connection parameter on the idempotent producer confluentinc#4989 * Use system-provided cyrus-sasl/libsasl2 at runtime confluentinc#5168 * Update changelog
|
/sem-approve |
…the idempotent producer confluentinc#4989
* CI/CD script * Avoid unnecessary producer epoch bumps confluentinc#4972 * Fully utilize the max.in.flight.requests.per.connection parameter on the idempotent producer confluentinc#4989 * Use system-provided cyrus-sasl/libsasl2 at runtime confluentinc#5168 * Update changelog
…the idempotent producer confluentinc#4989
* CI/CD script * Avoid unnecessary producer epoch bumps confluentinc#4972 * Fully utilize the max.in.flight.requests.per.connection parameter on the idempotent producer confluentinc#4989 * Use system-provided cyrus-sasl/libsasl2 at runtime confluentinc#5168 * Update changelog
Currently, the librdkafka implementation sends only one produce request at a time, regardless of the configured value for the
max.in.flight.requests.per.connectionparameter. This limitation prevents the producer from fully leveraging the potential of concurrent requests, which can impact throughput and performance.With the proposed changes, the
max.in.flight.requests.per.connectionparameter will be respected, allowing the number of concurrent requests to be aligned with the specified value. This enhancement aims to optimize the idempotent producer's performance by enabling multiple produce requests to be sent simultaneously, thereby improving overall latency.To validate my findings, I conducted tests on a network with a simulated delay of 1000 ms, configured using the
tccommand (tc qdisc add dev eth0 root netem delay 1000ms). I have also attached pcap files for your review.Relevant producer parameters: