{"id":53019,"date":"2017-12-13T15:00:47","date_gmt":"2017-12-13T13:00:47","guid":{"rendered":"http:\/\/examples.javacodegeeks.com\/?p=53019"},"modified":"2021-05-18T16:16:19","modified_gmt":"2021-05-18T13:16:19","slug":"spring-integration-kafka-tutorial","status":"publish","type":"post","link":"https:\/\/examples.javacodegeeks.com\/spring-integration-kafka-tutorial\/","title":{"rendered":"Spring Integration Kafka Tutorial"},"content":{"rendered":"<p>In this tutorial, we will show the Spring Integration with Kafka through examples.<\/p>\n<h2 class=\"wp-block-heading\" id=\"h-1-introduction\">1. Introduction<\/h2>\n<p>Apache Kafka started as an internal project at LinkedIn to solve the problem of scaling up the enterprise architecture from services talking to each other with strong typing contracts to an asynchronous message-based architecture. Both message persistence and high throughput were the goals of their new system. In addition, messages were required to be acknowledged in order and give independent consumers the ability to manage the offset of the next message that they will process. LinkedIn donated Kafka to the Apache foundation and is now the most popular open-source streaming platform giving high reliability and clustering abilities.<\/p>\n<p>Spring for Apache Kafka is a project that applies Spring concepts like dependency injection, annotations and listener containers to help develop messaging systems using Apache Kafka. Leveraging this project, the Spring Integration Kafka module provides two components:<\/p>\n<p>&nbsp;<br \/>i) Outbound Channel Adapter<br \/>As per the documentation page, &#8220;The Outbound channel adapter is used to publish messages from a Spring Integration channel to Kafka topics. The channel is defined in the application context and then wired into the application that sends messages to Kafka. Sender applications can publish to Kafka via Spring Integration messages, which are internally converted to Kafka messages by the outbound channel adapter&#8221;.<\/p>\n<p>ii) Message Driven Channel Adapter<br \/>This is used on the consuming (receiving) side of the application. The incoming messages can be processed in record or batch mode.<\/p>\n<h2 class=\"wp-block-heading\" id=\"h-2-spring-integration-kafka-application\">2.Spring Integration Kafka Application<\/h2>\n<p>The use case we will illustrate in this article is a library that sends newly arrived books to its readers. Each book belongs to a particular genre and readers subscribe to genres of their interest.<\/p>\n<div class=\"wp-block-image\">\n<figure class=\"aligncenter\"><a href=\"http:\/\/examples.javacodegeeks.com\/wp-content\/uploads\/2017\/12\/jcg2-usecase-final2.jpg\"><img decoding=\"async\" width=\"800\" height=\"300\" src=\"http:\/\/examples.javacodegeeks.com\/wp-content\/uploads\/2017\/12\/jcg2-usecase-final2.jpg\" alt=\"spring kafka - application use case\" class=\"wp-image-53066\" srcset=\"https:\/\/examples.javacodegeeks.com\/wp-content\/uploads\/2017\/12\/jcg2-usecase-final2.jpg 800w, https:\/\/examples.javacodegeeks.com\/wp-content\/uploads\/2017\/12\/jcg2-usecase-final2-300x113.jpg 300w, https:\/\/examples.javacodegeeks.com\/wp-content\/uploads\/2017\/12\/jcg2-usecase-final2-768x288.jpg 768w\" sizes=\"(max-width: 800px) 100vw, 800px\" \/><\/a><figcaption>Application use case with one library (producer) publishing messages to Kafka and two subscribed readers (consumers).<\/figcaption><\/figure>\n<\/div>\n<p>The application is implemented in two Spring Boot projects:<br \/>a) &#8216;library&#8217; which is the producer that sends Book messages to a Kafka broker<br \/>b) &#8216;reader&#8217; which is the consumer that receives books.<\/p>\n<p>In the code, we use four-channel classes from Spring Integration: <code>MessageChannel<\/code>, <code>DirectChannel<\/code>, <code>PollableChannel<\/code>, and <code>QueueChannel<\/code>. <code>MessageChannel<\/code> is an interface that is implemented by all Spring Integration channels. It declares the send method which the concrete classes define how a sender sends a message to the channel.<\/p>\n<p>The <code>DirectChannel<\/code> implements the <code>SubscribableChannel<\/code> (which extends <code>MessageChannel<\/code>) and has point-to-point semantics, that is, it will only send each <code>Message<\/code> to a single subscriber.<\/p>\n<p><code>PollableChannel<\/code> is an interface that extends the <code>MessageChannel<\/code> and is used for receiving messages. Classes implementing this interface provide functionality for polling messages from a channel.<\/p>\n<p><code>QueueChannel<\/code> implements multiple interfaces. It wraps a queue, provides point-to-point semantics, and has the functionality to filter and purge messages that satisfy certain criteria.<\/p>\n<p>One key point to note is that each record published to a topic is delivered to one consumer instance within each subscribing consumer group. Hence we will run two instances of the reader project, each belonging to a different consumer group and subscribing to a different combination of topics. Both the consumer group and topics set are given to the application as command-line arguments.<\/p>\n<h2 class=\"wp-block-heading\" id=\"h-3-environment\">3. Environment<\/h2>\n<p>I have used the following technologies for this application:<\/p>\n<ul class=\"wp-block-list\">\n<li>Java 1.8<\/li>\n<li>Spring Boot 1.5.9<\/li>\n<li>Spring Kafka 1.3.2<\/li>\n<li>Spring Integration Kafka 2.3.0<\/li>\n<li>Maven 3.3.9<\/li>\n<li>Ubuntu 16.04 LTS<\/li>\n<\/ul>\n<h2 class=\"wp-block-heading\" id=\"h-4-source-code\">4. Source Code<\/h2>\n<p><b>library<\/b>: This is a maven-based project, so all the dependencies are specified in the pom.xml file.<\/p>\n<p><span style=\"text-decoration: underline;\"><em>pom.xml<\/em><\/span><\/p>\n<pre class=\"wp-block-preformatted brush:xml\">&lt;?xml version=\"1.0\" encoding=\"UTF-8\"?&gt;\n&lt;project xmlns=\"http:\/\/maven.apache.org\/POM\/4.0.0\" xmlns:xsi=\"http:\/\/www.w3.org\/2001\/XMLSchema-instance\"\nxsi:schemaLocation=\"http:\/\/maven.apache.org\/POM\/4.0.0 http:\/\/maven.apache.org\/xsd\/maven-4.0.0.xsd\"&gt;\n    &lt;modelVersion&gt;4.0.0&lt;\/modelVersion&gt;\n\n    &lt;groupId&gt;org.javacodegeeks.springintegration.kafka&lt;\/groupId&gt;\n    &lt;artifactId&gt;producer&lt;\/artifactId&gt;\n    &lt;version&gt;0.0.1-SNAPSHOT&lt;\/version&gt;\n    &lt;packaging&gt;jar&lt;\/packaging&gt;\n\n    &lt;name&gt;producer&lt;\/name&gt;\n    &lt;description&gt;Kafka producer with Spring Boot&lt;\/description&gt;\n\n    &lt;parent&gt;\n    &lt;groupId&gt;org.springframework.boot&lt;\/groupId&gt;\n    &lt;artifactId&gt;spring-boot-starter-parent&lt;\/artifactId&gt;\n    &lt;version&gt;1.5.9.RELEASE&lt;\/version&gt;\n    &lt;relativePath \/&gt; &lt;!-- lookup parent from repository --&gt;\n    &lt;\/parent&gt;\n\n    &lt;properties&gt;\n    &lt;project.build.sourceEncoding&gt;UTF-8&lt;\/project.build.sourceEncoding&gt;\n    &lt;project.reporting.outputEncoding&gt;UTF-8&lt;\/project.reporting.outputEncoding&gt;\n    &lt;java.version&gt;1.8&lt;\/java.version&gt;\n    &lt;\/properties&gt;\n\n    &lt;dependencies&gt;\n        &lt;dependency&gt;\n            &lt;groupId&gt;org.apache.kafka&lt;\/groupId&gt;\n            &lt;artifactId&gt;kafka-clients&lt;\/artifactId&gt;\n            &lt;version&gt;1.0.0&lt;\/version&gt;\n        &lt;\/dependency&gt;\n        &lt;dependency&gt;\n            &lt;groupId&gt;org.springframework.boot&lt;\/groupId&gt;\n            &lt;artifactId&gt;spring-boot-starter-integration&lt;\/artifactId&gt;\n        &lt;\/dependency&gt;\n        &lt;dependency&gt;\n            &lt;groupId&gt;org.springframework.integration&lt;\/groupId&gt;\n            &lt;artifactId&gt;spring-integration-kafka&lt;\/artifactId&gt;\n            &lt;version&gt;2.3.0.RELEASE&lt;\/version&gt;\n        &lt;\/dependency&gt;\n        &lt;dependency&gt;\n            &lt;groupId&gt;org.springframework.kafka&lt;\/groupId&gt;\n            &lt;artifactId&gt;spring-kafka&lt;\/artifactId&gt;\n            &lt;version&gt;1.3.2.RELEASE&lt;\/version&gt;\n        &lt;\/dependency&gt;\n        &lt;dependency&gt;\n            &lt;groupId&gt;org.projectlombok&lt;\/groupId&gt;\n            &lt;artifactId&gt;lombok&lt;\/artifactId&gt;\n            &lt;optional&gt;true&lt;\/optional&gt;\n        &lt;\/dependency&gt;\n        &lt;dependency&gt;\n            &lt;groupId&gt;org.springframework.boot&lt;\/groupId&gt;\n            &lt;artifactId&gt;spring-boot-starter-test&lt;\/artifactId&gt;\n            &lt;scope&gt;test&lt;\/scope&gt;\n        &lt;\/dependency&gt;\n    &lt;\/dependencies&gt;\n    &lt;build&gt;\n        &lt;plugins&gt;\n            &lt;plugin&gt;\n                &lt;groupId&gt;org.springframework.boot&lt;\/groupId&gt;\n                &lt;artifactId&gt;spring-boot-maven-plugin&lt;\/artifactId&gt;\n            &lt;\/plugin&gt;\n        &lt;\/plugins&gt;\n    &lt;\/build&gt;\n&lt;\/project&gt;\n\n<\/pre>\n<p>Below is the <code>Book<\/code> class that serves as the model for the application.<\/p>\n<p><span style=\"text-decoration: underline;\"><em>Book.java<\/em><\/span><\/p>\n<pre class=\"wp-block-preformatted brush:java\">package org.javacodegeeks.springintegration.kafka.model;\n\nimport lombok.Getter;\nimport lombok.NoArgsConstructor;\nimport lombok.Setter;\nimport lombok.ToString;\n\n@Getter\n@Setter\n@NoArgsConstructor\n@ToString\npublic class Book {\n\n\tpublic enum Genre {\n\t\tfantasy, horror, romance, thriller\n\t}\n\n\tprivate long bookId;\n\tprivate String title;\n\tprivate Genre genre;\n}\n<\/pre>\n<p>A Book has an <code>enum<\/code> indicating which genre it belongs to. The other two properties are <code>bookId<\/code> and <code>title<\/code>. The <code>lombok<\/code> annotations inject the setters, getters, a no-argument constructor and the <code>toString()<\/code> method to all the members.<\/p>\n<p>Below is the <code>BookPublisher<\/code> class that initiates the message flow in the application.<\/p>\n<p><span style=\"text-decoration: underline;\"><em>BookPublisher.java<\/em><\/span><\/p>\n<pre class=\"wp-block-preformatted brush:java\">package org.javacodegeeks.springintegration.kafka.incoming;\n\nimport java.util.ArrayList;\nimport java.util.List;\n\nimport org.javacodegeeks.springintegration.kafka.model.Book;\nimport org.javacodegeeks.springintegration.kafka.model.Book.Genre;\nimport org.springframework.stereotype.Component;\n\n@Component\npublic class BookPublisher {\n\tprivate long nextBookId;\n\n\tpublic BookPublisher() {\n\t\tthis.nextBookId = 1001l;\n\t}\n\n\tpublic List getBooks() {\n\t\tList books = new ArrayList();\n\n\t\tbooks.add(createFantasyBook());\n\t\tbooks.add(createFantasyBook());\n\t\tbooks.add(createFantasyBook());\n\t\tbooks.add(createFantasyBook());\n\t\tbooks.add(createFantasyBook());\n\t\tbooks.add(createHorrorBook());\n\t\tbooks.add(createHorrorBook());\n\t\tbooks.add(createHorrorBook());\n\t\tbooks.add(createHorrorBook());\n\t\tbooks.add(createHorrorBook());\n\t\tbooks.add(createRomanceBook());\n\t\tbooks.add(createRomanceBook());\n\t\tbooks.add(createRomanceBook());\n\t\tbooks.add(createRomanceBook());\n\t\tbooks.add(createRomanceBook());\n\t\tbooks.add(createThrillerBook());\n\t\tbooks.add(createThrillerBook());\n\t\tbooks.add(createThrillerBook());\n\t\tbooks.add(createThrillerBook());\n\t\tbooks.add(createThrillerBook());\n\n\t\treturn books;\n\t}\n\n\tBook createFantasyBook() {\n\t\treturn createBook(\"\", Genre.fantasy);\n\t}\n\n\tBook createHorrorBook() {\n\t\treturn createBook(\"\", Genre.horror);\n\t}\n\n\tBook createRomanceBook() {\n\t\treturn createBook(\"\", Genre.romance);\n\t}\n\n\tBook createThrillerBook() {\n\t\treturn createBook(\"\", Genre.thriller);\n\t}\n\n\tBook createBook(String title, Genre genre) {\n\t\tBook book = new Book();\n\t\tbook.setBookId(nextBookId++);\n\t\tif (title == \"\") {\n\t\t\ttitle = \"# \" + Long.toString(book.getBookId());\n\t\t}\n\t\tbook.setTitle(title);\n\t\tbook.setGenre(genre);\n\n\t\treturn book;\n\t}\n}\n<\/pre>\n<p>The main functionality of this class is to create and return a list of twenty books, five each with the fantasy, horror, romance, and thriller genres. There is a book creation method for each of the genre type, which call a utility method <code>createBook<\/code> by passing the correct <code>enum<\/code> type. Book ids start from 1001 and are set incrementally.<div style=\"display:inline-block; margin: 15px 0;\"> <div id=\"adngin-JavaCodeGeeks_incontent_video-0\" style=\"display:inline-block;\"><\/div> <\/div><\/p>\n<p>Below is the <code>ProducerChannelConfig<\/code> class that configures all the beans required for the producer application.<\/p>\n<p><span style=\"text-decoration: underline;\"><em>ProducerChannelConfig.java<\/em><\/span><\/p>\n<pre class=\"wp-block-preformatted brush:java\">package org.javacodegeeks.springintegration.kafka.config;\n\nimport java.util.HashMap;\nimport java.util.Map;\n\nimport org.apache.kafka.clients.producer.ProducerConfig;\nimport org.apache.kafka.common.serialization.StringSerializer;\nimport org.springframework.beans.factory.annotation.Value;\nimport org.springframework.context.annotation.Bean;\nimport org.springframework.context.annotation.Configuration;\nimport org.springframework.expression.common.LiteralExpression;\nimport org.springframework.integration.annotation.ServiceActivator;\nimport org.springframework.integration.channel.DirectChannel;\nimport org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;\nimport org.springframework.kafka.core.DefaultKafkaProducerFactory;\nimport org.springframework.kafka.core.KafkaTemplate;\nimport org.springframework.kafka.core.ProducerFactory;\nimport org.springframework.messaging.MessageHandler;\n\n@Configuration\npublic class ProducerChannelConfig {\n\n\t@Value(\"${spring.kafka.bootstrap-servers}\")\n\tprivate String bootstrapServers;\n\n\t@Bean\n\tpublic DirectChannel producerChannel() {\n\t\treturn new DirectChannel();\n\t}\n\n\t@Bean\n\t@ServiceActivator(inputChannel = \"producerChannel\")\n\tpublic MessageHandler kafkaMessageHandler() {\n\t\tKafkaProducerMessageHandler handler = new KafkaProducerMessageHandler(kafkaTemplate());\n\t\thandler.setMessageKeyExpression(new LiteralExpression(\"kafka-integration\"));\n\n\t\treturn handler;\n\t}\n\n\t@Bean\n\tpublic KafkaTemplate kafkaTemplate() {\n\t\treturn new KafkaTemplate(producerFactory());\n\t}\n\n\t@Bean\n\tpublic ProducerFactory producerFactory() {\n\t\treturn new DefaultKafkaProducerFactory(producerConfigs());\n\t}\n\n\t@Bean\n\tpublic Map producerConfigs() {\n\t\tMap properties = new HashMap();\n\t\tproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);\n\t\tproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);\n\t\tproperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);\n\t\t\/\/ introduce a delay on the send to allow more messages to accumulate\n\t\tproperties.put(ProducerConfig.LINGER_MS_CONFIG, 1);\n\n\t\treturn properties;\n\t}\n}\n<\/pre>\n<p>Below is the <code>Library<\/code> class that is the main class of the application and the publisher endpoint of the system.<\/p>\n<p><span style=\"text-decoration: underline;\"><em>Library.java<\/em><\/span><\/p>\n<pre class=\"wp-block-preformatted brush:java\">package org.javacodegeeks.springintegration.kafka;\n\nimport java.util.Collections;\nimport java.util.List;\nimport java.util.Map;\n\nimport org.javacodegeeks.springintegration.kafka.incoming.BookPublisher;\nimport org.javacodegeeks.springintegration.kafka.model.Book;\nimport org.springframework.beans.factory.annotation.Autowired;\nimport org.springframework.boot.autoconfigure.SpringBootApplication;\nimport org.springframework.boot.builder.SpringApplicationBuilder;\nimport org.springframework.context.ConfigurableApplicationContext;\nimport org.springframework.kafka.support.KafkaHeaders;\nimport org.springframework.messaging.MessageChannel;\nimport org.springframework.messaging.support.GenericMessage;\n\n@SpringBootApplication\npublic class Library {\n\n\t@Autowired\n\tprivate BookPublisher bookPublisher;\n\n\tpublic static void main(String[] args) {\n\t\tConfigurableApplicationContext context = new SpringApplicationBuilder(Library.class).web(false).run(args);\n\t\tcontext.getBean(Library.class).run(context);\n\t\tcontext.close();\n\t}\n\n\tprivate void run(ConfigurableApplicationContext context) {\n\n\t\tSystem.out.println(\"Inside ProducerApplication run method...\");\n\n\t\tMessageChannel producerChannel = context.getBean(\"producerChannel\", MessageChannel.class);\n\n\t\tList books = bookPublisher.getBooks();\n\n\t\tfor (Book book : books) {\n\t\t\tMap headers = Collections.singletonMap(KafkaHeaders.TOPIC, book.getGenre().toString());\n\t\t\tproducerChannel.send(new GenericMessage(book.toString(), headers));\n\t\t}\n\n\t\tSystem.out.println(\"Finished ProducerApplication run method...\");\n\t};\n}\n<\/pre>\n<p>From the application context, a <code>MessageChannel<\/code> bean is obtained. It then takes a list of 20 books from <code>BookPublisher<\/code> and sends those to <code>producerChannel<\/code> which is wired to the Kafka broker. The topic of each message is the book genre.<\/p>\n<p>Below is the <code>application.properties<\/code> file that specifies values to the environment variables.<\/p>\n<p><span style=\"text-decoration: underline;\"><em>application.properties<\/em><\/span><\/p>\n<pre class=\"wp-block-preformatted brush:bash\">spring.kafka.bootstrap-servers=localhost:9092\n<\/pre>\n<p>Here we specify port 9092 for the Kafka server to connect to.<\/p>\n<p>Next, we take a look at the consumer side of the application.<br \/><b>reader<\/b>: This is also a Maven-based project and all dependencies are configured in pom.xml.<\/p>\n<p><span style=\"text-decoration: underline;\"><em>pom.xml<\/em><\/span><\/p>\n<pre class=\"wp-block-preformatted brush:xml\">&lt;?xml version=\"1.0\" encoding=\"UTF-8\"?&gt;\n&lt;project xmlns=\"http:\/\/maven.apache.org\/POM\/4.0.0\" xmlns:xsi=\"http:\/\/www.w3.org\/2001\/XMLSchema-instance\"\nxsi:schemaLocation=\"http:\/\/maven.apache.org\/POM\/4.0.0 http:\/\/maven.apache.org\/xsd\/maven-4.0.0.xsd\"&gt;\n&lt;modelVersion&gt;4.0.0&lt;\/modelVersion&gt;\n\n&lt;groupId&gt;org.javacodegeeks.springintegration.kafka&lt;\/groupId&gt;\n&lt;artifactId&gt;consumer&lt;\/artifactId&gt;\n&lt;version&gt;0.0.1-SNAPSHOT&lt;\/version&gt;\n&lt;packaging&gt;jar&lt;\/packaging&gt;\n\n&lt;name&gt;consumer&lt;\/name&gt;\n&lt;description&gt;Kafka consumer with Spring Boot&lt;\/description&gt;\n\n&lt;parent&gt;\n&nbsp; &nbsp; &lt;groupId&gt;org.springframework.boot&lt;\/groupId&gt;\n&nbsp; &nbsp; &lt;artifactId&gt;spring-boot-starter-parent&lt;\/artifactId&gt;\n&nbsp; &nbsp; &lt;version&gt;1.5.9.RELEASE&lt;\/version&gt;\n&nbsp; &nbsp; &lt;relativePath \/&gt; &lt;!-- lookup parent from repository --&gt;\n&lt;\/parent&gt;\n\n&lt;properties&gt;\n&nbsp; &nbsp; &lt;project.build.sourceEncoding&gt;UTF-8&lt;\/project.build.sourceEncoding&gt;\n&nbsp; &nbsp; &lt;project.reporting.outputEncoding&gt;UTF-8&lt;\/project.reporting.outputEncoding&gt;\n&nbsp; &nbsp; &lt;java.version&gt;1.8&lt;\/java.version&gt;\n&lt;\/properties&gt;\n\n&lt;dependencies&gt;\n&nbsp; &nbsp; &lt;dependency&gt;\n&nbsp; &nbsp; &nbsp; &nbsp; &lt;groupId&gt;org.apache.kafka&lt;\/groupId&gt;\n&nbsp; &nbsp; &nbsp; &nbsp; &lt;artifactId&gt;kafka-clients&lt;\/artifactId&gt;\n&nbsp; &nbsp; &nbsp; &nbsp; &lt;version&gt;1.0.0&lt;\/version&gt;\n&nbsp; &nbsp; &lt;\/dependency&gt;\n&nbsp;&nbsp;&nbsp;&nbsp;&lt;dependency&gt;\n        &lt;groupId&gt;org.springframework.boot&lt;\/groupId&gt;\n        &lt;artifactId&gt;spring-boot-starter-integration&lt;\/artifactId&gt;\n    &lt;\/dependency&gt;\n&nbsp;&nbsp;&nbsp;&nbsp;&lt;dependency&gt;\n        &lt;groupId&gt;org.springframework.integration&lt;\/groupId&gt;\n        &lt;artifactId&gt;spring-integration-kafka&lt;\/artifactId&gt;\n        &lt;version&gt;2.3.0.RELEASE&lt;\/version&gt;\n&nbsp;&nbsp;&nbsp;&nbsp;&lt;\/dependency&gt;\n&nbsp;&nbsp;&nbsp;&nbsp;&lt;dependency&gt;\n        &lt;groupId&gt;org.springframework.kafka&lt;\/groupId&gt;\n        &lt;artifactId&gt;spring-kafka&lt;\/artifactId&gt;\n        &lt;version&gt;1.3.2.RELEASE&lt;\/version&gt;\n&nbsp;&nbsp;&nbsp;&nbsp;&lt;\/dependency&gt;\n&nbsp;&nbsp;&nbsp; &lt;dependency&gt;\n        &lt;groupId&gt;org.springframework.boot&lt;\/groupId&gt;\n        &lt;artifactId&gt;spring-boot-starter-test&lt;\/artifactId&gt;\n        &lt;scope&gt;test&lt;\/scope&gt;\n&nbsp;&nbsp;&nbsp;&nbsp;&lt;\/dependency&gt;\n&lt;\/dependencies&gt;\n\n&lt;build&gt;\n    &lt;plugins&gt;\n        &lt;plugin&gt;\n            &lt;groupId&gt;org.springframework.boot&lt;\/groupId&gt;\n            &lt;artifactId&gt;spring-boot-maven-plugin&lt;\/artifactId&gt;\n        &lt;\/plugin&gt;\n    &lt;\/plugins&gt;\n&lt;\/build&gt;\n&lt;\/project&gt;\n<\/pre>\n<p>Below is the <code>ConsumerChannelConfig<\/code> class that configures all the beans required for the consumer application.[ulp id=&#8217;7POIYxRf1FUtPpmL&#8217;]<\/p>\n<p><span style=\"text-decoration: underline;\"><em>ConsumerChannelConfig.java<\/em><\/span><\/p>\n<pre class=\"wp-block-preformatted brush:java\">package org.javacodegeeks.springintegration.kafka.config;\n\nimport java.util.HashMap;\nimport java.util.Map;\n\nimport org.apache.kafka.clients.consumer.ConsumerConfig;\nimport org.apache.kafka.common.serialization.StringDeserializer;\nimport org.springframework.beans.factory.annotation.Value;\nimport org.springframework.context.annotation.Bean;\nimport org.springframework.context.annotation.Configuration;\nimport org.springframework.integration.channel.QueueChannel;\nimport org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;\nimport org.springframework.kafka.core.ConsumerFactory;\nimport org.springframework.kafka.core.DefaultKafkaConsumerFactory;\nimport org.springframework.kafka.listener.ConcurrentMessageListenerContainer;\nimport org.springframework.kafka.listener.config.ContainerProperties;\nimport org.springframework.messaging.PollableChannel;\n\n@Configuration\npublic class ConsumerChannelConfig {\n\n\t@Value(\"${spring.kafka.bootstrap-servers}\")\n\tprivate String bootstrapServers;\n\n\t@Value(\"${spring.kafka.topic}\")\n\tprivate String springIntegrationKafkaTopic;\n\n\t@Bean\n\tpublic PollableChannel consumerChannel() {\n\t\treturn new QueueChannel();\n\t}\n\n\t@Bean\n\tpublic KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter() {\n\t\tKafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter(\n\t\t\t\tkafkaListenerContainer());\n\t\tkafkaMessageDrivenChannelAdapter.setOutputChannel(consumerChannel());\n\n\t\treturn kafkaMessageDrivenChannelAdapter;\n\t}\n\n\t@SuppressWarnings(\"unchecked\")\n\t@Bean\n\tpublic ConcurrentMessageListenerContainer kafkaListenerContainer() {\n\t\tContainerProperties containerProps = new ContainerProperties(springIntegrationKafkaTopic);\n\n\t\treturn (ConcurrentMessageListenerContainer) new ConcurrentMessageListenerContainer(\n\t\t\t\tconsumerFactory(), containerProps);\n\t}\n\n\t@Bean\n\tpublic ConsumerFactory consumerFactory() {\n\t\treturn new DefaultKafkaConsumerFactory(consumerConfigs());\n\t}\n\n\t@Bean\n\tpublic Map consumerConfigs() {\n\t\tMap properties = new HashMap();\n\t\tproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);\n\t\tproperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);\n\t\tproperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);\n\t\tproperties.put(ConsumerConfig.GROUP_ID_CONFIG, \"dummy\");\n\t\treturn properties;\n\t}\n}\n<\/pre>\n<p>Below is the <code>SubscribedReader<\/code> class that is the main class of the application and the consumer endpoint of the system.<\/p>\n<p><span style=\"text-decoration: underline;\"><em>SubscribedReader.java<\/em><\/span><\/p>\n<pre class=\"wp-block-preformatted brush:java\">package org.javacodegeeks.springintegration.kafka;\n\nimport java.util.ArrayList;\nimport java.util.Arrays;\nimport java.util.List;\nimport java.util.Map;\n\nimport org.springframework.beans.factory.annotation.Autowired;\nimport org.springframework.boot.autoconfigure.SpringBootApplication;\nimport org.springframework.boot.autoconfigure.kafka.KafkaProperties;\nimport org.springframework.boot.builder.SpringApplicationBuilder;\nimport org.springframework.context.ConfigurableApplicationContext;\nimport org.springframework.integration.dsl.IntegrationFlow;\nimport org.springframework.integration.dsl.IntegrationFlows;\nimport org.springframework.integration.dsl.context.IntegrationFlowContext;\nimport org.springframework.integration.dsl.kafka.Kafka;\nimport org.springframework.kafka.core.DefaultKafkaConsumerFactory;\nimport org.springframework.messaging.Message;\nimport org.springframework.messaging.PollableChannel;\n\n@SpringBootApplication\npublic class SubscribedReader {\n\n\t@Autowired\n\tPollableChannel consumerChannel;\n\n\tpublic static void main(String[] args) {\n\n\t\tConfigurableApplicationContext context = new SpringApplicationBuilder(SubscribedReader.class).run(args);\n\n\t\tList valid_topics = Arrays.asList(\"fantasy\", \"horror\", \"romance\", \"thriller\");\n\n\t\tList topics = new ArrayList();\n\t\tif (args.length &gt; 0) {\n\t\t\tfor (String arg : args) {\n\t\t\t\tif (valid_topics.contains(arg))\n\t\t\t\t\ttopics.add(arg);\n\t\t\t}\n\t\t}\n\n\t\tcontext.getBean(SubscribedReader.class).run(context, topics);\n\t\tcontext.close();\n\t}\n\n\tprivate void run(ConfigurableApplicationContext context, List topics) {\n\n\t\tSystem.out.println(\"Inside ConsumerApplication run method...\");\n\t\tPollableChannel consumerChannel = context.getBean(\"consumerChannel\", PollableChannel.class);\n\n\t\tfor (String topic : topics)\n\t\t\taddAnotherListenerForTopics(topic);\n\n\t\tMessage received = consumerChannel.receive();\n\t\twhile (received != null) {\n\t\t\treceived = consumerChannel.receive();\n\t\t\tSystem.out.println(\"Received \" + received.getPayload());\n\t\t}\n\t}\n\n\t@Autowired\n\tprivate IntegrationFlowContext flowContext;\n\n\t@Autowired\n\tprivate KafkaProperties kafkaProperties;\n\n\tpublic void addAnotherListenerForTopics(String... topics) {\n\t\tMap consumerProperties = kafkaProperties.buildConsumerProperties();\n\t\tIntegrationFlow flow = IntegrationFlows\n\t\t\t\t.from(Kafka.messageDrivenChannelAdapter(\n\t\t\t\t\t\tnew DefaultKafkaConsumerFactory(consumerProperties), topics))\n\t\t\t\t.channel(\"consumerChannel\").get();\n\t\tthis.flowContext.registration(flow).register();\n\t}\n}\n\n<\/pre>\n<p>In the main method, we first check if there are any command line arguments. If they are present and are valid topics, they are added to an <code>ArrayList<\/code> that is passed as an argument to the <code>run<\/code> method.<\/p>\n<p>In the <code>run<\/code> method, a <code>PollableChannel<\/code> bean, configured in <code>ConsumerChannelConfig<\/code> is obtained from the application context. All the subscribed topics are added as listeners via <code>MessageDrivenChannelAdapter<\/code> object by calling the method <code>addAnotherListenerForTopics<\/code> for each topic. Then we call receive method of the <code>PollableChannel<\/code> object inside a while loop to get the messages from the Kafka broker.<\/p>\n<p>Below is the <code>application.properties<\/code> file that specifies values to the environment variables.<\/p>\n<p><span style=\"text-decoration: underline;\"><em>application.properties<\/em><\/span><\/p>\n<pre class=\"wp-block-preformatted brush:bash\">spring.kafka.bootstrap-servers=localhost:9092\nspring.kafka.topic=dummy\n<\/pre>\n<p>A dummy topic is specified so that when the consumer application starts, the message channel is properly configured. Next, the actual topics to listen to are added. The dummy topic is never used by the producer to send messages.<\/p>\n<h2 class=\"wp-block-heading\" id=\"h-5-how-to-run\">5. How to Run<\/h2>\n<p>You will need five terminal windows.<br \/>Terminal 1: Start ZooKeeper. In your Kafka installation folder, run the following command:<\/p>\n<pre class=\"wp-block-preformatted brush:bash\">bin\/zookeeper-server-start.sh config\/zookeeper.properties<\/pre>\n<p>Terminal 2: Start KafkaServer. Go to your Kafka installation folder and run the following command:<\/p>\n<pre class=\"wp-block-preformatted brush:bash\">bin\/kafka-server-start.sh config\/server.properties<\/pre>\n<p>Terminal 3: Start the first consumer with group id &#8220;group-one&#8221; and subscribed to fantasy and horror genres. Changed directory to the reader and run the following command:<\/p>\n<pre class=\"wp-block-preformatted brush:bash\">mvn spring-boot:run -Dspring.kafka.consumer.group-id=\"group-one\" -Drun.arguments=\"fantasy,horror\"<\/pre>\n<p>Terminal 4: Start the second consumer with group id &#8220;group-one&#8221; and subscribed to horror, romance and thriller genres. Change directory to the reader and run the following command:<\/p>\n<pre class=\"wp-block-preformatted brush:bash\">mvn spring-boot:run -Dspring.kafka.consumer.group-id=\"group-two\" -Drun.arguments=\"horror,romance,thriller\"<\/pre>\n<p>Terminal 5: Run producer. In the library folder, run the following command:<\/p>\n<pre class=\"wp-block-preformatted brush:bash\">mvn spring-boot:run<\/pre>\n<p>You will see the received messages in terminals 3 and 4. Please note that you can run the commands in terminals 3, 4, and 5 in any order. Due to Kafka&#8217;s retention time policy, defaulted to 7 days and its file-like persistence mechanics, you will still get the same output.<\/p>\n<div class=\"wp-block-image\">\n<figure class=\"aligncenter\"><a href=\"http:\/\/examples.javacodegeeks.com\/wp-content\/uploads\/2017\/12\/jcg2-consumer1-final.jpg\"><img decoding=\"async\" width=\"736\" height=\"274\" src=\"http:\/\/examples.javacodegeeks.com\/wp-content\/uploads\/2017\/12\/jcg2-consumer1-final.jpg\" alt=\"\" class=\"wp-image-53039\" srcset=\"https:\/\/examples.javacodegeeks.com\/wp-content\/uploads\/2017\/12\/jcg2-consumer1-final.jpg 736w, https:\/\/examples.javacodegeeks.com\/wp-content\/uploads\/2017\/12\/jcg2-consumer1-final-300x112.jpg 300w\" sizes=\"(max-width: 736px) 100vw, 736px\" \/><\/a><figcaption>Output for first consumer (reader) showing messages (books) received for two subscribed topics (genres).<\/figcaption><\/figure>\n<\/div>\n<div class=\"wp-block-image\">\n<figure class=\"aligncenter\"><a href=\"http:\/\/examples.javacodegeeks.com\/wp-content\/uploads\/2017\/12\/jcg2-consumer2-final.jpg\"><img decoding=\"async\" width=\"751\" height=\"413\" src=\"http:\/\/examples.javacodegeeks.com\/wp-content\/uploads\/2017\/12\/jcg2-consumer2-final.jpg\" alt=\"spring kafka - output for second consumer\" class=\"wp-image-53040\" srcset=\"https:\/\/examples.javacodegeeks.com\/wp-content\/uploads\/2017\/12\/jcg2-consumer2-final.jpg 751w, https:\/\/examples.javacodegeeks.com\/wp-content\/uploads\/2017\/12\/jcg2-consumer2-final-300x165.jpg 300w\" sizes=\"(max-width: 751px) 100vw, 751px\" \/><\/a><figcaption>Output for second consumer (reader) showing messages (books) received for three subscribed topics (genres).<\/figcaption><\/figure>\n<\/div>\n<h2 class=\"wp-block-heading\" id=\"h-6-summary\">6. Summary<\/h2>\n<p>In this example, we have seen the publish-subscribe mechanism provided by Apache Kafka and the methods by which Spring Integration enables applications to connect with it. We have also touched upon different message channels available with Spring Integration and described their key features.<\/p>\n<h2 class=\"wp-block-heading\" id=\"h-7-useful-links\">7. Useful Links<\/h2>\n<p>The following resources will be very useful to get additional information and insights on concepts discussed in this article:<\/p>\n<ul class=\"wp-block-list\">\n<li><a href=\"https:\/\/engineering.linkedin.com\/distributed-systems\/log-what-every-software-engineer-should-know-about-real-time-datas-unifying\">https:\/\/engineering.linkedin.com\/distributed-systems\/log-what-every-software-engineer-should-know-about-real-time-datas-unifying<\/a><\/li>\n<li><a href=\"https:\/\/kafka.apache.org\/intro\">https:\/\/kafka.apache.org\/intro<\/a><\/li>\n<li><a href=\"http:\/\/projects.spring.io\/spring-kafka\/\">http:\/\/projects.spring.io\/spring-kafka\/<\/a><\/li>\n<li><a href=\"https:\/\/docs.spring.io\/spring-kafka\/reference\/html\/_spring_integration.html\">https:\/\/docs.spring.io\/spring-kafka\/reference\/html\/_spring_integration.html<\/a><\/li>\n<\/ul>\n<h2 class=\"wp-block-heading\" id=\"h-8-more-articles\">8. More articles<\/h2>\n<ul class=\"wp-block-list\">\n<li><a href=\"https:\/\/examples.javacodegeeks.com\/spring-framework-example\/\">Spring Framework Example<\/a><\/li>\n<li><a href=\"https:\/\/www.javacodegeeks.com\/java-spring-tutorial\">Java Spring Tutorial<\/a><\/li>\n<\/ul>\n<h2 class=\"wp-block-heading\" id=\"h-9-download-the-source-code\">9. Download the Source Code<\/h2>\n<div class=\"download\"><strong>Download<\/strong><br \/>You can download the full source code of this example here: <a href=\"http:\/\/examples.javacodegeeks.com\/wp-content\/uploads\/2017\/12\/spring-integration-kafka.zip\"><strong>Spring Integration Kafka Tutorial<\/strong><\/a><\/div>\n<p><strong>Last updated on May 18th, 2021<\/strong><\/p>\n","protected":false},"excerpt":{"rendered":"<p>In this tutorial, we will show the Spring Integration with Kafka through examples. 1. Introduction Apache Kafka started as an internal project at LinkedIn to solve the problem of scaling up the enterprise architecture from services talking to each other with strong typing contracts to an asynchronous message-based architecture. Both message persistence and high throughput &hellip;<\/p>\n","protected":false},"author":141,"featured_media":1248,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[1660],"tags":[],"class_list":["post-53019","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-integration"],"yoast_head":"<!-- This site is optimized with the Yoast SEO plugin v26.5 - https:\/\/yoast.com\/wordpress\/plugins\/seo\/ -->\n<title>Spring Integration Kafka Tutorial - Java Code Geeks<\/title>\n<meta name=\"description\" content=\"In this tutorial, we will show the Spring Integration with Kafka through examples. 1. Introduction Apache Kafka started as an internal project at LinkedIn\" \/>\n<meta name=\"robots\" content=\"index, follow, max-snippet:-1, max-image-preview:large, max-video-preview:-1\" \/>\n<link rel=\"canonical\" href=\"https:\/\/examples.javacodegeeks.com\/spring-integration-kafka-tutorial\/\" \/>\n<meta property=\"og:locale\" content=\"en_US\" \/>\n<meta property=\"og:type\" content=\"article\" \/>\n<meta property=\"og:title\" content=\"Spring Integration Kafka Tutorial - Java Code Geeks\" \/>\n<meta property=\"og:description\" content=\"In this tutorial, we will show the Spring Integration with Kafka through examples. 1. Introduction Apache Kafka started as an internal project at LinkedIn\" \/>\n<meta property=\"og:url\" content=\"https:\/\/examples.javacodegeeks.com\/spring-integration-kafka-tutorial\/\" \/>\n<meta property=\"og:site_name\" content=\"Examples Java Code Geeks\" \/>\n<meta property=\"article:publisher\" content=\"https:\/\/www.facebook.com\/javacodegeeks\" \/>\n<meta property=\"article:published_time\" content=\"2017-12-13T13:00:47+00:00\" \/>\n<meta property=\"article:modified_time\" content=\"2021-05-18T13:16:19+00:00\" \/>\n<meta property=\"og:image\" content=\"https:\/\/examples.javacodegeeks.com\/wp-content\/uploads\/2012\/12\/spring-logo.jpg\" \/>\n\t<meta property=\"og:image:width\" content=\"150\" \/>\n\t<meta property=\"og:image:height\" content=\"150\" \/>\n\t<meta property=\"og:image:type\" content=\"image\/jpeg\" \/>\n<meta name=\"author\" content=\"Mahboob Hussain\" \/>\n<meta name=\"twitter:card\" content=\"summary_large_image\" \/>\n<meta name=\"twitter:creator\" content=\"@javacodegeeks\" \/>\n<meta name=\"twitter:site\" content=\"@javacodegeeks\" \/>\n<meta name=\"twitter:label1\" content=\"Written by\" \/>\n\t<meta name=\"twitter:data1\" content=\"Mahboob Hussain\" \/>\n\t<meta name=\"twitter:label2\" content=\"Est. reading time\" \/>\n\t<meta name=\"twitter:data2\" content=\"10 minutes\" \/>\n<script type=\"application\/ld+json\" class=\"yoast-schema-graph\">{\"@context\":\"https:\/\/schema.org\",\"@graph\":[{\"@type\":\"Article\",\"@id\":\"https:\/\/examples.javacodegeeks.com\/spring-integration-kafka-tutorial\/#article\",\"isPartOf\":{\"@id\":\"https:\/\/examples.javacodegeeks.com\/spring-integration-kafka-tutorial\/\"},\"author\":{\"name\":\"Mahboob Hussain\",\"@id\":\"https:\/\/examples.javacodegeeks.com\/#\/schema\/person\/7fcaa707830cfea8c55d4ad2b81cbf55\"},\"headline\":\"Spring Integration Kafka Tutorial\",\"datePublished\":\"2017-12-13T13:00:47+00:00\",\"dateModified\":\"2021-05-18T13:16:19+00:00\",\"mainEntityOfPage\":{\"@id\":\"https:\/\/examples.javacodegeeks.com\/spring-integration-kafka-tutorial\/\"},\"wordCount\":1262,\"commentCount\":1,\"publisher\":{\"@id\":\"https:\/\/examples.javacodegeeks.com\/#organization\"},\"image\":{\"@id\":\"https:\/\/examples.javacodegeeks.com\/spring-integration-kafka-tutorial\/#primaryimage\"},\"thumbnailUrl\":\"https:\/\/examples.javacodegeeks.com\/wp-content\/uploads\/2012\/12\/spring-logo.jpg\",\"articleSection\":[\"Integration\"],\"inLanguage\":\"en-US\",\"potentialAction\":[{\"@type\":\"CommentAction\",\"name\":\"Comment\",\"target\":[\"https:\/\/examples.javacodegeeks.com\/spring-integration-kafka-tutorial\/#respond\"]}]},{\"@type\":\"WebPage\",\"@id\":\"https:\/\/examples.javacodegeeks.com\/spring-integration-kafka-tutorial\/\",\"url\":\"https:\/\/examples.javacodegeeks.com\/spring-integration-kafka-tutorial\/\",\"name\":\"Spring Integration Kafka Tutorial - Java Code Geeks\",\"isPartOf\":{\"@id\":\"https:\/\/examples.javacodegeeks.com\/#website\"},\"primaryImageOfPage\":{\"@id\":\"https:\/\/examples.javacodegeeks.com\/spring-integration-kafka-tutorial\/#primaryimage\"},\"image\":{\"@id\":\"https:\/\/examples.javacodegeeks.com\/spring-integration-kafka-tutorial\/#primaryimage\"},\"thumbnailUrl\":\"https:\/\/examples.javacodegeeks.com\/wp-content\/uploads\/2012\/12\/spring-logo.jpg\",\"datePublished\":\"2017-12-13T13:00:47+00:00\",\"dateModified\":\"2021-05-18T13:16:19+00:00\",\"description\":\"In this tutorial, we will show the Spring Integration with Kafka through examples. 1. Introduction Apache Kafka started as an internal project at LinkedIn\",\"breadcrumb\":{\"@id\":\"https:\/\/examples.javacodegeeks.com\/spring-integration-kafka-tutorial\/#breadcrumb\"},\"inLanguage\":\"en-US\",\"potentialAction\":[{\"@type\":\"ReadAction\",\"target\":[\"https:\/\/examples.javacodegeeks.com\/spring-integration-kafka-tutorial\/\"]}]},{\"@type\":\"ImageObject\",\"inLanguage\":\"en-US\",\"@id\":\"https:\/\/examples.javacodegeeks.com\/spring-integration-kafka-tutorial\/#primaryimage\",\"url\":\"https:\/\/examples.javacodegeeks.com\/wp-content\/uploads\/2012\/12\/spring-logo.jpg\",\"contentUrl\":\"https:\/\/examples.javacodegeeks.com\/wp-content\/uploads\/2012\/12\/spring-logo.jpg\",\"width\":150,\"height\":150},{\"@type\":\"BreadcrumbList\",\"@id\":\"https:\/\/examples.javacodegeeks.com\/spring-integration-kafka-tutorial\/#breadcrumb\",\"itemListElement\":[{\"@type\":\"ListItem\",\"position\":1,\"name\":\"Home\",\"item\":\"https:\/\/examples.javacodegeeks.com\/\"},{\"@type\":\"ListItem\",\"position\":2,\"name\":\"Java Development\",\"item\":\"https:\/\/examples.javacodegeeks.com\/category\/java-development\/\"},{\"@type\":\"ListItem\",\"position\":3,\"name\":\"Enterprise Java\",\"item\":\"https:\/\/examples.javacodegeeks.com\/category\/java-development\/enterprise-java\/\"},{\"@type\":\"ListItem\",\"position\":4,\"name\":\"spring\",\"item\":\"https:\/\/examples.javacodegeeks.com\/category\/java-development\/enterprise-java\/spring\/\"},{\"@type\":\"ListItem\",\"position\":5,\"name\":\"Integration\",\"item\":\"https:\/\/examples.javacodegeeks.com\/category\/java-development\/enterprise-java\/spring\/integration\/\"},{\"@type\":\"ListItem\",\"position\":6,\"name\":\"Spring Integration Kafka Tutorial\"}]},{\"@type\":\"WebSite\",\"@id\":\"https:\/\/examples.javacodegeeks.com\/#website\",\"url\":\"https:\/\/examples.javacodegeeks.com\/\",\"name\":\"Java Code Geeks\",\"description\":\"Java Examples and Code Snippets\",\"publisher\":{\"@id\":\"https:\/\/examples.javacodegeeks.com\/#organization\"},\"alternateName\":\"JCG\",\"potentialAction\":[{\"@type\":\"SearchAction\",\"target\":{\"@type\":\"EntryPoint\",\"urlTemplate\":\"https:\/\/examples.javacodegeeks.com\/?s={search_term_string}\"},\"query-input\":{\"@type\":\"PropertyValueSpecification\",\"valueRequired\":true,\"valueName\":\"search_term_string\"}}],\"inLanguage\":\"en-US\"},{\"@type\":\"Organization\",\"@id\":\"https:\/\/examples.javacodegeeks.com\/#organization\",\"name\":\"Exelixis Media P.C.\",\"url\":\"https:\/\/examples.javacodegeeks.com\/\",\"logo\":{\"@type\":\"ImageObject\",\"inLanguage\":\"en-US\",\"@id\":\"https:\/\/examples.javacodegeeks.com\/#\/schema\/logo\/image\/\",\"url\":\"https:\/\/examples.javacodegeeks.com\/wp-content\/uploads\/2022\/06\/exelixis-logo.png\",\"contentUrl\":\"https:\/\/examples.javacodegeeks.com\/wp-content\/uploads\/2022\/06\/exelixis-logo.png\",\"width\":864,\"height\":246,\"caption\":\"Exelixis Media P.C.\"},\"image\":{\"@id\":\"https:\/\/examples.javacodegeeks.com\/#\/schema\/logo\/image\/\"},\"sameAs\":[\"https:\/\/www.facebook.com\/javacodegeeks\",\"https:\/\/x.com\/javacodegeeks\"]},{\"@type\":\"Person\",\"@id\":\"https:\/\/examples.javacodegeeks.com\/#\/schema\/person\/7fcaa707830cfea8c55d4ad2b81cbf55\",\"name\":\"Mahboob Hussain\",\"image\":{\"@type\":\"ImageObject\",\"inLanguage\":\"en-US\",\"@id\":\"https:\/\/examples.javacodegeeks.com\/#\/schema\/person\/image\/\",\"url\":\"https:\/\/examples.javacodegeeks.com\/wp-content\/uploads\/2017\/11\/Mahboob-Hussain_avatar_1510827107-96x96.jpg\",\"contentUrl\":\"https:\/\/examples.javacodegeeks.com\/wp-content\/uploads\/2017\/11\/Mahboob-Hussain_avatar_1510827107-96x96.jpg\",\"caption\":\"Mahboob Hussain\"},\"description\":\"Mahboob Hussain graduated in Engineering from NIT Nagpur, India and has an MBA from Webster University, USA. He has executed roles in various aspects of software development and technical governance. He started with FORTRAN and has programmed in a variety of languages in his career, the mainstay of which has been Java. He is an associate editor in our team and has his personal homepage at http:\/\/bit.ly\/mahboob\",\"sameAs\":[\"http:\/\/bit.ly\/mahboob\"],\"url\":\"https:\/\/examples.javacodegeeks.com\/author\/mahboob-hussain\/\"}]}<\/script>\n<!-- \/ Yoast SEO plugin. -->","yoast_head_json":{"title":"Spring Integration Kafka Tutorial - Java Code Geeks","description":"In this tutorial, we will show the Spring Integration with Kafka through examples. 1. Introduction Apache Kafka started as an internal project at LinkedIn","robots":{"index":"index","follow":"follow","max-snippet":"max-snippet:-1","max-image-preview":"max-image-preview:large","max-video-preview":"max-video-preview:-1"},"canonical":"https:\/\/examples.javacodegeeks.com\/spring-integration-kafka-tutorial\/","og_locale":"en_US","og_type":"article","og_title":"Spring Integration Kafka Tutorial - Java Code Geeks","og_description":"In this tutorial, we will show the Spring Integration with Kafka through examples. 1. Introduction Apache Kafka started as an internal project at LinkedIn","og_url":"https:\/\/examples.javacodegeeks.com\/spring-integration-kafka-tutorial\/","og_site_name":"Examples Java Code Geeks","article_publisher":"https:\/\/www.facebook.com\/javacodegeeks","article_published_time":"2017-12-13T13:00:47+00:00","article_modified_time":"2021-05-18T13:16:19+00:00","og_image":[{"width":150,"height":150,"url":"https:\/\/examples.javacodegeeks.com\/wp-content\/uploads\/2012\/12\/spring-logo.jpg","type":"image\/jpeg"}],"author":"Mahboob Hussain","twitter_card":"summary_large_image","twitter_creator":"@javacodegeeks","twitter_site":"@javacodegeeks","twitter_misc":{"Written by":"Mahboob Hussain","Est. reading time":"10 minutes"},"schema":{"@context":"https:\/\/schema.org","@graph":[{"@type":"Article","@id":"https:\/\/examples.javacodegeeks.com\/spring-integration-kafka-tutorial\/#article","isPartOf":{"@id":"https:\/\/examples.javacodegeeks.com\/spring-integration-kafka-tutorial\/"},"author":{"name":"Mahboob Hussain","@id":"https:\/\/examples.javacodegeeks.com\/#\/schema\/person\/7fcaa707830cfea8c55d4ad2b81cbf55"},"headline":"Spring Integration Kafka Tutorial","datePublished":"2017-12-13T13:00:47+00:00","dateModified":"2021-05-18T13:16:19+00:00","mainEntityOfPage":{"@id":"https:\/\/examples.javacodegeeks.com\/spring-integration-kafka-tutorial\/"},"wordCount":1262,"commentCount":1,"publisher":{"@id":"https:\/\/examples.javacodegeeks.com\/#organization"},"image":{"@id":"https:\/\/examples.javacodegeeks.com\/spring-integration-kafka-tutorial\/#primaryimage"},"thumbnailUrl":"https:\/\/examples.javacodegeeks.com\/wp-content\/uploads\/2012\/12\/spring-logo.jpg","articleSection":["Integration"],"inLanguage":"en-US","potentialAction":[{"@type":"CommentAction","name":"Comment","target":["https:\/\/examples.javacodegeeks.com\/spring-integration-kafka-tutorial\/#respond"]}]},{"@type":"WebPage","@id":"https:\/\/examples.javacodegeeks.com\/spring-integration-kafka-tutorial\/","url":"https:\/\/examples.javacodegeeks.com\/spring-integration-kafka-tutorial\/","name":"Spring Integration Kafka Tutorial - Java Code Geeks","isPartOf":{"@id":"https:\/\/examples.javacodegeeks.com\/#website"},"primaryImageOfPage":{"@id":"https:\/\/examples.javacodegeeks.com\/spring-integration-kafka-tutorial\/#primaryimage"},"image":{"@id":"https:\/\/examples.javacodegeeks.com\/spring-integration-kafka-tutorial\/#primaryimage"},"thumbnailUrl":"https:\/\/examples.javacodegeeks.com\/wp-content\/uploads\/2012\/12\/spring-logo.jpg","datePublished":"2017-12-13T13:00:47+00:00","dateModified":"2021-05-18T13:16:19+00:00","description":"In this tutorial, we will show the Spring Integration with Kafka through examples. 1. Introduction Apache Kafka started as an internal project at LinkedIn","breadcrumb":{"@id":"https:\/\/examples.javacodegeeks.com\/spring-integration-kafka-tutorial\/#breadcrumb"},"inLanguage":"en-US","potentialAction":[{"@type":"ReadAction","target":["https:\/\/examples.javacodegeeks.com\/spring-integration-kafka-tutorial\/"]}]},{"@type":"ImageObject","inLanguage":"en-US","@id":"https:\/\/examples.javacodegeeks.com\/spring-integration-kafka-tutorial\/#primaryimage","url":"https:\/\/examples.javacodegeeks.com\/wp-content\/uploads\/2012\/12\/spring-logo.jpg","contentUrl":"https:\/\/examples.javacodegeeks.com\/wp-content\/uploads\/2012\/12\/spring-logo.jpg","width":150,"height":150},{"@type":"BreadcrumbList","@id":"https:\/\/examples.javacodegeeks.com\/spring-integration-kafka-tutorial\/#breadcrumb","itemListElement":[{"@type":"ListItem","position":1,"name":"Home","item":"https:\/\/examples.javacodegeeks.com\/"},{"@type":"ListItem","position":2,"name":"Java Development","item":"https:\/\/examples.javacodegeeks.com\/category\/java-development\/"},{"@type":"ListItem","position":3,"name":"Enterprise Java","item":"https:\/\/examples.javacodegeeks.com\/category\/java-development\/enterprise-java\/"},{"@type":"ListItem","position":4,"name":"spring","item":"https:\/\/examples.javacodegeeks.com\/category\/java-development\/enterprise-java\/spring\/"},{"@type":"ListItem","position":5,"name":"Integration","item":"https:\/\/examples.javacodegeeks.com\/category\/java-development\/enterprise-java\/spring\/integration\/"},{"@type":"ListItem","position":6,"name":"Spring Integration Kafka Tutorial"}]},{"@type":"WebSite","@id":"https:\/\/examples.javacodegeeks.com\/#website","url":"https:\/\/examples.javacodegeeks.com\/","name":"Java Code Geeks","description":"Java Examples and Code Snippets","publisher":{"@id":"https:\/\/examples.javacodegeeks.com\/#organization"},"alternateName":"JCG","potentialAction":[{"@type":"SearchAction","target":{"@type":"EntryPoint","urlTemplate":"https:\/\/examples.javacodegeeks.com\/?s={search_term_string}"},"query-input":{"@type":"PropertyValueSpecification","valueRequired":true,"valueName":"search_term_string"}}],"inLanguage":"en-US"},{"@type":"Organization","@id":"https:\/\/examples.javacodegeeks.com\/#organization","name":"Exelixis Media P.C.","url":"https:\/\/examples.javacodegeeks.com\/","logo":{"@type":"ImageObject","inLanguage":"en-US","@id":"https:\/\/examples.javacodegeeks.com\/#\/schema\/logo\/image\/","url":"https:\/\/examples.javacodegeeks.com\/wp-content\/uploads\/2022\/06\/exelixis-logo.png","contentUrl":"https:\/\/examples.javacodegeeks.com\/wp-content\/uploads\/2022\/06\/exelixis-logo.png","width":864,"height":246,"caption":"Exelixis Media P.C."},"image":{"@id":"https:\/\/examples.javacodegeeks.com\/#\/schema\/logo\/image\/"},"sameAs":["https:\/\/www.facebook.com\/javacodegeeks","https:\/\/x.com\/javacodegeeks"]},{"@type":"Person","@id":"https:\/\/examples.javacodegeeks.com\/#\/schema\/person\/7fcaa707830cfea8c55d4ad2b81cbf55","name":"Mahboob Hussain","image":{"@type":"ImageObject","inLanguage":"en-US","@id":"https:\/\/examples.javacodegeeks.com\/#\/schema\/person\/image\/","url":"https:\/\/examples.javacodegeeks.com\/wp-content\/uploads\/2017\/11\/Mahboob-Hussain_avatar_1510827107-96x96.jpg","contentUrl":"https:\/\/examples.javacodegeeks.com\/wp-content\/uploads\/2017\/11\/Mahboob-Hussain_avatar_1510827107-96x96.jpg","caption":"Mahboob Hussain"},"description":"Mahboob Hussain graduated in Engineering from NIT Nagpur, India and has an MBA from Webster University, USA. He has executed roles in various aspects of software development and technical governance. He started with FORTRAN and has programmed in a variety of languages in his career, the mainstay of which has been Java. He is an associate editor in our team and has his personal homepage at http:\/\/bit.ly\/mahboob","sameAs":["http:\/\/bit.ly\/mahboob"],"url":"https:\/\/examples.javacodegeeks.com\/author\/mahboob-hussain\/"}]}},"_links":{"self":[{"href":"https:\/\/examples.javacodegeeks.com\/wp-json\/wp\/v2\/posts\/53019","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/examples.javacodegeeks.com\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/examples.javacodegeeks.com\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/examples.javacodegeeks.com\/wp-json\/wp\/v2\/users\/141"}],"replies":[{"embeddable":true,"href":"https:\/\/examples.javacodegeeks.com\/wp-json\/wp\/v2\/comments?post=53019"}],"version-history":[{"count":0,"href":"https:\/\/examples.javacodegeeks.com\/wp-json\/wp\/v2\/posts\/53019\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/examples.javacodegeeks.com\/wp-json\/wp\/v2\/media\/1248"}],"wp:attachment":[{"href":"https:\/\/examples.javacodegeeks.com\/wp-json\/wp\/v2\/media?parent=53019"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/examples.javacodegeeks.com\/wp-json\/wp\/v2\/categories?post=53019"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/examples.javacodegeeks.com\/wp-json\/wp\/v2\/tags?post=53019"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}