Skip to content

Commit 2b7c039

Browse files
KAFKA-18440: Convert AuthorizationException to fatal error in AdminClient (#18435)
Reviewers: Divij Vaidya <[email protected]>
1 parent c6f2276 commit 2b7c039

5 files changed

Lines changed: 111 additions & 24 deletions

File tree

clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,10 @@
2323
import org.apache.kafka.common.Node;
2424
import org.apache.kafka.common.errors.ApiException;
2525
import org.apache.kafka.common.errors.AuthenticationException;
26-
import org.apache.kafka.common.errors.MismatchedEndpointTypeException;
27-
import org.apache.kafka.common.errors.UnsupportedEndpointTypeException;
2826
import org.apache.kafka.common.errors.UnsupportedVersionException;
2927
import org.apache.kafka.common.requests.MetadataResponse;
3028
import org.apache.kafka.common.requests.RequestHeader;
29+
import org.apache.kafka.common.requests.RequestUtils;
3130
import org.apache.kafka.common.utils.LogContext;
3231

3332
import org.slf4j.Logger;
@@ -277,23 +276,21 @@ public void updateFailed(Throwable exception) {
277276
// We depend on pending calls to request another metadata update
278277
this.state = State.QUIESCENT;
279278

280-
if (exception instanceof AuthenticationException) {
281-
log.warn("Metadata update failed due to authentication error", exception);
282-
this.fatalException = (ApiException) exception;
283-
} else if (exception instanceof MismatchedEndpointTypeException) {
284-
log.warn("Metadata update failed due to mismatched endpoint type error", exception);
285-
this.fatalException = (ApiException) exception;
286-
} else if (exception instanceof UnsupportedEndpointTypeException) {
287-
log.warn("Metadata update failed due to unsupported endpoint type error", exception);
288-
this.fatalException = (ApiException) exception;
289-
} else if (exception instanceof UnsupportedVersionException) {
290-
if (usingBootstrapControllers) {
291-
log.warn("The remote node is not a CONTROLLER that supports the KIP-919 " +
292-
"DESCRIBE_CLUSTER api.", exception);
293-
} else {
294-
log.warn("The remote node is not a BROKER that supports the METADATA api.", exception);
279+
if (RequestUtils.isFatalException(exception)) {
280+
log.warn("Fatal error during metadata update", exception);
281+
// avoid unchecked/unconfirmed cast to ApiException
282+
if (exception instanceof ApiException) {
283+
this.fatalException = (ApiException) exception;
284+
}
285+
286+
if (exception instanceof UnsupportedVersionException) {
287+
if (usingBootstrapControllers) {
288+
log.warn("The remote node is not a CONTROLLER that supports the KIP-919 " +
289+
"DESCRIBE_CLUSTER api.", exception);
290+
} else {
291+
log.warn("The remote node is not a BROKER that supports the METADATA api.", exception);
292+
}
295293
}
296-
this.fatalException = (ApiException) exception;
297294
} else {
298295
log.info("Metadata update failed", exception);
299296
}

clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,13 @@
1616
*/
1717
package org.apache.kafka.common.requests;
1818

19+
import org.apache.kafka.common.errors.AuthenticationException;
20+
import org.apache.kafka.common.errors.AuthorizationException;
21+
import org.apache.kafka.common.errors.MismatchedEndpointTypeException;
22+
import org.apache.kafka.common.errors.SecurityDisabledException;
23+
import org.apache.kafka.common.errors.UnsupportedEndpointTypeException;
24+
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
25+
import org.apache.kafka.common.errors.UnsupportedVersionException;
1926
import org.apache.kafka.common.message.ProduceRequestData;
2027
import org.apache.kafka.common.protocol.ByteBufferAccessor;
2128
import org.apache.kafka.common.protocol.Message;
@@ -77,4 +84,14 @@ public static ByteBuffer serialize(
7784
writable.flip();
7885
return writable.buffer();
7986
}
87+
88+
public static boolean isFatalException(Throwable e) {
89+
return e instanceof AuthenticationException ||
90+
e instanceof AuthorizationException ||
91+
e instanceof MismatchedEndpointTypeException ||
92+
e instanceof SecurityDisabledException ||
93+
e instanceof UnsupportedVersionException ||
94+
e instanceof UnsupportedEndpointTypeException ||
95+
e instanceof UnsupportedForMessageFormatException;
96+
}
8097
}

clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminMetadataManagerTest.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.kafka.common.Cluster;
2121
import org.apache.kafka.common.Node;
2222
import org.apache.kafka.common.errors.AuthenticationException;
23+
import org.apache.kafka.common.errors.AuthorizationException;
2324
import org.apache.kafka.common.utils.LogContext;
2425
import org.apache.kafka.common.utils.MockTime;
2526

@@ -98,6 +99,16 @@ public void testAuthenticationFailure() {
9899
assertTrue(mgr.isReady());
99100
}
100101

102+
@Test
103+
public void testAuthorizationFailure() {
104+
mgr.transitionToUpdatePending(time.milliseconds());
105+
mgr.updateFailed(new AuthorizationException("Authorization failed"));
106+
assertEquals(refreshBackoffMs, mgr.metadataFetchDelayMs(time.milliseconds()));
107+
assertThrows(AuthorizationException.class, mgr::isReady);
108+
mgr.update(mockCluster(), time.milliseconds());
109+
assertTrue(mgr.isReady());
110+
}
111+
101112
@Test
102113
public void testNeedsRebootstrap() {
103114
long rebootstrapTriggerMs = 1000;
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kafka.common.requests;
19+
20+
import org.apache.kafka.common.errors.AuthenticationException;
21+
import org.apache.kafka.common.errors.AuthorizationException;
22+
import org.apache.kafka.common.errors.DisconnectException;
23+
import org.apache.kafka.common.errors.MismatchedEndpointTypeException;
24+
import org.apache.kafka.common.errors.SecurityDisabledException;
25+
import org.apache.kafka.common.errors.UnsupportedEndpointTypeException;
26+
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
27+
import org.apache.kafka.common.errors.UnsupportedVersionException;
28+
29+
import org.junit.jupiter.api.Test;
30+
31+
import static org.junit.jupiter.api.Assertions.assertFalse;
32+
import static org.junit.jupiter.api.Assertions.assertTrue;
33+
34+
public class RequestUtilsTest {
35+
@Test
36+
public void testIsFatalException() {
37+
assertTrue(RequestUtils.isFatalException(new AuthenticationException("")));
38+
assertTrue(RequestUtils.isFatalException(new AuthorizationException("")));
39+
assertTrue(RequestUtils.isFatalException(new MismatchedEndpointTypeException("")));
40+
assertTrue(RequestUtils.isFatalException(new SecurityDisabledException("")));
41+
assertTrue(RequestUtils.isFatalException(new UnsupportedEndpointTypeException("")));
42+
assertTrue(RequestUtils.isFatalException(new UnsupportedForMessageFormatException("")));
43+
assertTrue(RequestUtils.isFatalException(new UnsupportedVersionException("")));
44+
45+
// retriable exceptions
46+
assertFalse(RequestUtils.isFatalException(new DisconnectException("")));
47+
}
48+
}

test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.apache.kafka.clients.admin.Admin;
2121
import org.apache.kafka.clients.admin.AdminClientConfig;
2222
import org.apache.kafka.clients.admin.Config;
23-
import org.apache.kafka.clients.admin.DescribeAclsOptions;
2423
import org.apache.kafka.clients.admin.DescribeLogDirsResult;
2524
import org.apache.kafka.clients.admin.NewTopic;
2625
import org.apache.kafka.clients.consumer.Consumer;
@@ -35,7 +34,6 @@
3534
import org.apache.kafka.common.config.SaslConfigs;
3635
import org.apache.kafka.common.errors.ClusterAuthorizationException;
3736
import org.apache.kafka.common.errors.SaslAuthenticationException;
38-
import org.apache.kafka.common.errors.TimeoutException;
3937
import org.apache.kafka.common.errors.TopicAuthorizationException;
4038
import org.apache.kafka.common.security.auth.SecurityProtocol;
4139
import org.apache.kafka.common.serialization.StringDeserializer;
@@ -464,12 +462,12 @@ public void testSaslPlaintext(ClusterInstance clusterInstance) throws Cancellati
464462
}
465463
)
466464
public void testSaslPlaintextWithController(ClusterInstance clusterInstance) throws CancellationException, ExecutionException, InterruptedException {
467-
// test with admin
465+
// default ClusterInstance#admin helper with admin credentials
468466
try (Admin admin = clusterInstance.admin(Map.of(), true)) {
469467
admin.describeAcls(AclBindingFilter.ANY).values().get();
470468
}
471469

472-
// test with non-admin
470+
// client with non-admin credentials
473471
Map<String, Object> nonAdminConfig = Map.of(
474472
SaslConfigs.SASL_JAAS_CONFIG,
475473
String.format(
@@ -480,9 +478,25 @@ public void testSaslPlaintextWithController(ClusterInstance clusterInstance) thr
480478
try (Admin admin = clusterInstance.admin(nonAdminConfig, true)) {
481479
ExecutionException exception = assertThrows(
482480
ExecutionException.class,
483-
() -> admin.describeAcls(AclBindingFilter.ANY, new DescribeAclsOptions().timeoutMs(5000)).values().get()
481+
() -> admin.describeAcls(AclBindingFilter.ANY).values().get()
482+
);
483+
assertInstanceOf(ClusterAuthorizationException.class, exception.getCause());
484+
}
485+
486+
// client with unknown credentials
487+
Map<String, Object> unknownUserConfig = Map.of(
488+
SaslConfigs.SASL_JAAS_CONFIG,
489+
String.format(
490+
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
491+
"unknown", "unknown"
492+
)
493+
);
494+
try (Admin admin = clusterInstance.admin(unknownUserConfig)) {
495+
ExecutionException exception = assertThrows(
496+
ExecutionException.class,
497+
() -> admin.describeAcls(AclBindingFilter.ANY).values().get()
484498
);
485-
assertInstanceOf(TimeoutException.class, exception.getCause());
499+
assertInstanceOf(SaslAuthenticationException.class, exception.getCause());
486500
}
487501
}
488502
}

0 commit comments

Comments
 (0)