Skip to content
Merged
17 changes: 16 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

librdkafka v2.11.0 is a feature release:

* [KIP-1102](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1102%3A+Enable+clients+to+rebootstrap+based+on+timeout+or+error+code) Enable clients to rebootstrap based on timeout or error code (#4981).
Comment thread
pranavrth marked this conversation as resolved.
* Fix for poll ratio calculation in case the queues are forwarded (#5017).


Expand Down Expand Up @@ -124,8 +125,22 @@ librdkafka v2.10.0 is a feature release:
> The [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol) consumer is currently in **Preview** and should not be used in production environments. Implementation is feature complete but contract could have minor changes before General Availability.


## Upgrade considerations


Starting from this version, brokers not reported in Metadata RPC call are
removed along with their threads. Brokers and their threads are added back
when they appear in a Metadata RPC response again. When no brokers are left
or they're not reachable, the client will start a re-bootstrap sequence
by default. `metadata.recovery.strategy` controls this,
which defaults to `rebootstrap`.
Setting `metadata.recovery.strategy` to `none` avoids any re-bootstrapping and
leaves only the broker received in last successful metadata response.


## Enhancements and Fixes

* [KIP-899](https://cwiki.apache.org/confluence/display/KAFKA/KIP-899%3A+Allow+producer+and+consumer+clients+to+rebootstrap) Allow producer and consumer clients to rebootstrap
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be upgrading the release notes as well. How should we do that?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean the release in GitHub? I can modify it after merging

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

* Identify brokers only by broker id (#4557, @mfleming)
* Remove unavailable brokers and their thread (#4557, @mfleming)
* Commits during a cooperative incremental rebalance aren't causing
Expand Down Expand Up @@ -175,7 +190,7 @@ librdkafka v2.10.0 is a feature release:
and connection.
Happens since 1.x (#4557, @mfleming).
* Issues: #4557
Remove brokers not reported in a metadata call, along with their thread.
Remove brokers not reported in a metadata call, along with their threads.
Avoids that unavailable brokers are selected for a new connection when
there's no one available. We cannot tell if a broker was removed
temporarily or permanently so we always remove it and it'll be added back when
Expand Down
3 changes: 2 additions & 1 deletion CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ message.copy.max.bytes | * | 0 .. 1000000000 | 65535
receive.message.max.bytes | * | 1000 .. 2147483647 | 100000000 | medium | Maximum Kafka protocol response message size. This serves as a safety precaution to avoid memory exhaustion in case of protocol hickups. This value must be at least `fetch.max.bytes` + 512 to allow for protocol overhead; the value is adjusted automatically unless the configuration property is explicitly set. <br>*Type: integer*
max.in.flight.requests.per.connection | * | 1 .. 1000000 | 1000000 | low | Maximum number of in-flight requests per broker connection. This is a generic property applied to all broker communication, however it is primarily relevant to produce requests. In particular, note that other mechanisms limit the number of outstanding consumer fetch request per broker to one. <br>*Type: integer*
max.in.flight | * | 1 .. 1000000 | 1000000 | low | Alias for `max.in.flight.requests.per.connection`: Maximum number of in-flight requests per broker connection. This is a generic property applied to all broker communication, however it is primarily relevant to produce requests. In particular, note that other mechanisms limit the number of outstanding consumer fetch request per broker to one. <br>*Type: integer*
metadata.recovery.strategy | * | none, rebootstrap | rebootstrap | low | Controls how the client recovers when none of the brokers known to it is available. If set to `none`, the client fails with a fatal error. If set to `rebootstrap`, the client repeats the bootstrap process using `bootstrap.servers` and brokers added through `rd_kafka_brokers_add()`. Rebootstrapping is useful when a client communicates with brokers so infrequently that the set of brokers may change entirely before the client refreshes metadata. Metadata recovery is triggered when all last-known brokers appear unavailable simultaneously. <br>*Type: enum value*
metadata.recovery.strategy | * | none, rebootstrap | rebootstrap | low | Controls how the client recovers when none of the brokers known to it is available. If set to `none`, the client doesn't re-bootstrap. If set to `rebootstrap`, the client repeats the bootstrap process using `bootstrap.servers` and brokers added through `rd_kafka_brokers_add()`. Rebootstrapping is useful when a client communicates with brokers so infrequently that the set of brokers may change entirely before the client refreshes metadata. Metadata recovery is triggered when all last-known brokers appear unavailable simultaneously or the client cannot refresh metadata within `metadata.recovery.rebootstrap.trigger.ms` or it's requested in a metadata response. <br>*Type: enum value*
metadata.recovery.rebootstrap.trigger.ms | * | 0 .. 2147483647 | 300000 | low | If a client configured to rebootstrap using `metadata.recovery.strategy=rebootstrap` is unable to obtain metadata from any of the brokers for this interval, client repeats the bootstrap process using `bootstrap.servers` configuration and brokers added through `rd_kafka_brokers_add()`. <br>*Type: integer*
topic.metadata.refresh.interval.ms | * | -1 .. 3600000 | 300000 | low | Period of time in milliseconds at which topic and broker metadata is refreshed in order to proactively discover any new brokers, topics, partitions or partition leader changes. Use -1 to disable the intervalled refresh (not recommended). If there are no locally referenced topics (no topic objects created, no messages produced, no subscription or no assignment) then only the broker list will be refreshed every interval but no more often than every 10s. <br>*Type: integer*
metadata.max.age.ms | * | 1 .. 86400000 | 900000 | low | Metadata cache max age. Defaults to topic.metadata.refresh.interval.ms * 3 <br>*Type: integer*
topic.metadata.refresh.fast.interval.ms | * | 1 .. 60000 | 100 | low | When a topic loses its leader a new metadata request will be enqueued immediately and then with this initial interval, exponentially increasing upto `retry.backoff.max.ms`, until the topic metadata has been refreshed. If not set explicitly, it will be defaulted to `retry.backoff.ms`. This is used to recover quickly from transitioning leader brokers. <br>*Type: integer*
Expand Down
5 changes: 3 additions & 2 deletions INTRODUCTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -2132,10 +2132,11 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf
| KIP-735 - Increase default consumer session timeout | 3.0.0 | Supported |
| KIP-768 - SASL/OAUTHBEARER OIDC support | 3.0 | Supported |
| KIP-881 - Rack-aware Partition Assignment for Kafka Consumers | 3.5.0 | Supported |
| KIP-848 - The Next Generation of the Consumer Rebalance Protocol | 4.0.0 | Preview |
| KIP-848 - The Next Generation of the Consumer Rebalance Protocol | 4.0.0 | Preview |
| KIP-899 - Allow producer and consumer clients to rebootstrap | 3.8.0 | Supported |
| KIP-951 - Leader discovery optimisations for the client | 3.7.0 | Supported |
| KIP-1082 - Require Client-Generated IDs over the ConsumerGroupHeartbeat | 4.0.0 | Supported |
| KIP-1102 - Enable clients to rebootstrap based on timeout or error code | 4.0.0 | Supported |



Expand All @@ -2153,7 +2154,7 @@ release of librdkafka.
| 0 | Produce | 12 | 10 |
| 1 | Fetch | 17 | 16 |
| 2 | ListOffsets | 10 | 7 |
| 3 | Metadata | 13 | 12 |
| 3 | Metadata | 13 | 13 |
| 8 | OffsetCommit | 9 | 9 |
| 9 | OffsetFetch | 9 | 9 |
| 10 | FindCoordinator | 6 | 2 |
Expand Down
34 changes: 26 additions & 8 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ static const struct rd_kafka_err_desc rd_kafka_err_descs[] = {
"maximum size the broker will accept"),
_ERR_DESC(RD_KAFKA_RESP_ERR_REBOOTSTRAP_REQUIRED,
"Broker: Client metadata is stale, "
"client should rebootstrap to obtain new metadata."),
"client should rebootstrap to obtain new metadata"),
_ERR_DESC(RD_KAFKA_RESP_ERR__END, NULL)};


Expand Down Expand Up @@ -2077,15 +2077,13 @@ static void rd_kafka_rebootstrap_tmr_cb(rd_kafka_timers_t *rkts, void *arg) {
/* Avoid re-bootstrapping while terminating */
return;

rd_dassert(rk->rk_conf.metadata_recovery_strategy !=
RD_KAFKA_METADATA_RECOVERY_STRATEGY_NONE);
if (rk->rk_conf.metadata_recovery_strategy ==
RD_KAFKA_METADATA_RECOVERY_STRATEGY_NONE) {
rd_kafka_set_fatal_error(
rk, RD_KAFKA_RESP_ERR_REBOOTSTRAP_REQUIRED, "%s",
"Lost connection to broker(s) "
"and metadata recovery with re-bootstrap "
"is disabled");
RD_KAFKA_METADATA_RECOVERY_STRATEGY_NONE)
/* This function should not be called in this case.
* this is just a fail-safe. */
return;
}

rd_kafka_dbg(rk, ALL, "REBOOTSTRAP", "Starting re-bootstrap sequence");

Expand Down Expand Up @@ -2801,11 +2799,31 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
* Schedules a rebootstrap of the cluster immediately.
*/
void rd_kafka_rebootstrap(rd_kafka_t *rk) {
if (rk->rk_conf.metadata_recovery_strategy ==
RD_KAFKA_METADATA_RECOVERY_STRATEGY_NONE)
return;

Comment thread
pranavrth marked this conversation as resolved.
rd_kafka_timer_start_oneshot(&rk->rk_timers, &rk->rebootstrap_tmr,
rd_true /*restart*/, 0,
rd_kafka_rebootstrap_tmr_cb, NULL);
}

/**
* Restarts rebootstrap timer with the configured interval.
*
* @locks none
* @locality any
*/
void rd_kafka_rebootstrap_tmr_restart(rd_kafka_t *rk) {
if (rk->rk_conf.metadata_recovery_strategy ==
RD_KAFKA_METADATA_RECOVERY_STRATEGY_NONE)
return;

Comment thread
pranavrth marked this conversation as resolved.
rd_kafka_timer_start_oneshot(
&rk->rk_timers, &rk->rebootstrap_tmr, rd_true /*restart*/,
rk->rk_conf.metadata_recovery_rebootstrap_trigger_ms * 1000LL,
rd_kafka_rebootstrap_tmr_cb, NULL);
}

/**
* Counts usage of the legacy/simple consumer (rd_kafka_consume_start() with
Expand Down
17 changes: 15 additions & 2 deletions src/rdkafka_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -441,18 +441,31 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
{_RK_GLOBAL, "metadata.recovery.strategy", _RK_C_S2I,
_RK(metadata_recovery_strategy),
"Controls how the client recovers when none of the brokers known to it "
"is available. If set to `none`, the client fails with a fatal error. "
"is available. If set to `none`, the client doesn't re-bootstrap. "
"If set to `rebootstrap`, the client repeats the bootstrap process "
"using `bootstrap.servers` and brokers added through "
"`rd_kafka_brokers_add()`. Rebootstrapping is useful when a client "
"communicates with brokers so infrequently that the set of brokers "
"may change entirely before the client refreshes metadata. "
"Metadata recovery is triggered when all last-known brokers appear "
"unavailable simultaneously.",
"unavailable simultaneously or the client cannot refresh metadata within "
"`metadata.recovery.rebootstrap.trigger.ms` or it's requested in a "
"metadata response.",
.vdef = RD_KAFKA_METADATA_RECOVERY_STRATEGY_REBOOTSTRAP,
.s2i = {{RD_KAFKA_METADATA_RECOVERY_STRATEGY_NONE, "none"},
{RD_KAFKA_METADATA_RECOVERY_STRATEGY_REBOOTSTRAP, "rebootstrap"},
{0, NULL}}},
{_RK_GLOBAL, "metadata.recovery.rebootstrap.trigger.ms", _RK_C_INT,
_RK(metadata_recovery_rebootstrap_trigger_ms),
"If a client configured to rebootstrap using "
"`metadata.recovery.strategy=rebootstrap` "
"is unable to obtain metadata from any "
"of the brokers for this interval, "
"client repeats the bootstrap process using "
"`bootstrap.servers` configuration "
"and brokers added through "
"`rd_kafka_brokers_add()`.",
0, INT_MAX, 300000},
{_RK_GLOBAL | _RK_DEPRECATED | _RK_HIDDEN, "metadata.request.timeout.ms",
_RK_C_INT, _RK(metadata_request_timeout_ms), "Not used.", 10, 900 * 1000,
10},
Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ struct rd_kafka_conf_s {
int msg_copy_max_size;
int recv_max_msg_size;
int max_inflight;
int metadata_recovery_rebootstrap_trigger_ms;
int metadata_request_timeout_ms;
int metadata_refresh_interval_ms;
int metadata_refresh_fast_cnt;
Expand Down
15 changes: 11 additions & 4 deletions src/rdkafka_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -1088,6 +1088,14 @@ int rd_kafka_set_fatal_error0(rd_kafka_t *rk,

rd_kafka_error_t *rd_kafka_get_fatal_error(rd_kafka_t *rk);

#define rd_kafka_producer_can_have_fatal_errors(rk) \
(rk->rk_type == RD_KAFKA_PRODUCER && rk->rk_conf.eos.idempotence)

#define rd_kafka_consumer_can_have_fatal_errors(rk) \
(rk->rk_type == RD_KAFKA_CONSUMER && \
(rk->rk_conf.group_instance_id || \
rk->rk_conf.group_protocol == RD_KAFKA_GROUP_PROTOCOL_CONSUMER))

static RD_INLINE RD_UNUSED rd_kafka_resp_err_t
rd_kafka_fatal_error_code(rd_kafka_t *rk) {
/* This is an optimization to avoid an atomic read which are costly
Expand All @@ -1097,10 +1105,8 @@ rd_kafka_fatal_error_code(rd_kafka_t *rk) {
* 2) static consumers (group.instance.id)
* 3) Group using consumer protocol (Introduced in KIP-848). See exact
* errors in rd_kafka_cgrp_handle_ConsumerGroupHeartbeat() */
if ((rk->rk_type == RD_KAFKA_PRODUCER && rk->rk_conf.eos.idempotence) ||
(rk->rk_type == RD_KAFKA_CONSUMER &&
(rk->rk_conf.group_instance_id ||
rk->rk_conf.group_protocol == RD_KAFKA_GROUP_PROTOCOL_CONSUMER)))
if (rd_kafka_producer_can_have_fatal_errors(rk) ||
rd_kafka_consumer_can_have_fatal_errors(rk))
return rd_atomic32_get(&rk->rk_fatal.err);

return RD_KAFKA_RESP_ERR_NO_ERROR;
Expand Down Expand Up @@ -1241,5 +1247,6 @@ rd_kafka_resp_err_t rd_kafka_background_thread_create(rd_kafka_t *rk,

void rd_kafka_rebootstrap(rd_kafka_t *rk);

void rd_kafka_rebootstrap_tmr_restart(rd_kafka_t *rk);

#endif /* _RDKAFKA_INT_H_ */
15 changes: 15 additions & 0 deletions src/rdkafka_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,7 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
int broker_changes = 0;
int cache_changes = 0;
int cgrp_subscription_version = -1;
int16_t ErrorCode = 0;

/* If client rack is present, the metadata cache (topic or full) needs
* to contain the partition to rack map. */
Expand Down Expand Up @@ -872,8 +873,21 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
ClusterAuthorizedOperations;
}

if (ApiVersion >= 13) {
rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
}

rd_kafka_buf_skip_tags(rkbuf);

if (ErrorCode) {
rd_rkb_dbg(rkb, METADATA, "METADATA",
"Metadata response: received top level "
"error code %" PRId16 ": %s",
ErrorCode, rd_kafka_err2str(ErrorCode));
err = ErrorCode;
goto err;
}

/* Entire Metadata response now parsed without errors:
* update our internal state according to the response. */

Expand Down Expand Up @@ -1025,6 +1039,7 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
rd_kafka_wrlock(rkb->rkb_rk);

rkb->rkb_rk->rk_ts_metadata = rd_clock();
rd_kafka_rebootstrap_tmr_restart(rkb->rkb_rk);

/* Update cached cluster id. */
if (RD_KAFKAP_STR_LEN(&cluster_id) > 0 &&
Expand Down
16 changes: 11 additions & 5 deletions src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -2638,12 +2638,18 @@ static void rd_kafka_handle_Metadata(rd_kafka_t *rk,
goto done;

err:
actions = rd_kafka_err_action(rkb, err, request,
actions = rd_kafka_err_action(
rkb, err, request,

RD_KAFKA_ERR_ACTION_RETRY,
RD_KAFKA_RESP_ERR__PARTIAL,
RD_KAFKA_ERR_ACTION_SPECIAL, RD_KAFKA_RESP_ERR_REBOOTSTRAP_REQUIRED,

RD_KAFKA_ERR_ACTION_END);
RD_KAFKA_ERR_ACTION_RETRY, RD_KAFKA_RESP_ERR__PARTIAL,

RD_KAFKA_ERR_ACTION_END);

if (actions & RD_KAFKA_ERR_ACTION_SPECIAL) {
rd_kafka_rebootstrap(rk);
}

if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
/* In case it's a brokers full refresh call,
Expand Down Expand Up @@ -2765,7 +2771,7 @@ rd_kafka_MetadataRequest0(rd_kafka_broker_t *rkb,
int *full_incr = NULL;
void *handler_arg = NULL;
rd_kafka_resp_cb_t *handler_cb = rd_kafka_handle_Metadata;
int16_t metadata_max_version = 12;
int16_t metadata_max_version = 13;
rd_kafka_replyq_t use_replyq = replyq;

/* In case we want cluster authorized operations in the Metadata
Expand Down