Project: behemoth
/**
 * Licensed to the Apache Software Foundation (ASF) under one or more 
 * contributor license agreements.  See the NOTICE file distributed with 
 * this work for additional information regarding copyright ownership. 
 * The ASF licenses this file to You under the Apache License, Version 2.0 
 * (the "License"); you may not use this file except in compliance with 
 * the License.  You may obtain a copy of the License at 
 * 
 *     http://www.apache.org/licenses/LICENSE-2.0 
 * 
 * Unless required by applicable law or agreed to in writing, software 
 * distributed under the License is distributed on an "AS IS" BASIS, 
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
 * See the License for the specific language governing permissions and 
 * limitations under the License. 
 */
 
package com.digitalpebble.behemoth.gate; 
 
import gate.AnnotationSet; 
import gate.Corpus; 
import gate.CorpusController; 
import gate.Document; 
import gate.Factory; 
import gate.FeatureMap; 
import gate.Gate; 
import gate.creole.ResourceInstantiationException; 
import gate.util.InvalidOffsetException; 
import gate.util.OffsetComparator; 
import gate.util.persistence.PersistenceManager; 
 
import java.io.File; 
import java.io.IOException; 
import java.net.URL; 
import java.util.ArrayList; 
import java.util.Collections; 
import java.util.Iterator; 
import java.util.List; 
import java.util.Map.Entry; 
import java.util.Set; 
 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.io.MapWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.io.Writable; 
import org.apache.hadoop.mapred.Reporter; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
 
import com.digitalpebble.behemoth.Annotation; 
import com.digitalpebble.behemoth.BehemothDocument; 
import com.digitalpebble.behemoth.DocumentProcessor; 
 
/**
 * Encapsulates a GATE application 
 **/
 
public class GATEProcessor implements DocumentProcessor { 
 
    private static final Logger LOG = LoggerFactory 
            .getLogger(GATEProcessor.class); 
 
    private static boolean inited = false
 
    private CorpusController GATEapplication = null
 
    private Corpus corpus = null
 
    private Configuration config; 
 
    private GATEAnnotationFilters filters = null
 
    private URL applicationDescriptorPath = null
 
    public GATEProcessor(URL appliPath) { 
        applicationDescriptorPath = appliPath; 
    } 
 
    // Process an input document with GATE 
    public synchronized BehemothDocument[] process(BehemothDocument inputDoc) { 
        return process(inputDoc, null); 
    } 
 
    // Process an input document with GATE and a Reporter and return a GATE XML 
 
    public synchronized String processNative(BehemothDocument inputDoc, 
            Reporter reporter) { 
        if (reporter != null
            reporter.setStatus("GATE : " + inputDoc.getUrl().toString()); 
        // process the text passed as value with the application 
        // a) create a GATE document based on the text value 
        gate.Document gatedocument = null
        try { 
 
            gatedocument = generateGATEDoc(inputDoc); 
            // add it to the current corpus 
            corpus.add(gatedocument); 
            // get the application and assign the corpus to it 
            this.GATEapplication.setCorpus(corpus); 
            // process it with GATE 
            this.GATEapplication.execute(); 
 
            // transfer the annotations from the GATE document 
            // to the Behemoth one using the filters 
            if (reporter != null
                reporter.incrCounter("GATE""Document"1); 
 
            return gatedocument.toXml(); 
 
        } catch (Exception e) { 
            LOG.error(inputDoc.getUrl().toString(), e); 
            if (reporter != null
                reporter.incrCounter("GATE""Exceptions"1); 
        } finally { 
            // remove the document from the corpus again 
            corpus.clear(); 
            // and from memory 
            if (gatedocument != null
                Factory.deleteResource(gatedocument); 
        } 
        return null
    } 
 
    // Process an input document with GATE and a Reporter 
    public synchronized BehemothDocument[] process(BehemothDocument inputDoc, 
            Reporter reporter) { 
        if (reporter != null
            reporter.setStatus("GATE : " + inputDoc.getUrl().toString()); 
 
        boolean clearBehemothAnnotations = config.getBoolean( 
                "gate.deleteBehemothAnnotations"false); 
 
        // process the text passed as value with the application 
        // a) create a GATE document based on the text value 
        gate.Document gatedocument = null
        try { 
 
            gatedocument = generateGATEDoc(inputDoc); 
            // add it to the current corpus 
            corpus.add(gatedocument); 
            // get the application and assign the corpus to it 
            this.GATEapplication.setCorpus(corpus); 
            // process it with GATE 
            this.GATEapplication.execute(); 
 
            AnnotationSet annots = null
            if ("".equals(filters.getAnnotationSetName())) 
                annots = gatedocument.getAnnotations(); 
            else 
                annots = gatedocument.getAnnotations(filters 
                        .getAnnotationSetName()); 
 
            // enrich the input doc with the annotations from 
            // the GATE application 
            // transfer the annotations from the GATE document 
            // to the Behemoth one using the filters 
            List<com.digitalpebble.behemoth.Annotation> beheannotations = convertGATEAnnotationsToBehemoth( 
                    annots, inputDoc); 
 
            // sort the annotations before adding them? 
            Collections.sort(beheannotations); 
 
            // clear the existing behemoth annotations 
            if (clearBehemothAnnotations) { 
                inputDoc.getAnnotations().clear(); 
            } 
 
            inputDoc.getAnnotations().addAll(beheannotations); 
 
            // add counters about num of annotations added 
            if (reporter != null
                for (com.digitalpebble.behemoth.Annotation annot : beheannotations) { 
                    reporter.incrCounter("GATE", annot.getType(), 1); 
                } 
 
            // Add the document features from GATE to Behemoth 
            Set<String> docFeatFilter = this.filters.getDocFeaturesFilter(); 
            MapWritable beheMD = inputDoc.getMetadata(true); 
            if (docFeatFilter.size() > 0) { 
                for (String docFeatName : docFeatFilter) { 
                    Object featValue = gatedocument.getFeatures().get( 
                            docFeatName); 
                    if (featValue != null) { 
                        beheMD.put(new Text(docFeatName), 
                                new Text(featValue.toString())); 
                    } 
                } 
            } 
 
            if (reporter != null
                reporter.incrCounter("GATE""Document"1); 
 
        } catch (Exception e) { 
            LOG.error(inputDoc.getUrl().toString(), e); 
            if (reporter != null
                reporter.incrCounter("GATE""Exceptions"1); 
        } finally { 
            // remove the document from the corpus again 
            corpus.clear(); 
            // and from memory 
            if (gatedocument != null
                Factory.deleteResource(gatedocument); 
        } 
        // currently returns only the input document 
        return new BehemothDocument[] { inputDoc }; 
    } 
 
    public void setConf(Configuration conf) { 
        config = conf; 
 
        if (applicationDescriptorPath == null
            throw new RuntimeException("GATE application path is null"); 
 
        // create one instance of the GATE application 
        // need to avoid concurrent access to the application 
        try { 
 
            if (inited == false) { 
                File gateHome = new File(applicationDescriptorPath.getFile()) 
                        .getParentFile(); 
                LOG.info("Setting GATE_HOME as " + gateHome); 
                File pluginsHome = new File(gateHome, "plugins"); 
                // the config files are in the job archive - not in the GATE 
                // application 
                // zip 
                // File siteConfigFile = new File(conf 
                // .getResource("site-gate.xml").getFile()); 
                // File userConfig = new File(conf.getResource("user-gate.xml") 
                // .getFile()); 
                Gate.runInSandbox(true); 
                Gate.setGateHome(gateHome); 
                Gate.setPluginsHome(pluginsHome); 
                // Gate.setSiteConfigFile(siteConfigFile); 
                // Gate.setUserConfigFile(userConfig); 
                // the builtInCreoleDir files 
                // are stored in the same place as the config ones 
                // Gate.setBuiltinCreoleDir(conf.getResource("creole.xml")); 
                Gate.init(); 
                inited = true
            } 
 
            corpus = Factory.newCorpus("DummyCorpus"); 
 
            this.GATEapplication = (CorpusController) PersistenceManager 
                    .loadObjectFromUrl(applicationDescriptorPath); 
 
            // load the annotation and feature filters from the configuration 
            this.filters = GATEAnnotationFilters.getFilters(config); 
 
        } catch (Exception e) { 
            LOG.error("Encountered error while initialising GATE", e); 
            throw new RuntimeException(e); 
        } 
    } 
 
    public void close() { 
        if (GATEapplication != null
            Factory.deleteResource(GATEapplication); 
    } 
 
    public Configuration getConf() { 
        return config; 
    } 
 
    private gate.Document generateGATEDocFromLocalDump(BehemothDocument inputDoc) 
            throws ResourceInstantiationException, IOException { 
 
        // can't get that to work 
        // File tempDirectory = new 
        // File(this.config.get("hadoop.tmp.dir","/tmp"),this.config.get("user.name", 
        // "./tmp")); 
        // LOG.info("tempDirectory "+tempDirectory); 
        // 
        // tempDirectory.mkdirs(); 
        // 
        // File tempInputFile = File.createTempFile("gateInput-", 
        // inputDoc.getUrl(),tempDirectory); 
        // 
        // FileOutputStream fos = new FileOutputStream(tempInputFile); 
        // OutputStream bout = new BufferedOutputStream(fos); 
        // bout.write(inputDoc.getContent()); 
        // bout.flush(); 
        // bout.close(); 
        // 
        // URL url; 
        // try { 
        // url = tempInputFile.toURI().toURL(); 
        // } catch (MalformedURLException e) { 
        // // delete the input doc 
        // tempInputFile.delete(); 
        // throw e; 
        // } 
 
        FeatureMap params = Factory.newFeatureMap(); 
        params.put(Document.DOCUMENT_STRING_CONTENT_PARAMETER_NAME, new String( 
                inputDoc.getContent())); 
        String ct = inputDoc.getContentType(); 
        if (ct != null
            params.put(Document.DOCUMENT_MIME_TYPE_PARAMETER_NAME, ct); 
 
        gate.Document gatedocument; 
        try { 
            gatedocument = (Document) Factory.createResource( 
                    "gate.corpora.DocumentImpl", params); 
        } finally { 
            // delete the input doc 
            // tempInputFile.delete(); 
        } 
 
        return gatedocument; 
    } 
 
    /**
     * Generation of a GATE document from a Behemoth one 
     *  
     * @param key 
     *            URL of the input doc 
     * @param inputDoc 
     * @return 
     * @throws ResourceInstantiationException 
     * @throws InvalidOffsetException 
     * @throws IOException 
     */
 
    public gate.Document generateGATEDoc(BehemothDocument inputDoc) 
            throws ResourceInstantiationException, InvalidOffsetException, 
            IOException { 
 
        gate.Document gatedocument = null
 
        // if no text is available (e.g. Tika has not extracted it) 
        // let GATE do the parsing itself from the binary content 
        if (inputDoc.getText() == null) { 
            try { 
                gatedocument = generateGATEDocFromLocalDump(inputDoc); 
 
                // transfer the text from GATE to Behemoth 
                String textContent = gatedocument.getContent().toString(); 
                inputDoc.setText(textContent); 
 
                return gatedocument; 
            } catch (Exception e) { 
                LOG.error("Can't generate GATE doc from byte dump", e); 
            } 
        } 
 
        // if the input document does not have any text -> create a doc with an 
        // empty text 
 
        String text = inputDoc.getText(); 
        if (inputDoc.getText() == null
            text = ""
        else 
            text = inputDoc.getText(); 
 
        gatedocument = Factory.newDocument(text); 
 
        // then the metadata as document features 
        FeatureMap docFeatures = gatedocument.getFeatures(); 
        String docUrl = inputDoc.getUrl(); 
        if (docUrl != null
            docFeatures.put("gate.SourceURL", docUrl); 
        if (inputDoc.getMetadata() != null) { 
            Iterator<Entry<Writable, Writable>> iter = inputDoc.getMetadata() 
                    .entrySet().iterator(); 
            while (iter.hasNext()) { 
                Entry<Writable, Writable> entry = iter.next(); 
                String skey = entry.getKey().toString().trim(); 
                String svalue = null
                if (entry.getValue() != null
                    svalue = entry.getValue().toString().trim(); 
                docFeatures.put(skey, svalue); 
            } 
        } 
 
        // finally the annotations as original markups 
        // TODO change the name of the annotation set via config 
        AnnotationSet outputAS = gatedocument 
                .getAnnotations("Original markups"); 
        for (Annotation annot : inputDoc.getAnnotations()) { 
            // add to outputAS as a GATE annotation 
            FeatureMap features = Factory.newFeatureMap(); 
            features.putAll(annot.getFeatures()); 
            outputAS.add(annot.getStart(), annot.getEnd(), annot.getType(), 
                    features); 
        } 
        return gatedocument; 
    } 
 
    /**
     * Returns a list of annotations to be added to the Behemoth document from 
     * the GATE one 
     ***/
 
    private List<com.digitalpebble.behemoth.Annotation> convertGATEAnnotationsToBehemoth
            AnnotationSet GATEAnnotionSet, 
            com.digitalpebble.behemoth.BehemothDocument behemoth) { 
 
        List<com.digitalpebble.behemoth.Annotation> beheannotations = new ArrayList<com.digitalpebble.behemoth.Annotation>(); 
 
        AnnotationSet resultAS = GATEAnnotionSet.get(filters.getTypes()); 
 
        // sort the GATE annotations 
        List<gate.Annotation> annotationList = new ArrayList<gate.Annotation>( 
                resultAS); 
        Collections.sort(annotationList, new OffsetComparator()); 
        Iterator<gate.Annotation> inputASIter = annotationList.iterator(); 
 
        while (inputASIter.hasNext()) { 
            gate.Annotation source = inputASIter.next(); 
 
            com.digitalpebble.behemoth.Annotation target = new com.digitalpebble.behemoth.Annotation(); 
            target.setType(source.getType()); 
            target.setStart(source.getStartNode().getOffset().longValue()); 
            target.setEnd(source.getEndNode().getOffset().longValue()); 
 
            // now do the features 
            // is the type listed? 
            Set<String> expectedfeatnames = filters.getFeatureFilters().get( 
                    source.getType()); 
            if (expectedfeatnames != null) { 
                Iterator featurenames = source.getFeatures().keySet() 
                        .iterator(); 
                while (featurenames.hasNext()) { 
                    // cast the feature name to a string which will be right in 
                    // 99% of cases 
                    String featurename = featurenames.next().toString(); 
                    // if this feature name is not wanted just ignore it 
                    if (expectedfeatnames.contains(featurename) == false
                        continue
                    // we know that we want to keep this feature 
                    // let's see what the best way of representing the value 
                    // would be 
                    // TODO later => find a better way of mapping when not a 
                    // string 
                    Object originalvalue = source.getFeatures() 
                            .get(featurename); 
                    if (originalvalue == null
                        originalvalue = "null"
                    target.getFeatures().put(featurename, 
                            originalvalue.toString()); 
                } 
            } 
            beheannotations.add(target); 
        } 
        return beheannotations; 
    } 
 
}