package com.cloudera.flume.master.logical;
import static com.cloudera.flume.conf.PatternMatch.kind;
import static com.cloudera.flume.conf.PatternMatch.recursive;
import static com.cloudera.flume.conf.PatternMatch.var;
import java.io.IOException;
import java.util.Map;
import org.antlr.runtime.RecognitionException;
import org.antlr.runtime.tree.CommonTree;
import org.apache.commons.lang.StringEscapeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudera.flume.conf.FlumeBuilder;
import com.cloudera.flume.conf.FlumePatterns;
import com.cloudera.flume.conf.FlumeSpecException;
import com.cloudera.flume.conf.FlumeSpecGen;
import com.cloudera.flume.conf.PatternMatch;
import com.cloudera.flume.master.ConfigurationManager;
import com.cloudera.flume.master.StatusManager;
import com.cloudera.flume.master.TranslatingConfigurationManager;
import com.cloudera.flume.master.Translator;
import com.cloudera.flume.master.logical.LogicalNameManager.PhysicalNodeInfo;
TranslatingConfigurationManager implements Translator {
static final Logger LOG = LoggerFactory.getLogger(LogicalConfigurationManager.class);
public static final String NAME = "LogicalTranslator";
final LogicalNameManager nameMan;
ConfigurationManager self, StatusManager statman) {
super(parent, self);
this.nameMan = new LogicalNameManager(parent, statman);
}
FlumeSpecException {
CommonTree lsnkTree = FlumeBuilder.parseSink(sink);
LOG.debug(lsnkTree.toStringTree());
PatternMatch p = recursive(var("lsnk", kind("SINK").child(
kind("logicalSink"))));
Map<String, CommonTree> matches = p.match(lsnkTree);
if (matches == null) {
return lsnkTree;
}
CommonTree lsnk = matches.get("lsnk");
final String orig = StringEscapeUtils.escapeJava(FlumeSpecGen
.genEventSink(lsnk));
String tgtLn = FlumeBuilder.buildArg((CommonTree) lsnk.getChild(1));
PhysicalNodeInfo pni = nameMan.getPhysicalNodeInfo(tgtLn);
String tgtPhysNode = getPhysicalNode(tgtLn);
if (tgtPhysNode == null || pni == null) {
pni = new PhysicalNodeInfo() {
@Override
return "fail( \"" + orig + "\" )";
}
@Override
return "fail";
}
};
}
String snk = pni.getPhysicalSink();
if (snk == null) {
return null;
}
CommonTree psnkTree = FlumeBuilder.parseSink(snk);
PatternMatch.replaceChildren(lsnk, psnkTree);
return lsnkTree;
}
throws RecognitionException {
CommonTree lsrcTree = FlumeBuilder.parseSource(source);
LOG.debug(lsrcTree.toStringTree());
PatternMatch p = FlumePatterns.source("logicalSource");
Map<String, CommonTree> matches = p.match(lsrcTree);
if (matches == null) {
nameMan.setPhysicalNode(ln, null);
return lsrcTree;
}
PhysicalNodeInfo pni = nameMan.getPhysicalNodeInfo(ln);
if (pni == null) {
nameMan.updateNode(ln);
pni = nameMan.getPhysicalNodeInfo(ln);
if (pni == null) {
return null;
}
}
String src = pni.getPhysicalSource();
String phys = getPhysicalNode(ln);
if (phys == null) {
src = "fail( \"no physical translation for " + ln + "\" )";
}
if (src == null) {
LOG.warn("Physical Source for " + ln + " not translated");
return null;
}
CommonTree psrcTree = FlumeBuilder.parseSource(src);
PatternMatch.replaceChildren(lsrcTree, psrcTree);
return lsrcTree;
}
throws FlumeSpecException {
try {
CommonTree pCt = substLogicalSource(logicalnode, source);
if (pCt == null) {
return source;
}
String pStr = FlumeSpecGen.genEventSource(pCt);
return pStr;
} catch (RecognitionException e) {
LOG.error("Problem with physical sink", e);
}
return null;
}
throws FlumeSpecException {
try {
String cur = sink;
String last = null;
while (!cur.equals(last)) {
CommonTree pCt = substLogicalSink(cur);
if (pCt == null) {
return cur;
}
last = cur;
cur = FlumeSpecGen.genEventSink(pCt);
}
return cur;
} catch (RecognitionException e) {
LOG.error("Problem with physical sink", e);
}
return null;
}
@Override
try {
nameMan.update();
} catch (RecognitionException e) {
LOG.error("Internal Error: " + e.getLocalizedMessage(), e);
throw new IOException("Internal Error: " + e.getMessage());
}
super.refreshAll();
}
@Override
try {
nameMan.update();
} catch (RecognitionException e) {
LOG.error("Internal Error: " + e.getLocalizedMessage(), e);
throw new IOException("Internal Error: " + e.getMessage());
}
super.updateAll();
}
@Override
return NAME;
}
}