package com.cloudera.flume.master;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import com.cloudera.flume.conf.FlumeConfigData;
import com.cloudera.util.Clock;
import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
final Map<String, FlumeConfigData> cfgs = new HashMap<String, FlumeConfigData>();
@Override
public FlumeConfigData
getConfig(String host) {
if (cfgs.containsKey(host)) {
return cfgs.get(host);
}
return null;
}
@Override
public void setConfig(String host, String flowid, String source, String sink)
throws IOException {
Preconditions.checkArgument(host != null,
"Attempted to set config but missing host name!");
Preconditions.checkArgument(flowid != null, "Attempted to set config "
+ host + " but missing flowid!");
Preconditions.checkArgument(source != null, "Attempted to set config "
+ host + " but missing source!");
Preconditions.checkArgument(sink != null, "Attempted to set config " + host
+ " but missing sink");
long time = Clock.unixTime();
cfgs.put(host, new FlumeConfigData(time, source, sink, time, time, flowid));
}
@Override
public Map<String, FlumeConfigData>
getConfigs() {
return Collections.unmodifiableMap(cfgs);
}
final ListMultimap<String, String> nodeMap = ArrayListMultimap
.<String, String> create();
private final HashMap<String, HashMap<String, Integer>> chokeMap = new HashMap<String, HashMap<String, Integer>>();
public void addChokeLimit(String physNode, String chokeID,
int limit) {
if (!chokeMap.containsKey(physNode)) {
chokeMap.put(physNode, new HashMap<String, Integer>());
}
chokeMap.get(physNode).put(chokeID, limit);
}
if (nodeMap.containsEntry(physNode, logicNode)) {
return;
}
nodeMap.put(physNode, logicNode);
}
List<String> values;
values = nodeMap.get(physNode);
if (values == null) {
return Collections.emptyList();
}
return Collections.unmodifiableList(values);
}
@Override
return Multimaps.unmodifiableListMultimap(nodeMap);
}
@Override
throws IOException {
for (Entry<String, FlumeConfigData> e : configs.entrySet()) {
FlumeConfigData f = e.getValue();
setConfig(e.getKey(), f.getFlowID(), f.getSourceConfig(), f
.getSinkConfig());
}
}
@Override
cfgs.remove(logicNode);
}
@Override
nodeMap.remove(physNode, logicNode);
}
@Override
public void init()
throws IOException, InterruptedException {
}
@Override
public void shutdown()
throws IOException {
}
@Override
ListMultimap<String, String> clone = ArrayListMultimap.create(nodeMap);
for (Entry<String, String> e : clone.entries()) {
if (e.getKey().equals(e.getValue())) {
continue;
}
unmapLogicalNode(e.getKey(), e.getValue());
}
}
@Override
public Map<String, Integer>
getChokeMap(String physNode) {
if (chokeMap.get(physNode) == null) {
chokeMap.put(physNode, new HashMap<String, Integer>());
}
return chokeMap.get(physNode);
}
}