Tuesday, March 13, 2012

Simple MapReduce program using Hadoop and HBase

I have a set of input data, which is key value pairs that are generated by another Java program. The data look like the following: 

...
j 9.0
e 4.0
i 8.0
b 9.0
g 25.0
e 88.0
f 32.0
...

And, I wrote a very simple MapReduce program using HBase to import the data into the table and calculate the summation for each key.  I am using Hadoop 0.20.205.0 and HBase 0.90.5.

 

 
public class HBaseCalSum {

 static private String TABLE_NAME = "cal_sum";
 static private String COLUMN_NAME = "content";
 static private String COLUMN_KEY_NAME = "sum";
 
    public static class HBaseCalSumMap extends Mapper<LongWritable,Text,Text, DoubleWritable>
    {
        
        @Override
        public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException
        {
            String line = value.toString();

            if ( JavaUtil.isStringKeyNumSeparatedBySpace(line) )  {
             StringTokenizer st = new StringTokenizer(line);
             String k = (String) st.nextElement();
             Double d = Double.valueOf( (String)st.nextElement());
             context.write(new Text(k), new DoubleWritable(d));
            }

    
        }
    }

    public static class CalSumReduce extends TableReducer<Text, DoubleWritable, NullWritable>
    {
        @Override
        public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException
        {
            int sum = 0;
            for(DoubleWritable v : values)
            {
                sum += v.get();
            }
           
            Put put = new Put(Bytes.toBytes(key.toString()));
            put.add(Bytes.toBytes(COLUMN_NAME), Bytes.toBytes(COLUMN_KEY_NAME), Bytes.toBytes(String.valueOf(sum)));
            context.write(NullWritable.get(), put);
        }
    }

    public static void main(String args[]) throws Exception
    {
     if (args.length<1) {
      System.err.println("Usage: HBaseCalSum <path> ");
      System.exit(-1);
     }
     

        
        Configuration conf = new Configuration();
        conf.set(TableOutputFormat.OUTPUT_TABLE, TABLE_NAME);
        HbaseUtil.createHBaseTable(TABLE_NAME, COLUMN_NAME);

        String input = args[0];
        Job job = new Job(conf, "HBaseCalSum table with " + input);
       
        job.setJarByClass(HBaseCalSum.class);
        job.setMapperClass(HBaseCalSumMap.class);
        job.setReducerClass(CalSumReduce.class);
       
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(DoubleWritable.class);
       
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TableOutputFormat.class);

        FileInputFormat.addInputPath(job, new Path(input));

        int retJob = job.waitForCompletion(true)?0:1;
        
        System.exit(retJob);

    }
}


 
$ ./bin/hbase shell
HBase Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell
Version 0.90.5, r1212209, Fri Dec  9 05:40:36 UTC 2011

hbase(main):001:0> scan 'cal_sum'
ROW                                            COLUMN+CELL                                                                                                                          
 a                                             column=content:sum, timestamp=1331703862041, value=1304                                                                              
 b                                             column=content:sum, timestamp=1331703862041, value=1362                                                                              
 c                                             column=content:sum, timestamp=1331703862041, value=864                                                                               
 d                                             column=content:sum, timestamp=1331703862041, value=1602                                                                              
 e                                             column=content:sum, timestamp=1331703862041, value=1710                                                                              
 f                                             column=content:sum, timestamp=1331703862041, value=491                                                                               
 g                                             column=content:sum, timestamp=1331703862041, value=920                                                                               
 h                                             column=content:sum, timestamp=1331703862041, value=1353                                                                              
 i                                             column=content:sum, timestamp=1331703862041, value=1274                                                                              
 j                                             column=content:sum, timestamp=1331703862041, value=1215                                                                              
 k                                             column=content:sum, timestamp=1331703862041, value=869                                                                               
 l                                             column=content:sum, timestamp=1331703862041, value=1461                                                                              
 m                                             column=content:sum, timestamp=1331703862041, value=1108                                                                              
13 row(s) in 1.2870 seconds

hbase(main):002:0> 


Here is the Java code for HTable scan:
 

 public static void scanTable(String strConf, String tableName, String colName, String keyName) throws Exception {
  Configuration conf; 
  conf = HBaseConfiguration.create();
  conf.addResource(new Path(strConf)); // "/usr/local/hbase/conf/hbase-site.xml"
  
  Scan s = new Scan();
        HTable table = new HTable(conf, tableName);

  s.addColumn(Bytes.toBytes(colName), Bytes.toBytes(keyName));
  ResultScanner scanner = table.getScanner(s);
  try {
    // Scanners return Result instances.
    // Now, for the actual iteration. One way is to use a while loop like so:
    for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
      // print out the row we found and the columns we were looking for
     System.out.println("Found row: " + rr);
     for (KeyValue kv : rr.raw()) {
      
      System.out.println("Found key: " + Bytes.toStringBinary ( kv.getKey(), 2, kv.getRowLength() ) + " Count: " + Bytes.toStringBinary( kv.getValue() ) );
     }
    }

  } finally {
    // Make sure you close your scanners when you are done!
    // Thats why we have it inside a try/finally clause
    scanner.close();
  }

 }



Run and test this Simple MapReduce program with HBase

I installed both Hadoop and HBase in pseodo-distributed (single server) mode. Please refer to my other blogger how to install HBase. Or better yet, use BigTop.

Also, I modifed hadoop-env.sh to add HBase and Zookeeper jars to the HADOOP_CLASSPATH:

 
export HADOOP_CLASSPATH=$HBASE_HOME/hbase-0.90.5.jar:$HBASE_HOME/hbase-0.90.5-tests.jar:$HBASE_HOME/conf:${HBASE_HOME}/lib/zookeeper-3.3.2.jar

1) Start Hadoop in pseodo-distributed (single server) mode
2) Start Hbase in psuedo-distributed (single server) mode.

I run jps to verify all Hadoop and HBase processes are running:



 
home:lei$ jps

9968 HQuorumPeer
9999 HMaster
9632 DataNode
10354 Jps
10082 HRegionServer
9543 NameNode
9720 SecondaryNameNode


3) Run the following commands:
 
home:lei$hadoop fs -rmr ./data
home:lei$hadoop fs -mkdir ./data
home:lei$hadoop fs -put ./data/* ./data
home:lei$hadoop jar ./target/LeiBigTop-1.1.jar  com.lei.bigtop.hbase.calsum.HBaseCalSum ./data


I tested the code on my Mac 10.6.8, Ubuntu 11.10 and AWS EC2 (Ubuntu 11.x).


Here is the ant target to run the program:
 

 <target name="hbaseCalCum" description="Simple MapReduce program with HBase">
  <property name="myclasspath" refid="run.class.path"/>
  <echo message="Simple MapReduce program with HBase"/>
  <echo message="Classpath = ${myclasspath}"/>
  <echo>+--------------------------------------------------------+</echo>
  <echo>+ Program: com.lei.bigtop.hbase.calsum.HBaseCalSum       +</echo>
  <echo>+ Input: ./data                                          +</echo>
  <echo>+--------------------------------------------------------+</echo>
  <echo>${user.dir}</echo>
  <exec executable="hadoop" dir=".">
   <arg value="fs"/>
   <arg value="-rmr"/>
   <arg value="${user.dir}/data"/>
  </exec>
  <exec executable="hadoop" dir=".">
   <arg value="fs"/>
   <arg value="-mkdir"/>
   <arg value="${user.dir}/data"/>
  </exec>
  <exec executable="hadoop" dir=".">
   <arg value="fs"/>
   <arg value="-put"/>
   <arg value="${user.dir}/data/input.txt"/>
   <arg value="${user.dir}/data"/>
  </exec>
  <exec executable="hadoop" dir=".">
   <arg value="jar"/>
   <arg value="${basedir}/${dest.dir.release}/${target.jar}"/>
   <arg value="com.lei.bigtop.hbase.calsum.HBaseCalSum"/>
   <arg value="${user.dir}/data"/>
  </exec>
  <exec executable="hadoop" dir=".">
   <arg value="jar"/>
   <arg value="${basedir}/${dest.dir.release}/${target.jar}"/>
   <arg value="com.lei.bigtop.hbase.util.HBaseScanTable"/>
  </exec>
 </target>


Well, the journey's next stop will be "Test HBase/Hadoop Installation with a simple MapReduce program with HBase"

7 comments:

  1. Managing a business data is not an easy thing, it is very complex process to handle the corporate information both Hadoop and cognos doing this in a easy manner with help of business software suite, thanks for sharing this useful post….
    Regards,
    cognos Training in Chennai|cognos Training|cognos tm1 Training in Chennai

    ReplyDelete
  2. A table is the basic unit of data storage in an oracle database. The table of a database hold all of the user accesible data. Table data is stored in rows and columns. But what is all about the clusters and how to handle it using oracle database system? Expecting a right answer from you. By the way you are maintaining a great blog. Thanks for sharing this in here.
    Oracle Training in Chennai | Oracle Course in Chennai | Oracle Training Center in Chennai

    ReplyDelete
  3. It’s too informative blog and I am getting conglomerations of info’s about CCNA certification. Thanks for sharing; I would like to see your updates regularly so keep blogging.
    Regards,
    ccna institutes in Chennai|ccna courses in Chennai

    ReplyDelete
  4. Thanks for sharing this pretty post to our knowledge, SAS is a program that assists to retrieve, managing and uploading the data & simply it’s an integration system of software for performing these actions, thanks for taking your time to discuss about this topic.
    Regards,
    sas training in Chennai|sas course in Chennai|sas institutes in Chennai

    ReplyDelete
  5. Maharashtra Police Patil Recruitment 2016

    I like the valuable information you provide in your articles......

    ReplyDelete