Tuesday, March 13, 2012

Simple MapReduce program using Hadoop and HBase

I have a set of input data, which is key value pairs that are generated by another Java program. The data look like the following: 

j 9.0
e 4.0
i 8.0
b 9.0
g 25.0
e 88.0
f 32.0

And, I wrote a very simple MapReduce program using HBase to import the data into the table and calculate the summation for each key.  I am using Hadoop and HBase 0.90.5.


public class HBaseCalSum {

 static private String TABLE_NAME = "cal_sum";
 static private String COLUMN_NAME = "content";
 static private String COLUMN_KEY_NAME = "sum";
    public static class HBaseCalSumMap extends Mapper<LongWritable,Text,Text, DoubleWritable>
        public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException
            String line = value.toString();

            if ( JavaUtil.isStringKeyNumSeparatedBySpace(line) )  {
             StringTokenizer st = new StringTokenizer(line);
             String k = (String) st.nextElement();
             Double d = Double.valueOf( (String)st.nextElement());
             context.write(new Text(k), new DoubleWritable(d));


    public static class CalSumReduce extends TableReducer<Text, DoubleWritable, NullWritable>
        public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException
            int sum = 0;
            for(DoubleWritable v : values)
                sum += v.get();
            Put put = new Put(Bytes.toBytes(key.toString()));
            put.add(Bytes.toBytes(COLUMN_NAME), Bytes.toBytes(COLUMN_KEY_NAME), Bytes.toBytes(String.valueOf(sum)));
            context.write(NullWritable.get(), put);

    public static void main(String args[]) throws Exception
     if (args.length<1) {
      System.err.println("Usage: HBaseCalSum <path> ");

        Configuration conf = new Configuration();
        conf.set(TableOutputFormat.OUTPUT_TABLE, TABLE_NAME);
        HbaseUtil.createHBaseTable(TABLE_NAME, COLUMN_NAME);

        String input = args[0];
        Job job = new Job(conf, "HBaseCalSum table with " + input);

        FileInputFormat.addInputPath(job, new Path(input));

        int retJob = job.waitForCompletion(true)?0:1;


$ ./bin/hbase shell
HBase Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell
Version 0.90.5, r1212209, Fri Dec  9 05:40:36 UTC 2011

hbase(main):001:0> scan 'cal_sum'
ROW                                            COLUMN+CELL                                                                                                                          
 a                                             column=content:sum, timestamp=1331703862041, value=1304                                                                              
 b                                             column=content:sum, timestamp=1331703862041, value=1362                                                                              
 c                                             column=content:sum, timestamp=1331703862041, value=864                                                                               
 d                                             column=content:sum, timestamp=1331703862041, value=1602                                                                              
 e                                             column=content:sum, timestamp=1331703862041, value=1710                                                                              
 f                                             column=content:sum, timestamp=1331703862041, value=491                                                                               
 g                                             column=content:sum, timestamp=1331703862041, value=920                                                                               
 h                                             column=content:sum, timestamp=1331703862041, value=1353                                                                              
 i                                             column=content:sum, timestamp=1331703862041, value=1274                                                                              
 j                                             column=content:sum, timestamp=1331703862041, value=1215                                                                              
 k                                             column=content:sum, timestamp=1331703862041, value=869                                                                               
 l                                             column=content:sum, timestamp=1331703862041, value=1461                                                                              
 m                                             column=content:sum, timestamp=1331703862041, value=1108                                                                              
13 row(s) in 1.2870 seconds


Here is the Java code for HTable scan:

 public static void scanTable(String strConf, String tableName, String colName, String keyName) throws Exception {
  Configuration conf; 
  conf = HBaseConfiguration.create();
  conf.addResource(new Path(strConf)); // "/usr/local/hbase/conf/hbase-site.xml"
  Scan s = new Scan();
        HTable table = new HTable(conf, tableName);

  s.addColumn(Bytes.toBytes(colName), Bytes.toBytes(keyName));
  ResultScanner scanner = table.getScanner(s);
  try {
    // Scanners return Result instances.
    // Now, for the actual iteration. One way is to use a while loop like so:
    for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
      // print out the row we found and the columns we were looking for
     System.out.println("Found row: " + rr);
     for (KeyValue kv : rr.raw()) {
      System.out.println("Found key: " + Bytes.toStringBinary ( kv.getKey(), 2, kv.getRowLength() ) + " Count: " + Bytes.toStringBinary( kv.getValue() ) );

  } finally {
    // Make sure you close your scanners when you are done!
    // Thats why we have it inside a try/finally clause


Run and test this Simple MapReduce program with HBase

I installed both Hadoop and HBase in pseodo-distributed (single server) mode. Please refer to my other blogger how to install HBase. Or better yet, use BigTop.

Also, I modifed hadoop-env.sh to add HBase and Zookeeper jars to the HADOOP_CLASSPATH:

export HADOOP_CLASSPATH=$HBASE_HOME/hbase-0.90.5.jar:$HBASE_HOME/hbase-0.90.5-tests.jar:$HBASE_HOME/conf:${HBASE_HOME}/lib/zookeeper-3.3.2.jar

1) Start Hadoop in pseodo-distributed (single server) mode
2) Start Hbase in psuedo-distributed (single server) mode.

I run jps to verify all Hadoop and HBase processes are running:

home:lei$ jps

9968 HQuorumPeer
9999 HMaster
9632 DataNode
10354 Jps
10082 HRegionServer
9543 NameNode
9720 SecondaryNameNode

3) Run the following commands:
home:lei$hadoop fs -rmr ./data
home:lei$hadoop fs -mkdir ./data
home:lei$hadoop fs -put ./data/* ./data
home:lei$hadoop jar ./target/LeiBigTop-1.1.jar  com.lei.bigtop.hbase.calsum.HBaseCalSum ./data

I tested the code on my Mac 10.6.8, Ubuntu 11.10 and AWS EC2 (Ubuntu 11.x).

Here is the ant target to run the program:

 <target name="hbaseCalCum" description="Simple MapReduce program with HBase">
  <property name="myclasspath" refid="run.class.path"/>
  <echo message="Simple MapReduce program with HBase"/>
  <echo message="Classpath = ${myclasspath}"/>
  <echo>+ Program: com.lei.bigtop.hbase.calsum.HBaseCalSum       +</echo>
  <echo>+ Input: ./data                                          +</echo>
  <exec executable="hadoop" dir=".">
   <arg value="fs"/>
   <arg value="-rmr"/>
   <arg value="${user.dir}/data"/>
  <exec executable="hadoop" dir=".">
   <arg value="fs"/>
   <arg value="-mkdir"/>
   <arg value="${user.dir}/data"/>
  <exec executable="hadoop" dir=".">
   <arg value="fs"/>
   <arg value="-put"/>
   <arg value="${user.dir}/data/input.txt"/>
   <arg value="${user.dir}/data"/>
  <exec executable="hadoop" dir=".">
   <arg value="jar"/>
   <arg value="${basedir}/${dest.dir.release}/${target.jar}"/>
   <arg value="com.lei.bigtop.hbase.calsum.HBaseCalSum"/>
   <arg value="${user.dir}/data"/>
  <exec executable="hadoop" dir=".">
   <arg value="jar"/>
   <arg value="${basedir}/${dest.dir.release}/${target.jar}"/>
   <arg value="com.lei.bigtop.hbase.util.HBaseScanTable"/>

Well, the journey's next stop will be "Test HBase/Hadoop Installation with a simple MapReduce program with HBase"


