package com.cloudera.flume.master.logical;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import org.antlr.runtime.RecognitionException;
import org.antlr.runtime.tree.CommonTree;
import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.flume.conf.FlumePatterns;
import com.cloudera.flume.conf.FlumeConfigData;
import com.cloudera.flume.master.ConfigurationManager;
import com.cloudera.flume.master.StatusManager;
import com.cloudera.flume.master.StatusManager.NodeStatus;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
final ConfigurationManager cfgMan;
final StatusManager statMan;
this.cfgMan = cfg;
this.statMan = stat;
}
};
String host;
int port;
this.host = host;
this.port = port;
}
@Override
return "rpcSink(\"" + host + "\"," + port + ")";
}
@Override
return "rpcSource(" + port + ")";
}
@Override
return host + ":" + port;
}
}
Map<String, PhysicalNodeInfo> nameMap = new HashMap<String, PhysicalNodeInfo>();
Multimap<String, Integer> portMaps = HashMultimap.<String, Integer> create();
synchronized void updateNode(String ln)
throws RecognitionException {
FlumeConfigData fcd = cfgMan.getConfig(ln);
if (fcd == null) {
return;
}
NodeStatus stat = statMan.getStatus(ln);
if (stat == null) {
nameMap.put(ln, null);
return;
}
PhysicalNodeInfo pn = nameMap.get(ln);
if (pn != null) {
return;
}
String host = stat.host;
String src = fcd.getSourceConfig();
CommonTree lsrc = FlumePatterns.findSource(src, "logicalSource");
if (lsrc == null) {
return;
}
int port = FlumeConfiguration.get().getCollectorPort();
Collection<Integer> ports = portMaps.get(host);
while (ports.contains(port)) {
port++;
}
pn = new RpcPhysicalNode(host, port);
portMaps.put(host, port);
nameMap.put(ln, pn);
}
synchronized void update()
throws RecognitionException {
Map<String, FlumeConfigData> cfgs = cfgMan.getAllConfigs();
cfgs = new TreeMap<String, FlumeConfigData>(cfgs);
for (String ln : cfgs.keySet()) {
updateNode(ln);
}
}
return nameMap.get(logicalNode);
}
PhysicalNodeInfo pn) {
nameMap.put(logicalNode, pn);
}
}