Skip to content

Commit 131d475

Browse files
authored
KAFKA-12193: Re-resolve IPs after a client disconnects (apache#9902)
This patch changes the NetworkClient behavior to resolve the target node's hostname after disconnecting from an established connection, rather than waiting until the previously-resolved addresses are exhausted. This is to handle the scenario when the node's IP addresses have changed during the lifetime of the connection, and means that the client does not have to try to connect to invalid IP addresses until it has tried each address. Reviewers: Mickael Maison <[email protected]>, Satish Duggana <[email protected]>, David Jacot <[email protected]>
1 parent 1c56aba commit 131d475

10 files changed

Lines changed: 400 additions & 69 deletions

File tree

clients/src/main/java/org/apache/kafka/clients/ClientUtils.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,9 @@ public static ChannelBuilder createChannelBuilder(AbstractConfig config, Time ti
106106
clientSaslMechanism, time, true, logContext);
107107
}
108108

109-
static List<InetAddress> resolve(String host, ClientDnsLookup clientDnsLookup) throws UnknownHostException {
110-
InetAddress[] addresses = InetAddress.getAllByName(host);
109+
static List<InetAddress> resolve(String host, ClientDnsLookup clientDnsLookup,
110+
HostResolver hostResolver) throws UnknownHostException {
111+
InetAddress[] addresses = hostResolver.resolve(host);
111112

112113
switch (clientDnsLookup) {
113114
case DEFAULT:

clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,14 @@ final class ClusterConnectionStates {
4343
final static double CONNECTION_SETUP_TIMEOUT_JITTER = 0.2;
4444
private final Map<String, NodeConnectionState> nodeState;
4545
private final Logger log;
46+
private final HostResolver hostResolver;
4647
private Set<String> connectingNodes;
4748
private ExponentialBackoff reconnectBackoff;
4849
private ExponentialBackoff connectionSetupTimeout;
4950

5051
public ClusterConnectionStates(long reconnectBackoffMs, long reconnectBackoffMaxMs,
5152
long connectionSetupTimeoutMs, long connectionSetupTimeoutMaxMs,
52-
LogContext logContext) {
53+
LogContext logContext, HostResolver hostResolver) {
5354
this.log = logContext.logger(ClusterConnectionStates.class);
5455
this.reconnectBackoff = new ExponentialBackoff(
5556
reconnectBackoffMs,
@@ -63,6 +64,7 @@ public ClusterConnectionStates(long reconnectBackoffMs, long reconnectBackoffMax
6364
CONNECTION_SETUP_TIMEOUT_JITTER);
6465
this.nodeState = new HashMap<>();
6566
this.connectingNodes = new HashSet<>();
67+
this.hostResolver = hostResolver;
6668
}
6769

6870
/**
@@ -156,7 +158,8 @@ public void connecting(String id, long now, String host, ClientDnsLookup clientD
156158
// Create a new NodeConnectionState if nodeState does not already contain one
157159
// for the specified id or if the hostname associated with the node id changed.
158160
nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, now,
159-
reconnectBackoff.backoff(0), connectionSetupTimeout.backoff(0), host, clientDnsLookup));
161+
reconnectBackoff.backoff(0), connectionSetupTimeout.backoff(0), host,
162+
clientDnsLookup, hostResolver));
160163
connectingNodes.add(id);
161164
}
162165

@@ -183,6 +186,11 @@ public void disconnected(String id, long now) {
183186
connectingNodes.remove(id);
184187
} else {
185188
resetConnectionSetupTimeout(nodeState);
189+
if (nodeState.state.isConnected()) {
190+
// If a connection had previously been established, clear the addresses to trigger a new DNS resolution
191+
// because the node IPs may have changed
192+
nodeState.clearAddresses();
193+
}
186194
}
187195
nodeState.state = ConnectionState.DISCONNECTED;
188196
}
@@ -470,9 +478,11 @@ private static class NodeConnectionState {
470478
private int addressIndex;
471479
private final String host;
472480
private final ClientDnsLookup clientDnsLookup;
481+
private final HostResolver hostResolver;
473482

474483
private NodeConnectionState(ConnectionState state, long lastConnectAttempt, long reconnectBackoffMs,
475-
long connectionSetupTimeoutMs, String host, ClientDnsLookup clientDnsLookup) {
484+
long connectionSetupTimeoutMs, String host, ClientDnsLookup clientDnsLookup,
485+
HostResolver hostResolver) {
476486
this.state = state;
477487
this.addresses = Collections.emptyList();
478488
this.addressIndex = -1;
@@ -484,6 +494,7 @@ private NodeConnectionState(ConnectionState state, long lastConnectAttempt, long
484494
this.throttleUntilTimeMs = 0;
485495
this.host = host;
486496
this.clientDnsLookup = clientDnsLookup;
497+
this.hostResolver = hostResolver;
487498
}
488499

489500
public String host() {
@@ -498,7 +509,7 @@ public String host() {
498509
private InetAddress currentAddress() throws UnknownHostException {
499510
if (addresses.isEmpty()) {
500511
// (Re-)initialize list
501-
addresses = ClientUtils.resolve(host, clientDnsLookup);
512+
addresses = ClientUtils.resolve(host, clientDnsLookup, hostResolver);
502513
addressIndex = 0;
503514
}
504515

@@ -518,6 +529,13 @@ private void moveToNextAddress() {
518529
addresses = Collections.emptyList(); // Exhausted list. Re-resolve on next currentAddress() call
519530
}
520531

532+
/**
533+
* Clears the resolved addresses in order to trigger re-resolving on the next {@link #currentAddress()} call.
534+
*/
535+
private void clearAddresses() {
536+
addresses = Collections.emptyList();
537+
}
538+
521539
public String toString() {
522540
return "NodeState(" + state + ", " + lastConnectAttemptMs + ", " + failedAttempts + ", " + throttleUntilTimeMs + ")";
523541
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kafka.clients;
19+
20+
import java.net.InetAddress;
21+
import java.net.UnknownHostException;
22+
23+
public class DefaultHostResolver implements HostResolver {
24+
25+
@Override
26+
public InetAddress[] resolve(String host) throws UnknownHostException {
27+
return InetAddress.getAllByName(host);
28+
}
29+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kafka.clients;
19+
20+
import java.net.InetAddress;
21+
import java.net.UnknownHostException;
22+
23+
public interface HostResolver {
24+
25+
InetAddress[] resolve(String host) throws UnknownHostException;
26+
}

clients/src/main/java/org/apache/kafka/clients/NetworkClient.java

Lines changed: 41 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -146,9 +146,8 @@ public NetworkClient(Selectable selector,
146146
boolean discoverBrokerVersions,
147147
ApiVersions apiVersions,
148148
LogContext logContext) {
149-
this(null,
149+
this(selector,
150150
metadata,
151-
selector,
152151
clientId,
153152
maxInFlightRequestsPerConnection,
154153
reconnectBackoffMs,
@@ -167,22 +166,22 @@ public NetworkClient(Selectable selector,
167166
}
168167

169168
public NetworkClient(Selectable selector,
170-
Metadata metadata,
171-
String clientId,
172-
int maxInFlightRequestsPerConnection,
173-
long reconnectBackoffMs,
174-
long reconnectBackoffMax,
175-
int socketSendBuffer,
176-
int socketReceiveBuffer,
177-
int defaultRequestTimeoutMs,
178-
long connectionSetupTimeoutMs,
179-
long connectionSetupTimeoutMaxMs,
180-
ClientDnsLookup clientDnsLookup,
181-
Time time,
182-
boolean discoverBrokerVersions,
183-
ApiVersions apiVersions,
184-
Sensor throttleTimeSensor,
185-
LogContext logContext) {
169+
Metadata metadata,
170+
String clientId,
171+
int maxInFlightRequestsPerConnection,
172+
long reconnectBackoffMs,
173+
long reconnectBackoffMax,
174+
int socketSendBuffer,
175+
int socketReceiveBuffer,
176+
int defaultRequestTimeoutMs,
177+
long connectionSetupTimeoutMs,
178+
long connectionSetupTimeoutMaxMs,
179+
ClientDnsLookup clientDnsLookup,
180+
Time time,
181+
boolean discoverBrokerVersions,
182+
ApiVersions apiVersions,
183+
Sensor throttleTimeSensor,
184+
LogContext logContext) {
186185
this(null,
187186
metadata,
188187
selector,
@@ -200,7 +199,8 @@ public NetworkClient(Selectable selector,
200199
discoverBrokerVersions,
201200
apiVersions,
202201
throttleTimeSensor,
203-
logContext);
202+
logContext,
203+
new DefaultHostResolver());
204204
}
205205

206206
public NetworkClient(Selectable selector,
@@ -236,27 +236,29 @@ public NetworkClient(Selectable selector,
236236
discoverBrokerVersions,
237237
apiVersions,
238238
null,
239-
logContext);
239+
logContext,
240+
new DefaultHostResolver());
240241
}
241242

242-
private NetworkClient(MetadataUpdater metadataUpdater,
243-
Metadata metadata,
244-
Selectable selector,
245-
String clientId,
246-
int maxInFlightRequestsPerConnection,
247-
long reconnectBackoffMs,
248-
long reconnectBackoffMax,
249-
int socketSendBuffer,
250-
int socketReceiveBuffer,
251-
int defaultRequestTimeoutMs,
252-
long connectionSetupTimeoutMs,
253-
long connectionSetupTimeoutMaxMs,
254-
ClientDnsLookup clientDnsLookup,
255-
Time time,
256-
boolean discoverBrokerVersions,
257-
ApiVersions apiVersions,
258-
Sensor throttleTimeSensor,
259-
LogContext logContext) {
243+
public NetworkClient(MetadataUpdater metadataUpdater,
244+
Metadata metadata,
245+
Selectable selector,
246+
String clientId,
247+
int maxInFlightRequestsPerConnection,
248+
long reconnectBackoffMs,
249+
long reconnectBackoffMax,
250+
int socketSendBuffer,
251+
int socketReceiveBuffer,
252+
int defaultRequestTimeoutMs,
253+
long connectionSetupTimeoutMs,
254+
long connectionSetupTimeoutMaxMs,
255+
ClientDnsLookup clientDnsLookup,
256+
Time time,
257+
boolean discoverBrokerVersions,
258+
ApiVersions apiVersions,
259+
Sensor throttleTimeSensor,
260+
LogContext logContext,
261+
HostResolver hostResolver) {
260262
/* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor, but it's not
261263
* possible because `DefaultMetadataUpdater` is an inner class and it can only be instantiated after the
262264
* super constructor is invoked.
@@ -273,7 +275,7 @@ private NetworkClient(MetadataUpdater metadataUpdater,
273275
this.inFlightRequests = new InFlightRequests(maxInFlightRequestsPerConnection);
274276
this.connectionStates = new ClusterConnectionStates(
275277
reconnectBackoffMs, reconnectBackoffMax,
276-
connectionSetupTimeoutMs, connectionSetupTimeoutMaxMs, logContext);
278+
connectionSetupTimeoutMs, connectionSetupTimeoutMaxMs, logContext, hostResolver);
277279
this.socketSendBuffer = socketSendBuffer;
278280
this.socketReceiveBuffer = socketReceiveBuffer;
279281
this.correlation = 0;
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.clients;
18+
19+
import java.net.InetAddress;
20+
21+
class AddressChangeHostResolver implements HostResolver {
22+
private boolean useNewAddresses;
23+
private InetAddress[] initialAddresses;
24+
private InetAddress[] newAddresses;
25+
private int resolutionCount = 0;
26+
27+
public AddressChangeHostResolver(InetAddress[] initialAddresses, InetAddress[] newAddresses) {
28+
this.initialAddresses = initialAddresses;
29+
this.newAddresses = newAddresses;
30+
}
31+
32+
@Override
33+
public InetAddress[] resolve(String host) {
34+
++resolutionCount;
35+
return useNewAddresses ? newAddresses : initialAddresses;
36+
}
37+
38+
public void changeAddresses() {
39+
useNewAddresses = true;
40+
}
41+
42+
public boolean useNewAddresses() {
43+
return useNewAddresses;
44+
}
45+
46+
public int resolutionCount() {
47+
return resolutionCount;
48+
}
49+
}

clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
public class ClientUtilsTest {
3535

36+
private HostResolver hostResolver = new DefaultHostResolver();
3637

3738
@Test
3839
public void testParseAndValidateAddresses() throws UnknownHostException {
@@ -102,25 +103,25 @@ public void testFilterPreferredAddresses() throws UnknownHostException {
102103
@Test
103104
public void testResolveUnknownHostException() {
104105
assertThrows(UnknownHostException.class,
105-
() -> ClientUtils.resolve("some.invalid.hostname.foo.bar.local", ClientDnsLookup.USE_ALL_DNS_IPS));
106+
() -> ClientUtils.resolve("some.invalid.hostname.foo.bar.local", ClientDnsLookup.USE_ALL_DNS_IPS, hostResolver));
106107
}
107108

108109
@Test
109110
public void testResolveDnsLookup() throws UnknownHostException {
110111
// Note that kafka.apache.org resolves to at least 2 IP addresses
111-
assertEquals(1, ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.DEFAULT).size());
112+
assertEquals(1, ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.DEFAULT, hostResolver).size());
112113
}
113114

114115
@Test
115116
public void testResolveDnsLookupAllIps() throws UnknownHostException {
116117
// Note that kafka.apache.org resolves to at least 2 IP addresses
117-
assertTrue(ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.USE_ALL_DNS_IPS).size() > 1);
118+
assertTrue(ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.USE_ALL_DNS_IPS, hostResolver).size() > 1);
118119
}
119120

120121
@Test
121122
public void testResolveDnsLookupResolveCanonicalBootstrapServers() throws UnknownHostException {
122123
// Note that kafka.apache.org resolves to at least 2 IP addresses
123-
assertTrue(ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY).size() > 1);
124+
assertTrue(ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY, hostResolver).size() > 1);
124125
}
125126

126127
private List<InetSocketAddress> checkWithoutLookup(String... url) {

0 commit comments

Comments
 (0)