Friday, March 2, 2012

Test Hadoop Installation with simple MapReduce program and Groovy scripts

As I dig into BigTop project, I am figuring out how to test Hadoop installation using Groovy scripts. This is a part of BigTop integration test. I am putting down notes and code snippets to share with folks with my understanding. This journey shows a couple of useful know-hows: one is how to write Java program dealing with HDFS I/O; another is how to implement Mapper/Reducer in Java; the last thing is Groovy snippets how to test Hadoop Job.

The integrated test flow goes like this:
1. a Java program to generate random key/value pairs - key is selected from a fixed array of strings, the value is a double ranged from 1 to 100; save the key/value pairs separated by space to a local file "input.txt" at specified directory;
2. copy the input file from local file directory to HDFS;
3. do a simple Mapper/Reducer to calculate the sums for keys
4. copy the Hadoop output to a specified local output directory;
5. read the contents from input and output directories, calculate the sum for each key and compare the sum;
6. a Groovy script to run and test it;


Step 1. random key/number pairs generator in Java:
 static public int prepareLocalData (String fileName, String[] keys, int num, int range) {
  int count=0;
  PrintWriter out = null;
  try {
   out = new PrintWriter(new FileWriter(fileName));
   Random randomGenerator = new Random();
   for (int idx = 1; idx <= num; ++idx) {
    int randomInt = randomGenerator.nextInt(100);
    int randomIdx = randomGenerator.nextInt(keys.length);
    out.println(keys[randomIdx] + " " + new Double(randomInt) );
    count++;
   }
      
  } catch (IOException e) {
   e.printStackTrace();
  } finally {
   out.close();
  }
  return count;
 }

 static public int prepareLocalData (String pathName) {
  createDirectoryIfNotExists(pathName);
  String fileName = pathName.endsWith( System.getProperty("file.separator") ) ? pathName+"input.txt" : pathName+System.getProperty("file.separator")+"input.txt";
  String[] keys = new String[] {"a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l","m"};
  return prepareLocalData (fileName, keys, 300, 100);
 }

The input data is like
...
j 9.0
e 4.0
i 8.0
b 9.0
g 25.0
e 88.0
f 32.0
...


2. copy the input file from local directory to HDFS

 public static void copyLocalDirToHDFS (JobConf conf, String srcPathName, String destPathName ) throws IOException {
  Configuration hdfsConf = (conf==null) ? new Configuration() : conf;
  removeDirectory (hdfsConf, destPathName);
  FileSystem fs = FileSystem.get(hdfsConf);
  Path hdfsPath = new Path(destPathName);
  fs.copyFromLocalFile(new Path(destPathName), hdfsPath);
  DistributedCache.addCacheFile(hdfsPath.toUri(), hdfsConf);
 }

3. a simple Mapper/Reducer to calculate the sum for each key
The mapper is implemented as:
public class CalSumMapper extends MapReduceBase
 implements Mapper<LongWritable, Text, Text, DoubleWritable>{
    public void map(LongWritable l, Text text, OutputCollector<Text, DoubleWritable> output, Reporter report) throws IOException
    {
        String line = text.toString();

        if ( JavaUtil.isStringKeyNum(line) )  {
         StringTokenizer st = new StringTokenizer(line);
         String k = (String) st.nextElement();
         Double d = Double.valueOf( (String)st.nextElement());
         output.collect(new Text(k), new DoubleWritable(d));
        }
    }
}


The reducer is implemented as:
public class CalSumReducer extends MapReduceBase
        implements Reducer<Text, DoubleWritable, Text, DoubleWritable>
{
 @Override
 public void reduce(Text key, Iterator<DoubleWritable> values,
   OutputCollector<Text, DoubleWritable> output, Reporter reporter)
   throws IOException {
        double sum = 0;

        while(values.hasNext())
        {
            DoubleWritable value = (DoubleWritable)values.next();
            sum += value.get();
        }
        output.collect(key, new DoubleWritable((sum))); 
 }
}


4. copy the Hadoop output file to a local output directory

 public static void copyHDFSDirToLocal (JobConf conf, String srcPathName, String destPathName ) throws IOException {
  boolean delFlag = deleteLocalDir(destPathName);
  Configuration hdfsConf = (conf==null) ? new Configuration() : conf;
  FileSystem fs = FileSystem.get(hdfsConf);
  Path hdfsPath = new Path(srcPathName);
  Path localPath = new Path(destPathName);
  fs.copyToLocalFile(hdfsPath, localPath);
 }


5. read the contents from input and output directories, calculate the sum for each key and compare the sum
 public static Map <String, List<Double>> getKeyValueListFromDir(String dirName) {
  Map <String, List<Double>> mapKeyValueList = new TreeMap <String, List<Double>>();
  //Map <String, List<Double>> mapKeyValueList = new HashMap <String, List<Double>>();
  BufferedReader in = null;
  
  try {
   File startingDirectory= new File(dirName);
   List<File> files = getFileListing(startingDirectory);
   for (File file : files ) {
    if (file.isFile()) {
     in = new BufferedReader(new FileReader(file.getAbsolutePath() ));
     String line = null;
     while (( line = in.readLine()) != null) {
      if (line==null || line.length()==0) continue;
      if (! isStringKeyNumSeparatedBySpace (line) ) continue;
      StringTokenizer st = new StringTokenizer(line);
      String k = (String) st.nextElement();
      Double d = Double.valueOf( (String)st.nextElement());
      //System.out.println("" + k + " " + d);
      List<Double> list = (mapKeyValueList.get(k)==null)?  new ArrayList<Double> () : mapKeyValueList.get(k);
      list.add(d);
      mapKeyValueList.put(k, list);
     }
     in.close();
    }
   }
  } catch (FileNotFoundException e) {
   e.printStackTrace();
  } catch (IOException e) {
   e.printStackTrace();
  } finally {
   if ( in!=null ) {
    try {
     in.close();
    } catch (IOException e) {
     e.printStackTrace();
    }
   }
  }

  return mapKeyValueList;
 }


 public static boolean compareFileContentSum(String sourceDir, String destDir) {
    Map <String, List<Double>> src = getKeyValueListFromDir(sourceDir);
    Map <String, List<Double>> dest = getKeyValueListFromDir(destDir);
    return compareKeyValueList(src, dest);
 }


 public static boolean compareKeyValueList(Map <String, List<Double>> source, Map <String, List<Double>> dest) {
    
  if (source==null || dest==null) return false;
  if ( source.keySet().size()!=dest.keySet().size() ) 
   return false;
   
  Iterator<String> it = source.keySet().iterator();
  while (it.hasNext()) {
   String k = it.next();
   List<Double> listSource = source.get(k);
   List<Double> listDest = dest.get(k);
   if (listSource==null || listDest==null) return false;
   
   double sourceSum = 0;
   for (Double d : listSource) sourceSum += d.doubleValue();
   double destSum = 0;
   for (Double d : listDest)  destSum += d.doubleValue();
   if (sourceSum!=destSum) return false;
  }
  
  return true;
 }




6. a Groovy script to run and test it;

Here is the Groovy script to run the Hadoop Job as shell command. You need itest-common-0.3.0-incubating-SNAPSHOT.jar for org.apache.bigtop.itest.shell.Shell to run it.

class RunHadoopTest {
 static Shell sh = new Shell("/bin/bash -s");
 
 public static void main(String[] args) {
  String commandLine = "";
  for (String arg : args) {
   commandLine += " " + arg;
  }
  System.out.println( "commandLine [" + commandLine + "]");
  sh.exec(commandLine);
  if (sh.getRet()==0) {
   System.out.println( "SUCCESS! return code is " + sh.getRet());
  } else {
   System.out.println( "FAIL! return code is " + sh.getRet());
  }
 }
}




Well, the last thing is to have Ant target for it.

 <taskdef name="groovy"
         classname="org.codehaus.groovy.ant.Groovy"
         classpathref="groovy.classpath"/>

        <target name="runTestCalSum" depends="build.jar" description="calculate sum">
  <groovy src="src/com/lei/bigtop/hadoop/test/RunHadoopTest.groovy">
   <classpath>
    <pathelement location="."/>
    <fileset dir="lib">
      <include name="**/*.jar" /> 
    </fileset>
   </classpath>
   <arg line="hadoop jar ./target/release/LeiBigTop-1.1.jar com.lei.bigtop.hadoop.calsum.CalSum ./data ./output"/>
  </groovy>
        </target>


Well, that is about it. Enjoy the journey! If you want the source, send me a request, I will be happy to let you know where to get it. 

No comments:

Post a Comment