Project: avro
/**
 * Licensed to the Apache Software Foundation (ASF) under one 
 * or more contributor license agreements.  See the NOTICE file 
 * distributed with this work for additional information 
 * regarding copyright ownership.  The ASF licenses this file 
 * to you under the Apache License, Version 2.0 (the 
 * "License"); you may not use this file except in compliance 
 * with the License.  You may obtain a copy of the License at 
 * 
 *     http://www.apache.org/licenses/LICENSE-2.0 
 * 
 * Unless required by applicable law or agreed to in writing, software 
 * distributed under the License is distributed on an "AS IS" BASIS, 
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
 * See the License for the specific language governing permissions and 
 * limitations under the License. 
 */
package org.apache.avro.mapreduce; 
 
import java.io.IOException; 
import java.lang.reflect.Constructor; 
import java.util.Map; 
import java.util.StringTokenizer; 
import java.util.List; 
import java.util.Set; 
import java.util.HashMap; 
import java.util.ArrayList; 
import java.util.HashSet; 
import java.util.Collections; 
 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.util.ReflectionUtils; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.RecordWriter; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.JobContext; 
import org.apache.hadoop.mapreduce.OutputFormat; 
import org.apache.hadoop.mapreduce.TaskAttemptID; 
import org.apache.hadoop.mapreduce.TaskInputOutputContext; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.avro.Schema; 
 
/**
 * The AvroMultipleOutputs class simplifies writing Avro output data  
 * to multiple outputs 
 *  
 * <p>  
 * Case one: writing to additional outputs other than the job default output. 
 * 
 * Each additional output, or named output, may be configured with its own 
 * <code>Schema</code> and <code>OutputFormat</code>. 
 * </p> 
 * <p> 
 * Case two: to write data to different files provided by user 
 * </p> 
 *  
 * <p> 
 * AvroMultipleOutputs supports counters, by default they are disabled. The  
 * counters group is the {@link AvroMultipleOutputs} class name. The names of the  
 * counters are the same as the output name. These count the number of records  
 * written to each output name.  
 * </p> 
 *  
 * Usage pattern for job submission: 
 * <pre> 
 * 
 * Job job = new Job(); 
 * 
 * FileInputFormat.setInputPath(job, inDir); 
 * FileOutputFormat.setOutputPath(job, outDir); 
 * 
 * job.setMapperClass(MyAvroMapper.class); 
 * job.setReducerClass(MyAvroReducer.class); 
 * ... 
 *   
 * Schema schema; 
 * ... 
 * // Defines additional single output 'avro1' for the job 
 * AvroMultipleOutputs.addNamedOutput(job, "avro1", AvroKeyValueOutputFormat.class, 
 * keyschema, valueSchema);  // valueSchema can be set to null if there only Key to be written  
                                   to file in the RecordWriter 
 * 
 * // Defines additional output 'avro2' with different schema for the job 
 * AvroMultipleOutputs.addNamedOutput(job, "avro2", 
 *   AvroKeyOutputFormat.class, 
 *   schema,null);  
 * ... 
 * 
 * job.waitForCompletion(true); 
 * ... 
 * </pre> 
 * <p> 
 * Usage in Reducer: 
 * <pre> 
 *  
 * public class MyAvroReducer extends 
 *   Reducer<K, V, T, NullWritable> { 
 * private MultipleOutputs amos; 
 * 
 * 
 * public void setup(Context context) { 
 * ... 
 * amos = new AvroMultipleOutputs(context); 
 * } 
 * 
 * public void reduce(K, Iterator<V> values,Context context) 
 * throws IOException { 
 * ... 
 * amos.write("avro1",datum,NullWritable.get()); 
 * amos.write("avro2",datum,NullWritable.get()); 
 * amos.getCollector("avro3",datum); // here the value is taken as NullWritable 
 * ... 
 * } 
 * 
 * public void cleanup(Context context) throws IOException { 
 * amos.close(); 
 * ... 
 * } 
 * 
 * } 
 * </pre> 
 */
 
 
 
public class AvroMultipleOutputs
 
  private static final String MULTIPLE_OUTPUTS = "avro.mapreduce.multipleoutputs"
 
  private static final String MO_PREFIX =  
    "avro.mapreduce.multipleoutputs.namedOutput."
 
  private static final String FORMAT = ".format"
  private static final String COUNTERS_ENABLED =  
    "avro.mapreduce.multipleoutputs.counters"
 
  /**
   * Counters group used by the counters of MultipleOutputs. 
   */
 
  private static final String COUNTERS_GROUP = AvroMultipleOutputs.class.getName(); 
   
  /**
   * Cache for the taskContexts 
   */
 
  private Map<String, TaskAttemptContext> taskContexts = new HashMap<String, TaskAttemptContext>(); 
 
  /**
   * Cache for the Key Schemas 
   */
 
  private static Map<String, Schema> keySchemas = new HashMap<String, Schema>(); 
 
  /**
   * Cache for the Value Schemas 
   */
 
  private static Map<String, Schema> valSchemas = new HashMap<String, Schema>(); 
 
  /**
   * Checks if a named output name is valid token. 
   * 
   * @param namedOutput named output Name 
   * @throws IllegalArgumentException if the output name is not valid. 
   */
 
  private static void checkTokenName(String namedOutput) { 
    if (namedOutput == null || namedOutput.length() == 0) { 
      throw new IllegalArgumentException( 
        "Name cannot be NULL or emtpy"); 
    } 
    for (char ch : namedOutput.toCharArray()) { 
      if ((ch >= 'A') && (ch <= 'Z')) { 
        continue
      } 
      if ((ch >= 'a') && (ch <= 'z')) { 
        continue
      } 
      if ((ch >= '0') && (ch <= '9')) { 
        continue
      } 
      throw new IllegalArgumentException( 
        "Name cannot be have a '" + ch + "' char"); 
    } 
  } 
 
  /**
   * Checks if output name is valid. 
   * 
   * name cannot be the name used for the default output 
   * @param outputPath base output Name 
   * @throws IllegalArgumentException if the output name is not valid. 
   */
 
  private static void checkBaseOutputPath(String outputPath) { 
    if (outputPath.equals("part")) { 
      throw new IllegalArgumentException("output name cannot be 'part'"); 
    } 
  } 
   
  /**
   * Checks if a named output name is valid. 
   * 
   * @param namedOutput named output Name 
   * @throws IllegalArgumentException if the output name is not valid. 
   */
 
  private static void checkNamedOutputName(JobContext job, 
      String namedOutput, boolean alreadyDefined) { 
    checkTokenName(namedOutput); 
    checkBaseOutputPath(namedOutput); 
    List<String> definedChannels = getNamedOutputsList(job); 
    if (alreadyDefined && definedChannels.contains(namedOutput)) { 
      throw new IllegalArgumentException("Named output '" + namedOutput + 
        "' already alreadyDefined"); 
    } else if (!alreadyDefined && !definedChannels.contains(namedOutput)) { 
      throw new IllegalArgumentException("Named output '" + namedOutput + 
        "' not defined"); 
    } 
  } 
 
  // Returns list of channel names. 
  private static List<String> getNamedOutputsList(JobContext job) { 
    List<String> names = new ArrayList<String>(); 
    StringTokenizer st = new StringTokenizer( 
      job.getConfiguration().get(MULTIPLE_OUTPUTS, ""), " "); 
    while (st.hasMoreTokens()) { 
      names.add(st.nextToken()); 
    } 
    return names; 
  } 
 
  // Returns the named output OutputFormat. 
  @SuppressWarnings("unchecked"
  private static Class<? extends OutputFormat<?, ?>> getNamedOutputFormatClass
    JobContext job, String namedOutput) { 
    return (Class<? extends OutputFormat<?, ?>>) 
      job.getConfiguration().getClass(MO_PREFIX + namedOutput + FORMAT, null
      OutputFormat.class); 
  } 
 
 /**
   * Adds a named output for the job. 
   * <p/> 
   * 
   * @param job               job to add the named output 
   * @param namedOutput       named output name, it has to be a word, letters 
   *                          and numbers only, cannot be the word 'part' as 
   *                          that is reserved for the default output. 
   * @param outputFormatClass OutputFormat class. 
   * @param keySchema          Schema for the Key 
   */
 
  @SuppressWarnings("unchecked"
  public static void addNamedOutput(Job job, String namedOutput, 
      Class<? extends OutputFormat> outputFormatClass, 
      Schema keySchema) { 
      addNamedOutput(job,namedOutput,outputFormatClass,keySchema,null); 
  } 
 
  /**
   * Adds a named output for the job. 
   * <p/> 
   * 
   * @param job               job to add the named output 
   * @param namedOutput       named output name, it has to be a word, letters 
   *                          and numbers only, cannot be the word 'part' as 
   *                          that is reserved for the default output. 
   * @param outputFormatClass OutputFormat class. 
   * @param keySchema          Schema for the Key 
   * @param valueSchema        Schema for the Value (used in case of AvroKeyValueOutputFormat or null) 
   */
 
  @SuppressWarnings("unchecked"
  public static void addNamedOutput(Job job, String namedOutput, 
      Class<? extends OutputFormat> outputFormatClass, 
      Schema keySchema, Schema valueSchema) { 
    checkNamedOutputName(job, namedOutput, true); 
    Configuration conf = job.getConfiguration(); 
    conf.set(MULTIPLE_OUTPUTS, 
      conf.get(MULTIPLE_OUTPUTS, "") + " " + namedOutput); 
    conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass, 
      OutputFormat.class); 
    keySchemas.put(namedOutput+"_KEYSCHEMA",keySchema); 
    valSchemas.put(namedOutput+"_VALSCHEMA",valueSchema); 
   
  } 
 
  /**
   * Enables or disables counters for the named outputs. 
   *  
   * The counters group is the {@link MultipleOutputs} class name. 
   * The names of the counters are the same as the named outputs. These 
   * counters count the number records written to each output name. 
   * By default these counters are disabled. 
   * 
   * @param job    job  to enable counters 
   * @param enabled indicates if the counters will be enabled or not. 
   */
 
  public static void setCountersEnabled(Job job, boolean enabled) { 
    job.getConfiguration().setBoolean(COUNTERS_ENABLED, enabled); 
  } 
 
  /**
   * Returns if the counters for the named outputs are enabled or not. 
   * By default these counters are disabled. 
   * 
   * @param job    the job  
   * @return TRUE if the counters are enabled, FALSE if they are disabled. 
   */
 
  public static boolean getCountersEnabled(JobContext job) { 
    return job.getConfiguration().getBoolean(COUNTERS_ENABLED, false); 
  } 
 
  /**
   * Wraps RecordWriter to increment counters.  
   */
 
  @SuppressWarnings("unchecked"
  private static class RecordWriterWithCounter extends RecordWriter { 
    private RecordWriter writer; 
    private String counterName; 
    private TaskInputOutputContext context; 
 
    public RecordWriterWithCounter(RecordWriter writer, String counterName, 
                                   TaskInputOutputContext context) { 
      this.writer = writer; 
      this.counterName = counterName; 
      this.context = context; 
    } 
 
    @SuppressWarnings({"unchecked"}) 
    public void write(Object key, Object value)  
        throws IOException, InterruptedException { 
      context.getCounter(COUNTERS_GROUP, counterName).increment(1); 
      writer.write(key, value); 
    } 
 
    public void close(TaskAttemptContext context)  
        throws IOException, InterruptedException { 
      writer.close(context); 
    } 
  } 
 
  // instance code, to be used from Mapper/Reducer code 
 
  private TaskInputOutputContext<?, ?, ?, ?> context; 
  private Set<String> namedOutputs; 
  private Map<String, RecordWriter<?, ?>> recordWriters; 
  private boolean countersEnabled; 
   
  /**
   * Creates and initializes multiple outputs support, 
   * it should be instantiated in the Mapper/Reducer setup method. 
   * 
   * @param context the TaskInputOutputContext object 
   */
 
  public AvroMultipleOutputs
      TaskInputOutputContext<?, ?, ?, ?> context) { 
    this.context = context; 
    namedOutputs = Collections.unmodifiableSet( 
      new HashSet<String>(AvroMultipleOutputs.getNamedOutputsList(context))); 
    recordWriters = new HashMap<String, RecordWriter<?, ?>>(); 
    countersEnabled = getCountersEnabled(context); 
  } 
 
  /**
   * Write key and value to the namedOutput. 
   * 
   * Output path is a unique file generated for the namedOutput. 
   * For example, {namedOutput}-(m|r)-{part-number} 
   *  
   * @param namedOutput the named output name 
   * @param key         the key , value is NullWritable 
   */
 
  @SuppressWarnings("unchecked"
  public void write(String namedOutput, Object key) 
      throws IOException, InterruptedException { 
    write(namedOutput, key, NullWritable.get(), namedOutput); 
  } 
 
 
 
  /**
   * Write key and value to the namedOutput. 
   * 
   * Output path is a unique file generated for the namedOutput. 
   * For example, {namedOutput}-(m|r)-{part-number} 
   *  
   * @param namedOutput the named output name 
   * @param key         the key 
   * @param value       the value 
   */
 
  @SuppressWarnings("unchecked"
  public void write(String namedOutput, Object key, Object value) 
      throws IOException, InterruptedException { 
    write(namedOutput, key, value, namedOutput); 
  } 
 
  /**
   * Write key and value to baseOutputPath using the namedOutput. 
   *  
   * @param namedOutput    the named output name 
   * @param key            the key 
   * @param value          the value 
   * @param baseOutputPath base-output path to write the record to. 
   * Note: Framework will generate unique filename for the baseOutputPath 
   */
 
  @SuppressWarnings("unchecked"
  public void write(String namedOutput, Object key, Object value, 
      String baseOutputPath) throws IOException, InterruptedException { 
    checkNamedOutputName(context, namedOutput, false); 
    checkBaseOutputPath(baseOutputPath); 
    if (!namedOutputs.contains(namedOutput)) { 
      throw new IllegalArgumentException("Undefined named output '" + 
        namedOutput + "'"); 
    } 
    TaskAttemptContext taskContext = getContext(namedOutput); 
    getRecordWriter(taskContext, baseOutputPath).write(key, value); 
  } 
 
  /**
   * Write key value to an output file name. 
   *  
   * Gets the record writer from job's output format.   
   * Job's output format should be a FileOutputFormat. 
   *  
   * @param key       the key 
   * @param value     the value 
   * @param baseOutputPath base-output path to write the record to. 
   * Note: Framework will generate unique filename for the baseOutputPath 
   */
 
  @SuppressWarnings("unchecked"
  public void write(Object key, Object value, String baseOutputPath)  
      throws IOException, InterruptedException { 
    checkBaseOutputPath(baseOutputPath); 
    TaskAttemptContext taskContext = createTaskAttemptContext( 
      context.getConfiguration(), context.getTaskAttemptID()); 
    getRecordWriter(taskContext, baseOutputPath).write(key, value); 
  } 
 
  // by being synchronized MultipleOutputTask can be use with a 
  // MultithreadedMapper. 
  @SuppressWarnings("unchecked"
  private synchronized RecordWriter getRecordWriter
      TaskAttemptContext taskContext, String baseFileName)  
      throws IOException, InterruptedException { 
     
    // look for record-writer in the cache 
    RecordWriter writer = recordWriters.get(baseFileName); 
     
    // If not in cache, create a new one 
    if (writer == null) { 
      // get the record writer from context output format 
      //FileOutputFormat.setOutputName(taskContext, baseFileName); 
      try { 
        writer = ((OutputFormat) ReflectionUtils.newInstance( 
          taskContext.getOutputFormatClass(), taskContext.getConfiguration())) 
          .getRecordWriter(taskContext); 
      } catch (ClassNotFoundException e) { 
        throw new IOException(e); 
      } 
  
      // if counters are enabled, wrap the writer with context  
      // to increment counters  
      if (countersEnabled) { 
        writer = new RecordWriterWithCounter(writer, baseFileName, context); 
      } 
       
      // add the record-writer to the cache 
      recordWriters.put(baseFileName, writer); 
    } 
    return writer; 
  } 
 
   // Create a taskAttemptContext for the named output with  
   // output format and output key/value types put in the context 
  private TaskAttemptContext getContext(String nameOutput) throws IOException { 
 
    TaskAttemptContext taskContext = taskContexts.get(nameOutput); 
 
    if (taskContext != null) { 
      return taskContext; 
    } 
 
    // The following trick leverages the instantiation of a record writer via 
    // the job thus supporting arbitrary output formats. 
    context.getConfiguration().set("avro.mo.config.namedOutput",nameOutput); 
    Job job = new Job(context.getConfiguration()); 
    job.setOutputFormatClass(getNamedOutputFormatClass(context, nameOutput)); 
    Schema keySchema = keySchemas.get(nameOutput+"_KEYSCHEMA"); 
    Schema valSchema = valSchemas.get(nameOutput+"_VALSCHEMA"); 
 
    boolean isMaponly=job.getNumReduceTasks() == 0
 
    if(keySchema!=null
    { 
      if(isMaponly) 
        AvroJob.setMapOutputKeySchema(job,keySchema); 
      else 
        AvroJob.setOutputKeySchema(job,keySchema); 
    } 
    if(valSchema!=null
    { 
      if(isMaponly) 
        AvroJob.setMapOutputValueSchema(job,valSchema); 
      else 
        AvroJob.setOutputValueSchema(job,valSchema); 
    } 
    taskContext = createTaskAttemptContext( 
      job.getConfiguration(), context.getTaskAttemptID()); 
     
    taskContexts.put(nameOutput, taskContext); 
     
    return taskContext; 
  } 
   
  private TaskAttemptContext createTaskAttemptContext(Configuration conf,  
      TaskAttemptID taskId) { 
    // Use reflection since the context types changed incompatibly between 1.0 
    // and 2.0. 
    try { 
      Class<?> c = getTaskAttemptContextClass(); 
      Constructor<?> cons = c.getConstructor(Configuration.class
          TaskAttemptID.class); 
      return (TaskAttemptContext) cons.newInstance(conf, taskId); 
    } catch (Exception e) { 
      throw new IllegalStateException(e); 
    } 
  } 
   
  private Class<?> getTaskAttemptContextClass() { 
    try { 
      return Class.forName( 
          "org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl"); 
    } catch (Exception e) { 
      try { 
        return Class.forName( 
            "org.apache.hadoop.mapreduce.TaskAttemptContext"); 
      } catch (Exception ex) { 
        throw new IllegalStateException(ex); 
      } 
    } 
  } 
   
  /**
   * Closes all the opened outputs. 
   *  
   * This should be called from cleanup method of map/reduce task. 
   * If overridden subclasses must invoke <code>super.close()</code> at the 
   * end of their <code>close()</code> 
   *  
   */
 
  @SuppressWarnings("unchecked"
  public void close() throws IOException, InterruptedException { 
    for (RecordWriter writer : recordWriters.values()) { 
      writer.close(context); 
    } 
  }