Project: cdh-mapreduce-ext
/**
 * Licensed to the Apache Software Foundation (ASF) under one 
 * or more contributor license agreements.  See the NOTICE file 
 * distributed with this work for additional information 
 * regarding copyright ownership.  The ASF licenses this file 
 * to you under the Apache License, Version 2.0 (the 
 * "License"); you may not use this file except in compliance 
 * with the License.  You may obtain a copy of the License at 
 * 
 *     http://www.apache.org/licenses/LICENSE-2.0 
 * 
 * Unless required by applicable law or agreed to in writing, software 
 * distributed under the License is distributed on an "AS IS" BASIS, 
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
 * See the License for the specific language governing permissions and 
 * limitations under the License. 
 */
 
package org.apache.hadoop.mapreduce.lib.partition; 
 
import java.util.List; 
 
import org.apache.hadoop.conf.Configurable; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.io.WritableComparator; 
import org.apache.hadoop.io.WritableUtils; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.JobContext; 
import org.apache.hadoop.mapreduce.lib.partition.KeyFieldHelper.KeyDescription; 
 
 
/**
 * This comparator implementation provides a subset of the features provided 
 * by the Unix/GNU Sort. In particular, the supported features are: 
 * -n, (Sort numerically) 
 * -r, (Reverse the result of comparison) 
 * -k pos1[,pos2], where pos is of the form f[.c][opts], where f is the number 
 *  of the field to use, and c is the number of the first character from the 
 *  beginning of the field. Fields and character posns are numbered starting 
 *  with 1; a character position of zero in pos2 indicates the field's last 
 *  character. If '.c' is omitted from pos1, it defaults to 1 (the beginning 
 *  of the field); if omitted from pos2, it defaults to 0 (the end of the 
 *  field). opts are ordering options (any of 'nr' as described above).  
 * We assume that the fields in the key are separated by  
 * {@link JobContext#MAP_OUTPUT_KEY_FIELD_SEPERATOR}. 
 */
 
public class KeyFieldBasedComparator<K, V> extends WritableComparator  
    implements Configurable { 
  private KeyFieldHelper keyFieldHelper = new KeyFieldHelper(); 
  public static String COMPARATOR_OPTIONS = "mapreduce.partition.keycomparator.options"
  private static final byte NEGATIVE = (byte)'-'; 
  private static final byte ZERO = (byte)'0'; 
  private static final byte DECIMAL = (byte)'.'; 
  private Configuration conf; 
 
  public void setConf(Configuration conf) { 
    this.conf = conf; 
    String option = conf.get(COMPARATOR_OPTIONS); 
    String keyFieldSeparator = conf.get("mapreduce.map.output.key.field.separator","\t"); 
    keyFieldHelper.setKeyFieldSeparator(keyFieldSeparator); 
    keyFieldHelper.parseOption(option); 
  } 
 
  public Configuration getConf() { 
    return conf; 
  } 
   
  public KeyFieldBasedComparator() { 
    super(Text.class); 
  } 
     
  public int compare(byte[] b1, int s1, int l1, 
      byte[] b2, int s2, int l2) { 
    int n1 = WritableUtils.decodeVIntSize(b1[s1]); 
    int n2 = WritableUtils.decodeVIntSize(b2[s2]); 
    List <KeyDescription> allKeySpecs = keyFieldHelper.keySpecs(); 
 
    if (allKeySpecs.size() == 0) { 
      return compareBytes(b1, s1 + n1, l1 - n1, b2, s2 + n2, l2 - n2); 
    } 
     
    int []lengthIndicesFirst =  
      keyFieldHelper.getWordLengths(b1, s1 + n1, s1 + l1); 
    int []lengthIndicesSecond =  
      keyFieldHelper.getWordLengths(b2, s2 + n2, s2 + l2); 
     
    for (KeyDescription keySpec : allKeySpecs) { 
      int startCharFirst = keyFieldHelper.getStartOffset(b1, s1 + n1, s1 + l1, 
        lengthIndicesFirst, keySpec); 
      int endCharFirst = keyFieldHelper.getEndOffset(b1, s1 + n1, s1 + l1,  
        lengthIndicesFirst, keySpec); 
      int startCharSecond = keyFieldHelper.getStartOffset(b2, s2 + n2, s2 + l2, 
        lengthIndicesSecond, keySpec); 
      int endCharSecond = keyFieldHelper.getEndOffset(b2, s2 + n2, s2 + l2,  
        lengthIndicesSecond, keySpec); 
      int result; 
      if ((result = compareByteSequence(b1, startCharFirst, endCharFirst, b2,  
          startCharSecond, endCharSecond, keySpec)) != 0) { 
        return result; 
      } 
    } 
    return 0
  } 
   
  private int compareByteSequence(byte[] first, int start1, int end1,  
      byte[] second, int start2, int end2, KeyDescription key) { 
    if (start1 == -1) { 
      if (key.reverse) { 
        return 1
      } 
      return -1
    } 
    if (start2 == -1) { 
      if (key.reverse) { 
        return -1;  
      } 
      return 1
    } 
    int compareResult = 0
    if (!key.numeric) { 
      compareResult = compareBytes(first, start1, end1-start1 + 1, second, 
        start2, end2 - start2 + 1); 
    } 
    if (key.numeric) { 
      compareResult = numericalCompare (first, start1, end1, second, start2, 
        end2); 
    } 
    if (key.reverse) { 
      return -compareResult; 
    } 
    return compareResult; 
  } 
   
  private int numericalCompare (byte[] a, int start1, int end1,  
      byte[] b, int start2, int end2) { 
    int i = start1; 
    int j = start2; 
    int mul = 1
    byte first_a = a[i]; 
    byte first_b = b[j]; 
    if (first_a == NEGATIVE) { 
      if (first_b != NEGATIVE) { 
        //check for cases like -0.0 and 0.0 (they should be declared equal) 
        return oneNegativeCompare(a, start1 + 1, end1, b, start2, end2); 
      } 
      i++; 
    } 
    if (first_b == NEGATIVE) { 
      if (first_a != NEGATIVE) { 
        //check for cases like 0.0 and -0.0 (they should be declared equal) 
        return -oneNegativeCompare(b, start2+1, end2, a, start1, end1); 
      } 
      j++; 
    } 
    if (first_b == NEGATIVE && first_a == NEGATIVE) { 
      mul = -1
    } 
 
    //skip over ZEROs 
    while (i <= end1) { 
      if (a[i] != ZERO) { 
        break
      } 
      i++; 
    } 
    while (j <= end2) { 
      if (b[j] != ZERO) { 
        break
      } 
      j++; 
    } 
     
    //skip over equal characters and stopping at the first nondigit char 
    //The nondigit character could be '.' 
    while (i <= end1 && j <= end2) { 
      if (!isdigit(a[i]) || a[i] != b[j]) { 
        break
      } 
      i++; j++; 
    } 
    if (i <= end1) { 
      first_a = a[i]; 
    } 
    if (j <= end2) { 
      first_b = b[j]; 
    } 
    //store the result of the difference. This could be final result if the 
    //number of digits in the mantissa is the same in both the numbers  
    int firstResult = first_a - first_b; 
     
    //check whether we hit a decimal in the earlier scan 
    if ((first_a == DECIMAL && (!isdigit(first_b) || j > end2)) || 
            (first_b == DECIMAL && (!isdigit(first_a) || i > end1))) { 
      return ((mul < 0) ? -decimalCompare(a, i, end1, b, j, end2) :  
        decimalCompare(a, i, end1, b, j, end2)); 
    } 
    //check the number of digits in the mantissa of the numbers 
    int numRemainDigits_a = 0
    int numRemainDigits_b = 0
    while (i <= end1) { 
      //if we encounter a non-digit treat the corresponding number as being  
      //smaller       
      if (isdigit(a[i++])) { 
        numRemainDigits_a++; 
      } else break
    } 
    while (j <= end2) { 
      //if we encounter a non-digit treat the corresponding number as being  
      //smaller 
      if (isdigit(b[j++])) { 
        numRemainDigits_b++; 
      } else break
    } 
    int ret = numRemainDigits_a - numRemainDigits_b; 
    if (ret == 0) {  
      return ((mul < 0) ? -firstResult : firstResult); 
    } else { 
      return ((mul < 0) ? -ret : ret); 
    } 
  } 
  private boolean isdigit(byte b) { 
    if ('0' <= b && b <= '9') { 
      return true
    } 
    return false
  } 
  private int decimalCompare(byte[] a, int i, int end1,  
                             byte[] b, int j, int end2) { 
    if (i > end1) { 
      //if a[] has nothing remaining 
      return -decimalCompare1(b, ++j, end2); 
    } 
    if (j > end2) { 
      //if b[] has nothing remaining 
      return decimalCompare1(a, ++i, end1); 
    } 
    if (a[i] == DECIMAL && b[j] == DECIMAL) { 
      while (i <= end1 && j <= end2) { 
        if (a[i] != b[j]) { 
          if (isdigit(a[i]) && isdigit(b[j])) { 
            return a[i] - b[j]; 
          } 
          if (isdigit(a[i])) { 
            return 1
          } 
          if (isdigit(b[j])) { 
            return -1
          } 
          return 0
        } 
        i++; j++; 
      } 
      if (i > end1 && j > end2) { 
        return 0
      } 
         
      if (i > end1) { 
        //check whether there is a non-ZERO digit after potentially 
        //a number of ZEROs (e.g., a=.4444, b=.444400004) 
        return -decimalCompare1(b, j, end2); 
      } 
      if (j > end2) { 
        //check whether there is a non-ZERO digit after potentially 
        //a number of ZEROs (e.g., b=.4444, a=.444400004) 
        return decimalCompare1(a, i, end1); 
      } 
    } 
    else if (a[i] == DECIMAL) { 
      return decimalCompare1(a, ++i, end1); 
    } 
    else if (b[j] == DECIMAL) { 
      return -decimalCompare1(b, ++j, end2); 
    } 
    return 0
  } 
   
  private int decimalCompare1(byte[] a, int i, int end) { 
    while (i <= end) { 
      if (a[i] == ZERO) { 
        i++; 
        continue
      } 
      if (isdigit(a[i])) { 
        return 1
      } else { 
        return 0
      } 
    } 
    return 0
  } 
   
  private int oneNegativeCompare(byte[] a, int start1, int end1,  
      byte[] b, int start2, int end2) { 
    //here a[] is negative and b[] is positive 
    //We have to ascertain whether the number contains any digits. 
    //If it does, then it is a smaller number for sure. If not, 
    //then we need to scan b[] to find out whether b[] has a digit 
    //If b[] does contain a digit, then b[] is certainly 
    //greater. If not, that is, both a[] and b[] don't contain 
    //digits then they should be considered equal. 
    if (!isZero(a, start1, end1)) { 
      return -1
    } 
    //reached here - this means that a[] is a ZERO 
    if (!isZero(b, start2, end2)) { 
      return -1
    } 
    //reached here - both numbers are basically ZEROs and hence 
    //they should compare equal 
    return 0
  } 
   
  private boolean isZero(byte a[], int start, int end) { 
    //check for zeros in the significand part as well as the decimal part 
    //note that we treat the non-digit characters as ZERO 
    int i = start; 
    //we check the significand for being a ZERO 
    while (i <= end) { 
      if (a[i] != ZERO) { 
        if (a[i] != DECIMAL && isdigit(a[i])) { 
          return false
        } 
        break
      } 
      i++; 
    } 
 
    if (i != (end+1) && a[i++] == DECIMAL) { 
      //we check the decimal part for being a ZERO 
      while (i <= end) { 
        if (a[i] != ZERO) { 
          if (isdigit(a[i])) { 
            return false
          } 
          break
        } 
        i++; 
      } 
    } 
    return true
  } 
  /**
   * Set the {@link KeyFieldBasedComparator} options used to compare keys. 
   *  
   * @param keySpec the key specification of the form -k pos1[,pos2], where, 
   *  pos is of the form f[.c][opts], where f is the number 
   *  of the key field to use, and c is the number of the first character from 
   *  the beginning of the field. Fields and character posns are numbered  
   *  starting with 1; a character position of zero in pos2 indicates the 
   *  field's last character. If '.c' is omitted from pos1, it defaults to 1 
   *  (the beginning of the field); if omitted from pos2, it defaults to 0  
   *  (the end of the field). opts are ordering options. The supported options 
   *  are: 
   *    -n, (Sort numerically) 
   *    -r, (Reverse the result of comparison)                  
   */
 
  public static void setKeyFieldComparatorOptions(Job job, String keySpec) { 
    job.getConfiguration().set(COMPARATOR_OPTIONS, keySpec); 
  } 
   
  /**
   * Get the {@link KeyFieldBasedComparator} options 
   */
 
  public static String getKeyFieldComparatorOption(JobContext job) { 
    return job.getConfiguration().get(COMPARATOR_OPTIONS); 
  } 
 
 
}