package com.cloudera.flume.agent;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.cloudera.flume.conf.FlumeConfigData;
import com.cloudera.flume.handlers.endtoend.AckListener;
import com.cloudera.flume.handlers.endtoend.CollectorAckListener;
import com.cloudera.flume.master.FlumeMaster;
import com.cloudera.flume.reporter.ReportEvent;
import com.cloudera.util.NetUtils;
FlumeMaster master;
String physicalNode;
this.master = master;
this.physicalNode = physicalName;
}
this(master, NetUtils.localhost());
}
@Override
public void acknowledge(String group)
throws IOException {
master.getAckMan().acknowledge(group);
}
@Override
public boolean checkAck(String ackid)
throws IOException {
return master.getAckMan().check(ackid);
}
public void open()
throws IOException {
}
@Override
}
@Override
return new CollectorAckListener(this);
}
@Override
public FlumeConfigData
getConfig(LogicalNode n)
throws IOException {
return master.getSpecMan().getConfig(n.getName());
}
@Override
return new ArrayList<String>(master.getSpecMan().getLogicalNode(physNode));
}
@Override
public boolean heartbeat(LogicalNode n)
throws IOException {
String logicalNode = n.getName();
long version = n.getConfigVersion();
List<String> lns = master.getSpecMan().getLogicalNode(physicalNode);
if (lns == null || !lns.contains(logicalNode)) {
if (physicalNode.equals(logicalNode)) {
master.getSpecMan().addLogicalNode(physicalNode, logicalNode);
}
}
boolean configChanged = master.getStatMan().updateHeartbeatStatus(
NetUtils.localhost(), n.getStatus().physicalNode, logicalNode,
n.getStatus().state, version);
FlumeConfigData cfg = master.getSpecMan().getConfig(logicalNode);
if (cfg == null || version < cfg.getTimestamp()) {
configChanged = true;
}
return configChanged;
}
@Override
public void putReports(Map<String, ReportEvent> reports)
throws IOException {
}
@Override
public HashMap<String, Integer>
getChokeMap(String physNode)
throws IOException {
return new HashMap<String, Integer>(master.getSpecMan().getChokeMap(
physNode));
}
}