import org.apache.hadoop.conf.
Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
// Mapper class
public class StockAnalysis {
public static class StockMapper extends Mapper<LongWritable, Text, Text,
FloatWritable> {
private Text stockSymbol = new Text();
private FloatWritable closePrice = new FloatWritable();
@Override
protected void map(LongWritable key, Text value, Context context) throws
IOException, InterruptedException {
String line = value.toString().trim();
// Skip empty lines
if (line.isEmpty()) {
return;
}
// Split by commas or any other delimiter commonly used in datasets
String[] fields = line.split(",|\\s+|\\t");
// Basic validation to handle different kinds of datasets
if (fields.length < 2) {
return; // Ignore lines that don't have enough data
}
try {
// Attempt to extract the stock symbol and closing price
String stock = fields[1];
// Assuming the second field is stock symbol
float close = Float.parseFloat(fields[fields.length - 1]);
// Assuming last field is close price
stockSymbol.set(stock);
closePrice.set(close);
context.write(stockSymbol, closePrice);
}
catch (NumberFormatException e) {// Ignore lines where parsing fails
}
}
}
// Reducer class
public static class StockReducer extends Reducer<Text, FloatWritable, Text,
FloatWritable> {
@Override
protected void reduce(Text key, Iterable<FloatWritable> values, Context
context) throws IOException, InterruptedException {
float sum = 0.0f;
int count = 0;
for (FloatWritable value : values) {
sum += value.get();
count++;
}
// Avoid division by zero
if (count > 0) {
float average = sum / count;
context.write(key, new FloatWritable(average));
}
}
}
// Main method
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: StockAnalysis <input path> <output
path>");
System.exit(-1);
}
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Stock Analysis");
job.setJarByClass(StockAnalysis.class);
job.setMapperClass(StockMapper.class);
job.setReducerClass(StockReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FloatWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FloatWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}