WordCountPartitioner.
java
Objective:
Write a MapReduce program to count the occurrence of similar words in a file.
Use a partitioner to partition key/value pairs based on alphabets.
Partitioner class (WordCountPartitioner.java)
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class WordCountPartitioner extends Partitioner<Text, IntWritable> {
@Override
public int getPartition(Text key, IntWritable value, int numPartitions) {
String word = key.toString();
char alphabet = word.toUpperCase().charAt(0);
int partitionNumber = 0;
switch (alphabet) {
case 'A': partitionNumber = 1; break;
case 'B': partitionNumber = 2; break;
case 'C': partitionNumber = 3; break;
case 'D': partitionNumber = 4; break;
case 'E': partitionNumber = 5; break;
case 'F': partitionNumber = 6; break;
case 'G': partitionNumber = 7; break;
case 'H': partitionNumber = 8; break;
case 'I': partitionNumber = 9; break;
case 'J': partitionNumber = 10; break;
case 'K': partitionNumber = 11; break;
case 'L': partitionNumber = 12; break;
case 'M': partitionNumber = 13; break;
case 'N': partitionNumber = 14; break;
case 'O': partitionNumber = 15; break;
case 'P': partitionNumber = 16; break;
case 'Q': partitionNumber = 17; break;
case 'R': partitionNumber = 18; break;
case 'S': partitionNumber = 19; break;
case 'T': partitionNumber = 20; break;
case 'U': partitionNumber = 21; break;
case 'V': partitionNumber = 22; break;
case 'W': partitionNumber = 23; break;
case 'X': partitionNumber = 24; break;
case 'Y': partitionNumber = 25; break;
case 'Z': partitionNumber = 26; break;
1
Adithya M (Dept. of CSE, CEC)
default: partitionNumber = 0; break;
}
// Hadoop expects partition number to be in range [0, numPartitions-1]
return partitionNumber % numPartitions;
}
}
Driver Class (WordCountDriver.java)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCountDriver {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: WordCountDriver <input path> <output path>");
System.exit(-1);
}
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Word Count with Custom Partitioner");
job.setJarByClass(WordCountDriver.class);
// Set Mapper and Reducer classes (assumed to exist)
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// Set output key and value types
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// Set Partitioner class
job.setPartitionerClass(WordCountPartitioner.class);
// Set number of reduce tasks (must match partitions)
job.setNumReduceTasks(27); // A–Z + default
// Set input and output paths
FileInputFormat.addInputPath(job, new Path(args[0]));
2
Adithya M (Dept. of CSE, CEC)
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// Submit job and wait for completion
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Input Data
A text file containing lines such as:
• Welcome to Hadoop Session
• Introduction to Hadoop
• Introducing Hive
• Hive Session
• Pig Session
MapReduce Searching
Input file (student records) is stored in HDFS.
1001,John,45
1002,Jack,39
1003,Alex,44
1004,Smith,38
1005,Bob,33
Driver (WordSearcher.java) sets the job configuration and specifies the keyword to search
(e.g., "Jack").
Mapper (WordSearchMapper.java) reads line by line and checks if the keyword is present.
• If yes, it emits (line, position) as (key, value).
Reducer (WordSearchReducer.java) simply outputs the key-value pairs received from the
mapper.
Final output is stored in HDFS output path with only the lines that matched the keyword.
1. Mapper (WordSearchMapper.java)
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class WordSearchMapper extends Mapper<LongWritable, Text, Text, Text> {
static String keyword;
static int pos = 0;
3
Adithya M (Dept. of CSE, CEC)
protected void setup(Context context) throws IOException, InterruptedException {
Configuration configuration = context.getConfiguration();
keyword = configuration.get("keyword");
}
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
InputSplit i = context.getInputSplit(); // Get the input split for this map
FileSplit f = (FileSplit) i;
String fileName = f.getPath().getName();
Integer wordPos;
pos++;
if (value.toString().contains(keyword)) {
wordPos = value.find(keyword);
context.write(value, new Text(fileName + "," + new IntWritable(pos).toString()
+ "," + wordPos.toString()));
}
}
}
2. Reducer (WordSearchReducer.java)
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordSearchReducer extends Reducer<Text, Text, Text, Text> {
protected void reduce(Text key, Iterable<Text> value, Context context)
throws IOException, InterruptedException {
for (Text val : value) {
context.write(key, val);
}
}
}
3. Driver (WordSearcher.java)
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordSearcher {
4
Adithya M (Dept. of CSE, CEC)
public static void main(String[] args) throws IOException, InterruptedException,
ClassNotFoundException {
Configuration conf = new Configuration();
conf.set("keyword", "Jack"); // keyword to search
Job job = Job.getInstance(conf, "Word Search");
job.setJarByClass(WordSearcher.class);
job.setMapperClass(WordSearchMapper.class);
job.setReducerClass(WordSearchReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(1);
FileInputFormat.addInputPath(job, new Path("/mapreduce/student.csv"));
FileOutputFormat.setOutputPath(job, new Path("/mapreduce/output/search"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
MapReduce Sorting
Objective & Input Data
• Objective:
To write a MapReduce program that sorts student records by student name.
• Input Data (student.csv):
1001,John,45
1002,Jack,39
1003,Alex,44
1004,Smith,38
1005,Bob,33
Expected Sorted Output (by name):
1003,Alex,44
1005,Bob,33
1002,Jack,39
1001,John,45
1004,Smith,38
1. Mapper (SortMapper inside SortStudNames.java)
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
5
Adithya M (Dept. of CSE, CEC)
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class SortStudNames {
public static class SortMapper extends Mapper<LongWritable, Text, Text, Text> {
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] token = value.toString().split(",");
// token[0] = ID, token[1] = Name, token[2] = Marks
context.write(new Text(token[1]), new Text(token[0] + "," + token[2]));
}
}
2. Reducer (SortReducer inside SortStudNames.java)
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public static class SortReducer extends Reducer<Text, Text, NullWritable, Text> {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text details : values) {
context.write(NullWritable.get(), details);
}
}
}
3. Driver (main method inside SortStudNames.java)
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public static void main(String[] args) throws IOException, InterruptedException,
ClassNotFoundException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Sort Students by Name");
job.setJarByClass(SortStudNames.class);
job.setMapperClass(SortMapper.class);
job.setReducerClass(SortReducer.class);
job.setOutputKeyClass(Text.class);
6
Adithya M (Dept. of CSE, CEC)
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path("/mapreduce/student.csv"));
FileOutputFormat.setOutputPath(job, new Path("/mapreduce/output/sorted"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Mapreduce Word Count
Mapper Code: You have to copy paste this program into the WCMapper Java
Class file.
// Importing libraries
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class WCMapper extends MapReduceBase implements Mapper<LongWritable,
Text, Text, IntWritable> {
// Map function
public void map(LongWritable key, Text value, OutputCollector<Text,
IntWritable> output, Reporter rep) throws IOException
{
String line = value.toString();
// Splitting the line on spaces
for (String word : line.split(" "))
{
if (word.length() > 0)
{
output.collect(new Text(word), new IntWritable(1));
7
Adithya M (Dept. of CSE, CEC)
}
}
}
}
Reducer Code: You have to copy paste this program into the WCReducer
Java Class file.
// Importing libraries
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
public class WCReducer extends MapReduceBase implements Reducer<Text,
IntWritable, Text, IntWritable> {
// Reduce function
public void reduce(Text key, Iterator<IntWritable> value,
OutputCollector<Text, IntWritable> output,
Reporter rep) throws IOException
{
int count = 0;
// Counting the frequency of each words
while (value.hasNext())
{
IntWritable i = value.next();
count += i.get();
}
output.collect(key, new IntWritable(count));
}
}
Driver Code: You have to copy paste this program into the WCDriver Java
Class file.
8
Adithya M (Dept. of CSE, CEC)
// Importing libraries
import java.io.IOException;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class WCDriver extends Configured implements Tool {
public int run(String args[]) throws IOException
{
if (args.length < 2)
{
System.out.println("Please give valid inputs");
return -1;
}
JobConf conf = new JobConf(WCDriver.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setMapperClass(WCMapper.class);
conf.setReducerClass(WCReducer.class);
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(IntWritable.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
JobClient.runJob(conf);
return 0;
}
// Main Method
public static void main(String args[]) throws Exception
{
int exitCode = ToolRunner.run(new WCDriver(), args);
9
Adithya M (Dept. of CSE, CEC)
System.out.println(exitCode);
}
}
https://www.geeksforgeeks.org/data-engineering/how-to-execute-wordcount-program-in-
mapreduce-using-cloudera-distribution-hadoop-cdh/
https://youtu.be/lB8G0a_LjqA?si=-JNvUYhnc1UT_ZsZ
https://youtu.be/knAS0w-jiUk?si=4yoY1cEq0-LKQvFb
https://youtu.be/Dp2-dAftD1Q?si=pWac3qtl5PzyM2Sf
10
Adithya M (Dept. of CSE, CEC)