Building a MapReduce Application with Hadoop
The first article in this series described the domain of business problems that Hadoop was designed to solve, and the internal architecture of Hadoop that allows it to solve these problems. Applications that run in Hadoop are called MapReduce applications, so this article demonstrates how to build a simple MapReduce application.
Setting Up a Development Environment
Before you can use Hadoop, you’re going to need to have Java 6 (or later) installed, which can be downloaded for your platform from Oracle’s website. Additionally, if you are running on Windows, the official development and deployment platform upon which Hadoop runs is Linux, so you’re going to need to run Hadoop using Cygwin. Mac OSX users should have no problem running Hadoop natively.
Hadoop can be downloaded from its Releases page, but its numbering structure can be a little challenging to interpret. In short, the 1.x branch of code contains the current stable release, the 2.x.x branch contains the alpha code for version 2 of Hadoop, the 0.22.x branch of code is the 2.x.x code, but without security, and the 0.23.x branch of code excludes high availability. The 0.20.x branches of code are legacy and you should ignore them. For the examples in this article, I will be using the 0.23.x code branch, the latest of which is 0.23.5 as of this writing, but for production deployments, you would probably want to download version 1.x or 2.x.x.
Download and decompress this file on your local machine. If you’re planning on doing quite a bit of Hadoop development, it might be in your best interest to add the decompressed bin folder to your environment PATH. You can test your installation by executing the hadoop command from the bin folder:
bin/hadoop
Executing this command without any arguments reveals the following output:
Usage: hadoop [--config confdir] COMMAND where COMMAND is one of: fs run a generic filesystem user client version print the version jar <jar> run a jar file distcp <srcurl> <desturl> copy file or directories recursively archive -archiveName NAME -p <parent path> <src>* <dest> create a hadoop archive classpath prints the class path needed to get the Hadoop jar and the required libraries daemonlog get/set the log level for each daemon or CLASSNAME run the class named CLASSNAME Most commands print help when invoked w/o parameters.
There are numerous commands that can be passed to Hadoop, but in this article we’ll be focusing on executing Hadoop applications in a development environment, so the only one we’ll be interested in is the following:
hadoop jar <jar-file-name>
Hello, MapReduce
The first program that you write in any programming language is typically a “Hello, World” application. In terms of Hadoop and MapReduce, the standard application that everyone writes is the Word Count application. The Word Count application counts the number of times each word in a large amount of text occurs. For example, the word “a” might appear 2,000, times whereas the word “hypothetical” might appear three times. It is a perfect example to learn about MapReduce because the mapping step and reducing step are trivial, but introduce you to thinking in MapReduce. The following is a summary of the components in the Word Count application and their function:
- FileInputFormat: We define a FileInputFormat to read all of the files in a specified directory (passed as the first argument to the MapReduce application) and pass those to a TextInputFormat (see Listing 1) for distribution to our mappers.
- TextInputFormat: The default InputFormat for Hadoop is the TextInputFormat, which reads one line at a time and returns the key as the byte offset as the key (LongWritable) and the line of text as the value (Text).
- Word Count Mapper: This is a class that we write which tokenizes the single line of text passed to it by the InputFormat into words and then emits the word itself with a count of “1” to note that we saw this word.
- Combiner: While we don’t need a combiner in a development environment, the combiner is an implementation of the reducer (described later in this article) that runs on the local node before passing the key/value pair to the reducer. Using combiners can dramatically improve performance, but you need to make sure that combining your results does not break your reducer: In order for the reducer to be used as a combiner, its operation must be associative, otherwise the maps sent to the reducer will not result in the correct result.
- Word Count Reducer: The word count reducer receives a map of every word and a list of all the counts for the number of times that the word was observed by the mappers. Without a combiner, the reducer would receive a word and a collection of “1”s, but because we are going to use the reducer as a combiner, we will have a collection of numbers that will need to be added together.
- TextOutputFormat: In this example, we use the TextOutputFormat class and tell it that the keys will be Text and the values will be IntWritable.
- FileOutputFormat: The TextOutputFormat sends its formatted output to a FileOutputFormat, which writes results to a self-created “output” directory.
You might be wondering why we’re calling Strings “Text” and numbers “IntWritable” and “LongWritable”. The reason is that in order for values to be passed across the Hadoop Distributed File System (HDFS) in a distributed fashion, there are specific rules that define serialization. Fortunately, Hadoop provides wrappers for common types, but if you need to develop your own, then it provides a Writable interface that you can implement it to do so.
Listing 1 shows the source code for our first MapReduce application.
Listing 1 WordCount.java
package com.geekcap.hadoopexamples; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.KeyValueTextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; /** * Created by IntelliJ IDEA. * User: shaines * Date: 12/9/12 * Time: 9:25 PM * To change this template use File | Settings | File Templates. */ public class WordCount extends Configured implements Tool { public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private Text word = new Text(); private final static IntWritable one = new IntWritable( 1 ); public void map( LongWritable key, // Offset into the file Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { // Get the value as a String String text = value.toString().toLowerCase(); // Replace all non-characters text = text.replaceAll( "'", "" ); text = text.replaceAll( "[^a-zA-Z]", " " ); // Iterate over all of the words in the string StringTokenizer st = new StringTokenizer( text ); while( st.hasMoreTokens() ) { // Get the next token and set it as the text for our "word" variable word.set( st.nextToken() ); // Output this word as the key and 1 as the value output.collect( word, one ); } } } public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce( Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { // Iterate over all of the values (counts of occurrences of this word) int count = 0; while( values.hasNext() ) { // Add the value to our count count += values.next().get(); } // Output the word with its count (wrapped in an IntWritable) output.collect( key, new IntWritable( count ) ); } } public int run(String[] args) throws Exception { // Create a configuration Configuration conf = getConf(); // Create a job from the default configuration that will use the WordCount class JobConf job = new JobConf( conf, WordCount.class ); // Define our input path as the first command line argument and our output path as the second Path in = new Path( args[0] ); Path out = new Path( args[1] ); // Create File Input/Output formats for these paths (in the job) FileInputFormat.setInputPaths( job, in ); FileOutputFormat.setOutputPath( job, out ); // Configure the job: name, mapper, reducer, and combiner job.setJobName( "WordCount" ); job.setMapperClass( MapClass.class ); job.setReducerClass( Reduce.class ); job.setCombinerClass( Reduce.class ); // Configure the output job.setOutputFormat( TextOutputFormat.class ); job.setOutputKeyClass( Text.class ); job.setOutputValueClass( IntWritable.class ); // Run the job JobClient.runJob(job); return 0; } public static void main(String[] args) throws Exception { // Start the WordCount MapReduce application int res = ToolRunner.run( new Configuration(), new WordCount(), args ); System.exit( res ); } }
Code Analysis
Execution starts as the WordCount’s main() method is executed, which uses the ToolRunner class to run the job. The ToolRunner creates the WordCount class and executes its run() method.
The run() method configures the job by defining input and output paths and then creating FileInputFormat and FileOutputFormat objects that reference those paths. Setting the input and output formats is a little different from the remainder of the configuration because we create their instances and pass them the reference to the job. The other configuration is accomplished by invoking one of the job’s setter methods.
The job is then configured with a mapper class, a reducer class, and a combiner class. Note that we pass the class itself and not an instance of the class so that Hadoop can create as many of them as it needs to across its distributed environment.
The real work takes place in the MapClass and the Reduce class. The MapClass receives the following information:
- key: The byte offset into the file.
- value: The text of a single line of the file.
- output: The OutputCollector is the mechanism through which we output the key/value pair that we want to pass to the reducer.
- reporter: Used to report progress in processing the job back to the Hadoop server. It is not used in this example
The MapClass extracts the value to a String by calling the value’s toString() method and then does a few conversions: It converts the String to lowercase so that we can match words like “Apple” with “apple”, it deletes single quotes, and it replaces all non-characters with spaces. It then tokenizes the String using white space, and then iterates over all of the tokens in the String. For each token that it finds, it sets the word variable’s text to the token and then emits the word as the key and a static IntWrapper for the number 1 as the value. We could have created a new Text word each time, but because of the number of times this is going to run, it improves performance to maintain the word as a member variable and not re-create it each time.
The Reduce class’ reduce() method receives the same set of parameters that the map() method receives, only its key is the word and, instead of receiving a single value, it receives an Iterator to a list of values. In this example, it would receive something like the word “apple” and an Iterator to a collection with values 1, 1, 1, 1. But because we want to also be able to use this Reduce class as a combiner, we don’t just count the number of entries, but instead of extract the value by calling the IntWritable’s get() method and adding it to our sum. In the end, the reduce() method returns the same key it received (the word) and the sum of the number of occurrences.
You might be wondering what the big deal is. This is a simple program, right? And you are right, it is a simple program, and that is the elegance with MapReduce: You will find yourself spending more time deriving your solution that actually coding.
Listing 2 shows a Maven POM file for building this source code.
Listing 2 pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.geekcap</groupId> <artifactId>hadoop-examples</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <name>hadoop-examples</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-core</artifactId> <version>0.20.205.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> </dependencies> </project>
The POM file is very simple and only includes a reference to the hadoop-core dependency. You can build with the following command:
mvn clean install
To put this all together, we need a significant text file for which to count words. A great source of large text files is the Project Gutenberg, which includes more than 100,000 free ebooks. For my example, I chose Moby Dick. Download one of the text files and put it in a directory on your hard drive (and it should be the only file in that directory on your hard drive). Once you have it, then you can execute your MapReduce project by executing the hadoop command, passing it the path to the directory that contains your book, and a destination directory. For example:
hadoop jar hadoop-examples-1.0-SNAPSHOT.jar com.geekcap.hadoopexamples.WordCount ~/apps/hadoop-0.23.5/test-data output
When I execute this I see the following output:
2012-12-11 22:27:08.929 java[37044:1203] Unable to load realm info from SCDynamicStore 2012-12-11 22:27:09.023 java[37044:1203] Unable to load realm info from SCDynamicStore 12/12/11 22:27:09 WARN conf.Configuration: session.id is deprecated. Instead, use dfs.metrics.session-id 12/12/11 22:27:09 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId= 12/12/11 22:27:09 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized 12/12/11 22:27:09 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 12/12/11 22:27:09 WARN snappy.LoadSnappy: Snappy native library not loaded 12/12/11 22:27:09 INFO mapred.FileInputFormat: Total input paths to process : 1 12/12/11 22:27:10 INFO mapreduce.JobSubmitter: number of splits:1 12/12/11 22:27:10 WARN conf.Configuration: mapred.jar is deprecated. Instead, use mapreduce.job.jar 12/12/11 22:27:10 WARN conf.Configuration: mapred.output.value.class is deprecated. Instead, use mapreduce.job.output.value.class 12/12/11 22:27:10 WARN conf.Configuration: mapred.job.name is deprecated. Instead, use mapreduce.job.name 12/12/11 22:27:10 WARN conf.Configuration: mapred.input.dir is deprecated. Instead, use mapreduce.input.fileinputformat.inputdir 12/12/11 22:27:10 WARN conf.Configuration: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir 12/12/11 22:27:10 WARN conf.Configuration: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps 12/12/11 22:27:10 WARN conf.Configuration: mapred.output.key.class is deprecated. Instead, use mapreduce.job.output.key.class 12/12/11 22:27:10 WARN conf.Configuration: mapred.working.dir is deprecated. Instead, use mapreduce.job.working.dir 12/12/11 22:27:10 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local_0001 12/12/11 22:27:10 INFO mapreduce.Job: The url to track the job: http://localhost:8080/ 12/12/11 22:27:10 INFO mapred.LocalJobRunner: OutputCommitter set in config null 12/12/11 22:27:10 INFO mapreduce.Job: Running job: job_local_0001 12/12/11 22:27:10 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapred.FileOutputCommitter 12/12/11 22:27:10 INFO mapred.LocalJobRunner: Waiting for map tasks 12/12/11 22:27:10 INFO mapred.LocalJobRunner: Starting task: attempt_local_0001_m_000000_0 12/12/11 22:27:10 INFO mapred.Task: Using ResourceCalculatorPlugin : null 12/12/11 22:27:10 INFO mapred.MapTask: numReduceTasks: 1 12/12/11 22:27:10 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584) 12/12/11 22:27:10 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100 12/12/11 22:27:10 INFO mapred.MapTask: soft limit at 83886080 12/12/11 22:27:10 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600 12/12/11 22:27:10 INFO mapred.MapTask: kvstart = 26214396; length = 6553600 12/12/11 22:27:11 INFO mapred.LocalJobRunner: 12/12/11 22:27:11 INFO mapred.MapTask: Starting flush of map output 12/12/11 22:27:11 INFO mapred.MapTask: Spilling map output 12/12/11 22:27:11 INFO mapred.MapTask: bufstart = 0; bufend = 2027118; bufvoid = 104857600 12/12/11 22:27:11 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 25353164(101412656); length = 861233/6553600 12/12/11 22:27:11 INFO mapreduce.Job: Job job_local_0001 running in uber mode : false 12/12/11 22:27:11 INFO mapreduce.Job: map 0% reduce 0% 12/12/11 22:27:12 INFO mapred.MapTask: Finished spill 0 12/12/11 22:27:12 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of committing 12/12/11 22:27:12 INFO mapred.LocalJobRunner: file:/Users/shaines/apps/hadoop-0.23.5/test-data/mobydick.txt:0+1212132 12/12/11 22:27:12 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done. 12/12/11 22:27:12 INFO mapred.LocalJobRunner: Finishing task: attempt_local_0001_m_000000_0 12/12/11 22:27:12 INFO mapred.LocalJobRunner: Map task executor complete. 12/12/11 22:27:12 INFO mapred.Task: Using ResourceCalculatorPlugin : null 12/12/11 22:27:12 INFO mapred.Merger: Merging 1 sorted segments 12/12/11 22:27:12 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 247166 bytes 12/12/11 22:27:12 INFO mapred.LocalJobRunner: 12/12/11 22:27:12 INFO mapreduce.Job: map 100% reduce 0% 12/12/11 22:27:12 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And is in the process of committing 12/12/11 22:27:12 INFO mapred.LocalJobRunner: 12/12/11 22:27:12 INFO mapred.Task: Task attempt_local_0001_r_000000_0 is allowed to commit now 12/12/11 22:27:12 INFO output.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to file:/Users/shaines/Documents/Workspace/hadoop-examples/target/output/_temporary/0/task_local_0001_r_000000 12/12/11 22:27:12 INFO mapred.LocalJobRunner: reduce > reduce 12/12/11 22:27:12 INFO mapred.Task: Task 'attempt_local_0001_r_000000_0' done. 12/12/11 22:27:13 INFO mapreduce.Job: map 100% reduce 100% 12/12/11 22:27:13 INFO mapreduce.Job: Job job_local_0001 completed successfully 12/12/11 22:27:13 INFO mapreduce.Job: Counters: 24 File System Counters FILE: Number of bytes read=2683488 FILE: Number of bytes written=974132 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 Map-Reduce Framework Map input records=21573 Map output records=215309 Map output bytes=2027118 Map output materialized bytes=247174 Input split bytes=113 Combine input records=215309 Combine output records=17107 Reduce input groups=17107 Reduce shuffle bytes=0 Reduce input records=17107 Reduce output records=17107 Spilled Records=34214 Shuffled Maps =0 Failed Shuffles=0 Merged Map outputs=0 GC time elapsed (ms)=32 Total committed heap usage (bytes)=264110080 File Input Format Counters Bytes Read=1212132 File Output Format Counters Bytes Written=182624
And because I told it output to an “output” directory, my output directory has a file named part-00000. Here is a small snippet of its contents:
a 4687 aback 2 abaft 2 abandon 3 abandoned 7 abandonedly 1 abandonment 2 ... your 251 youre 6 youve 1 zephyr 1 zeuglodon 1 zones 3 zoology 2 zoroaster 1
The output contains the word that it found and the number of occurrences of that word. The word “a” occurred 4687 times in Moby Dick, whereas the word “your” only occurred 251 times.
Summary
This article demonstrated how to create a simple MapReduce application from start to finish. It delved into the depths of MapReduce to describe how mappers and reducers are built and then how Hadoop is configured to execute the mapper, reducer, and combiner. The important thing to realize about Hadoop and MapReduce in general is that you’ll need to spend more time thinking about how to solve your problem than you will coding. The trick is to think about the type of key that you need and how to properly construct your value. It takes time and practice, but it is a powerful tool at your disposal.
Don't miss the final article in this series, Applied Big Data Analysis in the Real World with MapReduce and Hadoop, to be posted next week. This article will walk you through setting up and managing a Hadoop production environment.
If you are looking for a good book to help you think in MapReduce, O’Reilly’s MapReduce Design Patterns is a great choice. I read through several books to help me get Hadoop set up and configured, but MapReduce Design Patterns was the first book that I found that helped me really understand how to approach MapReduce problems. I highly recommend it!