0% found this document useful (0 votes)
5 views2 pages

PGM 6

The document contains a Java implementation of a Hadoop MapReduce program for stock analysis. It includes a Mapper class that processes stock data to extract stock symbols and closing prices, and a Reducer class that calculates the average closing price for each stock symbol. The main method sets up the job configuration and specifies input and output paths.

Uploaded by

Vedanth Kumar
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
5 views2 pages

PGM 6

The document contains a Java implementation of a Hadoop MapReduce program for stock analysis. It includes a Mapper class that processes stock data to extract stock symbols and closing prices, and a Reducer class that calculates the average closing price for each stock symbol. The main method sets up the job configuration and specifies input and output paths.

Uploaded by

Vedanth Kumar
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
You are on page 1/ 2

import org.apache.hadoop.conf.

Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;

// Mapper class
public class StockAnalysis {
public static class StockMapper extends Mapper<LongWritable, Text, Text,
FloatWritable> {
private Text stockSymbol = new Text();
private FloatWritable closePrice = new FloatWritable();
@Override
protected void map(LongWritable key, Text value, Context context) throws
IOException, InterruptedException {
String line = value.toString().trim();
// Skip empty lines
if (line.isEmpty()) {
return;
}
// Split by commas or any other delimiter commonly used in datasets
String[] fields = line.split(",|\\s+|\\t");
// Basic validation to handle different kinds of datasets
if (fields.length < 2) {
return; // Ignore lines that don't have enough data
}
try {
// Attempt to extract the stock symbol and closing price
String stock = fields[1];
// Assuming the second field is stock symbol
float close = Float.parseFloat(fields[fields.length - 1]);
// Assuming last field is close price
stockSymbol.set(stock);
closePrice.set(close);
context.write(stockSymbol, closePrice);
}
catch (NumberFormatException e) {// Ignore lines where parsing fails
}
}
}

// Reducer class
public static class StockReducer extends Reducer<Text, FloatWritable, Text,
FloatWritable> {
@Override
protected void reduce(Text key, Iterable<FloatWritable> values, Context
context) throws IOException, InterruptedException {
float sum = 0.0f;
int count = 0;
for (FloatWritable value : values) {
sum += value.get();
count++;
}

// Avoid division by zero


if (count > 0) {
float average = sum / count;
context.write(key, new FloatWritable(average));
}
}
}
// Main method
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: StockAnalysis <input path> <output
path>");
System.exit(-1);
}
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Stock Analysis");
job.setJarByClass(StockAnalysis.class);
job.setMapperClass(StockMapper.class);
job.setReducerClass(StockReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FloatWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FloatWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

You might also like