package com.cloudera.flume.master;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.util.Set;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudera.flume.agent.FlumeNode;
import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.flume.master.flows.FlowConfigManager;
import com.cloudera.flume.master.logical.LogicalConfigurationManager;
import com.cloudera.flume.reporter.ReportEvent;
import com.cloudera.flume.reporter.ReportManager;
import com.cloudera.flume.reporter.Reportable;
import com.cloudera.flume.reporter.server.AvroReportServer;
import com.cloudera.flume.reporter.server.ThriftReportServer;
import com.cloudera.flume.util.FlumeVMInfo;
import com.cloudera.flume.util.SystemInfo;
import com.cloudera.util.CheckJavaVersion;
import com.cloudera.util.NetUtils;
import com.cloudera.util.StatusHttpServer;
protected static final String ZK_CFG_STORE = "zookeeper";
protected static final String MEMORY_CFG_STORE = "memory";
protected final FlumeConfiguration cfg;
static final Logger LOG = LoggerFactory.getLogger(FlumeMaster.class);
static final String REPORTKEY_HOSTNAME = "hostname";
static final String REPORTKEY_NODES_REPORTING_COUNT = "nodes_reporting_count";
MasterAdminServer configServer;
MasterClientServer controlServer;
ThriftReportServer thriftReportServer = null;
AvroReportServer avroReportServer = null;
StatusHttpServer http = null;
final boolean doHttp;
final CommandManager cmdman;
final ConfigurationManager specman;
final StatusManager statman;
final MasterAckManager ackman;
final String uniqueMasterName;
Thread reaper;
static FlumeMaster instance;
this(FlumeConfiguration.get(), true);
}
this(cfg, false);
}
public FlumeMaster(FlumeConfiguration cfg,
boolean doHttp) {
this.cfg = cfg;
instance = this;
this.uniqueMasterName = "flume-master-" + cfg.getMasterServerId();
this.doHttp = doHttp;
this.cmdman = new CommandManager();
ConfigStore cfgStore = createConfigStore(FlumeConfiguration.get());
this.statman = new StatusManager();
ConfigurationManager base = new ConfigManager(cfgStore);
ConfigurationManager flowedFailovers = new FlowConfigManager.FailoverFlowConfigManager(
base, statman);
this.specman = new LogicalConfigurationManager(flowedFailovers,
new ConfigManager(), statman);
if (FlumeConfiguration.get().getMasterIsDistributed()) {
this.ackman = new GossipedMasterAckManager(FlumeConfiguration.get());
} else {
this.ackman = new MasterAckManager();
}
}
public FlumeMaster(CommandManager cmd, ConfigurationManager cfgMan,
StatusManager stat, MasterAckManager ack, FlumeConfiguration cfg) {
instance = this;
this.doHttp = false;
this.cmdman = cmd;
this.specman = cfgMan;
this.statman = stat;
this.ackman = ack;
this.cfg = cfg;
this.uniqueMasterName = "flume-master-" + cfg.getMasterServerId();
}
if (instance == null) {
instance = new FlumeMaster();
}
return instance;
}
ConfigStore cfgStore;
if (cfg.getMasterStore().equals(ZK_CFG_STORE)) {
cfgStore = new ZooKeeperConfigStore();
} else if (cfg.getMasterStore().equals(MEMORY_CFG_STORE)) {
if (cfg.getMasterIsDistributed()) {
throw new IllegalStateException("Can't use non-zookeeper store with "
+ "distributed Master");
}
cfgStore = new MemoryBackedConfigStore();
} else {
throw new IllegalArgumentException("Unsupported config store: "
+ cfg.getMasterStore());
}
return cfgStore;
}
public long submit(Command cmd) {
return cmdman.submit(cmd);
}
public void serve()
throws IOException {
if (cfg.getMasterStore().equals(ZK_CFG_STORE)) {
try {
ZooKeeperService.getAndInit(cfg);
} catch (InterruptedException e) {
throw new IOException("Unexpected interrupt when starting ZooKeeper", e);
}
}
ReportManager.get().add(new FlumeVMInfo(this.uniqueMasterName + "."));
ReportManager.get().add(new SystemInfo(this.uniqueMasterName + "."));
if (doHttp) {
String webPath = FlumeNode.getWebPath(cfg);
this.http = new StatusHttpServer("flumeconfig", webPath, "0.0.0.0", cfg
.getMasterHttpPort(), false);
http.start();
}
controlServer = new MasterClientServer(this, FlumeConfiguration.get());
configServer = new MasterAdminServer(this, FlumeConfiguration.get());
avroReportServer = new AvroReportServer(FlumeConfiguration.get()
.getReportServerPort());
thriftReportServer = new ThriftReportServer(FlumeConfiguration.get()
.getReportServerPort());
ReportManager.get().add(this);
try {
controlServer.serve();
configServer.serve();
if (cfg.getReportServerRPC() == cfg.RPC_TYPE_AVRO) {
avroReportServer.serve();
} else {
thriftReportServer.serve();
}
} catch (TTransportException e1) {
throw new IOException("Error starting control or config server", e1);
}
cmdman.start();
ackman.start();
specman.start();
reaper = new Thread("Lost node reaper") {
@Override
try {
while (true) {
Thread.sleep(FlumeConfiguration.get().getConfigHeartbeatPeriod());
statman.checkup();
}
} catch (InterruptedException e) {
LOG.error("Reaper thread unexpectedly interrupted:" + e.getMessage());
LOG.debug("Lost node reaper unexpectedly interrupted", e);
}
}
};
reaper.start();
}
try {
if (http != null) {
try {
http.stop();
} catch (Exception e) {
LOG.error("Error stopping FlumeMaster", e);
}
http = null;
}
cmdman.stop();
ackman.stop();
if (configServer != null) {
configServer.stop();
configServer = null;
}
if (controlServer != null) {
controlServer.stop();
controlServer = null;
}
if (cfg.getReportServerRPC() == cfg.RPC_TYPE_AVRO) {
if (avroReportServer != null) {
avroReportServer.stop();
avroReportServer = null;
}
} else {
if (thriftReportServer != null) {
thriftReportServer.stop();
thriftReportServer = null;
}
}
specman.stop();
reaper.interrupt();
FlumeConfiguration cfg = FlumeConfiguration.get();
if (cfg.getMasterStore().equals(ZK_CFG_STORE)) {
ZooKeeperService.get().shutdown();
}
} catch (IOException e) {
LOG.error("Exception when shutting down master!", e);
} catch (Exception e) {
LOG.error("Exception when shutting down master!", e);
}
}
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
OutputStreamWriter w = new OutputStreamWriter(baos);
reportHtml(w);
w.flush();
return baos.toString();
} catch (IOException e) {
LOG.error("html report generation failed", e);
}
return "";
}
public void reportHtml(Writer o)
throws IOException {
statman.getReport().toHtml(o);
specman.getReport().toHtml(o);
cmdman.getReport().toHtml(o);
}
return statman.getNodeStatuses().keySet();
}
return specman;
}
return statman;
}
return ackman;
}
return cmdman;
}
@Override
return this.uniqueMasterName;
}
@Override
ReportEvent rpt = new ReportEvent(getName());
rpt.setStringMetric(REPORTKEY_HOSTNAME, NetUtils.localhost());
rpt.setLongMetric(REPORTKEY_NODES_REPORTING_COUNT, this.getKnownNodes()
.size());
return rpt;
}
public static void main(String[] argv) {
FlumeNode.logVersion(LOG);
FlumeNode.logEnvironment(LOG);
if (!CheckJavaVersion.isVersionOk()) {
LOG
.error("Exiting because of an old Java version or Java version in bad format");
System.exit(-1);
}
FlumeConfiguration.hardExitLoadConfig();
CommandLine cmd = null;
Options options = new Options();
options.addOption("c", true, "Load config from file");
options.addOption("f", false, "Use fresh (empty) flume configs");
options.addOption("i", true, "Server id (an integer from 0 up)");
try {
CommandLineParser parser = new PosixParser();
cmd = parser.parse(options, argv);
} catch (ParseException e) {
HelpFormatter fmt = new HelpFormatter();
fmt.printHelp("FlumeNode", options, true);
System.exit(0);
}
String nodeconfig = FlumeConfiguration.get().getMasterSavefile();
if (cmd != null && cmd.hasOption("c")) {
nodeconfig = cmd.getOptionValue("c");
}
if (cmd != null && cmd.hasOption("i")) {
String sid = cmd.getOptionValue("i");
LOG.info("Setting serverid from command line to be " + sid);
try {
int serverid = Integer.parseInt(cmd.getOptionValue("i"));
FlumeConfiguration.get().setInt(FlumeConfiguration.MASTER_SERVER_ID,
serverid);
} catch (NumberFormatException e) {
LOG.error("Couldn't parse server id as integer: " + sid);
System.exit(0);
}
}
FlumeMaster config = new FlumeMaster();
LOG.info("Starting flume master on: " + NetUtils.localhost());
LOG.info(" Working Directory is: " + new File(".").getAbsolutePath());
try {
boolean autoload = FlumeConfiguration.get().getMasterSavefileAutoload();
try {
if (autoload && (cmd == null || (cmd != null && !cmd.hasOption("f")))) {
config.getSpecMan().loadConfigFile(nodeconfig);
}
} catch (IOException e) {
LOG.warn("Could not autoload config from " + nodeconfig + " because "
+ e.getMessage());
}
config.serve();
} catch (IOException e) {
LOG.error("IO problem: " + e.getMessage());
LOG.debug("IOException", e);
}
}
}