The integrated test flow goes like this:
1. a Java program to generate random key/value pairs - key is selected from a fixed array of strings, the value is a double ranged from 1 to 100; save the key/value pairs separated by space to a local file "input.txt" at specified directory;
2. copy the input file from local file directory to HDFS;
3. do a simple Mapper/Reducer to calculate the sums for keys
4. copy the Hadoop output to a specified local output directory;
5. read the contents from input and output directories, calculate the sum for each key and compare the sum;
6. a Groovy script to run and test it;
Step 1. random key/number pairs generator in Java:
static public int prepareLocalData (String fileName, String[] keys, int num, int range) {
int count=0;
PrintWriter out = null;
try {
out = new PrintWriter(new FileWriter(fileName));
Random randomGenerator = new Random();
for (int idx = 1; idx <= num; ++idx) {
int randomInt = randomGenerator.nextInt(100);
int randomIdx = randomGenerator.nextInt(keys.length);
out.println(keys[randomIdx] + " " + new Double(randomInt) );
count++;
}
} catch (IOException e) {
e.printStackTrace();
} finally {
out.close();
}
return count;
}
static public int prepareLocalData (String pathName) {
createDirectoryIfNotExists(pathName);
String fileName = pathName.endsWith( System.getProperty("file.separator") ) ? pathName+"input.txt" : pathName+System.getProperty("file.separator")+"input.txt";
String[] keys = new String[] {"a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l","m"};
return prepareLocalData (fileName, keys, 300, 100);
}
The input data is like
...
j 9.0
e 4.0
i 8.0
b 9.0
g 25.0
e 88.0
f 32.0
...
2. copy the input file from local directory to HDFS
public static void copyLocalDirToHDFS (JobConf conf, String srcPathName, String destPathName ) throws IOException {
Configuration hdfsConf = (conf==null) ? new Configuration() : conf;
removeDirectory (hdfsConf, destPathName);
FileSystem fs = FileSystem.get(hdfsConf);
Path hdfsPath = new Path(destPathName);
fs.copyFromLocalFile(new Path(destPathName), hdfsPath);
DistributedCache.addCacheFile(hdfsPath.toUri(), hdfsConf);
}
3. a simple Mapper/Reducer to calculate the sum for each key
The mapper is implemented as:
public class CalSumMapper extends MapReduceBase
implements Mapper<LongWritable, Text, Text, DoubleWritable>{
public void map(LongWritable l, Text text, OutputCollector<Text, DoubleWritable> output, Reporter report) throws IOException
{
String line = text.toString();
if ( JavaUtil.isStringKeyNum(line) ) {
StringTokenizer st = new StringTokenizer(line);
String k = (String) st.nextElement();
Double d = Double.valueOf( (String)st.nextElement());
output.collect(new Text(k), new DoubleWritable(d));
}
}
}
The reducer is implemented as:
public class CalSumReducer extends MapReduceBase
implements Reducer<Text, DoubleWritable, Text, DoubleWritable>
{
@Override
public void reduce(Text key, Iterator<DoubleWritable> values,
OutputCollector<Text, DoubleWritable> output, Reporter reporter)
throws IOException {
double sum = 0;
while(values.hasNext())
{
DoubleWritable value = (DoubleWritable)values.next();
sum += value.get();
}
output.collect(key, new DoubleWritable((sum)));
}
}
4. copy the Hadoop output file to a local output directory
public static void copyHDFSDirToLocal (JobConf conf, String srcPathName, String destPathName ) throws IOException {
boolean delFlag = deleteLocalDir(destPathName);
Configuration hdfsConf = (conf==null) ? new Configuration() : conf;
FileSystem fs = FileSystem.get(hdfsConf);
Path hdfsPath = new Path(srcPathName);
Path localPath = new Path(destPathName);
fs.copyToLocalFile(hdfsPath, localPath);
}
5. read the contents from input and output directories, calculate the sum for each key and compare the sum
public static Map <String, List<Double>> getKeyValueListFromDir(String dirName) {
Map <String, List<Double>> mapKeyValueList = new TreeMap <String, List<Double>>();
//Map <String, List<Double>> mapKeyValueList = new HashMap <String, List<Double>>();
BufferedReader in = null;
try {
File startingDirectory= new File(dirName);
List<File> files = getFileListing(startingDirectory);
for (File file : files ) {
if (file.isFile()) {
in = new BufferedReader(new FileReader(file.getAbsolutePath() ));
String line = null;
while (( line = in.readLine()) != null) {
if (line==null || line.length()==0) continue;
if (! isStringKeyNumSeparatedBySpace (line) ) continue;
StringTokenizer st = new StringTokenizer(line);
String k = (String) st.nextElement();
Double d = Double.valueOf( (String)st.nextElement());
//System.out.println("" + k + " " + d);
List<Double> list = (mapKeyValueList.get(k)==null)? new ArrayList<Double> () : mapKeyValueList.get(k);
list.add(d);
mapKeyValueList.put(k, list);
}
in.close();
}
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if ( in!=null ) {
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return mapKeyValueList;
}
public static boolean compareFileContentSum(String sourceDir, String destDir) {
Map <String, List<Double>> src = getKeyValueListFromDir(sourceDir);
Map <String, List<Double>> dest = getKeyValueListFromDir(destDir);
return compareKeyValueList(src, dest);
}
public static boolean compareKeyValueList(Map <String, List<Double>> source, Map <String, List<Double>> dest) {
if (source==null || dest==null) return false;
if ( source.keySet().size()!=dest.keySet().size() )
return false;
Iterator<String> it = source.keySet().iterator();
while (it.hasNext()) {
String k = it.next();
List<Double> listSource = source.get(k);
List<Double> listDest = dest.get(k);
if (listSource==null || listDest==null) return false;
double sourceSum = 0;
for (Double d : listSource) sourceSum += d.doubleValue();
double destSum = 0;
for (Double d : listDest) destSum += d.doubleValue();
if (sourceSum!=destSum) return false;
}
return true;
}
6. a Groovy script to run and test it;
Here is the Groovy script to run the Hadoop Job as shell command. You need itest-common-0.3.0-incubating-SNAPSHOT.jar for org.apache.bigtop.itest.shell.Shell to run it.
class RunHadoopTest {
static Shell sh = new Shell("/bin/bash -s");
public static void main(String[] args) {
String commandLine = "";
for (String arg : args) {
commandLine += " " + arg;
}
System.out.println( "commandLine [" + commandLine + "]");
sh.exec(commandLine);
if (sh.getRet()==0) {
System.out.println( "SUCCESS! return code is " + sh.getRet());
} else {
System.out.println( "FAIL! return code is " + sh.getRet());
}
}
}
Well, the last thing is to have Ant target for it.
<taskdef name="groovy"
classname="org.codehaus.groovy.ant.Groovy"
classpathref="groovy.classpath"/>
<target name="runTestCalSum" depends="build.jar" description="calculate sum">
<groovy src="src/com/lei/bigtop/hadoop/test/RunHadoopTest.groovy">
<classpath>
<pathelement location="."/>
<fileset dir="lib">
<include name="**/*.jar" />
</fileset>
</classpath>
<arg line="hadoop jar ./target/release/LeiBigTop-1.1.jar com.lei.bigtop.hadoop.calsum.CalSum ./data ./output"/>
</groovy>
</target>
Well, that is about it. Enjoy the journey! If you want the source, send me a request, I will be happy to let you know where to get it.
No comments:
Post a Comment