BÀI TẬP ÁP DỤNG
ElectricityAnalysis.java
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class ElectricityAnalysis {
public static class TokenizerMapper extends Mapper<Object, Text, Text, Text> {
public void map(Object key, Text value, Context context) throws IOException,
InterruptedException {
if (value.toString().startsWith("Year")) return; // bỏ dòng tiêu đề
String[] fields = value.toString().split(",");
String year = fields[0];
String month = fields[1];
String consumption = fields[2];
context.write(new Text(year), new Text(month + ":" + consumption));
public static class SumReducer extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException,
InterruptedException {
int total = 0, max = Integer.MIN_VALUE, min = Integer.MAX_VALUE;
String maxMonth = "", minMonth = "";
for (Text val : values) {
String[] parts = val.toString().split(":");
String month = parts[0];
int consumption = Integer.parseInt(parts[1]);
total += consumption;
if (consumption > max) {
max = consumption;
maxMonth = month;
if (consumption < min) {
min = consumption;
minMonth = month;
String output = "Total=" + total +
", MaxMonth=" + maxMonth + "(" + max + ")" +
", MinMonth=" + minMonth + "(" + min + ")";
context.write(key, new Text(output));
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Electricity Analysis");
job.setJarByClass(ElectricityAnalysis.class);
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(SumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
FindMaxElectricity.java
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FindMaxElectricity {
public static class MapperMax extends Mapper<Object, Text, Text, IntWritable> {
public void map(Object key, Text value, Context context) throws IOException,
InterruptedException {
String[] parts = value.toString().split("\\t");
if (parts.length < 2) return;
String year = parts[0];
String[] tokens = parts[1].split(",");
for (String token : tokens) {
if (token.trim().startsWith("Total=")) {
int total = Integer.parseInt(token.trim().split("=")[1]);
context.write(new Text("year"), new IntWritable(total << 16 | Integer.parseInt(year))); //
encoded year + total
}
public static class ReducerMax extends Reducer<Text, IntWritable, Text, Text> {
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,
InterruptedException {
int maxTotal = Integer.MIN_VALUE;
int minTotal = Integer.MAX_VALUE;
int maxYear = 0, minYear = 0;
for (IntWritable val : values) {
int combined = val.get();
int total = combined >>> 16;
int year = combined & 0xFFFF;
if (total > maxTotal) {
maxTotal = total;
maxYear = year;
if (total < minTotal) {
minTotal = total;
minYear = year;
context.write(new Text("Highest Total Year"), new Text(maxYear + " (" + maxTotal + ")"));
context.write(new Text("Lowest Total Year"), new Text(minYear + " (" + minTotal + ")"));
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Find Max Electricity");
job.setJarByClass(FindMaxElectricity.class);
job.setMapperClass(MapperMax.class);
job.setReducerClass(ReducerMax.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0])); // input là output_dir của job trước
FileOutputFormat.setOutputPath(job, new Path(args[1])); // ví dụ: output_max
System.exit(job.waitForCompletion(true) ? 0 : 1);