{"id":101003,"date":"2019-12-31T10:00:47","date_gmt":"2019-12-31T08:00:47","guid":{"rendered":"https:\/\/www.javacodegeeks.com\/?p=101003"},"modified":"2019-12-16T12:07:36","modified_gmt":"2019-12-16T10:07:36","slug":"popular-frameworks-for-big-data-processing-in-java","status":"publish","type":"post","link":"https:\/\/www.javacodegeeks.com\/2019\/12\/popular-frameworks-for-big-data-processing-in-java.html","title":{"rendered":"Popular frameworks for big data processing in Java"},"content":{"rendered":"<h2 class=\"wp-block-heading\">The big data challenge<\/h2>\n<p>The concept of big data is understood differently in the variety of domains where companies face the need to deal with increasing volumes of data. In most of these scenarios the system under consideration needs to be designed in such a way so that it is capable of processing that data without sacrificing throughput as data grows in size. This essentially leads to the necessity of building systems that are highly scalable so that more resources can be allocated based on the volume of data that needs to be processed at a given point in time.<\/p>\n<p>Building such a system is a time-consuming and complex activity and for that reason a third-party frameworks and libraries can be used to provide the scalability requirements out of the box. There are already a number of good choices that can be used in Java applications and this article we will discuss briefly some of the most popular ones:<\/p>\n<div class=\"wp-block-image\">\n<figure class=\"aligncenter size-large\"><img decoding=\"async\" width=\"547\" height=\"301\" src=\"https:\/\/www.javacodegeeks.com\/wp-content\/uploads\/2019\/12\/big_data_processing_frameworks.jpg\" alt=\"1-big data\" class=\"wp-image-101024\" srcset=\"https:\/\/www.javacodegeeks.com\/wp-content\/uploads\/2019\/12\/big_data_processing_frameworks.jpg 547w, https:\/\/www.javacodegeeks.com\/wp-content\/uploads\/2019\/12\/big_data_processing_frameworks-300x165.jpg 300w\" sizes=\"(max-width: 547px) 100vw, 547px\" \/><\/figure>\n<\/div>\n<h2 class=\"wp-block-heading\">The frameworks in action<\/h2>\n<p>We are going to demonstrate each of the frameworks by implementing a simple pipeline for processing of data from devices that measure the air quality index for a given area. For simplicity we will assume that numeric data from the devices is either received in batches or in a streaming fashion. Throughout the examples we are going to use the THRESHOLD constant to denote the value above which we consider an area being polluted.<\/p>\n<h2 class=\"wp-block-heading\">Apache Spark<\/h2>\n<p>In Spark we need to first convert the data into a proper format. We are going to use Datasets but we can also choose DataFrames or RDDs (Resilient Distributed Datasets) as an alternative for the data representation. We can then apply a number of Spark transformations and actions in order to process the data in a distributed fashion.<\/p>\n<div>\n<div id=\"highlighter_319124\" class=\"syntaxhighlighter  c\">\n<table border=\"0\" cellspacing=\"0\" cellpadding=\"0\">\n<tbody>\n<tr>\n<td class=\"gutter\">\n<div class=\"line number1 index0 alt2\">01<\/div>\n<div class=\"line number2 index1 alt1\">02<\/div>\n<div class=\"line number3 index2 alt2\">03<\/div>\n<div class=\"line number4 index3 alt1\">04<\/div>\n<div class=\"line number5 index4 alt2\">05<\/div>\n<div class=\"line number6 index5 alt1\">06<\/div>\n<div class=\"line number7 index6 alt2\">07<\/div>\n<div class=\"line number8 index7 alt1\">08<\/div>\n<div class=\"line number9 index8 alt2\">09<\/div>\n<div class=\"line number10 index9 alt1\">10<\/div>\n<div class=\"line number11 index10 alt2\">11<\/div>\n<div class=\"line number12 index11 alt1\">12<\/div>\n<div class=\"line number13 index12 alt2\">13<\/div>\n<div class=\"line number14 index13 alt1\">14<\/div>\n<div class=\"line number15 index14 alt2\">15<\/div>\n<div class=\"line number16 index15 alt1\">16<\/div>\n<div class=\"line number17 index16 alt2\">17<\/div>\n<div class=\"line number18 index17 alt1\">18<\/div>\n<\/td>\n<td class=\"code\">\n<div class=\"container\">\n<div class=\"line number1 index0 alt2\"><code class=\"c keyword bold\">public<\/code> <code class=\"c color1 bold\">long<\/code> <code class=\"c plain\">countPollutedRegions(String[] numbers) {<\/code><\/div>\n<div class=\"line number2 index1 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c comments\">\/\/ runs a Spark master that takes up 4 cores<\/code><\/div>\n<div class=\"line number3 index2 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">SparkSession session = SparkSession.builder().<\/code><\/div>\n<div class=\"line number4 index3 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">appName(<\/code><code class=\"c string\">\"AirQuality\"<\/code><code class=\"c plain\">).<\/code><\/div>\n<div class=\"line number5 index4 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">master(<\/code><code class=\"c string\">\"local[4]\"<\/code><code class=\"c plain\">).<\/code><\/div>\n<div class=\"line number6 index5 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">getOrCreate();<\/code><\/div>\n<div class=\"line number7 index6 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c comments\">\/\/ converts the array of numbers to a Spark dataset<\/code><\/div>\n<div class=\"line number8 index7 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">Dataset numbersSet = session.createDataset(Arrays.asList(numbers), <\/code><\/div>\n<div class=\"line number9 index8 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">Encoders.STRING());<\/code><\/div>\n<div class=\"line number10 index9 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code>&nbsp;<\/div>\n<div class=\"line number11 index10 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c comments\">\/\/ runs the data pipeline on the local spark<\/code><\/div>\n<div class=\"line number12 index11 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c color1 bold\">long<\/code> <code class=\"c plain\">pollutedRegions = numbersSet.map(number -&gt; Integer.valueOf(number), <\/code><\/div>\n<div class=\"line number13 index12 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">Encoders.<\/code><code class=\"c color1 bold\">INT<\/code><code class=\"c plain\">())<\/code><\/div>\n<div class=\"line number14 index13 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">.filter(number -&gt; number &gt; THRESHOLD).count();<\/code><\/div>\n<div class=\"line number15 index14 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code>&nbsp;<\/div>\n<div class=\"line number16 index15 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code>&nbsp;<\/div>\n<div class=\"line number17 index16 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c keyword bold\">return<\/code> <code class=\"c plain\">pollutedRegions;<\/code><\/div>\n<div class=\"line number18 index17 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">}<\/code><\/div>\n<\/div>\n<\/td>\n<\/tr>\n<\/tbody>\n<\/table>\n<\/div>\n<\/div>\n<p>If we want to change the above application to read data from an external source, write to an external data source and run it on a Spark cluster rather than a local Spark instance we would have the following execution flow:<\/p>\n<div class=\"wp-block-image\">\n<figure class=\"aligncenter size-large\"><img decoding=\"async\" width=\"768\" height=\"422\" src=\"https:\/\/www.javacodegeeks.com\/wp-content\/uploads\/2019\/12\/spark.png\" alt=\"2-big data\" class=\"wp-image-101025\" srcset=\"https:\/\/www.javacodegeeks.com\/wp-content\/uploads\/2019\/12\/spark.png 768w, https:\/\/www.javacodegeeks.com\/wp-content\/uploads\/2019\/12\/spark-300x165.png 300w\" sizes=\"(max-width: 768px) 100vw, 768px\" \/><\/figure>\n<\/div>\n<p>The Spark driver might be either a separate instance or part of the Spark cluster.<\/p>\n<h2 class=\"wp-block-heading\">Apache Flink<\/h2>\n<p>Similarly to Spark we need to represent the data in a Flink DataSet and then apply the necessary transformations and actions over it:<\/p>\n<div>\n<div id=\"highlighter_674348\" class=\"syntaxhighlighter  c\">\n<table border=\"0\" cellspacing=\"0\" cellpadding=\"0\">\n<tbody>\n<tr>\n<td class=\"gutter\">\n<div class=\"line number1 index0 alt2\">01<\/div>\n<div class=\"line number2 index1 alt1\">02<\/div>\n<div class=\"line number3 index2 alt2\">03<\/div>\n<div class=\"line number4 index3 alt1\">04<\/div>\n<div class=\"line number5 index4 alt2\">05<\/div>\n<div class=\"line number6 index5 alt1\">06<\/div>\n<div class=\"line number7 index6 alt2\">07<\/div>\n<div class=\"line number8 index7 alt1\">08<\/div>\n<div class=\"line number9 index8 alt2\">09<\/div>\n<div class=\"line number10 index9 alt1\">10<\/div>\n<div class=\"line number11 index10 alt2\">11<\/div>\n<div class=\"line number12 index11 alt1\">12<\/div>\n<div class=\"line number13 index12 alt2\">13<\/div>\n<div class=\"line number14 index13 alt1\">14<\/div>\n<div class=\"line number15 index14 alt2\">15<\/div>\n<div class=\"line number16 index15 alt1\">16<\/div>\n<div class=\"line number17 index16 alt2\">17<\/div>\n<div class=\"line number18 index17 alt1\">18<\/div>\n<\/td>\n<td class=\"code\">\n<div class=\"container\">\n<div class=\"line number1 index0 alt2\"><code class=\"c keyword bold\">public<\/code> <code class=\"c color1 bold\">long<\/code> <code class=\"c plain\">countPollutedRegions(String[] numbers) throws Exception {<\/code><\/div>\n<div class=\"line number2 index1 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c comments\">\/\/ creates a Flink execution environment with proper configuration<\/code><\/div>\n<div class=\"line number3 index2 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">StreamExecutionEnvironment env = StreamExecutionEnvironment.<\/code><\/div>\n<div class=\"line number4 index3 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">createLocalEnvironment();<\/code><\/div>\n<div class=\"line number5 index4 alt2\">&nbsp;<\/div>\n<div class=\"line number6 index5 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c comments\">\/\/ converts the array of numbers to a Flink dataset and creates<\/code><\/div>\n<div class=\"line number7 index6 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c comments\">\/\/ the data pipiline<\/code><\/div>\n<div class=\"line number8 index7 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">DataStream stream = env.fromCollection(Arrays.asList(numbers)).<\/code><\/div>\n<div class=\"line number9 index8 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">map(number -&gt; Integer.valueOf(number))<\/code><\/div>\n<div class=\"line number10 index9 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">.filter(number -&gt; number &gt; THRESHOLD).returns(Integer.<\/code><code class=\"c keyword bold\">class<\/code><code class=\"c plain\">);<\/code><\/div>\n<div class=\"line number11 index10 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c color1 bold\">long<\/code> <code class=\"c plain\">pollutedRegions = 0;<\/code><\/div>\n<div class=\"line number12 index11 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">Iterator numbersIterator = DataStreamUtils.collect(stream);<\/code><\/div>\n<div class=\"line number13 index12 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c keyword bold\">while<\/code><code class=\"c plain\">(numbersIterator.hasNext()) {<\/code><\/div>\n<div class=\"line number14 index13 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">pollutedRegions++;<\/code><\/div>\n<div class=\"line number15 index14 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">numbersIterator.next();<\/code><\/div>\n<div class=\"line number16 index15 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">}<\/code><\/div>\n<div class=\"line number17 index16 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c keyword bold\">return<\/code> <code class=\"c plain\">pollutedRegions;<\/code><\/div>\n<div class=\"line number18 index17 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">}<\/code><\/div>\n<\/div>\n<\/td>\n<\/tr>\n<\/tbody>\n<\/table>\n<\/div>\n<\/div>\n<p>If we want to change the above application to read data from an external source, write to an external data source and run it on a Flink cluster we would have the following execution flow:<\/p>\n<div class=\"wp-block-image\">\n<figure class=\"aligncenter size-large\"><img decoding=\"async\" width=\"739\" height=\"362\" src=\"https:\/\/www.javacodegeeks.com\/wp-content\/uploads\/2019\/12\/flink.png\" alt=\"3-big data\" class=\"wp-image-101026\" srcset=\"https:\/\/www.javacodegeeks.com\/wp-content\/uploads\/2019\/12\/flink.png 739w, https:\/\/www.javacodegeeks.com\/wp-content\/uploads\/2019\/12\/flink-300x147.png 300w\" sizes=\"(max-width: 739px) 100vw, 739px\" \/><\/figure>\n<\/div>\n<p>The Flink client where the application is submitted to the Flink cluster is either the Flink CLI utility or JobManager\u2019s UI.<\/p>\n<h2 class=\"wp-block-heading\">Apache Storm<\/h2>\n<p>In Storm the data pipeline is created as a topology of Spouts (the sources of data) and Bolts (the data processing units). Since Storm typically processes unbounded streams of data we will emulate the processing of an array of air quality index numbers as bounded stream:<\/p>\n<div>\n<div id=\"highlighter_552985\" class=\"syntaxhighlighter  c\">\n<table border=\"0\" cellspacing=\"0\" cellpadding=\"0\">\n<tbody>\n<tr>\n<td class=\"gutter\">\n<div class=\"line number1 index0 alt2\">01<\/div>\n<div class=\"line number2 index1 alt1\">02<\/div>\n<div class=\"line number3 index2 alt2\">03<\/div>\n<div class=\"line number4 index3 alt1\">04<\/div>\n<div class=\"line number5 index4 alt2\">05<\/div>\n<div class=\"line number6 index5 alt1\">06<\/div>\n<div class=\"line number7 index6 alt2\">07<\/div>\n<div class=\"line number8 index7 alt1\">08<\/div>\n<div class=\"line number9 index8 alt2\">09<\/div>\n<div class=\"line number10 index9 alt1\">10<\/div>\n<div class=\"line number11 index10 alt2\">11<\/div>\n<div class=\"line number12 index11 alt1\">12<\/div>\n<div class=\"line number13 index12 alt2\">13<\/div>\n<div class=\"line number14 index13 alt1\">14<\/div>\n<div class=\"line number15 index14 alt2\">15<\/div>\n<div class=\"line number16 index15 alt1\">16<\/div>\n<div class=\"line number17 index16 alt2\">17<\/div>\n<div class=\"line number18 index17 alt1\">18<\/div>\n<div class=\"line number19 index18 alt2\">19<\/div>\n<div class=\"line number20 index19 alt1\">20<\/div>\n<div class=\"line number21 index20 alt2\">21<\/div>\n<div class=\"line number22 index21 alt1\">22<\/div>\n<div class=\"line number23 index22 alt2\">23<\/div>\n<\/td>\n<td class=\"code\">\n<div class=\"container\">\n<div class=\"line number1 index0 alt2\"><code class=\"c keyword bold\">public<\/code> <code class=\"c keyword bold\">void<\/code> <code class=\"c plain\">countPollutedRegions(String[] numbers) throws Exception {<\/code><\/div>\n<div class=\"line number2 index1 alt1\">&nbsp;<\/div>\n<div class=\"line number3 index2 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c comments\">\/\/ builds the topology as a combination of spouts and bolts<\/code><\/div>\n<div class=\"line number4 index3 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">TopologyBuilder builder = <\/code><code class=\"c keyword bold\">new<\/code> <code class=\"c plain\">TopologyBuilder();<\/code><\/div>\n<div class=\"line number5 index4 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">builder.setSpout(<\/code><code class=\"c string\">\"numbers-spout\"<\/code><code class=\"c plain\">, <\/code><code class=\"c keyword bold\">new<\/code> <code class=\"c plain\">StormAirQualitySpout(numbers));<\/code><\/div>\n<div class=\"line number6 index5 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">builder.setBolt(<\/code><code class=\"c string\">\"number-bolt\"<\/code><code class=\"c plain\">, <\/code><code class=\"c keyword bold\">new<\/code> <code class=\"c plain\">StormAirQualityBolt()).<\/code><\/div>\n<div class=\"line number7 index6 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">shuffleGrouping(<\/code><code class=\"c string\">\"numbers-spout\"<\/code><code class=\"c plain\">);<\/code><\/div>\n<div class=\"line number8 index7 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code>&nbsp;<\/div>\n<div class=\"line number9 index8 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c comments\">\/\/ prepares Storm conf and along with the topology submits it for <\/code><\/div>\n<div class=\"line number10 index9 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c comments\">\/\/ execution to a local Storm cluster<\/code><\/div>\n<div class=\"line number11 index10 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">Config conf = <\/code><code class=\"c keyword bold\">new<\/code> <code class=\"c plain\">Config();<\/code><\/div>\n<div class=\"line number12 index11 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">conf.setDebug(<\/code><code class=\"c keyword bold\">true<\/code><code class=\"c plain\">);<\/code><\/div>\n<div class=\"line number13 index12 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">LocalCluster localCluster = null;<\/code><\/div>\n<div class=\"line number14 index13 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c keyword bold\">try<\/code> <code class=\"c plain\">{<\/code><\/div>\n<div class=\"line number15 index14 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">localCluster = <\/code><code class=\"c keyword bold\">new<\/code> <code class=\"c plain\">LocalCluster();<\/code><\/div>\n<div class=\"line number16 index15 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">localCluster.submitTopology(<\/code><code class=\"c string\">\"airquality-topology\"<\/code><code class=\"c plain\">, <\/code><\/div>\n<div class=\"line number17 index16 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">conf, builder.createTopology());<\/code><\/div>\n<div class=\"line number18 index17 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">Thread.sleep(10000);<\/code><\/div>\n<div class=\"line number19 index18 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">localCluster.shutdown();<\/code><\/div>\n<div class=\"line number20 index19 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">} <\/code><code class=\"c keyword bold\">catch<\/code> <code class=\"c plain\">(InterruptedException ex) {<\/code><\/div>\n<div class=\"line number21 index20 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">localCluster.shutdown();<\/code><\/div>\n<div class=\"line number22 index21 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">}<\/code><\/div>\n<div class=\"line number23 index22 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">}<\/code><\/div>\n<\/div>\n<\/td>\n<\/tr>\n<\/tbody>\n<\/table>\n<\/div>\n<\/div>\n<p>We have one spout that provides a data source for the array of air quality index numbers and one bolt that filters only the ones that indicate polluted areas:<\/p>\n<div>\n<div id=\"highlighter_640800\" class=\"syntaxhighlighter  c\">\n<table border=\"0\" cellspacing=\"0\" cellpadding=\"0\">\n<tbody>\n<tr>\n<td class=\"gutter\">\n<div class=\"line number1 index0 alt2\">01<\/div>\n<div class=\"line number2 index1 alt1\">02<\/div>\n<div class=\"line number3 index2 alt2\">03<\/div>\n<div class=\"line number4 index3 alt1\">04<\/div>\n<div class=\"line number5 index4 alt2\">05<\/div>\n<div class=\"line number6 index5 alt1\">06<\/div>\n<div class=\"line number7 index6 alt2\">07<\/div>\n<div class=\"line number8 index7 alt1\">08<\/div>\n<div class=\"line number9 index8 alt2\">09<\/div>\n<div class=\"line number10 index9 alt1\">10<\/div>\n<div class=\"line number11 index10 alt2\">11<\/div>\n<div class=\"line number12 index11 alt1\">12<\/div>\n<div class=\"line number13 index12 alt2\">13<\/div>\n<div class=\"line number14 index13 alt1\">14<\/div>\n<div class=\"line number15 index14 alt2\">15<\/div>\n<div class=\"line number16 index15 alt1\">16<\/div>\n<div class=\"line number17 index16 alt2\">17<\/div>\n<div class=\"line number18 index17 alt1\">18<\/div>\n<div class=\"line number19 index18 alt2\">19<\/div>\n<div class=\"line number20 index19 alt1\">20<\/div>\n<div class=\"line number21 index20 alt2\">21<\/div>\n<div class=\"line number22 index21 alt1\">22<\/div>\n<div class=\"line number23 index22 alt2\">23<\/div>\n<div class=\"line number24 index23 alt1\">24<\/div>\n<div class=\"line number25 index24 alt2\">25<\/div>\n<div class=\"line number26 index25 alt1\">26<\/div>\n<div class=\"line number27 index26 alt2\">27<\/div>\n<div class=\"line number28 index27 alt1\">28<\/div>\n<div class=\"line number29 index28 alt2\">29<\/div>\n<div class=\"line number30 index29 alt1\">30<\/div>\n<div class=\"line number31 index30 alt2\">31<\/div>\n<div class=\"line number32 index31 alt1\">32<\/div>\n<div class=\"line number33 index32 alt2\">33<\/div>\n<div class=\"line number34 index33 alt1\">34<\/div>\n<div class=\"line number35 index34 alt2\">35<\/div>\n<div class=\"line number36 index35 alt1\">36<\/div>\n<\/td>\n<td class=\"code\">\n<div class=\"container\">\n<div class=\"line number1 index0 alt2\"><code class=\"c keyword bold\">public<\/code> <code class=\"c keyword bold\">class<\/code> <code class=\"c plain\">StormAirQualitySpout extends BaseRichSpout {<\/code><\/div>\n<div class=\"line number2 index1 alt1\">&nbsp;<\/div>\n<div class=\"line number3 index2 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c keyword bold\">private<\/code> <code class=\"c plain\">boolean emitted = <\/code><code class=\"c keyword bold\">false<\/code><code class=\"c plain\">;<\/code><\/div>\n<div class=\"line number4 index3 alt1\">&nbsp;<\/div>\n<div class=\"line number5 index4 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c keyword bold\">private<\/code> <code class=\"c plain\">SpoutOutputCollector collector;<\/code><\/div>\n<div class=\"line number6 index5 alt1\">&nbsp;<\/div>\n<div class=\"line number7 index6 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c keyword bold\">private<\/code> <code class=\"c plain\">String[] numbers;<\/code><\/div>\n<div class=\"line number8 index7 alt1\">&nbsp;<\/div>\n<div class=\"line number9 index8 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c keyword bold\">public<\/code> <code class=\"c plain\">StormAirQualitySpout(String[] numbers) {<\/code><\/div>\n<div class=\"line number10 index9 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c keyword bold\">this<\/code><code class=\"c plain\">.numbers = numbers;<\/code><\/div>\n<div class=\"line number11 index10 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">}<\/code><\/div>\n<div class=\"line number12 index11 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;<\/code>&nbsp;<\/div>\n<div class=\"line number13 index12 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">@Override<\/code><\/div>\n<div class=\"line number14 index13 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c keyword bold\">public<\/code> <code class=\"c keyword bold\">void<\/code> <code class=\"c plain\">declareOutputFields(OutputFieldsDeclarer declarer) {<\/code><\/div>\n<div class=\"line number15 index14 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">declarer.declare(<\/code><code class=\"c keyword bold\">new<\/code> <code class=\"c plain\">Fields(<\/code><code class=\"c string\">\"number\"<\/code><code class=\"c plain\">));<\/code><\/div>\n<div class=\"line number16 index15 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">}<\/code><\/div>\n<div class=\"line number17 index16 alt2\">&nbsp;<\/div>\n<div class=\"line number18 index17 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">@Override<\/code><\/div>\n<div class=\"line number19 index18 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c keyword bold\">public<\/code> <code class=\"c keyword bold\">void<\/code> <code class=\"c plain\">open(Map paramas, <\/code><\/div>\n<div class=\"line number20 index19 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">TopologyContext context, <\/code><\/div>\n<div class=\"line number21 index20 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">SpoutOutputCollector collector) {<\/code><\/div>\n<div class=\"line number22 index21 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c keyword bold\">this<\/code><code class=\"c plain\">.collector = collector;<\/code><\/div>\n<div class=\"line number23 index22 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">}<\/code><\/div>\n<div class=\"line number24 index23 alt1\">&nbsp;<\/div>\n<div class=\"line number25 index24 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">@Override<\/code><\/div>\n<div class=\"line number26 index25 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c keyword bold\">public<\/code> <code class=\"c keyword bold\">void<\/code> <code class=\"c plain\">nextTuple() {<\/code><\/div>\n<div class=\"line number27 index26 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c comments\">\/\/ we make sure that the numbers array is processed just once by <\/code><\/div>\n<div class=\"line number28 index27 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c comments\">\/\/ the spout<\/code><\/div>\n<div class=\"line number29 index28 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c keyword bold\">if<\/code><code class=\"c plain\">(!emitted) {<\/code><\/div>\n<div class=\"line number30 index29 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c keyword bold\">for<\/code><code class=\"c plain\">(String number : numbers) {<\/code><\/div>\n<div class=\"line number31 index30 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">collector.emit(<\/code><code class=\"c keyword bold\">new<\/code> <code class=\"c plain\">Values(number));<\/code><\/div>\n<div class=\"line number32 index31 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">}<\/code><\/div>\n<div class=\"line number33 index32 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">emitted = <\/code><code class=\"c keyword bold\">true<\/code><code class=\"c plain\">;<\/code><\/div>\n<div class=\"line number34 index33 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">}<\/code><\/div>\n<div class=\"line number35 index34 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">}<\/code><\/div>\n<div class=\"line number36 index35 alt1\"><code class=\"c plain\">}<\/code><\/div>\n<\/div>\n<\/td>\n<\/tr>\n<\/tbody>\n<\/table>\n<\/div>\n<\/div>\n<div>\n<div id=\"highlighter_482735\" class=\"syntaxhighlighter  c\">\n<table border=\"0\" cellspacing=\"0\" cellpadding=\"0\">\n<tbody>\n<tr>\n<td class=\"gutter\">\n<div class=\"line number1 index0 alt2\">01<\/div>\n<div class=\"line number2 index1 alt1\">02<\/div>\n<div class=\"line number3 index2 alt2\">03<\/div>\n<div class=\"line number4 index3 alt1\">04<\/div>\n<div class=\"line number5 index4 alt2\">05<\/div>\n<div class=\"line number6 index5 alt1\">06<\/div>\n<div class=\"line number7 index6 alt2\">07<\/div>\n<div class=\"line number8 index7 alt1\">08<\/div>\n<div class=\"line number9 index8 alt2\">09<\/div>\n<div class=\"line number10 index9 alt1\">10<\/div>\n<div class=\"line number11 index10 alt2\">11<\/div>\n<div class=\"line number12 index11 alt1\">12<\/div>\n<div class=\"line number13 index12 alt2\">13<\/div>\n<div class=\"line number14 index13 alt1\">14<\/div>\n<div class=\"line number15 index14 alt2\">15<\/div>\n<div class=\"line number16 index15 alt1\">16<\/div>\n<div class=\"line number17 index16 alt2\">17<\/div>\n<div class=\"line number18 index17 alt1\">18<\/div>\n<div class=\"line number19 index18 alt2\">19<\/div>\n<div class=\"line number20 index19 alt1\">20<\/div>\n<div class=\"line number21 index20 alt2\">21<\/div>\n<div class=\"line number22 index21 alt1\">22<\/div>\n<div class=\"line number23 index22 alt2\">23<\/div>\n<div class=\"line number24 index23 alt1\">24<\/div>\n<div class=\"line number25 index24 alt2\">25<\/div>\n<div class=\"line number26 index25 alt1\">26<\/div>\n<\/td>\n<td class=\"code\">\n<div class=\"container\">\n<div class=\"line number1 index0 alt2\"><code class=\"c keyword bold\">public<\/code> <code class=\"c keyword bold\">class<\/code> <code class=\"c plain\">StormAirQualityBolt extends BaseRichBolt {<\/code><\/div>\n<div class=\"line number2 index1 alt1\">&nbsp;<\/div>\n<div class=\"line number3 index2 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c keyword bold\">private<\/code> <code class=\"c keyword bold\">static<\/code> <code class=\"c keyword bold\">final<\/code> <code class=\"c color1 bold\">int<\/code> <code class=\"c plain\">THRESHOLD = 10;<\/code><\/div>\n<div class=\"line number4 index3 alt1\">&nbsp;<\/div>\n<div class=\"line number5 index4 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c keyword bold\">private<\/code> <code class=\"c color1 bold\">int<\/code> <code class=\"c plain\">pollutedRegions = 0;<\/code><\/div>\n<div class=\"line number6 index5 alt1\">&nbsp;<\/div>\n<div class=\"line number7 index6 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">@Override<\/code><\/div>\n<div class=\"line number8 index7 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c keyword bold\">public<\/code> <code class=\"c keyword bold\">void<\/code> <code class=\"c plain\">declareOutputFields(OutputFieldsDeclarer declarer) {<\/code><\/div>\n<div class=\"line number9 index8 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">declarer.declare(<\/code><code class=\"c keyword bold\">new<\/code> <code class=\"c plain\">Fields(<\/code><code class=\"c string\">\"number\"<\/code><code class=\"c plain\">));<\/code><\/div>\n<div class=\"line number10 index9 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">}<\/code><\/div>\n<div class=\"line number11 index10 alt2\">&nbsp;<\/div>\n<div class=\"line number12 index11 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">@Override<\/code><\/div>\n<div class=\"line number13 index12 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c keyword bold\">public<\/code> <code class=\"c keyword bold\">void<\/code> <code class=\"c plain\">prepare(Map params, <\/code><\/div>\n<div class=\"line number14 index13 alt1\"><code class=\"c plain\">TopologyContext context, <\/code><\/div>\n<div class=\"line number15 index14 alt2\"><code class=\"c plain\">OutputCollector collector) {<\/code><\/div>\n<div class=\"line number16 index15 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">}<\/code><\/div>\n<div class=\"line number17 index16 alt2\">&nbsp;<\/div>\n<div class=\"line number18 index17 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">@Override<\/code><\/div>\n<div class=\"line number19 index18 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c keyword bold\">public<\/code> <code class=\"c keyword bold\">void<\/code> <code class=\"c plain\">execute(Tuple tuple) {<\/code><\/div>\n<div class=\"line number20 index19 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">String number = tuple.getStringByField(<\/code><code class=\"c string\">\"number\"<\/code><code class=\"c plain\">);<\/code><\/div>\n<div class=\"line number21 index20 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">Integer numberInt = Integer.valueOf(number);<\/code><\/div>\n<div class=\"line number22 index21 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c keyword bold\">if<\/code> <code class=\"c plain\">(numberInt &gt; THRESHOLD) {<\/code><\/div>\n<div class=\"line number23 index22 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">pollutedRegions++;<\/code><\/div>\n<div class=\"line number24 index23 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">}<\/code><\/div>\n<div class=\"line number25 index24 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">}<\/code><\/div>\n<div class=\"line number26 index25 alt1\"><code class=\"c plain\">}<\/code><\/div>\n<\/div>\n<\/td>\n<\/tr>\n<\/tbody>\n<\/table>\n<\/div>\n<\/div>\n<p>We are using a LocalCluster instance for submitting to a local Storm cluster which is convenient for development purposes but we want to submit the Storm topology to a production cluster. In that case we would have the following execution flow:<div style=\"display:inline-block; margin: 15px 0;\"> <div id=\"adngin-JavaCodeGeeks_incontent_video-0\" style=\"display:inline-block;\"><\/div> <\/div><\/p>\n<div class=\"wp-block-image\">\n<figure class=\"aligncenter size-large\"><img decoding=\"async\" width=\"768\" height=\"326\" src=\"https:\/\/www.javacodegeeks.com\/wp-content\/uploads\/2019\/12\/storm.png\" alt=\"4-big data\" class=\"wp-image-101027\" srcset=\"https:\/\/www.javacodegeeks.com\/wp-content\/uploads\/2019\/12\/storm.png 768w, https:\/\/www.javacodegeeks.com\/wp-content\/uploads\/2019\/12\/storm-300x127.png 300w\" sizes=\"(max-width: 768px) 100vw, 768px\" \/><\/figure>\n<\/div>\n<h2 class=\"wp-block-heading\">Apache Ignite<\/h2>\n<p>In Ignite we need first to put the data in the distributed cache before running the data processing pipeline which is the former of an SQL query executed in a distributed fashion over the Ignite cluster:<\/p>\n<div>\n<div id=\"highlighter_349339\" class=\"syntaxhighlighter  c\">\n<table border=\"0\" cellspacing=\"0\" cellpadding=\"0\">\n<tbody>\n<tr>\n<td class=\"gutter\">\n<div class=\"line number1 index0 alt2\">01<\/div>\n<div class=\"line number2 index1 alt1\">02<\/div>\n<div class=\"line number3 index2 alt2\">03<\/div>\n<div class=\"line number4 index3 alt1\">04<\/div>\n<div class=\"line number5 index4 alt2\">05<\/div>\n<div class=\"line number6 index5 alt1\">06<\/div>\n<div class=\"line number7 index6 alt2\">07<\/div>\n<div class=\"line number8 index7 alt1\">08<\/div>\n<div class=\"line number9 index8 alt2\">09<\/div>\n<div class=\"line number10 index9 alt1\">10<\/div>\n<div class=\"line number11 index10 alt2\">11<\/div>\n<div class=\"line number12 index11 alt1\">12<\/div>\n<div class=\"line number13 index12 alt2\">13<\/div>\n<div class=\"line number14 index13 alt1\">14<\/div>\n<div class=\"line number15 index14 alt2\">15<\/div>\n<div class=\"line number16 index15 alt1\">16<\/div>\n<div class=\"line number17 index16 alt2\">17<\/div>\n<div class=\"line number18 index17 alt1\">18<\/div>\n<div class=\"line number19 index18 alt2\">19<\/div>\n<div class=\"line number20 index19 alt1\">20<\/div>\n<div class=\"line number21 index20 alt2\">21<\/div>\n<div class=\"line number22 index21 alt1\">22<\/div>\n<div class=\"line number23 index22 alt2\">23<\/div>\n<div class=\"line number24 index23 alt1\">24<\/div>\n<div class=\"line number25 index24 alt2\">25<\/div>\n<div class=\"line number26 index25 alt1\">26<\/div>\n<div class=\"line number27 index26 alt2\">27<\/div>\n<div class=\"line number28 index27 alt1\">28<\/div>\n<div class=\"line number29 index28 alt2\">29<\/div>\n<div class=\"line number30 index29 alt1\">30<\/div>\n<div class=\"line number31 index30 alt2\">31<\/div>\n<\/td>\n<td class=\"code\">\n<div class=\"container\">\n<div class=\"line number1 index0 alt2\"><code class=\"c keyword bold\">public<\/code> <code class=\"c color1 bold\">long<\/code> <code class=\"c plain\">countPollutedRegions(String[] numbers) {<\/code><\/div>\n<div class=\"line number2 index1 alt1\">&nbsp;<\/div>\n<div class=\"line number3 index2 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">IgniteConfiguration igniteConfig = <\/code><code class=\"c keyword bold\">new<\/code> <code class=\"c plain\">IgniteConfiguration();<\/code><\/div>\n<div class=\"line number4 index3 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">CacheConfiguration cacheConfig = <\/code><\/div>\n<div class=\"line number5 index4 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c keyword bold\">new<\/code> <code class=\"c plain\">CacheConfiguration();<\/code><\/div>\n<div class=\"line number6 index5 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c comments\">\/\/ cache key is number index in the array and value is the number<\/code><\/div>\n<div class=\"line number7 index6 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">cacheConfig.setIndexedTypes(Integer.<\/code><code class=\"c keyword bold\">class<\/code><code class=\"c plain\">, String.<\/code><code class=\"c keyword bold\">class<\/code><code class=\"c plain\">);<\/code><\/div>\n<div class=\"line number8 index7 alt1\">&nbsp;<\/div>\n<div class=\"line number9 index8 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">cacheConfig.setName(NUMBERS_CACHE);<\/code><\/div>\n<div class=\"line number10 index9 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">igniteConfig.setCacheConfiguration(cacheConfig);<\/code><\/div>\n<div class=\"line number11 index10 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code>&nbsp;<\/div>\n<div class=\"line number12 index11 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c keyword bold\">try<\/code> <code class=\"c plain\">(Ignite ignite = Ignition.start(igniteConfig)) {<\/code><\/div>\n<div class=\"line number13 index12 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">IgniteCache cache = ignite.getOrCreateCache(NUMBERS_CACHE);<\/code><\/div>\n<div class=\"line number14 index13 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c comments\">\/\/ adds the numbers to the Ignite cache<\/code><\/div>\n<div class=\"line number15 index14 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c keyword bold\">try<\/code> <code class=\"c plain\">(IgniteDataStreamer streamer = <\/code><\/div>\n<div class=\"line number16 index15 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">ignite.dataStreamer(cache.getName())) {<\/code><\/div>\n<div class=\"line number17 index16 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c color1 bold\">int<\/code> <code class=\"c plain\">key = 0;<\/code><\/div>\n<div class=\"line number18 index17 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c keyword bold\">for<\/code> <code class=\"c plain\">(String number : numbers) {<\/code><\/div>\n<div class=\"line number19 index18 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">streamer.addData(key++, number);<\/code><\/div>\n<div class=\"line number20 index19 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">}<\/code><\/div>\n<div class=\"line number21 index20 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">}<\/code><\/div>\n<div class=\"line number22 index21 alt1\">&nbsp;<\/div>\n<div class=\"line number23 index22 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c comments\">\/\/ performs an SQL query over the cached numbers<\/code><\/div>\n<div class=\"line number24 index23 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">SqlFieldsQuery query = <\/code><code class=\"c keyword bold\">new<\/code> <code class=\"c plain\">SqlFieldsQuery(<\/code><code class=\"c string\">\"select * from String where _val &gt; \"<\/code> <code class=\"c plain\">+ THRESHOLD);<\/code><\/div>\n<div class=\"line number25 index24 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code>&nbsp;<\/div>\n<div class=\"line number26 index25 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">FieldsQueryCursor&lt;List&gt; cursor = cache.query(query);<\/code><\/div>\n<div class=\"line number27 index26 alt2\">&nbsp;<\/div>\n<div class=\"line number28 index27 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c color1 bold\">int<\/code> <code class=\"c plain\">pollutedRegions = cursor.getAll().size();<\/code><\/div>\n<div class=\"line number29 index28 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c keyword bold\">return<\/code> <code class=\"c plain\">pollutedRegions;<\/code><\/div>\n<div class=\"line number30 index29 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">}<\/code><\/div>\n<div class=\"line number31 index30 alt2\"><code class=\"c plain\">}<\/code><\/div>\n<\/div>\n<\/td>\n<\/tr>\n<\/tbody>\n<\/table>\n<\/div>\n<\/div>\n<p>If we want to run the application in an Ignite cluster it will have the following execution flow:<\/p>\n<div class=\"wp-block-image\">\n<figure class=\"aligncenter size-large\"><img decoding=\"async\" width=\"768\" height=\"526\" src=\"https:\/\/www.javacodegeeks.com\/wp-content\/uploads\/2019\/12\/ignite.png\" alt=\"\" class=\"wp-image-101028\" srcset=\"https:\/\/www.javacodegeeks.com\/wp-content\/uploads\/2019\/12\/ignite.png 768w, https:\/\/www.javacodegeeks.com\/wp-content\/uploads\/2019\/12\/ignite-300x205.png 300w\" sizes=\"(max-width: 768px) 100vw, 768px\" \/><\/figure>\n<\/div>\n<h2 class=\"wp-block-heading\">Hazelcast Jet<\/h2>\n<p>Hazelcast Jet works on top of Hazelcast IMDG and similarly to Ignite if we want to process data we need first to put it in the Hazelcast IMDG cluster:<\/p>\n<div>\n<div id=\"highlighter_652090\" class=\"syntaxhighlighter  c\">\n<table border=\"0\" cellspacing=\"0\" cellpadding=\"0\">\n<tbody>\n<tr>\n<td class=\"gutter\">\n<div class=\"line number1 index0 alt2\">01<\/div>\n<div class=\"line number2 index1 alt1\">02<\/div>\n<div class=\"line number3 index2 alt2\">03<\/div>\n<div class=\"line number4 index3 alt1\">04<\/div>\n<div class=\"line number5 index4 alt2\">05<\/div>\n<div class=\"line number6 index5 alt1\">06<\/div>\n<div class=\"line number7 index6 alt2\">07<\/div>\n<div class=\"line number8 index7 alt1\">08<\/div>\n<div class=\"line number9 index8 alt2\">09<\/div>\n<div class=\"line number10 index9 alt1\">10<\/div>\n<div class=\"line number11 index10 alt2\">11<\/div>\n<div class=\"line number12 index11 alt1\">12<\/div>\n<div class=\"line number13 index12 alt2\">13<\/div>\n<div class=\"line number14 index13 alt1\">14<\/div>\n<div class=\"line number15 index14 alt2\">15<\/div>\n<div class=\"line number16 index15 alt1\">16<\/div>\n<div class=\"line number17 index16 alt2\">17<\/div>\n<div class=\"line number18 index17 alt1\">18<\/div>\n<div class=\"line number19 index18 alt2\">19<\/div>\n<div class=\"line number20 index19 alt1\">20<\/div>\n<div class=\"line number21 index20 alt2\">21<\/div>\n<div class=\"line number22 index21 alt1\">22<\/div>\n<div class=\"line number23 index22 alt2\">23<\/div>\n<div class=\"line number24 index23 alt1\">24<\/div>\n<div class=\"line number25 index24 alt2\">25<\/div>\n<\/td>\n<td class=\"code\">\n<div class=\"container\">\n<div class=\"line number1 index0 alt2\"><code class=\"c keyword bold\">public<\/code> <code class=\"c color1 bold\">long<\/code> <code class=\"c plain\">countPollutedRegions(String[] numbers) {<\/code><\/div>\n<div class=\"line number2 index1 alt1\">&nbsp;<\/div>\n<div class=\"line number3 index2 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c comments\">\/\/ prepares the Jet data processing pipeline<\/code><\/div>\n<div class=\"line number4 index3 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">Pipeline p = Pipeline.create();<\/code><\/div>\n<div class=\"line number5 index4 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">p.drawFrom(Sources.list(<\/code><code class=\"c string\">\"numbers\"<\/code><code class=\"c plain\">)).<\/code><\/div>\n<div class=\"line number6 index5 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">map(number -&gt; Integer.valueOf((String) number))<\/code><\/div>\n<div class=\"line number7 index6 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">.filter(number -&gt; number &gt; THRESHOLD).drainTo(Sinks.list(<\/code><code class=\"c string\">\"filteredNumbers\"<\/code><code class=\"c plain\">));<\/code><\/div>\n<div class=\"line number8 index7 alt1\">&nbsp;<\/div>\n<div class=\"line number9 index8 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">JetInstance jet = Jet.newJetInstance();<\/code><\/div>\n<div class=\"line number10 index9 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">IList numbersList = jet.getList(<\/code><code class=\"c string\">\"numbers\"<\/code><code class=\"c plain\">);<\/code><\/div>\n<div class=\"line number11 index10 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">numbersList.addAll(Arrays.asList(numbers));<\/code><\/div>\n<div class=\"line number12 index11 alt1\">&nbsp;<\/div>\n<div class=\"line number13 index12 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c keyword bold\">try<\/code> <code class=\"c plain\">{<\/code><\/div>\n<div class=\"line number14 index13 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c comments\">\/\/ submits the pipeline in the Jet cluster<\/code><\/div>\n<div class=\"line number15 index14 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">jet.newJob(p).join();<\/code><\/div>\n<div class=\"line number16 index15 alt1\">&nbsp;<\/div>\n<div class=\"line number17 index16 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c comments\">\/\/ gets the filtered data from Hazelcast IMDG<\/code><\/div>\n<div class=\"line number18 index17 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">List filteredRecordsList = jet.getList(<\/code><code class=\"c string\">\"filteredNumbers\"<\/code><code class=\"c plain\">);<\/code><\/div>\n<div class=\"line number19 index18 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c color1 bold\">int<\/code> <code class=\"c plain\">pollutedRegions = filteredRecordsList.size();<\/code><\/div>\n<div class=\"line number20 index19 alt1\">&nbsp;<\/div>\n<div class=\"line number21 index20 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c keyword bold\">return<\/code> <code class=\"c plain\">pollutedRegions;<\/code><\/div>\n<div class=\"line number22 index21 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">} finally {<\/code><\/div>\n<div class=\"line number23 index22 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">Jet.shutdownAll();<\/code><\/div>\n<div class=\"line number24 index23 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">}<\/code><\/div>\n<div class=\"line number25 index24 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">}<\/code><\/div>\n<\/div>\n<\/td>\n<\/tr>\n<\/tbody>\n<\/table>\n<\/div>\n<\/div>\n<p>Note however that Jet also provides integration without of external data sources and data does not need to be stored in the IMDG cluster. You can also do the aggregation without first storing the data into a list (review the full example in Github that contains the improved version). Thanks to Jaromir and Can from Hazelcast engineering team for the valuable input.<\/p>\n<p>If we want to run the application in a Hazelcast Jet cluster it will have the following execution flow:<\/p>\n<div class=\"wp-block-image\">\n<figure class=\"aligncenter size-large\"><img decoding=\"async\" width=\"768\" height=\"541\" src=\"https:\/\/www.javacodegeeks.com\/wp-content\/uploads\/2019\/12\/hazelcast.png\" alt=\"\" class=\"wp-image-101029\" srcset=\"https:\/\/www.javacodegeeks.com\/wp-content\/uploads\/2019\/12\/hazelcast.png 768w, https:\/\/www.javacodegeeks.com\/wp-content\/uploads\/2019\/12\/hazelcast-300x211.png 300w\" sizes=\"(max-width: 768px) 100vw, 768px\" \/><\/figure>\n<\/div>\n<h2 class=\"wp-block-heading\">Kafka Streams<\/h2>\n<p>Kafka Streams is a client library that uses Kafka topics as sources and sinks for the data processing pipeline. To make use of the Kafka Streams library for our scenario we would be putting the air quality index numbers in a <strong>numbers<\/strong> Kafka topic:<\/p>\n<div>\n<div id=\"highlighter_618464\" class=\"syntaxhighlighter  c\">\n<table border=\"0\" cellspacing=\"0\" cellpadding=\"0\">\n<tbody>\n<tr>\n<td class=\"gutter\">\n<div class=\"line number1 index0 alt2\">01<\/div>\n<div class=\"line number2 index1 alt1\">02<\/div>\n<div class=\"line number3 index2 alt2\">03<\/div>\n<div class=\"line number4 index3 alt1\">04<\/div>\n<div class=\"line number5 index4 alt2\">05<\/div>\n<div class=\"line number6 index5 alt1\">06<\/div>\n<div class=\"line number7 index6 alt2\">07<\/div>\n<div class=\"line number8 index7 alt1\">08<\/div>\n<div class=\"line number9 index8 alt2\">09<\/div>\n<div class=\"line number10 index9 alt1\">10<\/div>\n<div class=\"line number11 index10 alt2\">11<\/div>\n<div class=\"line number12 index11 alt1\">12<\/div>\n<div class=\"line number13 index12 alt2\">13<\/div>\n<div class=\"line number14 index13 alt1\">14<\/div>\n<div class=\"line number15 index14 alt2\">15<\/div>\n<div class=\"line number16 index15 alt1\">16<\/div>\n<div class=\"line number17 index16 alt2\">17<\/div>\n<div class=\"line number18 index17 alt1\">18<\/div>\n<div class=\"line number19 index18 alt2\">19<\/div>\n<div class=\"line number20 index19 alt1\">20<\/div>\n<div class=\"line number21 index20 alt2\">21<\/div>\n<div class=\"line number22 index21 alt1\">22<\/div>\n<div class=\"line number23 index22 alt2\">23<\/div>\n<div class=\"line number24 index23 alt1\">24<\/div>\n<div class=\"line number25 index24 alt2\">25<\/div>\n<div class=\"line number26 index25 alt1\">26<\/div>\n<div class=\"line number27 index26 alt2\">27<\/div>\n<div class=\"line number28 index27 alt1\">28<\/div>\n<div class=\"line number29 index28 alt2\">29<\/div>\n<div class=\"line number30 index29 alt1\">30<\/div>\n<div class=\"line number31 index30 alt2\">31<\/div>\n<div class=\"line number32 index31 alt1\">32<\/div>\n<div class=\"line number33 index32 alt2\">33<\/div>\n<div class=\"line number34 index33 alt1\">34<\/div>\n<div class=\"line number35 index34 alt2\">35<\/div>\n<div class=\"line number36 index35 alt1\">36<\/div>\n<div class=\"line number37 index36 alt2\">37<\/div>\n<div class=\"line number38 index37 alt1\">38<\/div>\n<div class=\"line number39 index38 alt2\">39<\/div>\n<div class=\"line number40 index39 alt1\">40<\/div>\n<\/td>\n<td class=\"code\">\n<div class=\"container\">\n<div class=\"line number1 index0 alt2\"><code class=\"c keyword bold\">public<\/code> <code class=\"c color1 bold\">long<\/code> <code class=\"c plain\">countPollutedRegions() {<\/code><\/div>\n<div class=\"line number2 index1 alt1\">&nbsp;<\/div>\n<div class=\"line number3 index2 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">List result = <\/code><code class=\"c keyword bold\">new<\/code> <code class=\"c plain\">LinkedList();<\/code><\/div>\n<div class=\"line number4 index3 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c comments\">\/\/ key\/value pairs contain string items<\/code><\/div>\n<div class=\"line number5 index4 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c keyword bold\">final<\/code> <code class=\"c plain\">Serde stringSerde = Serdes.String();<\/code><\/div>\n<div class=\"line number6 index5 alt1\">&nbsp;<\/div>\n<div class=\"line number7 index6 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c comments\">\/\/ prepares and runs the data processing pipeline<\/code><\/div>\n<div class=\"line number8 index7 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c keyword bold\">final<\/code> <code class=\"c plain\">StreamsBuilder builder = <\/code><code class=\"c keyword bold\">new<\/code> <code class=\"c plain\">StreamsBuilder();&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <\/code><\/div>\n<div class=\"line number9 index8 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">builder.stream(<\/code><code class=\"c string\">\"numbers\"<\/code><code class=\"c plain\">, Consumed.with(stringSerde, stringSerde))<\/code><\/div>\n<div class=\"line number10 index9 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">.map((key, value) -&gt; <\/code><code class=\"c keyword bold\">new<\/code> <code class=\"c plain\">KeyValue(key, Integer.valueOf(value))).<\/code><\/div>\n<div class=\"line number11 index10 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">filter((key, value) -&gt; value &gt; THRESHOLD)<\/code><\/div>\n<div class=\"line number12 index11 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">.foreach((key, value) -&gt; {<\/code><\/div>\n<div class=\"line number13 index12 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">result.add(value.toString());<\/code><\/div>\n<div class=\"line number14 index13 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">});<\/code><\/div>\n<div class=\"line number15 index14 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;<\/code>&nbsp;<\/div>\n<div class=\"line number16 index15 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c keyword bold\">final<\/code> <code class=\"c plain\">Topology topology = builder.build();<\/code><\/div>\n<div class=\"line number17 index16 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c keyword bold\">final<\/code> <code class=\"c plain\">KafkaStreams streams = <\/code><code class=\"c keyword bold\">new<\/code> <code class=\"c plain\">KafkaStreams(topology, <\/code><\/div>\n<div class=\"line number18 index17 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">createKafkaStreamsConfiguration());<\/code><\/div>\n<div class=\"line number19 index18 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">streams.start();<\/code><\/div>\n<div class=\"line number20 index19 alt1\">&nbsp;<\/div>\n<div class=\"line number21 index20 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c keyword bold\">try<\/code> <code class=\"c plain\">{<\/code><\/div>\n<div class=\"line number22 index21 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">Thread.sleep(10000);<\/code><\/div>\n<div class=\"line number23 index22 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">} <\/code><code class=\"c keyword bold\">catch<\/code> <code class=\"c plain\">(InterruptedException e) {<\/code><\/div>\n<div class=\"line number24 index23 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">e.printStackTrace();<\/code><\/div>\n<div class=\"line number25 index24 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">}<\/code><\/div>\n<div class=\"line number26 index25 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c color1 bold\">int<\/code> <code class=\"c plain\">pollutedRegions = result.size();<\/code><\/div>\n<div class=\"line number27 index26 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">System.out.println(<\/code><code class=\"c string\">\"Number of severely polluted regions: \"<\/code> <code class=\"c plain\">+ pollutedRegions);<\/code><\/div>\n<div class=\"line number28 index27 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">streams.close();<\/code><\/div>\n<div class=\"line number29 index28 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c keyword bold\">return<\/code> <code class=\"c plain\">pollutedRegions;<\/code><\/div>\n<div class=\"line number30 index29 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">}<\/code><\/div>\n<div class=\"line number31 index30 alt2\">&nbsp;<\/div>\n<div class=\"line number32 index31 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c keyword bold\">private<\/code> <code class=\"c plain\">Properties createKafkaStreamsConfiguration() {<\/code><\/div>\n<div class=\"line number33 index32 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">Properties props = <\/code><code class=\"c keyword bold\">new<\/code> <code class=\"c plain\">Properties();<\/code><\/div>\n<div class=\"line number34 index33 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">props.put(StreamsConfig.APPLICATION_ID_CONFIG, <\/code><code class=\"c string\">\"text-search-config\"<\/code><code class=\"c plain\">);<\/code><\/div>\n<div class=\"line number35 index34 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, <\/code><code class=\"c string\">\"localhost:9092\"<\/code><code class=\"c plain\">);<\/code><\/div>\n<div class=\"line number36 index35 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());<\/code><\/div>\n<div class=\"line number37 index36 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());<\/code><\/div>\n<div class=\"line number38 index37 alt1\">&nbsp;<\/div>\n<div class=\"line number39 index38 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c keyword bold\">return<\/code> <code class=\"c plain\">props;<\/code><\/div>\n<div class=\"line number40 index39 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">}<\/code><\/div>\n<\/div>\n<\/td>\n<\/tr>\n<\/tbody>\n<\/table>\n<\/div>\n<\/div>\n<p>We will have the following execution flow for our Kafka Stream application instances:<\/p>\n<div class=\"wp-block-image\">\n<figure class=\"aligncenter size-large\"><img decoding=\"async\" width=\"634\" height=\"397\" src=\"https:\/\/www.javacodegeeks.com\/wp-content\/uploads\/2019\/12\/kafka_streams.png\" alt=\"\" class=\"wp-image-101030\" srcset=\"https:\/\/www.javacodegeeks.com\/wp-content\/uploads\/2019\/12\/kafka_streams.png 634w, https:\/\/www.javacodegeeks.com\/wp-content\/uploads\/2019\/12\/kafka_streams-300x188.png 300w\" sizes=\"(max-width: 634px) 100vw, 634px\" \/><\/figure>\n<\/div>\n<h2 class=\"wp-block-heading\">Pulsar Functions<\/h2>\n<p>Apache Pulsar Functions are lightweight compute processes that work in a serverless fashion along with an Apache Pulsar cluster. Assuming we are streaming our air quality index in a Pulsar cluster we can write a function to count the number of indexes that exceed the given threshold and write the result back to Pulsar as follows:<\/p>\n<div>\n<div id=\"highlighter_569943\" class=\"syntaxhighlighter  c\">\n<table border=\"0\" cellspacing=\"0\" cellpadding=\"0\">\n<tbody>\n<tr>\n<td class=\"gutter\">\n<div class=\"line number1 index0 alt2\">01<\/div>\n<div class=\"line number2 index1 alt1\">02<\/div>\n<div class=\"line number3 index2 alt2\">03<\/div>\n<div class=\"line number4 index3 alt1\">04<\/div>\n<div class=\"line number5 index4 alt2\">05<\/div>\n<div class=\"line number6 index5 alt1\">06<\/div>\n<div class=\"line number7 index6 alt2\">07<\/div>\n<div class=\"line number8 index7 alt1\">08<\/div>\n<div class=\"line number9 index8 alt2\">09<\/div>\n<div class=\"line number10 index9 alt1\">10<\/div>\n<div class=\"line number11 index10 alt2\">11<\/div>\n<div class=\"line number12 index11 alt1\">12<\/div>\n<div class=\"line number13 index12 alt2\">13<\/div>\n<div class=\"line number14 index13 alt1\">14<\/div>\n<div class=\"line number15 index14 alt2\">15<\/div>\n<div class=\"line number16 index15 alt1\">16<\/div>\n<\/td>\n<td class=\"code\">\n<div class=\"container\">\n<div class=\"line number1 index0 alt2\"><code class=\"c keyword bold\">public<\/code> <code class=\"c keyword bold\">class<\/code> <code class=\"c plain\">PulsarFunctionsAirQualityApplication <\/code><\/div>\n<div class=\"line number2 index1 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">implements Function {<\/code><\/div>\n<div class=\"line number3 index2 alt2\">&nbsp;<\/div>\n<div class=\"line number4 index3 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c keyword bold\">private<\/code> <code class=\"c keyword bold\">static<\/code> <code class=\"c keyword bold\">final<\/code> <code class=\"c color1 bold\">int<\/code> <code class=\"c plain\">HIGH_THRESHOLD = 10;<\/code><\/div>\n<div class=\"line number5 index4 alt2\">&nbsp;<\/div>\n<div class=\"line number6 index5 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">@Override<\/code><\/div>\n<div class=\"line number7 index6 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c keyword bold\">public<\/code> <code class=\"c plain\">Void process(String input, Context context) throws Exception {<\/code><\/div>\n<div class=\"line number8 index7 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code>&nbsp;<\/div>\n<div class=\"line number9 index8 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c color1 bold\">int<\/code> <code class=\"c plain\">number = Integer.valueOf(input);<\/code><\/div>\n<div class=\"line number10 index9 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code>&nbsp;<\/div>\n<div class=\"line number11 index10 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c keyword bold\">if<\/code><code class=\"c plain\">(number &gt; HIGH_THRESHOLD) {<\/code><\/div>\n<div class=\"line number12 index11 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">context.incrCounter(<\/code><code class=\"c string\">\"pollutedRegions\"<\/code><code class=\"c plain\">, 1);<\/code><\/div>\n<div class=\"line number13 index12 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">}<\/code><\/div>\n<div class=\"line number14 index13 alt1\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c keyword bold\">return<\/code> <code class=\"c plain\">null;<\/code><\/div>\n<div class=\"line number15 index14 alt2\"><code class=\"c spaces\">&nbsp;&nbsp;&nbsp;&nbsp;<\/code><code class=\"c plain\">}<\/code><\/div>\n<div class=\"line number16 index15 alt1\"><code class=\"c plain\">}<\/code><\/div>\n<\/div>\n<\/td>\n<\/tr>\n<\/tbody>\n<\/table>\n<\/div>\n<\/div>\n<p>The execution flow of the function along with a Pulsar cluster is the following:<\/p>\n<div class=\"wp-block-image\">\n<figure class=\"aligncenter size-large\"><img decoding=\"async\" width=\"557\" height=\"397\" src=\"https:\/\/www.javacodegeeks.com\/wp-content\/uploads\/2019\/12\/pulsar_functions.png\" alt=\"\" class=\"wp-image-101031\" srcset=\"https:\/\/www.javacodegeeks.com\/wp-content\/uploads\/2019\/12\/pulsar_functions.png 557w, https:\/\/www.javacodegeeks.com\/wp-content\/uploads\/2019\/12\/pulsar_functions-300x214.png 300w\" sizes=\"(max-width: 557px) 100vw, 557px\" \/><\/figure>\n<\/div>\n<p>The Pulsar function can run either in the Pulsar cluster or as a separate application.<\/p>\n<h2 class=\"wp-block-heading\">Summary<\/h2>\n<p>In this article we reviewed briefly some of the most popular frameworks that can be used to implement big data processing systems in Java. Each of the presented frameworks is fairly big and deserves a separate article on its own. Although quite simple our air quality index data pipeline demonstrates the way these frameworks operate and you can use that as a basis for expanding your knowledge in each one of them that might be of further interest. You can review the complete code samples <a href=\"https:\/\/github.com\/martinfmi\/bigdataframeworks\">here<\/a>.<\/p>\n<div class=\"attribution\">\n<table>\n<tbody>\n<tr>\n<td>\n<p>Published on Java Code Geeks with permission by Martin Toshev, partner at our <a href=\"\/\/www.javacodegeeks.com\/join-us\/jcg\/\" target=\"_blank\" rel=\"noopener noreferrer\">JCG program<\/a>. See the original article here: <a href=\"https:\/\/www.javaadvent.com\/2019\/12\/popular-frameworks-for-big-data-processing-in-java.html\" target=\"_blank\" rel=\"noopener noreferrer\">Popular frameworks for big data processing in Java<\/a><\/p>\n<p>Opinions expressed by Java Code Geeks contributors are their own.<\/p>\n<\/td>\n<\/tr>\n<\/tbody>\n<\/table>\n<\/div>\n","protected":false},"excerpt":{"rendered":"<p>The big data challenge The concept of big data is understood differently in the variety of domains where companies face the need to deal with increasing volumes of data. In most of these scenarios the system under consideration needs to be designed in such a way so that it is capable of processing that data &hellip;<\/p>\n","protected":false},"author":11888,"featured_media":112,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[8],"tags":[372],"class_list":["post-101003","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-enterprise-java","tag-big-data"],"yoast_head":"<!-- This site is optimized with the Yoast SEO plugin v27.5 - https:\/\/yoast.com\/product\/yoast-seo-wordpress\/ -->\n<title>Popular frameworks for big data processing in Java - Java Code Geeks<\/title>\n<meta name=\"description\" content=\"Interested to learn about big data? Check our article presenting some popular frameworks for big data processing in Java with examples\" \/>\n<meta name=\"robots\" content=\"index, follow, max-snippet:-1, max-image-preview:large, max-video-preview:-1\" \/>\n<link rel=\"canonical\" href=\"https:\/\/www.javacodegeeks.com\/2019\/12\/popular-frameworks-for-big-data-processing-in-java.html\" \/>\n<meta property=\"og:locale\" content=\"en_US\" \/>\n<meta property=\"og:type\" content=\"article\" \/>\n<meta property=\"og:title\" content=\"Popular frameworks for big data processing in Java - Java Code Geeks\" \/>\n<meta property=\"og:description\" content=\"Interested to learn about big data? Check our article presenting some popular frameworks for big data processing in Java with examples\" \/>\n<meta property=\"og:url\" content=\"https:\/\/www.javacodegeeks.com\/2019\/12\/popular-frameworks-for-big-data-processing-in-java.html\" \/>\n<meta property=\"og:site_name\" content=\"Java Code Geeks\" \/>\n<meta property=\"article:publisher\" content=\"https:\/\/www.facebook.com\/javacodegeeks\" \/>\n<meta property=\"article:published_time\" content=\"2019-12-31T08:00:47+00:00\" \/>\n<meta property=\"og:image\" content=\"https:\/\/www.javacodegeeks.com\/wp-content\/uploads\/2012\/10\/enterprise-java-logo.jpg\" \/>\n\t<meta property=\"og:image:width\" content=\"150\" \/>\n\t<meta property=\"og:image:height\" content=\"150\" \/>\n\t<meta property=\"og:image:type\" content=\"image\/jpeg\" \/>\n<meta name=\"author\" content=\"Martin Toshev\" \/>\n<meta name=\"twitter:card\" content=\"summary_large_image\" \/>\n<meta name=\"twitter:creator\" content=\"@javacodegeeks\" \/>\n<meta name=\"twitter:site\" content=\"@javacodegeeks\" \/>\n<meta name=\"twitter:label1\" content=\"Written by\" \/>\n\t<meta name=\"twitter:data1\" content=\"Martin Toshev\" \/>\n\t<meta name=\"twitter:label2\" content=\"Est. reading time\" \/>\n\t<meta name=\"twitter:data2\" content=\"17 minutes\" \/>\n<script type=\"application\/ld+json\" class=\"yoast-schema-graph\">{\"@context\":\"https:\\\/\\\/schema.org\",\"@graph\":[{\"@type\":\"Article\",\"@id\":\"https:\\\/\\\/www.javacodegeeks.com\\\/2019\\\/12\\\/popular-frameworks-for-big-data-processing-in-java.html#article\",\"isPartOf\":{\"@id\":\"https:\\\/\\\/www.javacodegeeks.com\\\/2019\\\/12\\\/popular-frameworks-for-big-data-processing-in-java.html\"},\"author\":{\"name\":\"Martin Toshev\",\"@id\":\"https:\\\/\\\/www.javacodegeeks.com\\\/#\\\/schema\\\/person\\\/d6aa7d95e114929bd9da5d1a62db40e8\"},\"headline\":\"Popular frameworks for big data processing in Java\",\"datePublished\":\"2019-12-31T08:00:47+00:00\",\"mainEntityOfPage\":{\"@id\":\"https:\\\/\\\/www.javacodegeeks.com\\\/2019\\\/12\\\/popular-frameworks-for-big-data-processing-in-java.html\"},\"wordCount\":1039,\"commentCount\":0,\"publisher\":{\"@id\":\"https:\\\/\\\/www.javacodegeeks.com\\\/#organization\"},\"image\":{\"@id\":\"https:\\\/\\\/www.javacodegeeks.com\\\/2019\\\/12\\\/popular-frameworks-for-big-data-processing-in-java.html#primaryimage\"},\"thumbnailUrl\":\"https:\\\/\\\/www.javacodegeeks.com\\\/wp-content\\\/uploads\\\/2012\\\/10\\\/enterprise-java-logo.jpg\",\"keywords\":[\"Big Data\"],\"articleSection\":[\"Enterprise Java\"],\"inLanguage\":\"en-US\",\"potentialAction\":[{\"@type\":\"CommentAction\",\"name\":\"Comment\",\"target\":[\"https:\\\/\\\/www.javacodegeeks.com\\\/2019\\\/12\\\/popular-frameworks-for-big-data-processing-in-java.html#respond\"]}]},{\"@type\":\"WebPage\",\"@id\":\"https:\\\/\\\/www.javacodegeeks.com\\\/2019\\\/12\\\/popular-frameworks-for-big-data-processing-in-java.html\",\"url\":\"https:\\\/\\\/www.javacodegeeks.com\\\/2019\\\/12\\\/popular-frameworks-for-big-data-processing-in-java.html\",\"name\":\"Popular frameworks for big data processing in Java - Java Code Geeks\",\"isPartOf\":{\"@id\":\"https:\\\/\\\/www.javacodegeeks.com\\\/#website\"},\"primaryImageOfPage\":{\"@id\":\"https:\\\/\\\/www.javacodegeeks.com\\\/2019\\\/12\\\/popular-frameworks-for-big-data-processing-in-java.html#primaryimage\"},\"image\":{\"@id\":\"https:\\\/\\\/www.javacodegeeks.com\\\/2019\\\/12\\\/popular-frameworks-for-big-data-processing-in-java.html#primaryimage\"},\"thumbnailUrl\":\"https:\\\/\\\/www.javacodegeeks.com\\\/wp-content\\\/uploads\\\/2012\\\/10\\\/enterprise-java-logo.jpg\",\"datePublished\":\"2019-12-31T08:00:47+00:00\",\"description\":\"Interested to learn about big data? Check our article presenting some popular frameworks for big data processing in Java with examples\",\"breadcrumb\":{\"@id\":\"https:\\\/\\\/www.javacodegeeks.com\\\/2019\\\/12\\\/popular-frameworks-for-big-data-processing-in-java.html#breadcrumb\"},\"inLanguage\":\"en-US\",\"potentialAction\":[{\"@type\":\"ReadAction\",\"target\":[\"https:\\\/\\\/www.javacodegeeks.com\\\/2019\\\/12\\\/popular-frameworks-for-big-data-processing-in-java.html\"]}]},{\"@type\":\"ImageObject\",\"inLanguage\":\"en-US\",\"@id\":\"https:\\\/\\\/www.javacodegeeks.com\\\/2019\\\/12\\\/popular-frameworks-for-big-data-processing-in-java.html#primaryimage\",\"url\":\"https:\\\/\\\/www.javacodegeeks.com\\\/wp-content\\\/uploads\\\/2012\\\/10\\\/enterprise-java-logo.jpg\",\"contentUrl\":\"https:\\\/\\\/www.javacodegeeks.com\\\/wp-content\\\/uploads\\\/2012\\\/10\\\/enterprise-java-logo.jpg\",\"width\":150,\"height\":150,\"caption\":\"java-interview-questions-answers\"},{\"@type\":\"BreadcrumbList\",\"@id\":\"https:\\\/\\\/www.javacodegeeks.com\\\/2019\\\/12\\\/popular-frameworks-for-big-data-processing-in-java.html#breadcrumb\",\"itemListElement\":[{\"@type\":\"ListItem\",\"position\":1,\"name\":\"Home\",\"item\":\"https:\\\/\\\/www.javacodegeeks.com\\\/\"},{\"@type\":\"ListItem\",\"position\":2,\"name\":\"Java\",\"item\":\"https:\\\/\\\/www.javacodegeeks.com\\\/category\\\/java\"},{\"@type\":\"ListItem\",\"position\":3,\"name\":\"Enterprise Java\",\"item\":\"https:\\\/\\\/www.javacodegeeks.com\\\/category\\\/java\\\/enterprise-java\"},{\"@type\":\"ListItem\",\"position\":4,\"name\":\"Popular frameworks for big data processing in Java\"}]},{\"@type\":\"WebSite\",\"@id\":\"https:\\\/\\\/www.javacodegeeks.com\\\/#website\",\"url\":\"https:\\\/\\\/www.javacodegeeks.com\\\/\",\"name\":\"Java Code Geeks\",\"description\":\"Java Developers Resource Center\",\"publisher\":{\"@id\":\"https:\\\/\\\/www.javacodegeeks.com\\\/#organization\"},\"alternateName\":\"JCG\",\"potentialAction\":[{\"@type\":\"SearchAction\",\"target\":{\"@type\":\"EntryPoint\",\"urlTemplate\":\"https:\\\/\\\/www.javacodegeeks.com\\\/?s={search_term_string}\"},\"query-input\":{\"@type\":\"PropertyValueSpecification\",\"valueRequired\":true,\"valueName\":\"search_term_string\"}}],\"inLanguage\":\"en-US\"},{\"@type\":\"Organization\",\"@id\":\"https:\\\/\\\/www.javacodegeeks.com\\\/#organization\",\"name\":\"Exelixis Media P.C.\",\"url\":\"https:\\\/\\\/www.javacodegeeks.com\\\/\",\"logo\":{\"@type\":\"ImageObject\",\"inLanguage\":\"en-US\",\"@id\":\"https:\\\/\\\/www.javacodegeeks.com\\\/#\\\/schema\\\/logo\\\/image\\\/\",\"url\":\"https:\\\/\\\/www.javacodegeeks.com\\\/wp-content\\\/uploads\\\/2022\\\/06\\\/exelixis-logo.png\",\"contentUrl\":\"https:\\\/\\\/www.javacodegeeks.com\\\/wp-content\\\/uploads\\\/2022\\\/06\\\/exelixis-logo.png\",\"width\":864,\"height\":246,\"caption\":\"Exelixis Media P.C.\"},\"image\":{\"@id\":\"https:\\\/\\\/www.javacodegeeks.com\\\/#\\\/schema\\\/logo\\\/image\\\/\"},\"sameAs\":[\"https:\\\/\\\/www.facebook.com\\\/javacodegeeks\",\"https:\\\/\\\/x.com\\\/javacodegeeks\"]},{\"@type\":\"Person\",\"@id\":\"https:\\\/\\\/www.javacodegeeks.com\\\/#\\\/schema\\\/person\\\/d6aa7d95e114929bd9da5d1a62db40e8\",\"name\":\"Martin Toshev\",\"image\":{\"@type\":\"ImageObject\",\"inLanguage\":\"en-US\",\"@id\":\"https:\\\/\\\/secure.gravatar.com\\\/avatar\\\/a7ab09f89ce59abb723d92c2c46fff56b8b933ab8f6f81a0f69604ed6cfd4e4e?s=96&d=mm&r=g\",\"url\":\"https:\\\/\\\/secure.gravatar.com\\\/avatar\\\/a7ab09f89ce59abb723d92c2c46fff56b8b933ab8f6f81a0f69604ed6cfd4e4e?s=96&d=mm&r=g\",\"contentUrl\":\"https:\\\/\\\/secure.gravatar.com\\\/avatar\\\/a7ab09f89ce59abb723d92c2c46fff56b8b933ab8f6f81a0f69604ed6cfd4e4e?s=96&d=mm&r=g\",\"caption\":\"Martin Toshev\"},\"sameAs\":[\"https:\\\/\\\/www.javaadvent.com\"],\"url\":\"https:\\\/\\\/www.javacodegeeks.com\\\/author\\\/martin-toshev\"}]}<\/script>\n<!-- \/ Yoast SEO plugin. -->","yoast_head_json":{"title":"Popular frameworks for big data processing in Java - Java Code Geeks","description":"Interested to learn about big data? Check our article presenting some popular frameworks for big data processing in Java with examples","robots":{"index":"index","follow":"follow","max-snippet":"max-snippet:-1","max-image-preview":"max-image-preview:large","max-video-preview":"max-video-preview:-1"},"canonical":"https:\/\/www.javacodegeeks.com\/2019\/12\/popular-frameworks-for-big-data-processing-in-java.html","og_locale":"en_US","og_type":"article","og_title":"Popular frameworks for big data processing in Java - Java Code Geeks","og_description":"Interested to learn about big data? Check our article presenting some popular frameworks for big data processing in Java with examples","og_url":"https:\/\/www.javacodegeeks.com\/2019\/12\/popular-frameworks-for-big-data-processing-in-java.html","og_site_name":"Java Code Geeks","article_publisher":"https:\/\/www.facebook.com\/javacodegeeks","article_published_time":"2019-12-31T08:00:47+00:00","og_image":[{"width":150,"height":150,"url":"https:\/\/www.javacodegeeks.com\/wp-content\/uploads\/2012\/10\/enterprise-java-logo.jpg","type":"image\/jpeg"}],"author":"Martin Toshev","twitter_card":"summary_large_image","twitter_creator":"@javacodegeeks","twitter_site":"@javacodegeeks","twitter_misc":{"Written by":"Martin Toshev","Est. reading time":"17 minutes"},"schema":{"@context":"https:\/\/schema.org","@graph":[{"@type":"Article","@id":"https:\/\/www.javacodegeeks.com\/2019\/12\/popular-frameworks-for-big-data-processing-in-java.html#article","isPartOf":{"@id":"https:\/\/www.javacodegeeks.com\/2019\/12\/popular-frameworks-for-big-data-processing-in-java.html"},"author":{"name":"Martin Toshev","@id":"https:\/\/www.javacodegeeks.com\/#\/schema\/person\/d6aa7d95e114929bd9da5d1a62db40e8"},"headline":"Popular frameworks for big data processing in Java","datePublished":"2019-12-31T08:00:47+00:00","mainEntityOfPage":{"@id":"https:\/\/www.javacodegeeks.com\/2019\/12\/popular-frameworks-for-big-data-processing-in-java.html"},"wordCount":1039,"commentCount":0,"publisher":{"@id":"https:\/\/www.javacodegeeks.com\/#organization"},"image":{"@id":"https:\/\/www.javacodegeeks.com\/2019\/12\/popular-frameworks-for-big-data-processing-in-java.html#primaryimage"},"thumbnailUrl":"https:\/\/www.javacodegeeks.com\/wp-content\/uploads\/2012\/10\/enterprise-java-logo.jpg","keywords":["Big Data"],"articleSection":["Enterprise Java"],"inLanguage":"en-US","potentialAction":[{"@type":"CommentAction","name":"Comment","target":["https:\/\/www.javacodegeeks.com\/2019\/12\/popular-frameworks-for-big-data-processing-in-java.html#respond"]}]},{"@type":"WebPage","@id":"https:\/\/www.javacodegeeks.com\/2019\/12\/popular-frameworks-for-big-data-processing-in-java.html","url":"https:\/\/www.javacodegeeks.com\/2019\/12\/popular-frameworks-for-big-data-processing-in-java.html","name":"Popular frameworks for big data processing in Java - Java Code Geeks","isPartOf":{"@id":"https:\/\/www.javacodegeeks.com\/#website"},"primaryImageOfPage":{"@id":"https:\/\/www.javacodegeeks.com\/2019\/12\/popular-frameworks-for-big-data-processing-in-java.html#primaryimage"},"image":{"@id":"https:\/\/www.javacodegeeks.com\/2019\/12\/popular-frameworks-for-big-data-processing-in-java.html#primaryimage"},"thumbnailUrl":"https:\/\/www.javacodegeeks.com\/wp-content\/uploads\/2012\/10\/enterprise-java-logo.jpg","datePublished":"2019-12-31T08:00:47+00:00","description":"Interested to learn about big data? Check our article presenting some popular frameworks for big data processing in Java with examples","breadcrumb":{"@id":"https:\/\/www.javacodegeeks.com\/2019\/12\/popular-frameworks-for-big-data-processing-in-java.html#breadcrumb"},"inLanguage":"en-US","potentialAction":[{"@type":"ReadAction","target":["https:\/\/www.javacodegeeks.com\/2019\/12\/popular-frameworks-for-big-data-processing-in-java.html"]}]},{"@type":"ImageObject","inLanguage":"en-US","@id":"https:\/\/www.javacodegeeks.com\/2019\/12\/popular-frameworks-for-big-data-processing-in-java.html#primaryimage","url":"https:\/\/www.javacodegeeks.com\/wp-content\/uploads\/2012\/10\/enterprise-java-logo.jpg","contentUrl":"https:\/\/www.javacodegeeks.com\/wp-content\/uploads\/2012\/10\/enterprise-java-logo.jpg","width":150,"height":150,"caption":"java-interview-questions-answers"},{"@type":"BreadcrumbList","@id":"https:\/\/www.javacodegeeks.com\/2019\/12\/popular-frameworks-for-big-data-processing-in-java.html#breadcrumb","itemListElement":[{"@type":"ListItem","position":1,"name":"Home","item":"https:\/\/www.javacodegeeks.com\/"},{"@type":"ListItem","position":2,"name":"Java","item":"https:\/\/www.javacodegeeks.com\/category\/java"},{"@type":"ListItem","position":3,"name":"Enterprise Java","item":"https:\/\/www.javacodegeeks.com\/category\/java\/enterprise-java"},{"@type":"ListItem","position":4,"name":"Popular frameworks for big data processing in Java"}]},{"@type":"WebSite","@id":"https:\/\/www.javacodegeeks.com\/#website","url":"https:\/\/www.javacodegeeks.com\/","name":"Java Code Geeks","description":"Java Developers Resource Center","publisher":{"@id":"https:\/\/www.javacodegeeks.com\/#organization"},"alternateName":"JCG","potentialAction":[{"@type":"SearchAction","target":{"@type":"EntryPoint","urlTemplate":"https:\/\/www.javacodegeeks.com\/?s={search_term_string}"},"query-input":{"@type":"PropertyValueSpecification","valueRequired":true,"valueName":"search_term_string"}}],"inLanguage":"en-US"},{"@type":"Organization","@id":"https:\/\/www.javacodegeeks.com\/#organization","name":"Exelixis Media P.C.","url":"https:\/\/www.javacodegeeks.com\/","logo":{"@type":"ImageObject","inLanguage":"en-US","@id":"https:\/\/www.javacodegeeks.com\/#\/schema\/logo\/image\/","url":"https:\/\/www.javacodegeeks.com\/wp-content\/uploads\/2022\/06\/exelixis-logo.png","contentUrl":"https:\/\/www.javacodegeeks.com\/wp-content\/uploads\/2022\/06\/exelixis-logo.png","width":864,"height":246,"caption":"Exelixis Media P.C."},"image":{"@id":"https:\/\/www.javacodegeeks.com\/#\/schema\/logo\/image\/"},"sameAs":["https:\/\/www.facebook.com\/javacodegeeks","https:\/\/x.com\/javacodegeeks"]},{"@type":"Person","@id":"https:\/\/www.javacodegeeks.com\/#\/schema\/person\/d6aa7d95e114929bd9da5d1a62db40e8","name":"Martin Toshev","image":{"@type":"ImageObject","inLanguage":"en-US","@id":"https:\/\/secure.gravatar.com\/avatar\/a7ab09f89ce59abb723d92c2c46fff56b8b933ab8f6f81a0f69604ed6cfd4e4e?s=96&d=mm&r=g","url":"https:\/\/secure.gravatar.com\/avatar\/a7ab09f89ce59abb723d92c2c46fff56b8b933ab8f6f81a0f69604ed6cfd4e4e?s=96&d=mm&r=g","contentUrl":"https:\/\/secure.gravatar.com\/avatar\/a7ab09f89ce59abb723d92c2c46fff56b8b933ab8f6f81a0f69604ed6cfd4e4e?s=96&d=mm&r=g","caption":"Martin Toshev"},"sameAs":["https:\/\/www.javaadvent.com"],"url":"https:\/\/www.javacodegeeks.com\/author\/martin-toshev"}]}},"_links":{"self":[{"href":"https:\/\/www.javacodegeeks.com\/wp-json\/wp\/v2\/posts\/101003","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/www.javacodegeeks.com\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.javacodegeeks.com\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.javacodegeeks.com\/wp-json\/wp\/v2\/users\/11888"}],"replies":[{"embeddable":true,"href":"https:\/\/www.javacodegeeks.com\/wp-json\/wp\/v2\/comments?post=101003"}],"version-history":[{"count":0,"href":"https:\/\/www.javacodegeeks.com\/wp-json\/wp\/v2\/posts\/101003\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/www.javacodegeeks.com\/wp-json\/wp\/v2\/media\/112"}],"wp:attachment":[{"href":"https:\/\/www.javacodegeeks.com\/wp-json\/wp\/v2\/media?parent=101003"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.javacodegeeks.com\/wp-json\/wp\/v2\/categories?post=101003"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.javacodegeeks.com\/wp-json\/wp\/v2\/tags?post=101003"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}