Project: aim3-tu-berlin
/**
 * Copyright (C) 2011 AIM III course DIMA TU Berlin 
 * 
 * This programm is free software; you can redistribute it and/or modify 
 * it under the terms of the Apache License, Version 2.0 
 * (the "License"); you may not use this file except in compliance with 
 * the License.  You may obtain a copy of the License at 
 * 
 *     http://www.apache.org/licenses/LICENSE-2.0 
 * 
 * Unless required by applicable law or agreed to in writing, software 
 * distributed under the License is distributed on an "AS IS" BASIS, 
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
 * See the License for the specific language governing permissions and 
 * limitations under the License. 
 */
 
package de.tuberlin.dima.aim.exercises.hadoop; 
 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Writable; 
import org.apache.hadoop.mapreduce.*; 
import org.apache.hadoop.util.Tool; 
 
import java.io.IOException; 
import java.util.Collections; 
import java.util.HashMap; 
import java.util.Map; 
 
/**
 * a very simple base class for hadoop jobs modeled after {@link org.apache.mahout.common.AbstractJob} 
 */
 
public abstract class HadoopJob extends Configured implements Tool { 
 
  protected Map<String,String> parseArgs(String[] args) { 
    if (args == null || args.length % 2 != 0) { 
      throw new IllegalStateException("Cannot convert args!"); 
    } 
 
    Map<String,String> parsedArgs = new HashMap<String,String>(); 
    for (int n = 0; n < args.length; n+=2) { 
      parsedArgs.put(args[n], args[n+1]); 
    } 
    return Collections.unmodifiableMap(parsedArgs); 
  } 
 
  protected Job prepareJob(Path inputPath, Path outputPath, Class<? extends InputFormat> inputFormat, 
      Class<? extends Mapper> mapper, Class<? extends Writable> mapperKey, Class<? extends Writable> mapperValue, 
      Class<? extends Reducer> reducer, Class<? extends Writable> reducerKey, Class<? extends Writable> reducerValue, 
      Class<? extends OutputFormat> outputFormat) throws IOException { 
 
    Job job = new Job(new Configuration(getConf())); 
    Configuration jobConf = job.getConfiguration(); 
 
    if (reducer.equals(Reducer.class)) { 
      if (mapper.equals(Mapper.class)) { 
        throw new IllegalStateException("Can't figure out the user class jar file from mapper/reducer"); 
      } 
      job.setJarByClass(mapper); 
    } else { 
      job.setJarByClass(reducer); 
    } 
 
    job.setInputFormatClass(inputFormat); 
    jobConf.set("mapred.input.dir", inputPath.toString()); 
 
    job.setMapperClass(mapper); 
    job.setMapOutputKeyClass(mapperKey); 
    job.setMapOutputValueClass(mapperValue); 
 
    jobConf.setBoolean("mapred.compress.map.output"true); 
 
    job.setReducerClass(reducer); 
    job.setOutputKeyClass(reducerKey); 
    job.setOutputValueClass(reducerValue); 
 
    job.setJobName(getCustomJobName(job, mapper, reducer)); 
 
    job.setOutputFormatClass(outputFormat); 
    jobConf.set("mapred.output.dir", outputPath.toString()); 
 
    return job; 
  } 
 
  private String getCustomJobName(JobContext job, Class<? extends Mapper> mapper, Class<? extends Reducer> reducer) { 
    StringBuilder name = new StringBuilder(100); 
    String customJobName = job.getJobName(); 
    if (customJobName == null || customJobName.trim().length() == 0) { 
      name.append(getClass().getSimpleName()); 
    } else { 
      name.append(customJobName); 
    } 
    name.append('-').append(mapper.getSimpleName()); 
    name.append('-').append(reducer.getSimpleName()); 
    return name.toString(); 
  } 
 
}