package com.cloudera.flume.master;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.FlumeBuilder;
import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.flume.conf.FlumeSpecException;
import com.cloudera.flume.conf.FlumeSpecGen;
import com.cloudera.flume.conf.FlumeConfigData;
import com.cloudera.flume.reporter.ReportEvent;
import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Multimap;
static final Logger LOG = LoggerFactory.getLogger(ConfigManager.class);
ConfigStore cfgStore;
Map<String, String> logicalToPhysical = new HashMap<String, String>();
cfgStore = store;
}
cfgStore = new MemoryBackedConfigStore();
}
synchronized public FlumeConfigData
getConfig(String host) {
return cfgStore.getConfig(host);
}
@Override
return "configuration manager";
}
synchronized public void setConfig(String logicalNode, String flowid,
String source, String sink) throws IOException {
try {
FlumeBuilder.buildSink(new Context(), sink);
FlumeBuilder.buildSource(source);
} catch (Exception e) {
throw new IllegalArgumentException(
"Attempted to write an invalid sink/source: " + e.getMessage(), e);
}
cfgStore.setConfig(logicalNode, flowid, source, sink);
}
synchronized public void setBulkConfig(Map<String, FlumeConfigData> configs)
throws IOException {
cfgStore.bulkSetConfig(configs);
}
synchronized public Map<String, FlumeConfigData>
getAllConfigs() {
return new HashMap<String, FlumeConfigData>(cfgStore.getConfigs());
}
return new HashMap<String, FlumeConfigData>(cfgStore.getConfigs());
}
FlumeConfigData fcd) {
html.append("\n<tr>");
html.append("<td>" + name + "</td>");
FlumeConfigData cfg = fcd;
html.append("<td>" + new Date(cfg.timestamp) + "</td>");
html.append("<td>" + cfg.sourceConfig + "</td>");
html.append("<td>" + cfg.sinkConfig + "</td>");
html.append("</tr>\n");
}
String physical, Collection<String> logicals) {
html.append("\n<tr>");
html.append("<td>" + physical + "</td>");
Collection<String> lns = logicals;
html.append("<td>" + StringUtils.join(lns, ',') + "</td>");
html.append("</tr>\n");
}
@Override
synchronized public ReportEvent
getReport() {
StringBuilder html = new StringBuilder();
html.append("<h2>Node configuration</h2>\n<table border=\"1\"><tr>"
+ "<th>Node</th><th>Version</th><th>Source</th><th>Sink</th></tr>");
Map<String, FlumeConfigData> cfgs = cfgStore.getConfigs();
synchronized (cfgs) {
for (Entry<String, FlumeConfigData> e : cfgs.entrySet()) {
appendHtmlFlumeConfigData(html, e.getKey(), e.getValue());
}
}
html.append("</table>\n\n");
html.append("<h2>Physical/Logical Node mapping</h2>\n<table border=\"1\">"
+ "<tr><th>physical node</th><th>logical node</th></tr>");
Multimap<String, String> nodes = cfgStore.getLogicalNodeMap();
synchronized (nodes) {
for (Entry<String, Collection<String>> e : nodes.asMap().entrySet()) {
appendHtmlPhysicalLogicalMapping(html, e.getKey(), e.getValue());
}
}
html.append("</table>\n\n");
return ReportEvent.createLegacyHtmlReport("configs", html.toString());
}
synchronized public void loadConfigFile(String from)
throws IOException {
File f = new File(from);
LOG.info("Loading configuration from: " + f.getAbsolutePath());
FileInputStream r = null;
try {
r = new FileInputStream(f);
long len = f.length();
Preconditions.checkArgument(len <= Integer.MAX_VALUE);
byte[] buf = new byte[(int) len];
r.read(buf);
String fullspec = new String(buf);
List<FlumeNodeSpec> cfgs = FlumeSpecGen.generate(fullspec);
for (FlumeNodeSpec spec : cfgs) {
setConfig(spec.node, FlumeConfiguration.get().getDefaultFlowName(),
spec.src, spec.sink);
}
} catch (FlumeSpecException e) {
LOG.debug("Invalid Flume specification", e);
throw new IOException(e.getMessage());
} finally {
if (r != null) {
r.close();
}
}
}
synchronized public void saveConfigFile(String s)
throws IOException {
File targ = new File(s);
LOG.info("Saving configuration to: " + targ.getAbsolutePath());
File targ2 = new File(s + "~");
File tmp = File.createTempFile("current-", ".flume", targ.getParentFile());
tmp.deleteOnExit();
PrintWriter out = new PrintWriter(new FileWriter(tmp));
Map<String, FlumeConfigData> cfgs = cfgStore.getConfigs();
for (Entry<String, FlumeConfigData> e : cfgs.entrySet()) {
String name = e.getKey();
String snk = e.getValue().getSinkConfig();
String src = e.getValue().getSourceConfig();
out.println(FlumeBuilder.toLine(name, src, snk));
}
out.close();
if (!targ2.delete()) {
LOG.warn("Unable to delete config backup file: " + targ2);
}
if (targ.exists()) {
if (!targ.renameTo(targ2)) {
LOG.warn("Unable to make backup of config file: " + targ + " to "
+ targ2);
}
}
if (!tmp.renameTo(targ)) {
throw new IOException("Unable to rename " + tmp + " to " + targ);
}
}
@Override
return cfgStore.getLogicalNodes(physNode);
}
@Override
synchronized public Map<String, Integer>
getChokeMap(String physNode) {
return cfgStore.getChokeMap(physNode);
}
@Override
synchronized public boolean addLogicalNode(String physNode, String logicNode) {
if (!logicalToPhysical.containsKey(logicNode)) {
cfgStore.addLogicalNode(physNode, logicNode);
logicalToPhysical.put(logicNode, physNode);
return true;
} else {
LOG.warn("Logical node " + logicNode
+ " is already assigned to physical node "
+ logicalToPhysical.get(logicNode) + ". Unmap it first.");
return false;
}
}
@Override
if (physNode.equals(logicNode)) {
LOG.warn("Not allowed to unmap primary logical node from physical node");
return;
}
cfgStore.unmapLogicalNode(physNode, logicNode);
logicalToPhysical.remove(logicNode);
}
@Override
return logicalToPhysical.get(logicalNode);
}
@Override
synchronized public void refresh(String logicalNode)
throws IOException {
FlumeConfigData fcd = cfgStore.getConfig(logicalNode);
if (fcd == null) {
throw new IOException("Unable to refresh logicalNode " + logicalNode
+ ". It doesn't exist!");
}
cfgStore.setConfig(logicalNode, fcd.getFlowID(), fcd.getSourceConfig(), fcd
.getSinkConfig());
}
@Override
synchronized public void refreshAll()
throws IOException {
Map<String, FlumeConfigData> cfgs = new HashMap<String, FlumeConfigData>();
for (Entry<String, FlumeConfigData> ent : getAllConfigs().entrySet()) {
cfgs.put(ent.getKey(), ent.getValue());
}
setBulkConfig(cfgs);
}
@Override
throws IOException {
cfgStore.removeLogicalNode(logicNode);
String physical = getPhysicalNode(logicNode);
if (physical != null) {
cfgStore.unmapLogicalNode(physical, logicNode);
logicalToPhysical.remove(logicNode);
}
}
@Override
synchronized public void start()
throws IOException {
Preconditions.checkNotNull(cfgStore, "Trying to stop null cfgStore");
try {
try {
cfgStore.init();
reloadLogicalToPhysical();
} catch (InterruptedException e) {
LOG.warn("ConfigStore was interrupted on startup, this may be ok", e);
}
} catch (IOException e) {
LOG.error("ConfigStore init threw IOException", e);
throw e;
}
}
@Override
synchronized public void stop()
throws IOException {
if (cfgStore == null) {
LOG.warn("Trying to shutdown null cfgStore");
return;
}
cfgStore.shutdown();
}
@Override
cfgStore.unmapAllLogicalNodes();
logicalToPhysical.clear();
refreshAll();
}
@Override
}
@Override
return getAllConfigs().toString();
}
@Override
ListMultimap<String, String> map = ArrayListMultimap
.<String, String> create(cfgStore.getLogicalNodeMap());
return map;
}
Multimap<String, String> p2n = getLogicalNodeMap();
logicalToPhysical.clear();
for (Entry<String, String> e : p2n.entries()) {
if (logicalToPhysical.containsKey(e.getValue())) {
LOG.warn("logical node mapped to two physical nodes!");
}
logicalToPhysical.put(e.getValue(), e.getKey());
}
}
@Override
synchronized public void addChokeLimit(String physNode, String chokeID,
int limit) {
cfgStore.addChokeLimit(physNode, chokeID, limit);
}
}