package com.cloudera.flume.agent;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.flume.conf.FlumeSpecException;
import com.cloudera.flume.conf.SinkFactory.SinkBuilder;
import com.cloudera.flume.core.CompositeSink;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.master.availability.FailoverChainManager;
import com.cloudera.flume.reporter.ReportEvent;
import com.cloudera.util.NetUtils;
import com.cloudera.util.Pair;
import com.google.common.base.Preconditions;
static final Logger LOG = LoggerFactory.getLogger(AgentFailChainSink.class);
final EventSink snk;
public enum RELIABILITY {
E2E, DFO, BE
};
throws FlumeSpecException {
this(new Context(), rel, hosts);
}
throws FlumeSpecException {
int defaultPort = FlumeConfiguration.get().getCollectorPort();
List<String> thriftlist = thriftifyArgs(defaultPort, Arrays.asList(hosts));
switch (rel) {
case E2E: {
String chains = AgentFailChainSink.genE2EChain(thriftlist
.toArray(new String[0]));
LOG.info("Setting failover chain to " + chains);
snk = new CompositeSink(context, chains);
break;
}
case DFO: {
String chains = AgentFailChainSink.genDfoChain(thriftlist
.toArray(new String[0]));
LOG.info("Setting failover chain to " + chains);
snk = new CompositeSink(context, chains);
break;
}
case BE: {
String chains = AgentFailChainSink.genBestEffortChain(thriftlist
.toArray(new String[0]));
LOG.info("Setting failover chain to " + chains);
snk = new CompositeSink(context, chains);
break;
}
default: {
throw new FlumeSpecException("Unknown relability " + rel);
}
}
}
@Override
public void open()
throws IOException {
snk.open();
}
@Override
public void close()
throws IOException {
snk.close();
}
@Override
public void append(Event e)
throws IOException {
snk.append(e);
super.append(e);
}
@Override
public void getReports(String namePrefix, Map<String, ReportEvent> reports) {
super.getReports(namePrefix, reports);
snk.getReports(namePrefix + getName() + ".", reports);
}
String body = "{ lazyOpen => { stubbornAppend => %s } } ";
String spec = FailoverChainManager.genAvailableSinkSpec(body, Arrays
.asList(chain));
LOG.info("Setting best effort failover chain to " + spec);
return spec;
}
String body = " %s ";
String spec = FailoverChainManager.genAvailableSinkSpec(body, Arrays
.asList(chain));
spec = "{ ackedWriteAhead => { stubbornAppend => { insistentOpen => "
+ spec + " } } }";
LOG.info("Setting e2e failover chain to " + spec);
return spec;
}
StringBuilder sb = new StringBuilder();
String primaries = genBestEffortChain(chain);
sb.append("let primary := " + primaries);
String body = "< primary ? {diskFailover => { insistentOpen => primary} } >";
LOG.info("Setting dfo failover chain to " + body);
sb.append(" in ");
sb.append(body);
return sb.toString();
}
public static List<String>
thriftifyArgs(
int defaultPort, List<String> list) {
ArrayList<String> thriftified = new ArrayList<String>();
if (list == null || list.size() == 0) {
String sink = String.format("tsink(\"%s\",%d)", FlumeConfiguration.get()
.getCollectorHost(), FlumeConfiguration.get().getCollectorPort());
thriftified.add(sink);
return thriftified;
}
for (String socket : list) {
Pair<String, Integer> sock = NetUtils.parseHostPortPair(socket,
defaultPort);
String collector = sock.getLeft();
int port = sock.getRight();
String sink = String.format("tsink(\"%s\",%d)", collector, port);
thriftified.add(sink);
}
return thriftified;
}
return new SinkBuilder() {
@Override
public EventSink
build(Context context, String... argv) {
Preconditions
.checkArgument(argv.length >= 1,
"usage: agentE2EChain(\"machine1[:port]\" [, \"machine2[:port]\" [,...]])");
try {
return new AgentFailChainSink(RELIABILITY.E2E, argv);
} catch (FlumeSpecException e) {
throw new IllegalArgumentException(e);
}
}
};
}
return new SinkBuilder() {
@Override
public EventSink
build(Context context, String... argv) {
Preconditions
.checkArgument(argv.length >= 1,
"usage: agentDFOChain(\"machine1[:port]\" [, \"machine2[:port]\" [,...]])");
try {
return new AgentFailChainSink(RELIABILITY.DFO, argv);
} catch (FlumeSpecException e) {
throw new IllegalArgumentException(e);
}
}
};
}
return new SinkBuilder() {
@Override
public EventSink
build(Context context, String... argv) {
Preconditions
.checkArgument(argv.length >= 1,
"usage: agentBEChain(\"machine1[:port]\" [, \"machine2[:port]\" [,...]])");
try {
return new AgentFailChainSink(RELIABILITY.BE, argv);
} catch (FlumeSpecException e) {
throw new IllegalArgumentException(e);
}
}
};
}
@Override
return "FailchainAgent";
}
}