
The Hadoop Java API for MapReduce
Hadoop underwent a major API change in its 0.20 release, which is the primary interface in the 1.0 version we use in this book. Though the prior API was certainly functional, the community felt it was unwieldy and unnecessarily complex in some regards.
The new API, sometimes generally referred to as context objects, for reasons we'll see later, is the future of Java's MapReduce development; and as such we will use it wherever possible in this book. Note that caveat: there are parts of the pre-0.20 MapReduce libraries that have not been ported to the new API, so we will use the old interfaces when we need to examine any of these.
The 0.20 MapReduce Java API
The 0.20 and above versions of MapReduce API have most of the key classes and interfaces either in the org.apache.hadoop.mapreduce
package or its subpackages.
In most cases, the implementation of a MapReduce job will provide job-specific subclasses of the Mapper
and Reducer
base classes found in this package.
This is a cut-down view of the base Mapper
class provided by Hadoop. For our own mapper implementations, we will subclass this base class and override the specified method as follows:
class Mapper<K1, V1, K2, V2> { void map(K1 key, V1 value Mapper.Context context) throws IOException, InterruptedException {..} }
Although the use of Java generics can make this look a little opaque at first, there is actually not that much going on. The class is defined in terms of the key/value input and output types, and then the map
method takes an input key/value pair in its parameters. The other parameter is an instance of the Context
class that provides various mechanisms to communicate with the Hadoop framework, one of which is to output the results of a map
or reduce
method.
Tip
Notice that the map
method only refers to a single instance of K1
and V1
key/value pairs. This is a critical aspect of the MapReduce paradigm in which you write classes that process single records and the framework is responsible for all the work required to turn an enormous data set into a stream of key/value pairs. You will never have to write map
or reduce
classes that try to deal with the full data set. Hadoop also provides mechanisms through its InputFormat
and OutputFormat
classes that provide implementations of common file formats and likewise remove the need of having to write file parsers for any but custom file types.
There are three additional methods that sometimes may be required to be overridden.
protected void setup( Mapper.Context context) throws IOException, Interrupted Exception
This method is called once before any key/value pairs are presented to the map
method. The default implementation does nothing.
protected void cleanup( Mapper.Context context) throws IOException, Interrupted Exception
This method is called once after all key/value pairs have been presented to the map
method. The default implementation does nothing.
protected void run( Mapper.Context context) throws IOException, Interrupted Exception
This method controls the overall flow of task processing within a JVM. The default implementation calls the setup
method once before repeatedly calling the map
method for each key/value pair in the split, and then finally calls the cleanup
method.
Tip
Downloading the example code
You can download the example code files for all Packt books you have purchased from your account at http://www.packtpub.com. If you purchased this book elsewhere, you can visit http://www.packtpub.com/support and register to have the files e-mailed directly to you.
The Reducer
base class works very similarly to the Mapper
class, and usually requires only subclasses to override a single reduce
method. Here is the cut-down class definition:
public class Reducer<K2, V2, K3, V3> { void reduce(K1 key, Iterable<V2> values, Reducer.Context context) throws IOException, InterruptedException {..} }
Again, notice the class definition in terms of the broader data flow (the reduce
method accepts K2
/V2
as input and provides K3
/V3
as output) while the actual reduce method takes only a single key and its associated list of values. The Context
object is again the mechanism to output the result of the method.
This class also has the setup
, run
, and cleanup
methods with similar default implementations as with the Mapper
class that can optionally be overridden:
protected void setup( Reduce.Context context) throws IOException, InterruptedException
This method is called once before any key/lists of values are presented to the reduce
method. The default implementation does nothing.
protected void cleanup( Reducer.Context context) throws IOException, InterruptedException
This method is called once after all key/lists of values have been presented to the reduce
method. The default implementation does nothing.
protected void run( Reducer.Context context) throws IOException, InterruptedException
This method controls the overall flow of processing the task within JVM. The default implementation calls the setup
method before repeatedly calling the reduce
method for as many key/values provided to the Reducer
class, and then finally calls the cleanup
method.
Although our mapper and reducer implementations are all we need to perform the MapReduce job, there is one more piece of code required: the driver that communicates with the Hadoop framework and specifies the configuration elements needed to run a MapReduce job. This involves aspects such as telling Hadoop which Mapper
and Reducer
classes to use, where to find the input data and in what format, and where to place the output data and how to format it. There is an additional variety of other configuration options that can be set and which we will see throughout this book.
There is no default parent Driver class as a subclass; the driver logic usually exists in the main method of the class written to encapsulate a MapReduce job. Take a look at the following code snippet as an example driver. Don't worry about how each line works, though you should be able to work out generally what each is doing:
public class ExampleDriver { ... public static void main(String[] args) throws Exception { // Create a Configuration object that is used to set other options Configuration conf = new Configuration() ; // Create the object representing the job Job job = new Job(conf, "ExampleJob") ; // Set the name of the main class in the job jarfile job.setJarByClass(ExampleDriver.class) ; // Set the mapper class job.setMapperClass(ExampleMapper.class) ; // Set the reducer class job.setReducerClass(ExampleReducer.class) ; // Set the types for the final output key and value job.setOutputKeyClass(Text.class) ; job.setOutputValueClass(IntWritable.class) ; // Set input and output file paths FileInputFormat.addInputPath(job, new Path(args[0])) ; FileOutputFormat.setOutputPath(job, new Path(args[1])) // Execute the job and wait for it to complete System.exit(job.waitForCompletion(true) ? 0 : 1); } }}
Given our previous talk of jobs, it is not surprising that much of the setup involves operations on a Job
object. This includes setting the job name and specifying which classes are to be used for the mapper and reducer implementations.
Certain input/output configurations are set and, finally, the arguments passed to the main method are used to specify the input and output locations for the job. This is a very common model that you will see often.
There are a number of default values for configuration options, and we are implicitly using some of them in the preceding class. Most notably, we don't say anything about the file format of the input files or how the output files are to be written. These are defined through the InputFormat
and OutputFormat
classes mentioned earlier; we will explore them in detail later. The default input and output formats are text files that suit our WordCount example. There are multiple ways of expressing the format within text files in addition to particularly optimized binary formats.
A common model for less complex MapReduce jobs is to have the Mapper
and Reducer
classes as inner classes within the driver. This allows everything to be kept in a single file, which simplifies the code distribution.