Wednesday, August 28, 2013

MapReduce with Hadoop 1.2.1 Part One - Install Hadoop on Mac, Run MapReduce job to count words and produce sorted output


I’ve done a bit of work with Hadoop and MapReduce in the past years, and would like to use this blog post to share my experience. 

MapReduce is a software framework introduced by Google to support distributed computing on large data sets on clusters of computers. MapReduce offers an attractive, easy computational model that is data scalable. "Programs written in this functional style are automatically parallelized and executed on a large cluster of commodity machines. The run-time system takes care of the details of partitioning the input data, scheduling the program’s execution across a set of machines, handling machine failures, and managing the required inter-machine communication. This allows programmers without any experience with parallel and distributed systems to easily utilize the resources of a large distributed system."

Apache Hadoop is an open-source software framework that supports data-intensive distributed applications, licensed under the Apache v2 license. It supports the running of applications on large clusters of commodity hardware. 

Here is an illustration of how MapReduce works:






A Mapper
    Accepts (key,value) pairs from the input
    Produces intermediate (key,value) pairs, which are then shuffled
A Reducer
    Accepts intermediate (key,value) pairs
    Produces final (key,value) pairs for the output
A driver
    Specifies which inputs to use, where to put the outputs
    Chooses the mapper and the reducer to use






Install Hadoop 1.2.1 on Mac


Step 1: Enable SSH to localhost

Go to System Preferences > Sharing.
Make sure “Remote Login” is checked.

$ ssh-keygen -t rsa
$ cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys

Make sure in the end you can ssh to the localhost 
$ ssh localhost


Step 2: Install Hadoop

Download the latest Hadoop core (Hadoop hadoop-1.2.1.tar.gz), untag it and move to 
/usr/local

$ tar xvfz hadoop-1.2.1.tar.gz 
$ mv hadoop-1.2.1 /usr/local/

Most likely, you need the root privilege to do the mv . 
$ sudo mv hadoop-1.2.1 /usr/local/ 


Step 3: Configure Hadoop

Change directory to the  where Hadoop is installed and configure the Hadoop. 

$ cd /usr/local/hadoop-1.2.1
$ mkdir /usr/local/hadoop-1.2.1/dfs/

Add the following to conf/core-site.xml

<property>
    <name>fs.default.name</name>
    <value>hdfs://localhost:9000</value>
</property>


Add the following to conf/hdfs-site.xml

<configuration>

<property>
<name>dfs.name.dir</name>
        <value>/usr/local/hadoop-1.2.1/dfs/name</value>
        <description>Path on the local filesystem where the NameNode stores the namespace and transactions logs persistently</description>
</property>


<property>
        <name>dfs.data.dir</name>
        <value>/usr/local/hadoop-1.2.1/dfs/data</value>
        <description>Comma separated list of paths on the local filesystem of a DataNode where it should store its blocks.</description>
</property>

<property>
    <name>dfs.replication</name>
    <value>1</value>
</property>

</configuration>


Add the following to conf/mapred-site.xml

<property>
    <name>mapred.job.tracker</name>
    <value>localhost:9001</value>
</property>


Add the followings to conf/hadoop-env.sh

export HADOOP_OPTS="-Djava.security.krb5.realm= -Djava.security.krb5.kdc="
export JAVA_HOME=/Library/Java/Home



Step 4: Configure environment variables for Hadoop

Edit ~/.bash_profile, to add the followings : 

## set Hadoop environment 
export HADOOP_HOME=/usr/local/hadoop-1.2.1
export HADOOP_CONF_DIR=/usr/local/hadoop-1.2.1/conf
export PATH=$PATH:$HADOOP_HOME/bin


Step 5: Format Hadoop filesystem

Run the hadoop command to format the HDFS system. 

$ hadoop namenode -format

Step 6: Start Hadoop

Run the start-all.sh command to start the Hadoop. 

$ start-all.sh

To verify that all Hadoop processes are running:

$ jps
9460 Jps
9428 TaskTracker
9344 JobTracker
9198 DataNode
9113 NameNode
9282 SecondaryNameNode



To run a sample Hadoop MapReduce job (calculating Pi) that comes from hadoop-examples-1.2.1.jar:
$ hadoop jar /usr/local/hadoop-1.2.1/./hadoop-examples-1.2.1.jar pi 10 100


To take a look at hadoop logs:
$ ls -altr /usr/local/hadoop-1.2.1/logs/


To stop hadoop:
$ stop-all.sh 

Web UI for Hadoop NameNode: http://localhost:50070/
Web UI for Hadoop JobTracker: http://localhost:50030/



Run MapReduce job to count words and produce sorted output 


Use mvn to start a Hadoop MapReduce Java project:
$ mvn archetype:generate -DgroupId=com.lei.hadoop -DartifactId=MapReduceWithHadoop -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false 


$ cd MapReduceWithHadoop/


Add the following Hadoop dependency to pom.xml: 
<dependency>
     <groupId>org.apache.hadoop</groupId>
     <artifactId>hadoop-core</artifactId>
     <version>1.2.1</version>
</dependency>


To build the target jar: 
$ mvn clean compile package 


To generate eclipse project: 
$ mvn eclipse:eclipse 


To run MapReduce job that counts words in text files at local folder wordcount_input, and generate result in sorted format: 
$ hadoop jar ./target/MapReduceWithHadoop-1.0-SNAPSHOT.jar com.lei.hadoop.countword.CountWordsV2 ./wordcount_input ./wordcount_output 


To see the result from output folder wordcount_output: 
$ tail wordcount_output/part-00000
5 Apache
5 a
6 distributed
8 data
8 to
8 of
10 and
10 Hadoop
11 for
12 A



Mapper for WordCount:
    
  // Maper to send <word, 1> to OutputCollector
  public void map(LongWritable key, Text value, 
    OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
   String line = (caseSensitive) ? value.toString() : value.toString().toLowerCase();
   for (String pattern : patternsToSkip) {
    line = line.replaceAll(pattern, "");
   }
     
   StringTokenizer tokenizer = new StringTokenizer(line);
   while (tokenizer.hasMoreTokens()) {
    word.set(tokenizer.nextToken());
    output.collect(word, one);
    reporter.incrCounter(Counters.INPUT_WORDS, 1);
   }
     
   if ((++numRecords % 100) == 0) {
    reporter.setStatus("Finished processing " + numRecords + " records " + "from the input file: " + inputFile);
   }
  }


Reducer for WordCount:
    
 public static class CountWordsV2Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
  // Reducer to sum up word count, and send them to  OutputCollector
  public void reduce(Text key, Iterator<IntWritable> values, 
    OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
   int sum = 0;
   while (values.hasNext()) {
    sum += values.next().get();
   }
   output.collect(key, new IntWritable(sum));
  }
 }


Mapper for WordSort:
    

 /**
  * 
  * @author stones333
  *
  */
 public static class SortWordsMap extends MapReduceBase 
  implements Mapper<LongWritable, Text, IntWritable, Text> {
  @Override
  public void map(LongWritable key, Text value, 
    OutputCollector<IntWritable, Text> collector, 
    Reporter reporter) throws IOException 
  {
   String line = value.toString();
   StringTokenizer stringTokenizer = new StringTokenizer(line);

   int number = 0; 
   String word = "";

   if(stringTokenizer.hasMoreTokens())
   {
    String strWord= stringTokenizer.nextToken();
    word = strWord.trim();
   }

   if( stringTokenizer.hasMoreElements())
   {
    String strNum = stringTokenizer.nextToken();
    number = Integer.parseInt(strNum.trim());
   }

   collector.collect(new IntWritable(number), new Text(word));

  }
 }



Reducer for WordSort:
    
 /**
  * 
  * @author stones333
  *
  */
 public static class SortWordsReduce extends MapReduceBase implements Reducer<IntWritable, Text, IntWritable, Text> {
  
  public void reduce(IntWritable key, Iterator<Text> values, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException
     {
         while((values.hasNext()))
         {
          output.collect(key, values.next());
         }

     }
 }


You can access the code from https://github.com/leizou123/MapReduceWithHadoop

 $ git clone https://github.com/leizou123/MapReduceWithHadoop


Have fun and enjoy the journey.