7272import org .apache .pulsar .common .api .proto .FeatureFlags ;
7373import org .apache .pulsar .common .api .proto .ProtocolVersion ;
7474import org .apache .pulsar .common .api .proto .ServerError ;
75+ import org .apache .pulsar .common .configuration .anonymizer .DefaultAuthenticationRoleLoggingAnonymizer ;
7576import org .apache .pulsar .common .protocol .Commands ;
7677import org .apache .pulsar .common .protocol .PulsarHandler ;
7778import org .apache .pulsar .common .util .Runnables ;
@@ -120,6 +121,7 @@ public class ProxyConnection extends PulsarHandler {
120121 private int protocolVersionToAdvertise ;
121122 private String proxyToBrokerUrl ;
122123 private HAProxyMessage haProxyMessage ;
124+ private final DefaultAuthenticationRoleLoggingAnonymizer authenticationRoleLoggingAnonymizer ;
123125
124126 protected static final Integer SPLICE_BYTES = 1024 * 1024 * 1024 ;
125127 private static final byte [] EMPTY_CREDENTIALS = new byte [0 ];
@@ -161,6 +163,8 @@ public ProxyConnection(ProxyService proxyService, DnsAddressResolverGroup dnsAdd
161163 this .state = State .Init ;
162164 this .brokerProxyValidator = service .getBrokerProxyValidator ();
163165 this .connectionController = proxyService .getConnectionController ();
166+ this .authenticationRoleLoggingAnonymizer = new DefaultAuthenticationRoleLoggingAnonymizer (
167+ proxyService .getConfiguration ().getAuthenticationRoleLoggingAnonymizer ());
164168 }
165169
166170 @ Override
@@ -343,16 +347,17 @@ protected static boolean isTlsChannel(Channel channel) {
343347
344348 private synchronized void completeConnect () throws PulsarClientException {
345349 checkArgument (state == State .Connecting );
350+ String maybeAnonymizedClientAuthRole = authenticationRoleLoggingAnonymizer .anonymize (clientAuthRole );
346351 LOG .info ("[{}] complete connection, init proxy handler. authenticated with {} role {}, hasProxyToBrokerUrl: {}" ,
347- remoteAddress , authMethod , clientAuthRole , hasProxyToBrokerUrl );
352+ remoteAddress , authMethod , maybeAnonymizedClientAuthRole , hasProxyToBrokerUrl );
348353 if (hasProxyToBrokerUrl ) {
349354 // Optimize proxy connection to fail-fast if the target broker isn't active
350355 // Pulsar client will retry connecting after a back off timeout
351356 if (service .getConfiguration ().isCheckActiveBrokers ()
352357 && !isBrokerActive (proxyToBrokerUrl )) {
353358 state = State .Closing ;
354359 LOG .warn ("[{}] Target broker '{}' isn't available. authenticated with {} role {}." ,
355- remoteAddress , proxyToBrokerUrl , authMethod , clientAuthRole );
360+ remoteAddress , proxyToBrokerUrl , authMethod , maybeAnonymizedClientAuthRole );
356361 final ByteBuf msg = Commands .newError (-1 ,
357362 ServerError .ServiceNotReady , "Target broker isn't available." );
358363 writeAndFlushAndClose (msg );
@@ -371,10 +376,11 @@ private synchronized void completeConnect() throws PulsarClientException {
371376
372377 LOG .warn ("[{}] Target broker '{}' cannot be validated. {}. authenticated with {} role {}." ,
373378 remoteAddress , proxyToBrokerUrl , targetAddressDeniedException .getMessage (),
374- authMethod , clientAuthRole );
379+ authMethod , maybeAnonymizedClientAuthRole );
375380 } else {
376381 LOG .error ("[{}] Error validating target broker '{}'. authenticated with {} role {}." ,
377- remoteAddress , proxyToBrokerUrl , authMethod , clientAuthRole , throwable );
382+ remoteAddress , proxyToBrokerUrl , authMethod , maybeAnonymizedClientAuthRole ,
383+ throwable );
378384 }
379385 final ByteBuf msg = Commands .newError (-1 , ServerError .ServiceNotReady ,
380386 "Target broker cannot be validated." );
@@ -401,7 +407,7 @@ private synchronized void completeConnect() throws PulsarClientException {
401407 Optional .of (dnsAddressResolverGroup .getResolver (service .getWorkerGroup ().next ())), null );
402408 } else {
403409 LOG .error ("BUG! Connection Pool has already been created for proxy connection to {} state {} role {}" ,
404- remoteAddress , state , clientAuthRole );
410+ remoteAddress , state , maybeAnonymizedClientAuthRole );
405411 }
406412
407413 state = State .ProxyLookupRequests ;
@@ -488,7 +494,7 @@ protected void authChallengeSuccessCallback(AuthData authChallenge) {
488494 clientAuthRole = authState .getAuthRole ();
489495 if (LOG .isDebugEnabled ()) {
490496 LOG .debug ("[{}] Client successfully authenticated with {} role {}" ,
491- remoteAddress , authMethod , clientAuthRole );
497+ remoteAddress , authMethod , authenticationRoleLoggingAnonymizer . anonymize ( clientAuthRole ) );
492498 }
493499
494500 // First connection
0 commit comments