The integrated test flow goes like this:
1. a Java program to generate random key/value pairs - key is selected from a fixed array of strings, the value is a double ranged from 1 to 100; save the key/value pairs separated by space to a local file "input.txt" at specified directory;
2. copy the input file from local file directory to HDFS;
3. do a simple Mapper/Reducer to calculate the sums for keys
4. copy the Hadoop output to a specified local output directory;
5. read the contents from input and output directories, calculate the sum for each key and compare the sum;
6. a Groovy script to run and test it;
Step 1. random key/number pairs generator in Java:
static public int prepareLocalData (String fileName, String[] keys, int num, int range) { int count=0; PrintWriter out = null; try { out = new PrintWriter(new FileWriter(fileName)); Random randomGenerator = new Random(); for (int idx = 1; idx <= num; ++idx) { int randomInt = randomGenerator.nextInt(100); int randomIdx = randomGenerator.nextInt(keys.length); out.println(keys[randomIdx] + " " + new Double(randomInt) ); count++; } } catch (IOException e) { e.printStackTrace(); } finally { out.close(); } return count; } static public int prepareLocalData (String pathName) { createDirectoryIfNotExists(pathName); String fileName = pathName.endsWith( System.getProperty("file.separator") ) ? pathName+"input.txt" : pathName+System.getProperty("file.separator")+"input.txt"; String[] keys = new String[] {"a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l","m"}; return prepareLocalData (fileName, keys, 300, 100); }
The input data is like
...
j 9.0
e 4.0
i 8.0
b 9.0
g 25.0
e 88.0
f 32.0
...
2. copy the input file from local directory to HDFS
public static void copyLocalDirToHDFS (JobConf conf, String srcPathName, String destPathName ) throws IOException { Configuration hdfsConf = (conf==null) ? new Configuration() : conf; removeDirectory (hdfsConf, destPathName); FileSystem fs = FileSystem.get(hdfsConf); Path hdfsPath = new Path(destPathName); fs.copyFromLocalFile(new Path(destPathName), hdfsPath); DistributedCache.addCacheFile(hdfsPath.toUri(), hdfsConf); }
3. a simple Mapper/Reducer to calculate the sum for each key
The mapper is implemented as:
public class CalSumMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, DoubleWritable>{ public void map(LongWritable l, Text text, OutputCollector<Text, DoubleWritable> output, Reporter report) throws IOException { String line = text.toString(); if ( JavaUtil.isStringKeyNum(line) ) { StringTokenizer st = new StringTokenizer(line); String k = (String) st.nextElement(); Double d = Double.valueOf( (String)st.nextElement()); output.collect(new Text(k), new DoubleWritable(d)); } } }
The reducer is implemented as:
public class CalSumReducer extends MapReduceBase implements Reducer<Text, DoubleWritable, Text, DoubleWritable> { @Override public void reduce(Text key, Iterator<DoubleWritable> values, OutputCollector<Text, DoubleWritable> output, Reporter reporter) throws IOException { double sum = 0; while(values.hasNext()) { DoubleWritable value = (DoubleWritable)values.next(); sum += value.get(); } output.collect(key, new DoubleWritable((sum))); } }
4. copy the Hadoop output file to a local output directory
public static void copyHDFSDirToLocal (JobConf conf, String srcPathName, String destPathName ) throws IOException { boolean delFlag = deleteLocalDir(destPathName); Configuration hdfsConf = (conf==null) ? new Configuration() : conf; FileSystem fs = FileSystem.get(hdfsConf); Path hdfsPath = new Path(srcPathName); Path localPath = new Path(destPathName); fs.copyToLocalFile(hdfsPath, localPath); }
5. read the contents from input and output directories, calculate the sum for each key and compare the sum
public static Map <String, List<Double>> getKeyValueListFromDir(String dirName) { Map <String, List<Double>> mapKeyValueList = new TreeMap <String, List<Double>>(); //Map <String, List<Double>> mapKeyValueList = new HashMap <String, List<Double>>(); BufferedReader in = null; try { File startingDirectory= new File(dirName); List<File> files = getFileListing(startingDirectory); for (File file : files ) { if (file.isFile()) { in = new BufferedReader(new FileReader(file.getAbsolutePath() )); String line = null; while (( line = in.readLine()) != null) { if (line==null || line.length()==0) continue; if (! isStringKeyNumSeparatedBySpace (line) ) continue; StringTokenizer st = new StringTokenizer(line); String k = (String) st.nextElement(); Double d = Double.valueOf( (String)st.nextElement()); //System.out.println("" + k + " " + d); List<Double> list = (mapKeyValueList.get(k)==null)? new ArrayList<Double> () : mapKeyValueList.get(k); list.add(d); mapKeyValueList.put(k, list); } in.close(); } } } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { if ( in!=null ) { try { in.close(); } catch (IOException e) { e.printStackTrace(); } } } return mapKeyValueList; } public static boolean compareFileContentSum(String sourceDir, String destDir) { Map <String, List<Double>> src = getKeyValueListFromDir(sourceDir); Map <String, List<Double>> dest = getKeyValueListFromDir(destDir); return compareKeyValueList(src, dest); } public static boolean compareKeyValueList(Map <String, List<Double>> source, Map <String, List<Double>> dest) { if (source==null || dest==null) return false; if ( source.keySet().size()!=dest.keySet().size() ) return false; Iterator<String> it = source.keySet().iterator(); while (it.hasNext()) { String k = it.next(); List<Double> listSource = source.get(k); List<Double> listDest = dest.get(k); if (listSource==null || listDest==null) return false; double sourceSum = 0; for (Double d : listSource) sourceSum += d.doubleValue(); double destSum = 0; for (Double d : listDest) destSum += d.doubleValue(); if (sourceSum!=destSum) return false; } return true; }
6. a Groovy script to run and test it;
Here is the Groovy script to run the Hadoop Job as shell command. You need itest-common-0.3.0-incubating-SNAPSHOT.jar for org.apache.bigtop.itest.shell.Shell to run it.
class RunHadoopTest { static Shell sh = new Shell("/bin/bash -s"); public static void main(String[] args) { String commandLine = ""; for (String arg : args) { commandLine += " " + arg; } System.out.println( "commandLine [" + commandLine + "]"); sh.exec(commandLine); if (sh.getRet()==0) { System.out.println( "SUCCESS! return code is " + sh.getRet()); } else { System.out.println( "FAIL! return code is " + sh.getRet()); } } }
Well, the last thing is to have Ant target for it.
<taskdef name="groovy" classname="org.codehaus.groovy.ant.Groovy" classpathref="groovy.classpath"/> <target name="runTestCalSum" depends="build.jar" description="calculate sum"> <groovy src="src/com/lei/bigtop/hadoop/test/RunHadoopTest.groovy"> <classpath> <pathelement location="."/> <fileset dir="lib"> <include name="**/*.jar" /> </fileset> </classpath> <arg line="hadoop jar ./target/release/LeiBigTop-1.1.jar com.lei.bigtop.hadoop.calsum.CalSum ./data ./output"/> </groovy> </target>
Well, that is about it. Enjoy the journey! If you want the source, send me a request, I will be happy to let you know where to get it.
No comments:
Post a Comment