Skip to content

Commit 84020bf

Browse files
badaiaqrandistaijuma
authored andcommitted
KAFKA-9313: Set use_all_dns_ips as the new default for client.dns.lookup (KIP-602) (#8644)
This applies to the producer, consumer, admin client, connect worker and inter broker communication. `ClientDnsLookup.DEFAULT` has been deprecated and a warning will be logged if it's explicitly set in a client config. Reviewers: Mickael Maison <[email protected]>, Ismael Juma <[email protected]>
1 parent 66709c0 commit 84020bf

18 files changed

Lines changed: 73 additions & 26 deletions

File tree

clients/src/main/java/org/apache/kafka/clients/ClientDnsLookup.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public enum ClientDnsLookup {
2424
USE_ALL_DNS_IPS("use_all_dns_ips"),
2525
RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY("resolve_canonical_bootstrap_servers_only");
2626

27-
private String clientDnsLookup;
27+
private final String clientDnsLookup;
2828

2929
ClientDnsLookup(String clientDnsLookup) {
3030
this.clientDnsLookup = clientDnsLookup;

clients/src/main/java/org/apache/kafka/clients/ClientUtils.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,13 +108,25 @@ public static ChannelBuilder createChannelBuilder(AbstractConfig config, Time ti
108108

109109
static List<InetAddress> resolve(String host, ClientDnsLookup clientDnsLookup) throws UnknownHostException {
110110
InetAddress[] addresses = InetAddress.getAllByName(host);
111-
if (ClientDnsLookup.USE_ALL_DNS_IPS == clientDnsLookup) {
112-
return filterPreferredAddresses(addresses);
113-
} else {
114-
return Collections.singletonList(addresses[0]);
111+
112+
switch (clientDnsLookup) {
113+
case DEFAULT:
114+
return Collections.singletonList(addresses[0]);
115+
case USE_ALL_DNS_IPS:
116+
case RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY:
117+
return filterPreferredAddresses(addresses);
115118
}
119+
120+
throw new IllegalStateException("Unhandled ClientDnsLookup instance: " + clientDnsLookup);
116121
}
117122

123+
/**
124+
* Return a list containing the first address in `allAddresses` and subsequent addresses
125+
* that are a subtype of the first address.
126+
*
127+
* The outcome is that all returned addresses are either IPv4 or IPv6 (InetAddress has two
128+
* subclasses: Inet4Address and Inet6Address).
129+
*/
118130
static List<InetAddress> filterPreferredAddresses(InetAddress[] allAddresses) {
119131
List<InetAddress> preferredAddresses = new ArrayList<>();
120132
Class<? extends InetAddress> clazz = null;

clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,18 @@ public class CommonClientConfigs {
4242
+ "servers (you may want more than one, though, in case a server is down).";
4343

4444
public static final String CLIENT_DNS_LOOKUP_CONFIG = "client.dns.lookup";
45-
public static final String CLIENT_DNS_LOOKUP_DOC = "Controls how the client uses DNS lookups. If set to <code>use_all_dns_ips</code> then, when the lookup returns multiple IP addresses for a hostname,"
46-
+ " they will all be attempted to connect to before failing the connection. Applies to both bootstrap and advertised servers."
47-
+ " If the value is <code>resolve_canonical_bootstrap_servers_only</code> each entry will be resolved and expanded into a list of canonical names.";
45+
public static final String CLIENT_DNS_LOOKUP_DOC = "Controls how the client uses DNS lookups. "
46+
+ "If set to <code>use_all_dns_ips</code>, connect to each returned IP "
47+
+ "address in sequence until a successful connection is established. "
48+
+ "After a disconnection, the next IP is used. Once all IPs have been "
49+
+ "used once, the client resolves the IP(s) from the hostname again "
50+
+ "(both the JVM and the OS cache DNS name lookups, however). "
51+
+ "If set to <code>resolve_canonical_bootstrap_servers_only</code>, "
52+
+ "resolve each bootstrap address into a list of canonical names. After "
53+
+ "the bootstrap phase, this behaves the same as <code>use_all_dns_ips</code>. "
54+
+ "If set to <code>default</code> (deprecated), attempt to connect to the "
55+
+ "first IP address returned by the lookup, even if the lookup returns multiple "
56+
+ "IP addresses.";
4857

4958
public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";
5059
public static final String METADATA_MAX_AGE_DOC = "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.";
@@ -167,4 +176,13 @@ public static Map<String, Object> postProcessReconnectBackoffConfigs(AbstractCon
167176
}
168177
return rval;
169178
}
179+
180+
public static void warnIfDeprecatedDnsLookupValue(AbstractConfig config) {
181+
String clientDnsLookupValue = config.getString(CLIENT_DNS_LOOKUP_CONFIG);
182+
if (clientDnsLookupValue.equals(ClientDnsLookup.DEFAULT.toString()))
183+
log.warn("Configuration '{}' with value '{}' is deprecated and will be removed in " +
184+
"future version. Please use '{}' or another non-deprecated value.",
185+
CLIENT_DNS_LOOKUP_CONFIG, ClientDnsLookup.DEFAULT,
186+
ClientDnsLookup.USE_ALL_DNS_IPS);
187+
}
170188
}

clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ public class AdminClientConfig extends AbstractConfig {
182182
METRICS_RECORDING_LEVEL_DOC)
183183
.define(CLIENT_DNS_LOOKUP_CONFIG,
184184
Type.STRING,
185-
ClientDnsLookup.DEFAULT.toString(),
185+
ClientDnsLookup.USE_ALL_DNS_IPS.toString(),
186186
in(ClientDnsLookup.DEFAULT.toString(),
187187
ClientDnsLookup.USE_ALL_DNS_IPS.toString(),
188188
ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()),
@@ -205,6 +205,7 @@ public class AdminClientConfig extends AbstractConfig {
205205

206206
@Override
207207
protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) {
208+
CommonClientConfigs.warnIfDeprecatedDnsLookupValue(this);
208209
return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
209210
}
210211

clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ public class ConsumerConfig extends AbstractConfig {
314314
CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
315315
.define(CLIENT_DNS_LOOKUP_CONFIG,
316316
Type.STRING,
317-
ClientDnsLookup.DEFAULT.toString(),
317+
ClientDnsLookup.USE_ALL_DNS_IPS.toString(),
318318
in(ClientDnsLookup.DEFAULT.toString(),
319319
ClientDnsLookup.USE_ALL_DNS_IPS.toString(),
320320
ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()),
@@ -543,6 +543,7 @@ public class ConsumerConfig extends AbstractConfig {
543543

544544
@Override
545545
protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) {
546+
CommonClientConfigs.warnIfDeprecatedDnsLookupValue(this);
546547
Map<String, Object> refinedConfigs = CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
547548
maybeOverrideClientId(refinedConfigs);
548549
return refinedConfigs;

clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ public class ProducerConfig extends AbstractConfig {
279279
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Collections.emptyList(), new ConfigDef.NonNullValidator(), Importance.HIGH, CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
280280
.define(CLIENT_DNS_LOOKUP_CONFIG,
281281
Type.STRING,
282-
ClientDnsLookup.DEFAULT.toString(),
282+
ClientDnsLookup.USE_ALL_DNS_IPS.toString(),
283283
in(ClientDnsLookup.DEFAULT.toString(),
284284
ClientDnsLookup.USE_ALL_DNS_IPS.toString(),
285285
ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()),
@@ -413,6 +413,7 @@ public class ProducerConfig extends AbstractConfig {
413413

414414
@Override
415415
protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) {
416+
CommonClientConfigs.warnIfDeprecatedDnsLookupValue(this);
416417
Map<String, Object> refinedConfigs = CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
417418
maybeOverrideEnableIdempotence(refinedConfigs);
418419
maybeOverrideClientId(refinedConfigs);

clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,21 +97,29 @@ public void testFilterPreferredAddresses() throws UnknownHostException {
9797

9898
@Test(expected = UnknownHostException.class)
9999
public void testResolveUnknownHostException() throws UnknownHostException {
100-
ClientUtils.resolve("some.invalid.hostname.foo.bar.local", ClientDnsLookup.DEFAULT);
100+
ClientUtils.resolve("some.invalid.hostname.foo.bar.local", ClientDnsLookup.USE_ALL_DNS_IPS);
101101
}
102102

103103
@Test
104104
public void testResolveDnsLookup() throws UnknownHostException {
105-
assertEquals(1, ClientUtils.resolve("localhost", ClientDnsLookup.DEFAULT).size());
105+
// Note that kafka.apache.org resolves to 2 IP addresses
106+
assertEquals(1, ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.DEFAULT).size());
106107
}
107108

108109
@Test
109110
public void testResolveDnsLookupAllIps() throws UnknownHostException {
111+
// Note that kafka.apache.org resolves to 2 IP addresses
110112
assertEquals(2, ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.USE_ALL_DNS_IPS).size());
111113
}
112114

115+
@Test
116+
public void testResolveDnsLookupResolveCanonicalBootstrapServers() throws UnknownHostException {
117+
// Note that kafka.apache.org resolves to 2 IP addresses
118+
assertEquals(2, ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY).size());
119+
}
120+
113121
private List<InetSocketAddress> checkWithoutLookup(String... url) {
114-
return ClientUtils.parseAndValidateAddresses(Arrays.asList(url), ClientDnsLookup.DEFAULT);
122+
return ClientUtils.parseAndValidateAddresses(Arrays.asList(url), ClientDnsLookup.USE_ALL_DNS_IPS);
115123
}
116124

117125
private List<InetSocketAddress> checkWithLookup(List<String> url) {

clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ private static Cluster mockCluster(int numNodes, int controllerIndex) {
272272

273273
private static Cluster mockBootstrapCluster() {
274274
return Cluster.bootstrap(ClientUtils.parseAndValidateAddresses(
275-
singletonList("localhost:8121"), ClientDnsLookup.DEFAULT));
275+
singletonList("localhost:8121"), ClientDnsLookup.USE_ALL_DNS_IPS));
276276
}
277277

278278
private static AdminClientUnitTestEnv mockClientEnv(String... configVals) {

clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2040,7 +2040,7 @@ public void testQuotaMetrics() {
20402040
Cluster cluster = TestUtils.singletonCluster("test", 1);
20412041
Node node = cluster.nodes().get(0);
20422042
NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
2043-
1000, 1000, 64 * 1024, 64 * 1024, 1000, ClientDnsLookup.DEFAULT,
2043+
1000, 1000, 64 * 1024, 64 * 1024, 1000, ClientDnsLookup.USE_ALL_DNS_IPS,
20442044
time, true, new ApiVersions(), throttleTimeSensor, new LogContext());
20452045

20462046
ByteBuffer buffer = ApiVersionsResponse.
@@ -3487,7 +3487,7 @@ private void testGetOffsetsForTimesWithError(Errors errorForP0,
34873487
TopicPartition t2p0 = new TopicPartition(topicName2, 0);
34883488
// Expect a metadata refresh.
34893489
metadata.bootstrap(ClientUtils.parseAndValidateAddresses(Collections.singletonList("1.1.1.1:1111"),
3490-
ClientDnsLookup.DEFAULT));
3490+
ClientDnsLookup.USE_ALL_DNS_IPS));
34913491

34923492
Map<String, Integer> partitionNumByTopic = new HashMap<>();
34933493
partitionNumByTopic.put(topicName, 2);

clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ public void testQuotaMetrics() {
262262
Cluster cluster = TestUtils.singletonCluster("test", 1);
263263
Node node = cluster.nodes().get(0);
264264
NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
265-
1000, 1000, 64 * 1024, 64 * 1024, 1000, ClientDnsLookup.DEFAULT,
265+
1000, 1000, 64 * 1024, 64 * 1024, 1000, ClientDnsLookup.USE_ALL_DNS_IPS,
266266
time, true, new ApiVersions(), throttleTimeSensor, logContext);
267267

268268
ByteBuffer buffer = ApiVersionsResponse.createApiVersionsResponse(400, RecordBatch.CURRENT_MAGIC_VALUE).

0 commit comments

Comments
 (0)