package com.cloudera.flume.agent;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudera.flume.VersionInfo;
import com.cloudera.flume.agent.diskfailover.DiskFailoverManager;
import com.cloudera.flume.agent.diskfailover.NaiveFileFailoverManager;
import com.cloudera.flume.agent.durability.NaiveFileWALManager;
import com.cloudera.flume.agent.durability.WALManager;
import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.FlumeBuilder;
import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.flume.conf.FlumeSpecException;
import com.cloudera.flume.conf.LogicalNodeContext;
import com.cloudera.flume.handlers.debug.ChokeManager;
import com.cloudera.flume.handlers.endtoend.AckListener;
import com.cloudera.flume.handlers.endtoend.CollectorAckListener;
import com.cloudera.flume.handlers.text.FormatFactory;
import com.cloudera.flume.handlers.text.FormatFactory.OutputFormatBuilder;
import com.cloudera.flume.reporter.MasterReportPusher;
import com.cloudera.flume.reporter.ReportEvent;
import com.cloudera.flume.reporter.ReportManager;
import com.cloudera.flume.reporter.Reportable;
import com.cloudera.flume.util.FlumeVMInfo;
import com.cloudera.flume.util.SystemInfo;
import com.cloudera.util.CheckJavaVersion;
import com.cloudera.util.FileUtil;
import com.cloudera.util.NetUtils;
import com.cloudera.util.Pair;
import com.cloudera.util.StatusHttpServer;
import com.google.common.base.Preconditions;
public class FlumeNode implements Reportable {
static final Logger LOG = LoggerFactory.getLogger(FlumeNode.class);
final static String PHYSICAL_NODE_REPORT_PREFIX = "pn-";
static final String R_NUM_LOGICAL_NODES = "Logical nodes";
private static FlumeNode instance;
final boolean startHttp;
private StatusHttpServer http = null;
private FlumeVMInfo vmInfo;
private SystemInfo sysInfo;
private final LivenessManager liveMan;
private MasterRPC rpcMan;
private LogicalNodeManager nodesMan;
private final MasterReportPusher reportPusher;
private Map<String, WALManager> walMans = new HashMap<String, WALManager>();
private Map<String, DiskFailoverManager> failoverMans = new HashMap<String, DiskFailoverManager>();
final private CollectorAckListener collectorAck;
final String physicalNodeName;
private final ChokeManager chokeMan;
public FlumeNode(String name, MasterRPC rpc, LogicalNodeManager nodesMan,
WALManager walMan, DiskFailoverManager dfMan,
CollectorAckListener colAck, LivenessManager liveman) {
this.physicalNodeName = name;
rpcMan = rpc;
instance = this;
this.startHttp = false;
this.nodesMan = nodesMan;
this.walMans.put(getPhysicalNodeName(), walMan);
this.failoverMans.put(getPhysicalNodeName(), dfMan);
this.collectorAck = colAck;
this.liveMan = liveman;
this.chokeMan = new ChokeManager();
this.vmInfo = new FlumeVMInfo(PHYSICAL_NODE_REPORT_PREFIX
+ this.physicalNodeName + ".");
this.reportPusher = new MasterReportPusher(FlumeConfiguration.get(),
ReportManager.get(), rpcMan);
this.sysInfo = new SystemInfo(PHYSICAL_NODE_REPORT_PREFIX
+ this.physicalNodeName + ".");
}
public FlumeNode(FlumeConfiguration conf, String nodeName, MasterRPC rpc,
boolean startHttp, boolean oneshot) {
this.physicalNodeName = nodeName;
rpcMan = rpc;
instance = this;
this.startHttp = startHttp;
this.nodesMan = new LogicalNodeManager(nodeName);
File defaultDir = new File(conf.getAgentLogsDir(), getPhysicalNodeName());
WALManager walMan = new NaiveFileWALManager(defaultDir);
this.walMans.put(getPhysicalNodeName(), walMan);
this.failoverMans.put(getPhysicalNodeName(), new NaiveFileFailoverManager(
defaultDir));
this.collectorAck = new CollectorAckListener(rpcMan);
if (!oneshot) {
this.liveMan = new LivenessManager(nodesMan, rpcMan,
new FlumeNodeWALNotifier(this.walMans));
this.reportPusher = new MasterReportPusher(conf, ReportManager.get(),
rpcMan);
} else {
this.liveMan = null;
this.reportPusher = null;
}
this.chokeMan = new ChokeManager();
this.vmInfo = new FlumeVMInfo(PHYSICAL_NODE_REPORT_PREFIX
+ this.getPhysicalNodeName() + ".");
this.sysInfo = new SystemInfo(PHYSICAL_NODE_REPORT_PREFIX
+ this.getPhysicalNodeName() + ".");
}
public FlumeNode(MasterRPC rpc,
boolean startHttp,
boolean oneshot) {
this(FlumeConfiguration.get(), NetUtils.localhost(), rpc, startHttp,
oneshot);
}
public FlumeNode(String nodename, FlumeConfiguration conf,
boolean startHttp,
boolean oneshot) {
this(conf, nodename, new MultiMasterRPC(conf, true), startHttp, oneshot);
}
this(NetUtils.localhost(), conf, false , false );
}
if (instance == null) {
instance = new FlumeNode(FlumeConfiguration.get());
}
return instance;
}
public static String
getWebPath(FlumeConfiguration conf) {
String webPath = conf.getWebAppsPath();
File f = new File(webPath);
if (!f.isAbsolute()) {
String basepath = FlumeConfiguration.getFlumeHome();
if (basepath == null) {
LOG.warn("FLUME_HOME not set, potential for odd behavior!");
}
File base = new File(basepath, webPath);
webPath = base.getAbsolutePath();
}
return webPath;
}
synchronized public void start() {
FlumeConfiguration conf = FlumeConfiguration.get();
ReportManager.get().add(vmInfo);
ReportManager.get().add(sysInfo);
ReportManager.get().add(this);
if (startHttp) {
try {
String webPath = getWebPath(conf);
boolean findport = FlumeConfiguration.get().getNodeAutofindHttpPort();
this.http = new StatusHttpServer("flumeagent", webPath, "0.0.0.0",
conf.getNodeStatusPort(), findport);
http.start();
} catch (IOException e) {
LOG.error("Flume node failed: " + e.getMessage(), e);
} catch (Throwable t) {
LOG.error("Unexcepted exception/error thrown! " + t.getMessage(), t);
}
}
if (reportPusher != null) {
reportPusher.start();
}
if (liveMan != null) {
liveMan.start();
}
if (chokeMan != null) {
chokeMan.setDaemon(true);
chokeMan.start();
}
}
synchronized public void stop() {
if (this.http != null) {
try {
http.stop();
} catch (Exception e) {
LOG.error("Exception stopping FlumeNode", e);
}
}
if (reportPusher != null) {
reportPusher.stop();
}
if (liveMan != null) {
liveMan.stop();
}
if (chokeMan != null) {
chokeMan.halt();
}
}
String outputFormatPluginClasses = FlumeConfiguration.get().get(
FlumeConfiguration.OUTPUT_FORMAT_PLUGIN_CLASSES, "");
String[] classes = outputFormatPluginClasses.split(",\\s*");
for (String className : classes) {
try {
Class<?> cls = Class.forName(className);
if (OutputFormatBuilder.class.isAssignableFrom(cls)) {
OutputFormatBuilder builder = (OutputFormatBuilder) cls.newInstance();
FormatFactory.get().registerFormat(builder.getName(), builder);
LOG.info("Registered output format plugin " + className);
} else {
LOG.warn("Ignoring output format plugin class " + className
+ " - Does not subclass OutputFormatBuilder");
}
} catch (ClassNotFoundException e) {
LOG.warn("Unable to load output format plugin class " + className
+ " - Class not found");
} catch (FlumeSpecException e) {
LOG.warn("Unable to load output format plugin class " + className
+ " - Flume spec exception follows.", e);
} catch (InstantiationException e) {
LOG.warn("Unable to load output format plugin class " + className
+ " - Unable to instantiate class.", e);
} catch (IllegalAccessException e) {
LOG.warn("Unable to load output format plugin class " + className
+ " - Access violation.", e);
}
}
}
public void init(String[] args) {
try {
setup(args);
} catch (IOException ioe) {
LOG.error("Failed to init Flume Node", ioe);
}
}
stop();
}
return getReport().toHtml();
}
if (liveMan == null)
return null;
return liveMan.getAckChecker();
}
return collectorAck;
}
log.info("Flume " + VersionInfo.getVersion());
log.info(" rev " + VersionInfo.getRevision());
log.info("Compiled on " + VersionInfo.getDate());
}
Properties props = System.getProperties();
for (Entry<Object, Object> p : props.entrySet()) {
log.info("System property " + p.getKey() + "=" + p.getValue());
}
}
FlumeConfiguration conf = FlumeConfiguration.get();
String s = conf.getAgentLogsDir();
File f = new File(s);
if (!FileUtil.makeDirs(f)) {
throw new IOException("Path to Log dir cannot be created: '" + s
+ "'. Check permissions?");
}
if (!f.isDirectory()) {
throw new IOException("Log dir '" + s
+ "' already exists as a file. Check log dir path.");
}
File f2 = null;
try {
f2 = File.createTempFile("initcheck", ".test", f);
} catch (IOException e) {
throw new IOException("Failure to write in log directory: '" + s
+ "'. Check permissions?");
}
if (!f2.delete()) {
throw new IOException("Unable to delete " + f2 + " from log directory "
+ "(but writing succeeded) - something is strange here");
}
File tmp = new File("/tmp");
File cur = f;
while (cur != null) {
if (cur.equals(tmp)) {
LOG.warn("Log directory is writing inside of /tmp. This data may not survive reboot!");
break;
}
cur = cur.getParentFile();
}
}
public static FlumeNode
setup(String[] argv)
throws IOException {
logVersion(LOG);
logEnvironment(LOG);
if (!CheckJavaVersion.isVersionOk()) {
LOG.error("Exiting because of an old Java version or Java version in bad format");
System.exit(-1);
}
LOG.info("Starting flume agent on: " + NetUtils.localhost());
LOG.info(" Working directory is: " + new File(".").getAbsolutePath());
FlumeConfiguration.hardExitLoadConfig();
CommandLine cmd = null;
Options options = new Options();
options.addOption("c", true, "Load initial config from cmdline arg");
options.addOption("n", true, "Set node name");
options.addOption("s", false,
"Do not start local flume status server on node");
options.addOption("1", false,
"Make flume node one shot (if closes or errors, exits)");
options.addOption("m", false,
"Have flume hard exit if in likey gc thrash situation");
options.addOption("h", false, "Print help information");
options.addOption("v", false, "Print version information");
try {
CommandLineParser parser = new PosixParser();
cmd = parser.parse(options, argv);
} catch (ParseException e) {
HelpFormatter fmt = new HelpFormatter();
fmt.printHelp("FlumeNode", options, true);
return null;
}
if (cmd != null && cmd.hasOption("v")) {
return null;
}
if (cmd != null && cmd.hasOption("h")) {
HelpFormatter fmt = new HelpFormatter();
fmt.printHelp("FlumeNode", options, true);
return null;
}
nodeConfigChecksOk();
String nodename = NetUtils.localhost();
if (cmd != null && cmd.hasOption("n")) {
nodename = cmd.getOptionValue("n");
}
boolean startHttp = false;
if (cmd != null && !cmd.hasOption("s")) {
startHttp = true;
}
boolean oneshot = false;
if (cmd != null && cmd.hasOption("1")) {
oneshot = true;
}
loadOutputFormatPlugins();
FlumeConfiguration conf = FlumeConfiguration.get();
FlumeNode flume = new FlumeNode(nodename, conf, startHttp, oneshot);
flume.start();
if (cmd != null && cmd.hasOption("c")) {
String spec = cmd.getOptionValue("c");
LOG.info("Loading spec from command line: '" + spec + "'");
try {
Context ctx = new LogicalNodeContext(nodename, nodename);
Map<String, Pair<String, String>> cfgs = FlumeBuilder.parseConf(ctx,
spec);
Pair<String, String> node = cfgs.get(nodename);
flume.nodesMan.spawn(nodename, node.getLeft(), node.getRight());
} catch (Exception e) {
LOG.warn("Caught exception loading node:" + e.getMessage());
LOG.debug("Exception: ", e);
if (oneshot) {
System.exit(0);
}
}
} else {
try {
flume.nodesMan.spawn(nodename, "null", "null");
} catch (FlumeSpecException e) {
LOG.error("This should never happen", e);
} catch (IOException e) {
LOG.error("Caught exception loading node", e);
}
}
if (cmd != null && cmd.hasOption("m")) {
LOG.info("Setup hard exit on memory exhaustion");
MemoryMonitor.setupHardExitMemMonitor(FlumeConfiguration.get()
.getAgentMemoryThreshold());
}
try {
tryKerberosLogin();
} catch (IOException ioe) {
LOG.error("Failed to kerberos login.", ioe);
}
return flume;
}
@SuppressWarnings("unchecked")
boolean useSec = false;
try {
Class<UserGroupInformation> c = UserGroupInformation.class;
useSec = (Boolean) c.getMethod("isSecurityEnabled").invoke(null);
} catch (Exception e) {
LOG.warn("Flume is using Hadoop core "
+ org.apache.hadoop.util.VersionInfo.getVersion()
+ " which does not support Security / Authentication: "
+ e.getMessage());
return;
}
LOG.info("Hadoop Security enabled: " + useSec);
if (!useSec) {
return;
}
String principal = FlumeConfiguration.get().getKerberosPrincipal();
String keytab = FlumeConfiguration.get().getKerberosKeytab();
LOG.info("Kerberos login as " + principal + " from " + keytab);
try {
Class c = Class.forName("org.apache.hadoop.security.SecurityUtil");
Method m = c.getMethod("login", Configuration.class, String.class,
String.class);
m.invoke(null, FlumeConfiguration.get(),
FlumeConfiguration.SECURITY_KERBEROS_KEYTAB,
FlumeConfiguration.SECURITY_KERBEROS_PRINCIPAL);
} catch (Exception e) {
LOG.error("Flume failed when attempting to authenticate with keytab "
+ FlumeConfiguration.get().getKerberosKeytab() + " and principal '"
+ FlumeConfiguration.get().getKerberosPrincipal() + "'", e);
return;
}
try {
Class<UserGroupInformation> c2 = UserGroupInformation.class;
UserGroupInformation ugi = (UserGroupInformation) c2.getMethod(
"getLoginUser").invoke(null);
String authMethod = c2.getMethod("getAuthenticationMethod").invoke(ugi)
.toString();
boolean keytabBased = (Boolean) c2.getMethod("isLoginKeytabBased")
.invoke(ugi);
LOG.info("Auth method: " + authMethod);
LOG.info(" User name: " + ugi.getUserName());
LOG.info(" Using keytab: " + keytabBased);
} catch (Exception e) {
LOG.error("Flume was unable to dump kerberos login user"
+ " and authentication method", e);
return;
}
}
public static void main(String[] argv) {
try {
setup(argv);
} catch (Exception e) {
LOG.error(
"Aborting: Unexpected problem with environment." + e.getMessage(), e);
System.exit(-1);
}
}
synchronized (walMans) {
return walMans.get(getPhysicalNodeName());
}
}
synchronized (walMans) {
if (walnode == null) {
return getWalManager();
}
return walMans.get(walnode);
}
}
Preconditions.checkArgument(walnode != null);
FlumeConfiguration conf = FlumeConfiguration.get();
WALManager wm = new NaiveFileWALManager(new File(new File(
conf.getAgentLogsDir()), walnode));
synchronized (walMans) {
walMans.put(walnode, wm);
return wm;
}
}
synchronized (failoverMans) {
WALManager walman = getWalManager(dfonode);
if (walman == null) {
walman = addWalManager(dfonode);
}
return walman;
}
}
synchronized (failoverMans) {
return failoverMans.get(getPhysicalNodeName());
}
}
synchronized (failoverMans) {
if (dfonode == null) {
return getDFOManager();
}
return failoverMans.get(dfonode);
}
}
Preconditions.checkArgument(dfonode != null);
FlumeConfiguration conf = FlumeConfiguration.get();
DiskFailoverManager wm = new NaiveFileFailoverManager(new File(new File(
conf.getAgentLogsDir()), dfonode));
synchronized (failoverMans) {
failoverMans.put(dfonode, wm);
return wm;
}
}
synchronized (failoverMans) {
DiskFailoverManager dfoman = getDFOManager(dfonode);
if (dfoman == null) {
dfoman = addDFOManager(dfonode);
}
return dfoman;
}
}
return nodesMan;
}
return liveMan;
}
return chokeMan;
}
@Override
return PHYSICAL_NODE_REPORT_PREFIX + this.getPhysicalNodeName();
}
@Override
ReportEvent node = new ReportEvent(getName());
node.setLongMetric(R_NUM_LOGICAL_NODES, this.getLogicalNodeManager()
.getNodes().size());
node.hierarchicalMerge(nodesMan.getName(), nodesMan.getReport());
if (getAckChecker() != null) {
node.hierarchicalMerge(getAckChecker().getName(), getAckChecker()
.getReport());
}
return node;
}
return physicalNodeName;
}
}