package com.cloudera.flume.conf;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.antlr.runtime.ANTLRFileStream;
import org.antlr.runtime.ANTLRStringStream;
import org.antlr.runtime.CommonTokenStream;
import org.antlr.runtime.RecognitionException;
import org.antlr.runtime.tree.CommonTree;
import org.apache.commons.lang.StringEscapeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudera.flume.core.BackOffFailOverSink;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.core.EventSinkDecorator;
import com.cloudera.flume.core.EventSource;
import com.cloudera.flume.core.FanInSource;
import com.cloudera.flume.core.FanOutSink;
import com.cloudera.flume.handlers.rolling.RollSink;
import com.cloudera.flume.master.availability.FailoverChainSink;
import com.cloudera.util.Pair;
import com.google.common.base.Preconditions;
static final Logger LOG = LoggerFactory.getLogger(FlumeBuilder.class);
static SourceFactory srcFactory = new SourceFactoryImpl();
static SinkFactory sinkFactory = new SinkFactoryImpl();
enum ASTNODE {
DEC, HEX, OCT, STRING, BOOL, FLOAT,
SINK, SOURCE,
MULTI, DECO, BACKUP, LET, ROLL, FAILCHAIN,
NODE,
};
Preconditions.checkNotNull(srcFact);
srcFactory = srcFact;
}
Preconditions.checkNotNull(snkFact);
sinkFactory = snkFact;
}
FlumeDeployLexer lexer = new FlumeDeployLexer(new ANTLRStringStream(s));
CommonTokenStream tokens = new CommonTokenStream(lexer);
return new FlumeDeployParser(tokens);
}
static CommonTree
parse(String s)
throws RecognitionException {
return (CommonTree) getDeployParser(s).deflist().getTree();
}
static CommonTree
parseHost(String s)
throws RecognitionException {
return (CommonTree) getDeployParser(s).host().getTree();
}
static CommonTree
parseLiteral(String s)
throws RecognitionException {
return (CommonTree) getDeployParser(s).literal().getTree();
}
public static CommonTree
parseSink(String s)
throws RecognitionException {
FlumeDeployParser parser = getDeployParser(s);
CommonTree ast = (CommonTree) parser.sink().getTree();
return ast;
}
public static CommonTree
parseSource(String s)
throws RecognitionException {
return (CommonTree) getDeployParser(s).source().getTree();
}
static CommonTree
parseFile(String filename)
throws IOException,
RecognitionException {
FlumeDeployLexer lexer = new FlumeDeployLexer(new ANTLRFileStream(filename));
CommonTokenStream tokens = new CommonTokenStream(lexer);
FlumeDeployParser parser = new FlumeDeployParser(tokens);
return (CommonTree) parser.deflist().getTree();
}
static CommonTree
parseNodeFile(String filename)
throws IOException,
RecognitionException {
FlumeDeployLexer lexer = new FlumeDeployLexer(new ANTLRFileStream(filename));
CommonTokenStream tokens = new CommonTokenStream(lexer);
FlumeDeployParser parser = new FlumeDeployParser(tokens);
return (CommonTree) parser.connection().getTree();
}
public static Pair<EventSource, EventSink>
buildNode(Context context, File f)
throws IOException, RecognitionException, FlumeSpecException {
CommonTree t = parseNodeFile(f.getCanonicalPath());
if (t.getText() != "NODE") {
throw new FlumeSpecException("fail, expected node but had "
+ t.toStringTree());
}
CommonTree tsrc = (CommonTree) t.getChild(0);
CommonTree tsnk = (CommonTree) t.getChild(1);
return new Pair<EventSource, EventSink>(buildEventSource(tsrc),
buildEventSink(context, tsnk, sinkFactory));
}
@SuppressWarnings("unchecked")
public static Map<String, Pair<String, String>>
parseConf(Context ctx,
String s) throws FlumeSpecException {
try {
CommonTree node = parse(s);
Map<String, Pair<String, String>> cfg = new HashMap<String, Pair<String, String>>();
for (CommonTree t : (List<CommonTree>) node.getChildren()) {
if (t.getType() == -1) {
break;
}
if (t.getText() != "NODE") {
throw new FlumeSpecException("fail, expected node but had "
+ t.toStringTree());
}
if (t.getChildCount() != 3) {
throw new FlumeSpecException(
"fail, node didn't wasn't (name,src,snk): " + t.toStringTree());
}
String host = t.getChild(0).getText();
CommonTree tsrc = (CommonTree) t.getChild(1);
CommonTree tsnk = (CommonTree) t.getChild(2);
Pair<String, String> p = new Pair<String, String>(FlumeSpecGen
.genEventSource(tsrc), FlumeSpecGen.genEventSink(tsnk));
cfg.put(host, p);
}
return cfg;
} catch (RecognitionException re) {
LOG.error("Failure to parse and instantiate sink: '" + s + "'", re);
throw new FlumeSpecException(re.toString());
}
}
@SuppressWarnings("unchecked")
public static Map<String, Pair<EventSource, EventSink>>
build(
Context context, String s) throws FlumeSpecException {
try {
CommonTree node = parse(s);
Map<String, Pair<EventSource, EventSink>> cfg = new HashMap<String, Pair<EventSource, EventSink>>();
for (CommonTree t : (List<CommonTree>) node.getChildren()) {
if (t.getType() == -1) {
break;
}
if (t.getText() != "NODE") {
throw new FlumeSpecException("fail, expected node but had "
+ t.toStringTree());
}
String host = t.getChild(0).getText();
CommonTree tsrc = (CommonTree) t.getChild(1);
CommonTree tsnk = (CommonTree) t.getChild(2);
Pair<EventSource, EventSink> p = new Pair<EventSource, EventSink>(
buildEventSource(tsrc), buildEventSink(context, tsnk, sinkFactory));
cfg.put(host, p);
}
return cfg;
} catch (RecognitionException re) {
LOG.error("Failure to parse and instantiate sink: '" + s + "'", re);
throw new FlumeSpecException(re.toString());
}
}
public static EventSource
buildSource(String s)
throws FlumeSpecException {
try {
CommonTree srcTree = parseSource(s);
return buildEventSource(srcTree);
} catch (RecognitionException re) {
LOG.debug("Failure to parse and instantiate sink: '" + s + "'", re);
throw new FlumeSpecException(re.toString());
} catch (NumberFormatException nfe) {
LOG.debug("Failure to parse and instantiate sink: '" + s + "'", nfe);
throw new FlumeSpecException(nfe.getMessage());
} catch (IllegalArgumentException iae) {
LOG.debug("Failure to parse and instantiate sink: '" + s + "'", iae);
throw new FlumeSpecException(iae.getMessage());
} catch (RuntimeRecognitionException re) {
LOG.debug("Failure to parse and instantiate sink: '" + s + "'", re);
throw new FlumeSpecException(re.getMessage());
}
}
public static EventSink
buildSink(Context context, String s)
throws FlumeSpecException {
try {
CommonTree snkTree = parseSink(s);
return buildEventSink(context, snkTree, sinkFactory);
} catch (RecognitionException re) {
LOG.debug("Failure to parse and instantiate sink: '" + s + "'", re);
throw new FlumeSpecException(re.toString());
} catch (NumberFormatException nfe) {
LOG.debug("Failure to parse and instantiate sink: '" + s + "'", nfe);
throw new FlumeSpecException(nfe.getMessage());
} catch (IllegalArgumentException iae) {
LOG.debug("Failure to parse and instantiate sink: '" + s + "'", iae);
throw new FlumeSpecException(iae.getMessage());
} catch (RuntimeRecognitionException re) {
LOG.debug("Failure to parse and instantiate sink: '" + s + "'", re);
throw new FlumeSpecException(re.getMessage());
}
}
@Deprecated
public static EventSink
buildSink(String s)
throws FlumeSpecException {
return buildSink(new Context(), s);
}
public static String
toLine(String name, String src, String snk) {
return name + " : " + src + " | " + snk + ";";
}
public static String
buildArg(CommonTree t)
throws FlumeSpecException {
ASTNODE type = ASTNODE.valueOf(t.getText());
switch (type) {
case HEX:
String hex = t.getChild(0).getText();
Preconditions.checkArgument(hex.startsWith("0x"));
hex = hex.substring(2);
Long i = Long.parseLong(hex, 16);
return i.toString();
case DEC:
return t.getChild(0).getText();
case BOOL:
return t.getChild(0).getText();
case OCT:
String oct = t.getChild(0).getText();
Preconditions.checkArgument(oct.startsWith("0"));
Preconditions.checkArgument(!oct.startsWith("0x"));
Long i2 = Long.parseLong(oct, 8);
return i2.toString();
case FLOAT:
return t.getChild(0).getText();
case STRING:
String str = t.getChild(0).getText();
Preconditions.checkArgument(str.startsWith("\"") && str.endsWith("\""));
str = str.substring(1, str.length() - 1);
return StringEscapeUtils.unescapeJava(str);
default:
throw new FlumeSpecException("Not a node of literal type: "
+ t.toStringTree());
}
}
@SuppressWarnings("unchecked")
ASTNODE type = ASTNODE.valueOf(t.getText());
switch (type) {
case SOURCE: {
List<CommonTree> children = new ArrayList<CommonTree>(
(List<CommonTree>) t.getChildren());
CommonTree source = children.remove(0);
String sourceType = source.getText();
List<String> args = new ArrayList<String>();
for (CommonTree tr : children) {
args.add(buildArg(tr));
}
EventSource src = srcFactory.getSource(sourceType, args
.toArray(new String[0]));
if (src == null) {
children.add(0, source);
throw new FlumeIdException("Invalid source: "
+ FlumeSpecGen.genEventSource(t));
}
return src;
}
case MULTI:
List<CommonTree> elems = (List<CommonTree>) t.getChildren();
List<EventSource> srcs = new ArrayList<EventSource>();
try {
for (CommonTree tr : elems) {
EventSource src = buildEventSource(tr);
srcs.add(src);
}
FanInSource<EventSource> src = new FanInSource<EventSource>(srcs);
return src;
} catch (FlumeSpecException ife) {
throw ife;
}
default:
throw new FlumeSpecException("bad parse tree! Expected source but got "
+ t.toStringTree());
}
}
@SuppressWarnings("unchecked")
SinkFactory sinkFactory) throws FlumeSpecException {
ASTNODE type = ASTNODE.valueOf(t.getText());
switch (type) {
case SINK:
List<CommonTree> children = (List<CommonTree>) new ArrayList<CommonTree>(
t.getChildren());
String sinkType = children.remove(0).getText();
List<String> args = new ArrayList<String>();
for (CommonTree tr : children) {
args.add(buildArg(tr));
}
EventSink snk = sinkFactory.getSink(context, sinkType, args
.toArray(new String[0]));
if (snk == null) {
throw new FlumeIdException("Invalid sink: "
+ FlumeSpecGen.genEventSink(t));
}
return snk;
case MULTI: {
List<CommonTree> elems = (List<CommonTree>) t.getChildren();
List<EventSink> snks = new ArrayList<EventSink>();
try {
for (CommonTree tr : elems) {
EventSink s = buildEventSink(context, tr, sinkFactory);
snks.add(s);
}
FanOutSink<EventSink> sink = new FanOutSink<EventSink>(snks);
return sink;
} catch (FlumeSpecException ife) {
throw ife;
}
}
case DECO: {
List<CommonTree> decoNodes = (List<CommonTree>) t.getChildren();
Preconditions.checkArgument(decoNodes.size() == 2,
"Only supports one decorator per expression");
CommonTree deco = decoNodes.get(0);
CommonTree decoSnk = decoNodes.get(1);
EventSinkDecorator<EventSink> decoSink = buildEventSinkDecorator(context,
deco);
try {
EventSink dsnk = buildEventSink(context, decoSnk, sinkFactory);
decoSink.setSink(dsnk);
return decoSink;
} catch (FlumeSpecException ife) {
throw ife;
}
}
case BACKUP: {
List<CommonTree> backupNodes = (List<CommonTree>) t.getChildren();
Preconditions.checkArgument(backupNodes.size() == 2,
"Only supports two retry nodes per failover expression");
CommonTree main = backupNodes.get(0);
CommonTree backup = backupNodes.get(1);
try {
EventSink mainSink = buildEventSink(context, main, sinkFactory);
EventSink backupSink = buildEventSink(context, backup, sinkFactory);
return new BackOffFailOverSink(mainSink, backupSink);
} catch (FlumeSpecException ife) {
LOG.error("Failed to build Failover sink", ife);
throw ife;
}
}
case LET: {
List<CommonTree> letNodes = (List<CommonTree>) t.getChildren();
Preconditions.checkArgument(letNodes.size() == 3);
String argName = letNodes.get(0).getText();
CommonTree arg = letNodes.get(1);
CommonTree body = letNodes.get(2);
try {
EventSink argSink = buildEventSink(context, arg, sinkFactory);
EventSink argSinkRef = new EventSinkDecorator<EventSink>(argSink) {
boolean open = false;
@Override
public void open()
throws IOException {
if (open) {
return;
}
open = true;
sink.open();
}
@Override
public void append(Event e)
throws IOException {
Preconditions.checkState(open);
sink.append(e);
}
@Override
public void close()
throws IOException {
open = false;
sink.close();
}
};
LinkedSinkFactory linkedFactory = new LinkedSinkFactory(sinkFactory,
argName, argSinkRef);
EventSink bodySink = buildEventSink(context, body, linkedFactory);
return bodySink;
} catch (FlumeSpecException ife) {
throw ife;
}
}
case ROLL: {
List<CommonTree> rollArgs = (List<CommonTree>) t.getChildren();
try {
Preconditions.checkArgument(rollArgs.size() == 2, "bad parse tree! "
+ t.toStringTree() + "roll only takes two arguments");
CommonTree ctbody = rollArgs.get(0);
Long period = Long.parseLong(buildArg(rollArgs.get(1)));
String body = FlumeSpecGen.genEventSink(ctbody);
RollSink roller = new RollSink(context, body, period, 250);
return roller;
} catch (IllegalArgumentException iae) {
throw new FlumeSpecException(iae.getMessage());
}
}
case FAILCHAIN: {
List<CommonTree> rollArgs = (List<CommonTree>) t.getChildren();
Preconditions.checkArgument(rollArgs.size() >= 2);
CommonTree ctbody = rollArgs.get(0);
List<String> rargs = new ArrayList<String>(rollArgs.size() - 1);
boolean first = true;
for (CommonTree ct : rollArgs) {
if (first) {
first = false;
continue;
}
rargs.add(buildArg(ct));
}
String body = FlumeSpecGen.genEventSink(ctbody);
FlumeConfiguration conf = FlumeConfiguration.get();
FailoverChainSink failchain = new FailoverChainSink(context, body, rargs,
conf.getFailoverInitialBackoff(), conf.getFailoverMaxSingleBackoff());
return failchain;
}
default:
throw new FlumeSpecException("bad parse tree! expected sink but got "
+ t.toStringTree());
}
}
@SuppressWarnings("unchecked")
CommonTree t) throws FlumeSpecException {
List<CommonTree> children = (List<CommonTree>) new ArrayList<CommonTree>(t
.getChildren());
String sinkType = children.remove(0).getText();
List<String> args = new ArrayList<String>();
for (CommonTree tr : children) {
args.add(buildArg(tr));
}
EventSinkDecorator deco = sinkFactory.getDecorator(context, sinkType, args
.toArray(new String[0]));
if (deco == null) {
throw new FlumeIdException("Invalid sink decorator: "
+ FlumeSpecGen.genEventSinkDecorator(t));
}
return deco;
}
return sinkFactory.getSinkNames();
}
return sinkFactory.getDecoratorNames();
}
return srcFactory.getSourceNames();
}
}