{"id":32802,"date":"2021-02-01T14:58:44","date_gmt":"2021-02-01T14:58:44","guid":{"rendered":"https:\/\/ittutorial.org\/?p=32802"},"modified":"2021-02-01T14:58:44","modified_gmt":"2021-02-01T14:58:44","slug":"kafka-stream-api-json-parse","status":"publish","type":"post","link":"https:\/\/ittutorial.org\/kafka-stream-api-json-parse\/","title":{"rendered":"Kafka Stream API Json\u00a0Parse"},"content":{"rendered":"<section class=\"section section--body\">\n<div class=\"section-divider\">\n<hr class=\"section-divider\" \/>\n<\/div>\n<div class=\"section-content\">\n<div class=\"section-inner sectionLayout--insetColumn\">\n<h3 class=\"graf graf--h3\">Kafka Stream API Json\u00a0Parse<\/h3>\n<p class=\"graf graf--p\">Hello, in this article, I will talk about how to process data incoming to Kafka queue with <strong class=\"markup--strong markup--p-strong\">Kafka stream api<\/strong>.<\/p>\n<figure class=\"graf graf--figure\"><img decoding=\"async\" class=\"graf-image\" src=\"https:\/\/cdn-images-1.medium.com\/max\/1000\/0*P-Ymr2JMKVfe-xJP.png\" data-image-id=\"0*P-Ymr2JMKVfe-xJP.png\" data-width=\"1856\" data-height=\"1228\" data-is-featured=\"true\" \/><\/figure>\n<p class=\"graf graf--p\">We can send data from various sources to the Kafka queue,The data waiting in the queue can be in formats such as json, avro, etc. Or only a single string or integer values \u200b\u200bcan come.<\/p>\n<p class=\"graf graf--p\">If the incoming data contains one piece of String or Integer values, it is easier for us to process the data.<\/p>\n<p class=\"graf graf--p\">I will run this example on my Windows machine using Java. However, after preparing the code and outputting it as jar, you can do it by editing the necessary ports on the hortonworks sandbox.<\/p>\n<p class=\"graf graf--p\">To do the example, you must have Apache Kafka installed on your Windows (or Linux) machine.<\/p>\n<p class=\"graf graf--p\">I use the Intellij IDE as a code development environment.<br \/>\nLet\u2019s create a new maven project.<\/p>\n<figure class=\"graf graf--figure\"><img decoding=\"async\" class=\"graf-image\" src=\"https:\/\/cdn-images-1.medium.com\/max\/1000\/1*tlgZVwYepMpVFr5nc1AT-w.png\" data-image-id=\"1*tlgZVwYepMpVFr5nc1AT-w.png\" data-width=\"1079\" data-height=\"891\" \/><\/figure>\n<p class=\"graf graf--p\">After the project is created, a screen like the one below will meet us.<\/p>\n<figure class=\"graf graf--figure\"><img decoding=\"async\" class=\"graf-image\" src=\"https:\/\/cdn-images-1.medium.com\/max\/1000\/1*0HgTrQoM2PMkqYn-Fr5f8Q.png\" data-image-id=\"1*0HgTrQoM2PMkqYn-Fr5f8Q.png\" data-width=\"1842\" data-height=\"739\" \/><\/figure>\n<p class=\"graf graf--p\">Let\u2019s go to the maven repositories for our project and add the necessary dependencies.<\/p>\n<figure class=\"graf graf--figure\"><img decoding=\"async\" class=\"graf-image\" src=\"https:\/\/cdn-images-1.medium.com\/max\/1000\/1*RegS4IKmZdChi-1WOWPp9Q.png\" data-image-id=\"1*RegS4IKmZdChi-1WOWPp9Q.png\" data-width=\"1835\" data-height=\"875\" \/><\/figure>\n<figure class=\"graf graf--figure\"><img decoding=\"async\" class=\"graf-image\" src=\"https:\/\/cdn-images-1.medium.com\/max\/1000\/1*FIU_eWiAoGqb2SdYaBHpdw.png\" data-image-id=\"1*FIU_eWiAoGqb2SdYaBHpdw.png\" data-width=\"1779\" data-height=\"820\" \/><\/figure>\n<p class=\"graf graf--p\">Copy required space and paste it in <strong class=\"markup--strong markup--p-strong\">pom.xml<\/strong> file<\/p>\n<figure class=\"graf graf--figure\"><img decoding=\"async\" class=\"graf-image\" src=\"https:\/\/cdn-images-1.medium.com\/max\/1000\/1*54AfFVHejWySNdqThZgUaw.png\" data-image-id=\"1*54AfFVHejWySNdqThZgUaw.png\" data-width=\"1322\" data-height=\"693\" \/><\/figure>\n<p class=\"graf graf--p\">After adding Dependencies, refresh the pom.xml file<\/p>\n<p class=\"graf graf--p\">Now create a new Java class. Here we will do our work for Producer.<\/p>\n<p class=\"graf graf--p\">After creating the Main function, the first thing we need to do is to define Properties to communicate with Kafka.<\/p>\n<pre class=\"graf graf--pre\">Properties properties = new Properties();\r\n\r\n<em class=\"markup--em markup--pre-em\">\r\n<\/em>properties.setProperty(ProducerConfig.<em class=\"markup--em markup--pre-em\">BOOTSTRAP_SERVERS_CONFIG<\/em>, \"127.0.0.1:9092\");\r\nproperties.setProperty(ProducerConfig.<em class=\"markup--em markup--pre-em\">KEY_SERIALIZER_CLASS_CONFIG<\/em>, StringSerializer.class.getName());\r\nproperties.setProperty(ProducerConfig.<em class=\"markup--em markup--pre-em\">VALUE_SERIALIZER_CLASS_CONFIG<\/em>, StringSerializer.class.getName());<\/pre>\n<pre class=\"graf graf--pre\"><em class=\"markup--em markup--pre-em\">\r\n<\/em>properties.setProperty(ProducerConfig.<em class=\"markup--em markup--pre-em\">ACKS_CONFIG<\/em>, \"all\"); <em class=\"markup--em markup--pre-em\">\/\/ strongest producing guarantee\r\n<\/em>properties.setProperty(ProducerConfig.<em class=\"markup--em markup--pre-em\">RETRIES_CONFIG<\/em>, \"3\");\r\nproperties.setProperty(ProducerConfig.<em class=\"markup--em markup--pre-em\">LINGER_MS_CONFIG<\/em>, \"1\");\r\n<em class=\"markup--em markup--pre-em\">\r\n<\/em>properties.setProperty(ProducerConfig.<em class=\"markup--em markup--pre-em\">ENABLE_IDEMPOTENCE_CONFIG<\/em>, \"true\"); <em class=\"markup--em markup--pre-em\">\/\/ ensure we don't push duplicates<\/em><\/pre>\n<p class=\"graf graf--p\">Let\u2019s write a small function to generate a random data, the type of this function should be <strong class=\"markup--strong markup--p-strong\">ProducerRecord&lt;&gt;<\/strong><\/p>\n<pre class=\"graf graf--pre\">public static ProducerRecord&lt;String, String&gt; newRandomTransaction(String name) {\r\n\r\n    ObjectNode transaction = JsonNodeFactory.<em class=\"markup--em markup--pre-em\">instance<\/em>.objectNode();\r\n\r\n\r\n    transaction.put(\"name\", \"Jack\");\r\n    transaction.put(\"amount\", 100);\r\n    return new ProducerRecord&lt;&gt;(\"test2\", name, transaction.toString());\r\n}<\/pre>\n<p class=\"graf graf--p\">The data we want to send will be in JSON format, so we must create a JsonNodeFactory object.<\/p>\n<p class=\"graf graf--p\">Then we put what we want to send into it with the <strong class=\"markup--strong markup--p-strong\">put ()<\/strong> method.<\/p>\n<p class=\"graf graf--p\">Now all we have to do is create an object from the function we have created and give it to Producer.<\/p>\n<pre class=\"graf graf--pre\">Producer&lt;String, String&gt; producer = new KafkaProducer&lt;&gt;(properties);\r\n\r\n\r\nproducer.send(<em class=\"markup--em markup--pre-em\">newRandomTransaction<\/em>(\"Denis\"));\r\nproducer.close();<\/pre>\n<p class=\"graf graf--p\">Very good, now a JSON with <strong class=\"markup--strong markup--p-strong\">{\u201cname\u201d: \u201cJack\u201d, \u201camount\u201d: 100}<\/strong> will go to Kafka Queue<\/p>\n<p class=\"graf graf--p\">Let\u2019s read the data written to the Queue as a stream and move on to the processing step. Create a new class<\/p>\n<p class=\"graf graf--p\">Let\u2019s define the properties required to read from the Kafka Queue.<\/p>\n<pre class=\"graf graf--pre\">Properties props = new Properties();\r\nprops.put(StreamsConfig.<em class=\"markup--em markup--pre-em\">APPLICATION_ID_CONFIG<\/em>, \"test\");\r\nprops.put(StreamsConfig.<em class=\"markup--em markup--pre-em\">BOOTSTRAP_SERVERS_CONFIG<\/em>, \"localhost:9092\");\r\nprops.put(StreamsConfig.<em class=\"markup--em markup--pre-em\">DEFAULT_KEY_SERDE_CLASS_CONFIG<\/em>, Serdes.<em class=\"markup--em markup--pre-em\">String<\/em>().getClass());\r\nprops.put(StreamsConfig.<em class=\"markup--em markup--pre-em\">DEFAULT_VALUE_SERDE_CLASS_CONFIG<\/em>, Serdes.<em class=\"markup--em markup--pre-em\">String<\/em>().getClass());\r\n<em class=\"markup--em markup--pre-em\">\r\n<\/em>props.put(ConsumerConfig.<em class=\"markup--em markup--pre-em\">AUTO_OFFSET_RESET_CONFIG<\/em>, \"earliest\");\r\nprops.put(StreamsConfig.<em class=\"markup--em markup--pre-em\">CACHE_MAX_BYTES_BUFFERING_CONFIG<\/em>, \"0\");<\/pre>\n<p class=\"graf graf--p\">Since the incoming data will be in JSON format, we will need <strong class=\"markup--strong markup--p-strong\">Serializer <\/strong>and <strong class=\"markup--strong markup--p-strong\">Deserializer <\/strong>to parse it.<\/p>\n<pre class=\"graf graf--pre\">final Serializer&lt;JsonNode&gt; jsonNodeSerializer = new JsonSerializer();\r\nfinal Deserializer&lt;JsonNode&gt; jsonNodeDeserializer = new JsonDeserializer();\r\nfinal Serde&lt;JsonNode&gt; jsonNodeSerde = Serdes.<em class=\"markup--em markup--pre-em\">serdeFrom<\/em>(jsonNodeSerializer,jsonNodeDeserializer);<\/pre>\n<pre class=\"graf graf--pre\">StreamsBuilder builder = new StreamsBuilder();\r\nKStream&lt;String,JsonNode &gt; textLines = builder.stream(\"test2\", Consumed.<em class=\"markup--em markup--pre-em\">with<\/em>(Serdes.<em class=\"markup--em markup--pre-em\">String<\/em>(),jsonNodeSerde));<\/pre>\n<p class=\"graf graf--p\">Going to read \u201ctest2\u201d topic. You can organize this place according to your own topic.<\/p>\n<p class=\"graf graf--p\">After processing the data read in this section, it determines the JSON format of the new output.<\/p>\n<p class=\"graf graf--p\">In this example, I group the people in the incoming data and sum up their account balances.<\/p>\n<pre class=\"graf graf--pre\">ObjectNode initialbalance = JsonNodeFactory.<em class=\"markup--em markup--pre-em\">instance<\/em>.objectNode();\r\ninitialbalance.put(\"count\",0);\r\ninitialbalance.put(\"balance\",0);<\/pre>\n<p class=\"graf graf--p\">I am aggregating each unique person\u2019s own balance with their previous balance.<\/p>\n<pre class=\"graf graf--pre\">private static JsonNode newBalance(JsonNode transaction, JsonNode balance){\r\n    ObjectNode newBalance = JsonNodeFactory.<em class=\"markup--em markup--pre-em\">instance<\/em>.objectNode();\r\n    newBalance.put(\"count\",balance.get(\"count\").asInt()+1);\r\n    newBalance.put(\"balance\",balance.get(\"balance\").asInt()+transaction.get(\"amount\").asInt());\r\n\r\n    return newBalance;\r\n\r\n}<\/pre>\n<p class=\"graf graf--p\">We come to the most important part of the code piece.<\/p>\n<ul class=\"postList\">\n<li class=\"graf graf--li\">Data in the incoming JSON format is first subjected to a groupby function.<\/li>\n<li class=\"graf graf--li\">Next, the JSON object we created above is pointed out for its new output.<\/li>\n<li class=\"graf graf--li\">Then, the data coming with the JSON data is sent as a parameter to the function we have just created and the sumed balance is returned.<\/li>\n<li class=\"graf graf--li\">As a result of these operations, the result is written to a new topic with toStream.<\/li>\n<li class=\"graf graf--li\">Finally, the stream is started with KafkaStream.<\/li>\n<\/ul>\n<\/div>\n<\/div>\n<\/section>\n<section class=\"section section--body\">\n<div class=\"section-divider\">\n<hr class=\"section-divider\" \/>\n<\/div>\n<div class=\"section-content\">\n<div class=\"section-inner sectionLayout--insetColumn\">\n<pre class=\"graf graf--pre\">KTable&lt;String, JsonNode&gt; bankBanalnce = textLines\r\n        .groupByKey(Serialized.<em class=\"markup--em markup--pre-em\">with<\/em>(Serdes.<em class=\"markup--em markup--pre-em\">String<\/em>(), jsonNodeSerde))\r\n        .aggregate(\r\n                () -&gt; initialbalance,\r\n                (key,transaction,balance) -&gt; <em class=\"markup--em markup--pre-em\">newBalance<\/em>(transaction, balance),\r\n\r\n                Materialized.&lt;String, JsonNode, KeyValueStore&lt;Bytes, byte[]&gt;&gt;<em class=\"markup--em markup--pre-em\">as<\/em>(\"bank-balance-agg\")\r\n                        .withKeySerde(Serdes.<em class=\"markup--em markup--pre-em\">String<\/em>())\r\n                        .withValueSerde(jsonNodeSerde)\r\n        );\r\nbankBanalnce.toStream().to(\"out\",Produced.<em class=\"markup--em markup--pre-em\">with<\/em>(Serdes.<em class=\"markup--em markup--pre-em\">String<\/em>(),jsonNodeSerde));\r\n\r\n\r\n\r\n\r\nKafkaStreams streams = new KafkaStreams(builder.build(), props);\r\nstreams.cleanUp();\r\nstreams.start();<\/pre>\n<p class=\"graf graf--p\">The full version of the Producer Class,<\/p>\n<pre class=\"graf graf--pre\">import com.fasterxml.jackson.databind.node.JsonNodeFactory;\r\nimport com.fasterxml.jackson.databind.node.ObjectNode;\r\nimport org.apache.kafka.clients.producer.KafkaProducer;\r\nimport org.apache.kafka.clients.producer.Producer;\r\nimport org.apache.kafka.clients.producer.ProducerConfig;\r\nimport org.apache.kafka.clients.producer.ProducerRecord;\r\nimport org.apache.kafka.common.serialization.StringSerializer;\r\n\r\nimport java.time.Instant;\r\nimport java.util.Properties;\r\nimport java.util.concurrent.ThreadLocalRandom;\r\n\r\npublic class ProducerClass{\r\n\r\n    public static void main(String[] args) {\r\n        Properties properties = new Properties();\r\n\r\n        <em class=\"markup--em markup--pre-em\">\/\/ kafka bootstrap server\r\n        <\/em>properties.setProperty(ProducerConfig.<em class=\"markup--em markup--pre-em\">BOOTSTRAP_SERVERS_CONFIG<\/em>, \"127.0.0.1:9092\");\r\n        properties.setProperty(ProducerConfig.<em class=\"markup--em markup--pre-em\">KEY_SERIALIZER_CLASS_CONFIG<\/em>, StringSerializer.class.getName());\r\n        properties.setProperty(ProducerConfig.<em class=\"markup--em markup--pre-em\">VALUE_SERIALIZER_CLASS_CONFIG<\/em>, StringSerializer.class.getName());\r\n        <em class=\"markup--em markup--pre-em\">\/\/ producer acks\r\n        <\/em>properties.setProperty(ProducerConfig.<em class=\"markup--em markup--pre-em\">ACKS_CONFIG<\/em>, \"all\"); <em class=\"markup--em markup--pre-em\">\/\/ strongest producing guarantee\r\n        <\/em>properties.setProperty(ProducerConfig.<em class=\"markup--em markup--pre-em\">RETRIES_CONFIG<\/em>, \"3\");\r\n        properties.setProperty(ProducerConfig.<em class=\"markup--em markup--pre-em\">LINGER_MS_CONFIG<\/em>, \"1\");\r\n        <em class=\"markup--em markup--pre-em\">\/\/ leverage idempotent producer from Kafka 0.11 !\r\n        <\/em>properties.setProperty(ProducerConfig.<em class=\"markup--em markup--pre-em\">ENABLE_IDEMPOTENCE_CONFIG<\/em>, \"true\"); <em class=\"markup--em markup--pre-em\">\/\/ ensure we don't push duplicates\r\n\r\n        <\/em>Producer&lt;String, String&gt; producer = new KafkaProducer&lt;&gt;(properties);\r\n\r\n\r\n        producer.send(<em class=\"markup--em markup--pre-em\">newRandomTransaction<\/em>(\"Jack\"));\r\n        producer.close();\r\n    }\r\n    public static ProducerRecord&lt;String, String&gt; newRandomTransaction(String name) {\r\n\r\n        ObjectNode transaction = JsonNodeFactory.<em class=\"markup--em markup--pre-em\">instance<\/em>.objectNode();\r\n\r\n\r\n        transaction.put(\"name\", \"Jack\");\r\n        transaction.put(\"amount\", 100);\r\n        return new ProducerRecord&lt;&gt;(\"test5\", name, transaction.toString());\r\n    }\r\n}<\/pre>\n<p class=\"graf graf--p\">The full version of Stream Class,<\/p>\n<pre class=\"graf graf--pre\">import com.fasterxml.jackson.databind.JsonNode;\r\nimport com.fasterxml.jackson.databind.node.JsonNodeFactory;\r\nimport com.fasterxml.jackson.databind.node.ObjectNode;\r\n\r\nimport org.apache.kafka.clients.consumer.ConsumerConfig;\r\nimport org.apache.kafka.common.serialization.Deserializer;\r\nimport org.apache.kafka.common.serialization.Serde;\r\nimport org.apache.kafka.common.serialization.Serdes;\r\nimport org.apache.kafka.common.serialization.Serializer;\r\nimport org.apache.kafka.common.utils.Bytes;\r\nimport org.apache.kafka.connect.json.JsonDeserializer;\r\nimport org.apache.kafka.connect.json.JsonSerializer;\r\nimport org.apache.kafka.streams.KafkaStreams;\r\nimport org.apache.kafka.streams.StreamsBuilder;\r\nimport org.apache.kafka.streams.StreamsConfig;\r\nimport org.apache.kafka.streams.kstream.*;\r\nimport org.apache.kafka.streams.state.KeyValueStore;\r\nimport java.util.Properties;\r\n\r\n\r\npublic class KafkaStream {\r\n\r\n    public static void main(final String[] args) throws Exception {\r\n        Properties props = new Properties();\r\n        props.put(StreamsConfig.<em class=\"markup--em markup--pre-em\">APPLICATION_ID_CONFIG<\/em>, \"test2\");\r\n        props.put(StreamsConfig.<em class=\"markup--em markup--pre-em\">BOOTSTRAP_SERVERS_CONFIG<\/em>, \"localhost:9092\");\r\n        props.put(StreamsConfig.<em class=\"markup--em markup--pre-em\">DEFAULT_KEY_SERDE_CLASS_CONFIG<\/em>, Serdes.<em class=\"markup--em markup--pre-em\">String<\/em>().getClass());\r\n        props.put(StreamsConfig.<em class=\"markup--em markup--pre-em\">DEFAULT_VALUE_SERDE_CLASS_CONFIG<\/em>, Serdes.<em class=\"markup--em markup--pre-em\">String<\/em>().getClass());\r\n        <em class=\"markup--em markup--pre-em\">\/\/props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,StreamsConfig.EXACTLY_ONCE);\r\n        <\/em>props.put(ConsumerConfig.<em class=\"markup--em markup--pre-em\">AUTO_OFFSET_RESET_CONFIG<\/em>, \"earliest\");\r\n        props.put(StreamsConfig.<em class=\"markup--em markup--pre-em\">CACHE_MAX_BYTES_BUFFERING_CONFIG<\/em>, \"0\");\r\n\r\n\r\n        final Serializer&lt;JsonNode&gt; jsonNodeSerializer = new JsonSerializer();\r\n        final Deserializer&lt;JsonNode&gt; jsonNodeDeserializer = new JsonDeserializer();\r\n        final Serde&lt;JsonNode&gt; jsonNodeSerde = Serdes.<em class=\"markup--em markup--pre-em\">serdeFrom<\/em>(jsonNodeSerializer,jsonNodeDeserializer);\r\n\r\n\r\n        StreamsBuilder builder = new StreamsBuilder();\r\n        KStream&lt;String,JsonNode &gt; textLines = builder.stream(\"test5\", Consumed.<em class=\"markup--em markup--pre-em\">with<\/em>(Serdes.<em class=\"markup--em markup--pre-em\">String<\/em>(),jsonNodeSerde));\r\n\r\n        ObjectNode initialbalance = JsonNodeFactory.<em class=\"markup--em markup--pre-em\">instance<\/em>.objectNode();\r\n        initialbalance.put(\"count\",0);\r\n        initialbalance.put(\"balance\",0);\r\n\r\n\r\n        KTable&lt;String, JsonNode&gt; bankBanalnce = textLines\r\n                .groupByKey(Serialized.<em class=\"markup--em markup--pre-em\">with<\/em>(Serdes.<em class=\"markup--em markup--pre-em\">String<\/em>(), jsonNodeSerde))\r\n                .aggregate(\r\n                        () -&gt; initialbalance,\r\n                        (key,transaction,balance) -&gt; <em class=\"markup--em markup--pre-em\">newBalance<\/em>(transaction, balance),\r\n\r\n                        Materialized.&lt;String, JsonNode, KeyValueStore&lt;Bytes, byte[]&gt;&gt;<em class=\"markup--em markup--pre-em\">as<\/em>(\"bank-balance-agg\")\r\n                                .withKeySerde(Serdes.<em class=\"markup--em markup--pre-em\">String<\/em>())\r\n                                .withValueSerde(jsonNodeSerde)\r\n                );\r\n        bankBanalnce.toStream().to(\"out4\",Produced.<em class=\"markup--em markup--pre-em\">with<\/em>(Serdes.<em class=\"markup--em markup--pre-em\">String<\/em>(),jsonNodeSerde));\r\n\r\n        bankBanalnce.toStream().print(Printed.<em class=\"markup--em markup--pre-em\">toSysOut<\/em>());\r\n\r\n\r\n        KafkaStreams streams = new KafkaStreams(builder.build(), props);\r\n        streams.cleanUp();\r\n        streams.start();\r\n    }\r\n    private static JsonNode newBalance(JsonNode transaction, JsonNode balance){\r\n        ObjectNode newBalance = JsonNodeFactory.<em class=\"markup--em markup--pre-em\">instance<\/em>.objectNode();\r\n        newBalance.put(\"count\",balance.get(\"count\").asInt()+1);\r\n        newBalance.put(\"balance\",balance.get(\"balance\").asInt()+transaction.get(\"amount\").asInt());\r\n\r\n        return newBalance;\r\n\r\n    }\r\n\r\n}<\/pre>\n<p class=\"graf graf--p\">Let\u2019s test it now<\/p>\n<p class=\"graf graf--p\">1-) First run the kafka consumer from cmd.<\/p>\n<p class=\"graf graf--p\">kafka-console-consumer.bat\u200a\u2014\u200abootstrap-server localhost:9092\u200a\u2014\u200atopic out2\u200a\u2014\u200afrom-beginning\u200a\u2014\u200aformatter kafka.tools.DefaultMessageFormatter\u200a\u2014\u200aproperty print.key=true\u200a\u2014\u200aproperty print.value=true\u200a\u2014\u200aproperty key.deserializer=org.apache.kafka.common.serialization.StringDeserializer\u200a\u2014\u200aproperty value.deserializer=org.apache.kafka.common.serialization.StringDeserializer<\/p>\n<p class=\"graf graf--p\">2-) Run the stream class, then run the Producer class 3 times and watch the consumer.<\/p>\n<figure class=\"graf graf--figure\"><img decoding=\"async\" class=\"graf-image\" src=\"https:\/\/cdn-images-1.medium.com\/max\/1000\/1*HkIY0irGeX2u6LZUgDASYA.png\" data-image-id=\"1*HkIY0irGeX2u6LZUgDASYA.png\" data-width=\"1501\" data-height=\"751\" \/><\/figure>\n<p class=\"graf graf--p\">Everything seems successful.<\/p>\n<p class=\"graf graf--p\">I hope it was useful.<\/p>\n<p class=\"graf graf--p\">Thank you.<\/p>\n<\/div>\n<\/div>\n<\/section>\n","protected":false},"excerpt":{"rendered":"<p>Kafka Stream API Json\u00a0Parse Hello, in this article, I will talk about how to process data incoming to Kafka queue with Kafka stream api. We can send data from various sources to the Kafka queue,The data waiting in the queue can be in formats such as json, avro, etc. Or only a single string or &hellip;<\/p>\n","protected":false},"author":67,"featured_media":32803,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"om_disable_all_campaigns":false,"_monsterinsights_skip_tracking":false,"_monsterinsights_sitenote_active":false,"_monsterinsights_sitenote_note":"","_monsterinsights_sitenote_category":0,"_uf_show_specific_survey":0,"_uf_disable_surveys":false,"_jetpack_memberships_contains_paid_content":false,"footnotes":""},"categories":[6674],"tags":[16412,6992,16420,16418,16413,16419,16416,16417,16414,16415,16410,16411],"class_list":["post-32802","post","type-post","status-publish","format-standard","has-post-thumbnail","","category-big-data","tag-apace-kafka-streaming","tag-apache-kafka","tag-apache-kafka-stream-json-parse","tag-kafka-ktable-groupby-json-data","tag-kafka-ktable-json","tag-kafka-parse-json-data-into-ktable","tag-kafka-read-json-data","tag-kafka-read-json-topic","tag-kafka-stream-api-json-parse","tag-kafka-stream-json","tag-kafka-streaming","tag-kafka-streaming-api"],"aioseo_notices":[],"jetpack_featured_media_url":"https:\/\/ittutorial.org\/wp-content\/uploads\/2021\/02\/0_P-Ymr2JMKVfe-xJP.png","jetpack_sharing_enabled":true,"amp_enabled":true,"_links":{"self":[{"href":"https:\/\/ittutorial.org\/wp-json\/wp\/v2\/posts\/32802","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/ittutorial.org\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/ittutorial.org\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/ittutorial.org\/wp-json\/wp\/v2\/users\/67"}],"replies":[{"embeddable":true,"href":"https:\/\/ittutorial.org\/wp-json\/wp\/v2\/comments?post=32802"}],"version-history":[{"count":1,"href":"https:\/\/ittutorial.org\/wp-json\/wp\/v2\/posts\/32802\/revisions"}],"predecessor-version":[{"id":32804,"href":"https:\/\/ittutorial.org\/wp-json\/wp\/v2\/posts\/32802\/revisions\/32804"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/ittutorial.org\/wp-json\/wp\/v2\/media\/32803"}],"wp:attachment":[{"href":"https:\/\/ittutorial.org\/wp-json\/wp\/v2\/media?parent=32802"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/ittutorial.org\/wp-json\/wp\/v2\/categories?post=32802"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/ittutorial.org\/wp-json\/wp\/v2\/tags?post=32802"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}