Spring Kafka Synchronous Request Reply Example
Apache Kafka is primarily designed for asynchronous, event-driven communication. However, there are cases when synchronous request-reply patterns are required—such as retrieving a response from a service for a given request. Let us delve into understanding how Spring Kafka handles synchronous request-reply messaging.
1. What is Apache Kafka?
Apache Kafka is a distributed event streaming platform used for building real-time data pipelines and streaming applications. It enables the publishing, storing, and processing of streams of records in a fault-tolerant and scalable manner. Kafka is widely used for its high throughput, durability, and ability to handle large volumes of data with low latency.
1.1 Benefits of Apache Kafka
- Scalability: Kafka supports horizontal scaling by adding more brokers to the cluster.
- Fault Tolerance: Data replication ensures message durability even if a broker fails.
- High Throughput: Kafka processes millions of messages per second with low latency.
- Durability: Messages are stored persistently, ensuring reliability.
- Flexibility: Kafka integrates seamlessly with big data and cloud-based platforms.
1.2 What is ReplyingKafkaTemplate?
ReplyingKafkaTemplate is a specialized component in the Spring Kafka framework that simplifies implementing request-reply messaging patterns. It allows a producer to send a message and synchronously wait for a corresponding reply from a consumer, facilitating synchronous communication over Kafka’s asynchronous messaging system.
2. Code Example
This section demonstrates how to set up a local Kafka environment using Docker and integrate it with a Spring Boot application. We will cover the steps to configure Kafka and Zookeeper, create a Spring Boot project with Kafka dependencies, set up Kafka producer and consumer services, and expose a REST endpoint to send and receive messages through Kafka.
2.1 Kafka Setup on Docker
To set up a local development environment for Apache Kafka along with Zookeeper, you can use the following docker-compose.yml file.
This configuration defines two services: zookeeper and kafka. The Zookeeper service uses the Confluent image and exposes the default client port 2181. The Kafka service depends on Zookeeper and uses the Confluent Kafka image. It maps port 9092 on the host for Kafka communication and sets several important environment variables: KAFKA_BROKER_ID is set to 1, KAFKA_ZOOKEEPER_CONNECT links Kafka to Zookeeper, KAFKA_ADVERTISED_LISTENERS makes the broker accessible via localhost:9092, and KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR is set to 1 for a single-broker setup.
Here’s the full configuration:
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.2.1
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.2.1
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Once saved, you can start the services in detached mode by running: docker-compose up -d.
2.2 Setting up the Project
To get started, use the Spring Initializr to generate a basic Spring Boot project. Ensure that you include the Kafka dependency during setup or add it manually to your pom.xml as shown below. This adds support for producing and consuming Kafka messages using Spring Kafka.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2.3 Kafka Configuration
The configuration class below sets up Kafka producer and consumer factories, a Kafka template, and a replying Kafka template for request-reply messaging. The bootstrapServers value is injected from the application properties file. The producerFactory and consumerFactory beans define serialization and deserialization strategies. The kafkaTemplate is used to send messages, while the ReplyingKafkaTemplate and KafkaMessageListenerContainer are configured for handling synchronous replies. Two topics are declared: request-topic and reply-topic, representing the request and reply channels respectively.
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
public static final String REQUEST_TOPIC = "request-topic";
public static final String REPLY_TOPIC = "reply-topic";
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate(
ProducerFactory<String, String> pf,
KafkaMessageListenerContainer<String, String> container) {
return new ReplyingKafkaTemplate<>(pf, container);
}
@Bean
public KafkaMessageListenerContainer<String, String> replyContainer(
ConsumerFactory<String, String> cf) {
ContainerProperties containerProperties = new ContainerProperties(REPLY_TOPIC);
return new KafkaMessageListenerContainer<>(cf, containerProperties);
}
}
2.4 Producer Service (Client)
The producer service, named KafkaRequestClient, is responsible for sending a message to the Kafka request-topic and awaiting a response from the reply-topic using a ReplyingKafkaTemplate. It constructs a ProducerRecord, adds the reply topic in the headers, sends the record, and blocks until a response is received.
@Service
public class KafkaRequestClient {
@Autowired
private ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate;
public String sendAndReceive(String message) throws Exception {
ProducerRecord<String, String> record = new ProducerRecord<>(
KafkaConfig.REQUEST_TOPIC, null, message);
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC,
KafkaConfig.REPLY_TOPIC.getBytes()));
RequestReplyFuture<String, String, String> future = replyingKafkaTemplate.sendAndReceive(record);
ConsumerRecord<String, String> response = future.get();
return response.value();
}
}
2.5 Consumer Service (Server)
The consumer service, KafkaRequestConsumer, listens on the request-topic using the @KafkaListener annotation. When a message is received, it logs the request and returns a simple response string. This return value is automatically sent to the reply-topic due to Kafka’s request-reply mechanism.
@Service
public class KafkaRequestConsumer {
@KafkaListener(topics = KafkaConfig.REQUEST_TOPIC)
public String handleRequest(String message) {
System.out.println("Received request: " + message);
return "Processed: " + message;
}
}
2.6 REST Controller
To expose the Kafka producer functionality as an HTTP endpoint, a REST controller is defined with a /kafka/request GET API. It accepts a message as a query parameter, forwards it to Kafka via the KafkaRequestClient, and returns the processed response to the client.
@RestController
@RequestMapping("/kafka")
public class KafkaController {
@Autowired
private KafkaRequestClient kafkaRequestClient;
@GetMapping("/request")
public ResponseEntity<String> sendMessage(@RequestParam String msg) throws Exception {
String response = kafkaRequestClient.sendAndReceive(msg);
return ResponseEntity.ok(response);
}
}
2.7 Running and Output
Once Kafka and Zookeeper are running via Docker (as set up earlier), you can start the Spring Boot application to initiate Kafka request-reply communication. After the application is up, trigger the flow by hitting the REST endpoint shown below. The message will be published to the Kafka request-topic, processed by the consumer, and a response will be sent back via the reply-topic. The producer receives the reply and returns it as an HTTP response.
2.7.1 Hit the Endpoint
To verify that the Kafka request-reply setup is working as expected, start the Spring Boot application and invoke the REST endpoint shown below. This triggers the producer to send the message hello to Kafka, which is then consumed, processed, and replied back to the caller.
-- Endpoint – GET http://localhost:8080/kafka/request?msg=hello -- Console output – Received request: hello -- HTTP response -- Processed: hello
3. Conclusion
While Kafka is best suited for asynchronous event streaming, using ReplyingKafkaTemplate in Spring Kafka allows us to implement a clean request-reply communication model. This approach is useful for scenarios that require guaranteed processing and acknowledgment between services.
However, be cautious when using synchronous patterns over Kafka—they should not replace standard RPC mechanisms in highly time-sensitive applications.





Multiple instances running , then the response would goto different instance rather the same instance