Enterprise Java

Kafka Security with SASL/PLAIN Authentication

Kafka is a distributed streaming platform widely used for building real-time data pipelines and streaming applications. Security is a critical aspect when deploying Kafka in production environments. One of the simplest ways to secure Kafka is by using SASL/PLAIN authentication, which provides username-password based authentication between clients and Kafka brokers. Let us delve into understanding Kafka authentication using SASL/PLAIN.

1. What is Kafka? What is SASL/PLAIN Authentication?

Kafka is an open-source distributed event streaming platform capable of handling trillions of events a day. It allows producers to send messages to topics, and consumers to read these messages in real time or batch mode. Kafka provides high throughput, fault tolerance, and scalability.

SASL (Simple Authentication and Security Layer) is a framework that adds authentication support to connection-based protocols. SASL/PLAIN is one mechanism under SASL that uses a simple username and password for client authentication. Though not encrypted by itself, SASL/PLAIN is often used in conjunction with TLS to protect credentials over the network. Using SASL/PLAIN with Kafka, the client authenticates with the broker using a configured username and password, enabling controlled access to Kafka topics and operations.

2. Code Example

2.1 Setting Up Kafka on Docker with SASL/PLAIN

Below is a detailed guide and configuration for running Kafka and Zookeeper using Docker Compose with SASL/PLAIN authentication enabled. This setup enables Kafka brokers to require username-password authentication, improving the security of Kafka communications.

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper:3.4.6
    ports:
      - "2181:2181"

  kafka:
    image: wurstmeister/kafka:2.12-2.2.1
    ports:
      - "9092:9092"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: SASL_PLAINTEXT
      KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
      KAFKA_SUPER_USERS: User:admin
      KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
    volumes:
      - ./kafka_server_jaas.conf:/opt/kafka/config/kafka_server_jaas.conf
    depends_on:
      - zookeeper

This Docker Compose configuration defines two services: zookeeper and kafka. The zookeeper service uses the wurstmeister/zookeeper:3.4.6 image and exposes port 2181 to allow Kafka to coordinate broker metadata and cluster state. The kafka service uses the wurstmeister/kafka:2.12-2.2.1 image, exposes port 9092 for client connections, and is configured to connect to the Zookeeper service at zookeeper:2181. Kafka is set up with two listeners: a PLAINTEXT listener on port 9092 and a SASL_PLAINTEXT listener on port 9093, with the inter-broker communication secured using SASL/PLAIN mechanism. It uses the SimpleAclAuthorizer to control access with an open policy allowing everyone if no ACL is found, and designates the user admin as a superuser. The configuration file for Java Authentication and Authorization Service (JAAS) is mounted into the container at runtime via a volume mount from the local file kafka_server_jaas.conf. The Kafka service depends on Zookeeper to ensure proper startup order, allowing Kafka to rely on Zookeeper for distributed coordination. This setup facilitates a secure Kafka broker with SASL/PLAIN authentication running alongside Zookeeper within Docker containers.

2.1.1 Server Configuration

Create a file named kafka_server_jaas.conf with the following content:

KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="admin-secret"
  user_admin="admin-secret"
  user_alice="alice-secret";
};

This kafka_server_jaas.conf file configures the Kafka broker’s JAAS (Java Authentication and Authorization Service) settings to enable SASL/PLAIN authentication; it defines a KafkaServer section using the PlainLoginModule where the broker authenticates clients by matching usernames and passwords, specifying admin as the principal username with its password admin-secret, and also listing additional valid users such as admin and alice with their respective passwords admin-secret and alice-secret, allowing these users to connect securely to Kafka brokers using SASL/PLAIN mechanism for authentication.

2.1.2 How to Run the Kafka and Zookeeper Docker Setup?

Go to the Docker folder in the terminal and run this command to start the containers in the background:

docker-compose up -d

This command will launch the Zookeeper service on port 2181 and the Kafka broker on ports 9092 (PLAINTEXT) and 9093 (SASL_PLAINTEXT), applying the SASL/PLAIN authentication and authorization configurations as defined in the Docker Compose file and JAAS configuration file, thereby preparing a secured Kafka environment ready for client connections.

2.1.3 How to Create a Kafka Topic?

Kafka topics are like channels where producers send messages and consumers receive them. Before you can send or get messages, you must create the topic on the Kafka broker. To do this, log into the Kafka container and run the following command inside it:

kafka-topics.sh --create \
  --topic test-topic \
  --bootstrap-server localhost:9092 \
  --partitions 3 \
  --replication-factor 1

2.2 Implement Kafka with SASL/PLAIN Authentication in Spring Boot

Below is a detailed Spring Boot example demonstrating a Kafka producer and consumer configured to use SASL/PLAIN authentication.

@Configuration
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.properties.sasl.jaas.config}")
    private String jaasConfig;

    // Producer configuration
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        // Enable SASL/PLAIN authentication
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "PLAIN");
        props.put("sasl.jaas.config", jaasConfig);

        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    // Consumer configuration
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // Enable SASL/PLAIN authentication
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "PLAIN");
        props.put("sasl.jaas.config", jaasConfig);

        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

This Java Spring configuration class KafkaConfig sets up both Kafka producer and consumer with SASL/PLAIN authentication; it reads the Kafka bootstrap server address and JAAS configuration string from properties, defines producer properties including bootstrap servers, serializers, and security settings for SASL/PLAIN via security.protocol, sasl.mechanism, and sasl.jaas.config, then creates a ProducerFactory and a KafkaTemplate bean for producing messages. Similarly, it defines consumer properties including deserializers, group ID, auto offset reset, and identical SASL/PLAIN security configurations, followed by a ConsumerFactory and a concurrent listener container factory bean to enable message consumption with secure authentication. This setup ensures that both Kafka producers and consumers connect securely to Kafka brokers using SASL/PLAIN over the specified bootstrap servers in a Spring Boot application.

2.3 Setting up the configuration

Define the JAAS configuration in application.properties:

spring.kafka.bootstrap-servers=localhost:9093

spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="alice" password="alice-secret";

This configuration snippet sets Kafka client properties in the application.properties file for a Spring Boot application, specifying the Kafka broker address at localhost:9093 under spring.kafka.bootstrap-servers, and defining the JAAS login module configuration string for SASL/PLAIN authentication under spring.kafka.properties.sasl.jaas.config; it uses the PlainLoginModule with the required username alice and password alice-secret, ensuring the Kafka client authenticates securely with the broker using the SASL/PLAIN mechanism, with special attention to properly escaping the double quotes in the properties file to avoid syntax errors during loading.

2.4 Producer Service

Create a simple Kafka producer service. This Java Spring service class KafkaProducer uses dependency injection to receive a KafkaTemplate configured for sending string key-value messages, and provides a method sendMessage that takes a Kafka topic name and a message string, then sends the message asynchronously to the specified topic using the kafkaTemplate.send() method, followed by printing a confirmation to the console indicating the message content and target topic, enabling simple and reusable Kafka message production within a Spring application.

@Service
public class KafkaProducer {
    private final KafkaTemplate<String, String> kafkaTemplate;

    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
        System.out.println("Sent message: " + message + " to topic: " + topic);
    }
}

2.5 Consumer Service

Create a Kafka consumer to listen on the topic. This Java Spring component class KafkaConsumer listens to messages from the Kafka topic test-topic within the consumer group group_id using the @KafkaListener annotation; the listen method is triggered automatically whenever a new message arrives on the specified topic, receiving the message as a string parameter and printing it to the console, thus providing a simple way to consume and process Kafka messages asynchronously in a Spring application.

@Component
public class KafkaConsumer {

    @KafkaListener(topics = "test-topic", groupId = "group_id")
    public void listen(String message) {
        System.out.println("Received message: " + message);
    }
}

2.6 Main Class

In the main application or a command line runner, send a test message.

@SpringBootApplication
public class KafkaSaslPlainApplication implements CommandLineRunner {

    private final KafkaProducer kafkaProducer;

    public KafkaSaslPlainApplication(KafkaProducer kafkaProducer) {
        this.kafkaProducer = kafkaProducer;
    }

    public static void main(String[] args) {
        SpringApplication.run(KafkaSaslPlainApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        kafkaProducer.sendMessage("test-topic", "Hello Kafka with SASL/PLAIN!");
    }
}

2.7 Code Run and Output

When you run the application, this output appears on the console.

// Producer
Hello Kafka with SASL/PLAIN! 

// Consumer 
Received message: Hello Kafka with SASL/PLAIN!

The output shown reflects the interaction between the Kafka producer and consumer in the Spring Boot application configured with SASL/PLAIN authentication. When the application starts, the KafkaSaslPlainApplication class executes its run method (because it implements CommandLineRunner), which calls the sendMessage method of the KafkaProducer service. This method sends the message "Hello Kafka with SASL/PLAIN!" to the Kafka topic named test-topic and immediately prints a confirmation: Sent message: Hello Kafka with SASL/PLAIN! to topic: test-topic. This indicates the producer successfully dispatched the message to the Kafka broker.

Meanwhile, the KafkaConsumer component is actively listening to the same topic test-topic within the consumer group group_id. Once the message arrives on the broker, Kafka delivers it to the consumer, triggering the annotated method listen(String message). This method receives the message payload and prints it out to the console: Received message: Hello Kafka with SASL/PLAIN!. This confirms the message was successfully consumed by the client application.

Behind the scenes, this entire process is secured by SASL/PLAIN authentication, where both producer and consumer authenticate with the Kafka broker using configured usernames and passwords, as specified in the JAAS configuration. Although the output itself only shows message send and receive logs, the security configuration ensures that only authenticated clients can connect, publish, and consume messages, preventing unauthorized access.

Thus, the output provides visible evidence of the Kafka messaging flow working end-to-end: the producer sends a message, the Kafka broker routes it to the topic, and the consumer receives and processes it, all while enforcing security via SASL/PLAIN authentication.

3. Conclusion

Securing Kafka with SASL/PLAIN authentication provides a straightforward way to enforce username-password based client authentication. While SASL/PLAIN itself does not encrypt credentials, combining it with TLS ensures secure transmission. This article demonstrated how to set up Kafka with SASL/PLAIN on Docker and how to configure a Spring Boot Kafka client to authenticate securely. This approach helps protect Kafka clusters from unauthorized access and is suitable for many enterprise environments. Implementing SASL/PLAIN in your Kafka setup improves security with minimal complexity, making it a good first step before moving to more advanced authentication mechanisms like SCRAM or OAuth.

Yatin Batra

An experience full-stack engineer well versed with Core Java, Spring/Springboot, MVC, Security, AOP, Frontend (Angular & React), and cloud technologies (such as AWS, GCP, Jenkins, Docker, K8).
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Back to top button