package com.cloudera.flume.conf;
import java.io.File;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudera.util.Pair;
import com.google.common.base.Preconditions;
protected static final Logger LOG = LoggerFactory
.getLogger(FlumeConfiguration.class);
private static FlumeConfiguration singleton;
String flumeHome = System.getProperty("flume.home");
if (null == flumeHome) {
flumeHome = System.getenv("FLUME_HOME");
}
if (null == flumeHome) {
LOG.warn("-Dflume.home and $FLUME_HOME both unset");
}
return flumeHome;
}
String flumeConfDir = System.getProperty("flume.conf.dir");
if (null == flumeConfDir) {
flumeConfDir = System.getenv("FLUME_CONF_DIR");
}
if (null == flumeConfDir) {
String flumeHome = getFlumeHome();
if (null != flumeHome) {
flumeConfDir = new File(flumeHome, "conf").toString();
} else {
flumeConfDir = "./conf";
}
}
return flumeConfDir;
}
public synchronized static FlumeConfiguration
get() {
if (singleton == null)
singleton = new FlumeConfiguration();
return singleton;
}
this(true);
}
singleton = new FlumeConfiguration(false);
return get();
}
super();
if (loadDefaults) {
Path home = null;
String flumeHome = getFlumeHome();
if (flumeHome == null) {
home = new Path(".");
} else {
home = new Path(flumeHome);
}
Path conf = new Path(getFlumeConfDir());
LOG.info("Loading configurations from " + conf);
super.addResource(new Path(conf, "flume-conf.xml"));
super.addResource(new Path(conf, "flume-site.xml"));
}
}
static public final int DEFAULT_HEARTBEAT_PORT = 35872;
static public final int DEFAULT_GOSSIP_PORT = 57890;
static public final int DEFAULT_ADMIN_PORT = 35873;
static public final int DEFAULT_HTTP_PORT = 35871;
static public final int DEFAULT_REPORT_SERVER_PORT = 45678;
public static final int DEFAULT_ZK_CLIENT_PORT = 3181;
public static final int DEFAULT_ZK_SERVER_QUORUM_PORT = 3182;
public static final int DEFAULT_ZK_SERVER_ELECTION_PORT = 3183;
static public final int DEFAULT_SCRIBE_SOURCE_PORT = 1463;
public final static String WATCHDOG_MAX_RESTARTS = "watchdog.restarts.max";
public final static String AGENT_LOG_DIR_NEW = "flume.agent.logdir";
public final static String AGENT_LOG_MAX_AGE = "flume.agent.logdir.maxage";
public static final String AGENT_LOG_ACKED_RETRANSMIT_AGE = "flume.agent.logdir.retransmit";
public final static String NODE_STATUS_PORT = "flume.node.status.port";
public static final String AGENT_FAILOVER_INITIAL_BACKOFF = "flume.agent.failover.backoff.initial";
public static final String AGENT_FAILOVER_MAX_BACKOFF = "flume.agent.failover.backoff.max";
public static final String AGENT_FAILOVER_MAX_CUMULATIVE_BACKOFF = "flume.agent.failover.backoff.cumulativemax";
public static final String AGENT_HEARTBEAT_BACKOFF = "flume.agent.heartbeat.backoff";
public static final String AGENT_MEMTHRESHOLD = "flume.agent.mem.threshold";
public static final String AGENT_MULTIMASTER_MAXRETRIES = "flume.agent.multimaster.maxretries";
public static final String AGENT_MULTIMASTER_RETRYBACKOFF = "flume.agent.multimaster.retrybackoff";
public static final String DEFAULT_FLOW_NAME = "flume.flow.default.name";
public static final String POLLER_QUEUESIZE = "flume.poller.queuesize";
public static final String THRIFT_QUEUESIZE = "flume.thrift.queuesize";
public static final String THRIFT_CLOSE_MAX_SLEEP = "flume.thrift.close.maxsleep";
public static final String THRIFT_SOCKET_TIMEOUT_MS = "flume.thrift.socket.timeout.ms";
public static final String INSISTENTOPEN_INIT_BACKOFF = "flume.inisistentOpen.init.backoff";
public static final String HISTORY_DEFAULTPERIOD = "flume.countHistory.period";
public static final String HISTORY_MAXLENGTH = "flume.history.maxlength";
public static final String TAIL_POLLPERIOD = "flume.tail.pollperiod";
public static final String EVENT_RPC_TYPE = "flume.event.rpc";
public final static String COLLECTOR_EVENT_HOST = "flume.collector.event.host";
public final static String COLLECTOR_EVENT_PORT = "flume.collector.event.port";
public static final String COLLECTOR_DFS_DIR = "flume.collector.dfs.dir";
public static final String COLLECTOR_ROLL_MILLIS = "flume.collector.roll.millis";
public static final String COLLECTOR_OUTPUT_FORMAT = "flume.collector.output.format";
public static final String COLLECTOR_DFS_COMPRESS_GZIP = "flume.collector.dfs.compress.gzip";
public static final String COLLECTOR_DFS_COMPRESS_CODEC = "flume.collector.dfs.compress.codec";
public static final String MASTER_HTTP_PORT = "flume.master.http.port";
public static final String MASTER_HEARTBEAT_PORT = "flume.master.heartbeat.port";
public static final String MASTER_HEARTBEAT_SERVERS = "flume.master.heartbeat.servers";
public static final String MASTER_HEARBEAT_RPC = "flume.master.heartbeat.rpc";
public static final String CONFIG_HEARTBEAT_PERIOD = "flume.config.heartbeat.period";
public static final String MASTER_HEARTBEAT_MAX_MISSED = "flume.config.heartbeat.missed.max";
public static final String NODE_HEARTBEAT_BACKOFF_LIMIT = "flume.node.heartbeat.backoff.ceiling";
public static final String NODE_HTTP_AUTOFINDPORT = "flume.node.http.autofindport";
public static final String NODE_CLOSE_TIMEOUT = "flume.node.close.timeout";
public static final String CONFIG_ADMIN_PORT = "flume.config.admin.port";
public static final String REPORT_SERVER_PORT = "flume.report.server.port";
public static final String REPORT_SERVER_RPC_TYPE = "flume.report.server.rpc.type";
public static final String MASTER_SAVEFILE = "flume.master.savefile";
public static final String MASTER_SAVEFILE_AUTOLOAD = "flume.master.savefile.autoload";
public static final String REPORTER_POLLER_PERIOD = "flume.reporter.poller.period";
public static final String SECURITY_KERBEROS_PRINCIPAL = "flume.security.kerberos.principal";
public static final String SECURITY_KERBEROS_KEYTAB = "flume.security.kerberos.keytab";
public static final String FLURKER_ENCODING = "flume.irc.encoding";
public static final String TWITTER_STREAM_URL = "flume.twitter.url";
public static final String TWITTER_USER = "flume.twitter.username";
public static final String TWITTER_PW = "flume.twitter.password";
public static final String SCRIBE_SOURCE_PORT = "flume.scribe.source.port";
public static final String WEBAPPS_PATH = "flume.webapps.path";
public static final String MASTER_SERVERS = "flume.master.servers";
public static final String MASTER_STORE = "flume.master.store";
public static final String MASTER_SERVER_ID = "flume.master.serverid";
public static final String MASTER_GOSSIP_SERVERS = "flume.master.gossip.servers";
public static final String MASTER_GOSSIP_PERIOD_MS = "flume.master.gossip.period";
public static final String MASTER_GOSSIP_PORT = "flume.master.gossip.port";
public static final String MASTER_ZK_LOGDIR = "flume.master.zk.logdir";
public static final String MASTER_ZK_CLIENT_PORT = "flume.master.zk.client.port";
public static final String MASTER_ZK_SERVER_QUORUM_PORT = "flume.master.zk.server.quorum.port";
public static final String MASTER_ZK_SERVER_ELECTION_PORT = "flume.master.zk.server.election.port";
public static final String MASTER_ZK_SERVERS = "flume.master.zk.servers";
public static final String MASTER_ZK_USE_EXTERNAL = "flume.master.zk.use.external";
public static final String ZK_TICK_TIME = "flume.zk.ticktime";
public static final String ZK_INIT_LIMIT = "flume.zk.initlimit";
public static final String ZK_SYNC_LIMIT = "flume.zk.synclimit";
public static final String EVENT_MAX_SIZE = "flume.event.max.size.bytes";
public static final String GANGLIA_SERVERS = "flume.ganglia.servers";
public static final String HIVE_HOST = "flume.hive.host";
public static final String HIVE_PORT = "flume.hive.port";
public static final String HIVE_USER = "flume.hive.user";
public static final String HIVE_PW = "flume.hive.userpw";
public static String HIVE_MARKER_FOLDER = "flume.hive.markerfolder";
public static String HIVE_DEFAULT_MARKER_FOLDER = "hdfs://admin1.research.hadoop.sjc1.mozilla.com/user/hive/warehouse/flume-temp-marker-folder";
public static String ELASTICSEARCH_MARKER_FOLDER = "flume.elasticsearch.markerfolder";
public static String ELASTICSEARCH_DEFAULT_MARKER_FOLDER = "hdfs://admin1.research.hadoop.sjc1.mozilla.com/user/hive/warehouse/elasticsearch-temp-marker-folder";
public static final String PLUGIN_CLASSES = "flume.plugin.classes";
public static final String OUTPUT_FORMAT_PLUGIN_CLASSES = "flume.plugin.outputformat.classes";
public static final String RPC_TYPE_THRIFT = "THRIFT";
public static final String RPC_TYPE_AVRO = "AVRO";
return getMasterServers().split(",").length > 1;
}
return get(MASTER_STORE, "zookeeper");
}
String servers = get(MASTER_ZK_SERVERS, null);
if (servers != null) {
return servers;
}
String[] hosts = getMasterServers().split(",");
int clientport = getMasterZKClientPort();
int quorumport = getMasterZKServerQuorumPort();
int electionport = getMasterZKServerElectionPort();
List<String> l = Arrays.asList(hosts);
Iterator<String> iter = l.iterator();
StringBuilder builder = new StringBuilder();
while (iter.hasNext()) {
builder.append(iter.next() + ":" + clientport + ":" + quorumport + ":"
+ electionport);
if (iter.hasNext()) {
builder.append(',');
}
}
return builder.toString();
}
String servers = getMasterZKServers();
String[] hosts = servers.split(",");
StringBuilder builder = new StringBuilder();
for (int i = 0; i < hosts.length; ++i) {
hosts[i] = hosts[i].trim();
String[] parts = hosts[i].split(":");
builder.append(parts[0] + ":" + parts[1]);
if (i < hosts.length - 1) {
builder.append(",");
}
}
return builder.toString();
}
String clientport = get(MASTER_ZK_CLIENT_PORT, null);
if (clientport != null) {
return Integer.parseInt(clientport);
}
String servers = get(MASTER_ZK_SERVERS, null);
if (servers == null) {
return DEFAULT_ZK_CLIENT_PORT;
}
List<String> serverList = Arrays.asList(servers.split(","));
int serverid = getMasterServerId();
Preconditions.checkState(serverid < serverList.size(),
"Serverid is out of range: " + serverid);
String[] server = serverList.get(serverid).split(":");
Preconditions.checkState(server.length == 4, "Server spec "
+ serverList.get(serverid) + " is ill-formed");
return Integer.parseInt(server[1].trim());
}
String quorumport = get(MASTER_ZK_SERVER_QUORUM_PORT, null);
if (quorumport != null) {
return Integer.parseInt(quorumport);
}
String servers = get(MASTER_ZK_SERVERS, null);
if (servers == null) {
return DEFAULT_ZK_SERVER_QUORUM_PORT;
}
List<String> serverList = Arrays.asList(servers.split(","));
int serverid = getMasterServerId();
Preconditions.checkState(serverid < serverList.size(),
"Serverid is out of range: " + serverid);
String[] server = serverList.get(serverid).split(":");
Preconditions.checkState(server.length == 4, "Server spec "
+ serverList.get(serverid) + " is ill-formed");
return Integer.parseInt(server[2].trim());
}
String electionport = get(MASTER_ZK_SERVER_ELECTION_PORT, null);
if (electionport != null) {
return Integer.parseInt(electionport);
}
String servers = get(MASTER_ZK_SERVERS, null);
if (servers == null) {
return DEFAULT_ZK_SERVER_ELECTION_PORT;
}
List<String> serverList = Arrays.asList(servers.split(","));
int serverid = getMasterServerId();
Preconditions.checkState(serverid < serverList.size(),
"Serverid is out of range: " + serverid);
String[] server = serverList.get(serverid).split(":");
Preconditions.checkState(server.length == 4, "Server spec "
+ serverList.get(serverid) + " is ill-formed");
return Integer.parseInt(server[3].trim());
}
return getBoolean(MASTER_ZK_USE_EXTERNAL, false);
}
return get(MASTER_ZK_LOGDIR, "/tmp/flume/master/zk");
}
return getInt(ZK_TICK_TIME, 2000);
}
return getInt(ZK_INIT_LIMIT, 10);
}
return getInt(ZK_SYNC_LIMIT, 5);
}
return getInt(MASTER_SERVER_ID, 0);
}
return get(AGENT_LOG_DIR_NEW, "/tmp/flume/agent");
}
return getLong(AGENT_LOG_MAX_AGE, 10000);
}
return getInt(WATCHDOG_MAX_RESTARTS, 5);
}
return getInt(NODE_STATUS_PORT, 35862);
}
return getLong(AGENT_LOG_ACKED_RETRANSMIT_AGE, 60000);
}
return getFloat(AGENT_MEMTHRESHOLD, 0.95F);
}
return getInt(AGENT_MULTIMASTER_MAXRETRIES, 12);
}
return getInt(AGENT_MULTIMASTER_RETRYBACKOFF, 5000);
}
return getLong(AGENT_FAILOVER_INITIAL_BACKOFF, 1000);
}
return getLong(AGENT_FAILOVER_MAX_BACKOFF, 60000);
}
return getLong(AGENT_FAILOVER_MAX_CUMULATIVE_BACKOFF, Integer.MAX_VALUE);
}
return getInt(POLLER_QUEUESIZE, 100);
}
return getInt(THRIFT_QUEUESIZE, 1000);
}
return getInt(THRIFT_SOCKET_TIMEOUT_MS, 10000);
}
return getLong(INSISTENTOPEN_INIT_BACKOFF, 1000);
}
return getLong(HISTORY_DEFAULTPERIOD, 1000);
}
return getLong(HISTORY_MAXLENGTH, 300);
}
return getLong(TAIL_POLLPERIOD, 1000);
}
return get(COLLECTOR_EVENT_HOST, "localhost");
}
return getInt(COLLECTOR_EVENT_PORT, 35853);
}
String[] validRPCProtocols = { RPC_TYPE_AVRO, RPC_TYPE_THRIFT };
String entered = get(EVENT_RPC_TYPE, RPC_TYPE_THRIFT).toUpperCase();
for (String prot : validRPCProtocols) {
if (entered.equals(prot)) {
return prot;
}
}
LOG.warn("event.rpc.type incorrectly defined, should be either"
+ " \"THRIFT\" or \"AVRO\". Defaulting to \"THRIFT\"");
return RPC_TYPE_THRIFT;
}
return get(COLLECTOR_DFS_DIR, "file://tmp/flume/collected");
}
@Deprecated
return getBoolean(COLLECTOR_DFS_COMPRESS_GZIP, false);
}
return get(COLLECTOR_DFS_COMPRESS_CODEC, "None");
}
return getLong(COLLECTOR_ROLL_MILLIS, 30000);
}
String svrs = get(MASTER_SERVERS, "localhost");
String[] hosts = svrs.split(",");
StringBuilder builder = new StringBuilder();
for (int i = 0; i < hosts.length; ++i) {
hosts[i] = hosts[i].trim();
String[] parts = hosts[i].split(":");
builder.append(parts[0]);
if (parts.length > 1) {
LOG.warn("Master Server's should not have list ports but host '"
+ hosts[i] + " 'specified ports! ");
}
if (i < hosts.length - 1) {
builder.append(",");
}
}
return builder.toString();
}
String servers = get(MASTER_HEARTBEAT_SERVERS, null);
if (servers != null) {
return servers;
}
String[] hosts = getMasterServers().split(",");
int heartbeatport = getMasterHeartbeatPort();
List<String> l = Arrays.asList(hosts);
Iterator<String> iter = l.iterator();
StringBuilder builder = new StringBuilder();
while (iter.hasNext()) {
builder.append(iter.next() + ":" + heartbeatport);
if (iter.hasNext()) {
builder.append(',');
}
}
return builder.toString();
}
String[] hosts = getMasterHeartbeatServers().split(",");
List<Pair<String, Integer>> ret = new ArrayList<Pair<String, Integer>>(
hosts.length);
for (String s : hosts) {
String[] parts = s.split(":");
ret.add(new Pair<String, Integer>(parts[0], Integer.parseInt(parts[1])));
}
return ret;
}
return getInt(MASTER_HTTP_PORT, DEFAULT_HTTP_PORT);
}
return getInt(REPORT_SERVER_PORT, DEFAULT_REPORT_SERVER_PORT);
}
String[] validRPCProtocols = { RPC_TYPE_AVRO, RPC_TYPE_THRIFT };
String entered = get(REPORT_SERVER_RPC_TYPE, RPC_TYPE_THRIFT).toUpperCase();
for (String prot : validRPCProtocols) {
if (entered.equals(prot)) {
return prot;
}
}
LOG.warn("flume.report.server.rpc.type incorrectly defined, "
+ "should be either \"THRIFT\" or \"AVRO\". "
+ "Defaulting to \"THRIFT\"");
return RPC_TYPE_THRIFT;
}
String port = get(MASTER_HEARTBEAT_PORT, null);
if (port != null) {
return Integer.parseInt(port);
}
String servers = get(MASTER_HEARTBEAT_SERVERS, null);
if (servers == null) {
return DEFAULT_HEARTBEAT_PORT;
}
List<String> serverList = Arrays.asList(servers.split(","));
int serverid = getMasterServerId();
Preconditions.checkState(serverid < serverList.size(),
"Serverid is out of range: " + serverid);
String[] server = serverList.get(serverid).split(":");
Preconditions.checkState(server.length == 2, "Server spec "
+ serverList.get(serverid) + " is ill-formed");
return Integer.parseInt(server[1].trim());
}
String[] validRPCProtocols = { RPC_TYPE_AVRO, RPC_TYPE_THRIFT };
String entered = get(MASTER_HEARBEAT_RPC, RPC_TYPE_THRIFT).toUpperCase();
for (String prot : validRPCProtocols) {
if (entered.equals(prot)) {
return prot;
}
}
return RPC_TYPE_THRIFT;
}
return getInt(MASTER_GOSSIP_PERIOD_MS, 1000);
}
String port = get(MASTER_GOSSIP_PORT, null);
if (port != null) {
return Integer.parseInt(port);
}
String servers = get(MASTER_GOSSIP_SERVERS, null);
if (servers == null) {
return DEFAULT_GOSSIP_PORT;
}
List<String> serverList = Arrays.asList(servers.split(","));
int serverid = getMasterServerId();
Preconditions.checkState(serverid < serverList.size(),
"Serverid is out of range: " + serverid);
String[] server = serverList.get(serverid).split(":");
Preconditions.checkState(server.length == 2, "Server spec "
+ serverList.get(serverid) + " is ill-formed");
return Integer.parseInt(server[1].trim());
}
String ret = get(MASTER_GOSSIP_SERVERS, null);
if (ret != null) {
return ret;
}
String[] hosts = getMasterServers().split(",");
List<String> l = Arrays.asList(hosts);
Iterator<String> iter = l.iterator();
StringBuilder builder = new StringBuilder();
while (iter.hasNext()) {
builder.append(iter.next() + ":" + getMasterGossipPort());
if (iter.hasNext()) {
builder.append(',');
}
}
return builder.toString();
}
String hostString = getMasterGossipServers();
if (hostString.equals("")) {
return new ArrayList<Pair<String, Integer>>();
}
String[] hosts = hostString.split(",");
List<Pair<String, Integer>> ret = new ArrayList<Pair<String, Integer>>(
hosts.length);
for (String s : hosts) {
String[] parts = s.split(":");
ret.add(new Pair<String, Integer>(parts[0], Integer.parseInt(parts[1])));
}
return ret;
}
return getLong(CONFIG_HEARTBEAT_PERIOD, 5000);
}
return getInt(MASTER_HEARTBEAT_MAX_MISSED, 10);
}
return getLong(NODE_HEARTBEAT_BACKOFF_LIMIT, 60000);
}
return getBoolean(NODE_HTTP_AUTOFINDPORT, true);
}
return getInt(CONFIG_ADMIN_PORT, DEFAULT_ADMIN_PORT);
}
return getInt(REPORT_SERVER_PORT, DEFAULT_REPORT_SERVER_PORT);
}
return get(MASTER_SAVEFILE, "conf/current.flume");
}
return getBoolean(MASTER_SAVEFILE_AUTOLOAD, false);
}
return get(SECURITY_KERBEROS_PRINCIPAL, "");
}
return get(SECURITY_KERBEROS_KEYTAB, "/etc/flume/conf/krb5.keytab");
}
return getLong(REPORTER_POLLER_PERIOD, 2000);
}
return get(FLURKER_ENCODING, "UTF-8");
}
return get(WEBAPPS_PATH, "webapps");
}
try {
FlumeConfiguration conf = FlumeConfiguration.get();
conf.getMaxRestartsPerMin();
return conf;
} catch (RuntimeException parseEx) {
System.exit(-1);
}
return null;
}
public String () {
return get(TWITTER_USER, "");
}
public String () {
return get(TWITTER_PW, "");
}
public String () {
return get(TWITTER_STREAM_URL,
"http://stream.twitter.com/1/statuses/sample.json");
}
return get(COLLECTOR_OUTPUT_FORMAT, "avrojson");
}
return get(GANGLIA_SERVERS, "239.2.11.71:8649");
}
return getLong(EVENT_MAX_SIZE, 32 * 1024);
}
return get(PLUGIN_CLASSES, "");
}
return get(HIVE_HOST, "localhost");
}
return getInt(HIVE_PORT, 9083);
}
return get(HIVE_USER, "hive");
}
return get(HIVE_PW, "");
}
return getLong(AGENT_HEARTBEAT_BACKOFF, 1000);
}
return HIVE_DEFAULT_MARKER_FOLDER;
}
HIVE_DEFAULT_MARKER_FOLDER = hiveDefaultMarkerFolder;
}
return HIVE_MARKER_FOLDER;
}
HIVE_MARKER_FOLDER = hiveMarkerFolder;
}
return ELASTICSEARCH_DEFAULT_MARKER_FOLDER;
}
ELASTICSEARCH_DEFAULT_MARKER_FOLDER = elasticsearchDefaultMarkerFolder;
}
return get(ELASTICSEARCH_MARKER_FOLDER, ELASTICSEARCH_DEFAULT_MARKER_FOLDER);
}
ELASTICSEARCH_MARKER_FOLDER = elasticsearchMarkerFolder;
}
return getLong(THRIFT_CLOSE_MAX_SLEEP, 10 * 1000);
}
return getInt(SCRIBE_SOURCE_PORT, DEFAULT_SCRIBE_SOURCE_PORT);
}
return get(DEFAULT_FLOW_NAME, "default-flow");
}
Iterator<Entry<String, String>> iter = iterator();
ArrayList<String> keys = new ArrayList<String>();
while (iter.hasNext()) {
Entry<String, String> e = iter.next();
keys.add(e.getKey());
}
Collections.sort(keys);
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
pw.print("<table>");
for (String k : keys) {
String value = get(k);
pw.println("<tr><th>" + k + "</th>");
pw.print("<td>");
pw.print("<div class=\"" + k + "\">");
pw.print(value);
pw.print("</div>");
pw.println("</td>");
pw.println("</tr>");
}
pw.print("</table>");
pw.flush();
return sw.getBuffer().toString();
}
return getLong(NODE_CLOSE_TIMEOUT, 30000);
}
}