Ch 8 and Ch 9:
MapReduce Types, Formats
and Features
MapReduce Form Review
General form of Map/Reduce functions:
map: (K1, V1) -> list(K2, V2)
reduce: (K2, list(V2)) -> list(K3, V3)
General form with Combiner function:
map: (K1, V1) -> list(K2, V2)
combiner: (K2, list(V2)) -> list(K2, V2)
reduce: (K2, list(V2)) -> list(K3, V3)
Partition function:
Input Formats - Basics
Input split - a chunk of the input that is processed by a single map
Each map processes a single split, which is divided into records (key-value pair) that are
individually processed by the map
Represented by Java class InputSplit
Set of storage locations (hostname strings)
Contains reference to the data not the actual data
InputFormat - responsible for creating input splits and dividing them into records
so you will not directly deal with with the InputSplit class
Controlling split size
Input Formats - Basics
Avoid small files - storing a large number of small files increases the numbers of
seeks needed to run the job
A sequence file can be used to merge small files into larger files to avoid a large number of small
files
Preventing splitting - you might want to prevent splitting if you want a single
mapper to process each input file as an entire file
1. Increase the minimum split size to be larger than the largest file in the system
2. Subclass the subclass of FileInputFormat to override the isSplitable() method to return false
Reading an entire file as a record:
Input Formats - File Input
FileInputFormat - the base class for all implementations of InputFormat that use
a file as the source for data
Provides a place to define what files are included as input to a job and an implementation for
generating splits for the input files
Input is often specified as a collection of paths
Splits large files (larger than HDFS block)
CombineFileInputFormat - Java class designed to work well with small files in
Hadoop
Each split will contain many of the small files so that each mapper has more to process
Takes node and rack locality into account when deciding what blocks to place into the same split
Input Formats - Text Input
TextInputFormat - default InputFormat where each record is a line of input
Key - byte offset within the file of the beginning of the line; Value - the contents of the line, not
including any line terminators, packaged as a Text object
mapreduce.input.linerecordreader.line.maxlength - can be used to set a maximum expected line
length
Safeguards against corrupted files (often appears as a very long line)
KeyValueTextInputFormat - Used to interpret TextOutputFormat (default output
that contains key-value pairs separated by a delimiter)
mapreduce.input.keyvaluelinerecordreader.key.value.separator - used to specify the
delimiter/separator which is tab by default
Input Formats - Binary Input, Multiple Inputs, and Database I/O
Binary Input:
SequenceFileInputFormat - stores sequences of binary key-value pairs
SequenceFileAsTextInputFormat - converts sequence file’s keys and values to Text objects
SequenceFileAsBinaryInputFormat - retrieves the sequence file’s keys and values as binary
objects
FixedLengthInputFormat - reading fixed-width binary records from a file where the records are not
separated by delimiters
Multiple Inputs:
All input is interpreted by a single InputFormat and a single Mapper
Output Formats
Text Output: TextOutputFormat - default output format; writes records as lines
of text (keys and values are turned into strings)
KeyValueTextInputFormat - breaks lines into key-value pairs based on a configurable separator
Binary Output:
SequenceFileOutputFormat - writes sequence files as output
SequenceFileAsBinaryOutputFormat - writes keys and values in binary format into a sequence
file container
MapFileOutputFormat - writes map files as output
Multiple Outputs:
Counters
Useful for gathering statistics about a job, quality-control, and problem diagnosis
Built-in Counter Types:
Task Counters - gather info about tasks as they are executed and results are aggregated over all
job tasks
Maintained by each task attempt and are sent to the application manager on a regular basis
to be globally aggregated
May go down if a task fails
Job Counters - measure job-level statistics and are maintained by the application master so they
do not need to be sent across the network
User-Defined Counters: User can define a set of counters to be incremented in
Sorting
Partial Sort - does not produce a globally- Total Sort - produces a globally-sorted output
sorted output file file
Produce a set of sorted files that can be
concatenated to form a globally-sorted file
To do this: use a partitioner that respects the
total order of the output and the partition
sizes must be fairly even
Secondary Sort - Sorts the values of the keys
These are usually not sorted by MapReduce
Joins
MapReduce can perform joins between large datasets.
Ex:
Joins - Map-Side vs Reduce-Side
Map-Side Join Reduce-Side Join
the inputs must be divided into Input datasets do not have to be
the same number of structured in a particular way
partitions and sorted by the
same key (the join key) Results in records with the same
key being brought together in
All the records for a particular the reducer function
key must reside in the same
partition Uses MultipleInputs and a
secondary sort
CompositeInputFormat can be
used to run a map-side join
Side Data Distribution
Side Data - extra read-only data needed by a job to process the main dataset
The main challenge is to make side data available to all the map or reduce tasks (which are
spread across the cluster) in way that is convenient and efficient
Using the Job Configuration
Configuration is a setter method used to set key-value pairs in the job configuration
Useful for passing metadata to tasks
Distributed Cache
Instead of serializing side data in the job config, it is preferred to distribute the datasets using
Hadoop’s distributed cache
MapReduce Library Classes
Mappers/Reducers for commonly-used functions:
Video – Example MapReduce WordCount
Video: https://youtu.be/aelDuboaTqA