package com.cloudera.flume.agent;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudera.flume.agent.durability.WALCompletionNotifier;
import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.flume.conf.FlumeSpecException;
import com.cloudera.flume.conf.FlumeConfigData;
import com.cloudera.flume.handlers.endtoend.AckListener.Empty;
import com.cloudera.util.Clock;
import com.google.common.base.Preconditions;
static final Logger LOG = LoggerFactory.getLogger(LivenessManager.class);
final long BACKOFF_MILLIS;
MasterRPC master;
LogicalNodeManager nodesman;
HeartbeatThread t;
final WALAckManager ackcheck;
final WALCompletionNotifier walman;
@Override
public void end(String group)
throws IOException {
walman.toAcked(group);
}
@Override
public void expired(String group)
throws IOException {
walman.retry(group);
}
};
WALCompletionNotifier walman) {
Preconditions.checkNotNull(nodesman);
Preconditions.checkNotNull(master);
BACKOFF_MILLIS = FlumeConfiguration.get().getHeartbeatBackoff();
this.walman = walman;
this.nodesman = nodesman;
this.master = master;
this.t = new HeartbeatThread();
this.ackcheck = new WALAckManager(master, new RetryAckListener(),
FlumeConfiguration.get().getAgentAckedRetransmit());
}
String physNode = nodesman.getPhysicalNodeName();
List<String> lns = master.getLogicalNodes(physNode);
if (!lns.contains(physNode)) {
lns = new ArrayList<String>(lns);
lns.add(physNode);
}
for (String ln : lns) {
if (nodesman.get(ln) == null) {
try {
nodesman.spawn(ln, "null", "null");
} catch (FlumeSpecException e) {
LOG.error("This should never happen", e);
}
}
}
FlumeNode.getInstance().getChokeManager().updateChokeLimitMap(
master.getChokeMap(physNode));
nodesman.decommissionAllBut(lns);
}
for (LogicalNode nd : nodesman.getNodes()) {
boolean needsCfg = master.heartbeat(nd);
if (needsCfg) {
final FlumeConfigData data = master.getConfig(nd);
if (data == null) {
LOG.debug("Logical Node '" + nd.getName()
+ "' not configured on master");
}
final LogicalNode node = nd;
new Thread("SpawningLogicalNode " + nd.getName()) {
node.checkConfig(data);
}
}.start();
}
}
}
checkLogicalNodes();
checkLogicalNodeConfigs();
ackcheck.checkAcks();
}
volatile boolean done = false;
long backoff = BACKOFF_MILLIS;
long backoffLimit = FlumeConfiguration.get().getNodeHeartbeatBackoffLimit();
long heartbeatPeriod = FlumeConfiguration.get().getConfigHeartbeatPeriod();
CountDownLatch stopped = new CountDownLatch(1);
super("Heartbeat");
}
try {
while (!done) {
try {
heartbeatChecks();
backoff = BACKOFF_MILLIS;
Clock.sleep(heartbeatPeriod);
} catch (Exception e) {
backoff *= 2;
backoff = backoff > backoffLimit ? backoffLimit : backoff;
LOG.warn("Connection to master(s) failed, " + e.getMessage()
+ ". Backing off for " + backoff + " ms ");
LOG.debug("Current master is " + master.toString(), e);
try {
master.close();
} catch (IOException e1) {
LOG.error("Failed when attempting to close master", e1);
}
Clock.sleep(backoff);
}
}
} catch (InterruptedException e) {
LOG.error("Heartbeat interrupted, this is not expected!", e);
}
stopped.countDown();
}
};
t.start();
}
CountDownLatch stopped = t.stopped;
t.done = true;
try {
stopped.await();
} catch (InterruptedException e) {
LOG.error("Problem waiting for livenessManager to stop", e);
}
}
return ackcheck;
}
}