package com.cloudera.flume.agent;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.flume.conf.FlumeConfigData;
import com.cloudera.flume.handlers.endtoend.AckListener;
import com.cloudera.util.FixedPeriodBackoff;
import com.cloudera.util.Pair;
import com.cloudera.util.ResultRetryable;
import com.cloudera.util.RetryHarness;
import com.cloudera.flume.reporter.ReportEvent;
static final Logger LOG = LoggerFactory.getLogger(MultiMasterRPC.class);
final protected int MAX_RETRIES;
final protected int RETRY_PAUSE_MS;
final String rpcProtocol;
protected MasterRPC masterRPC;
protected final List<Pair<String, Integer>> masterAddresses;
protected int nextMaster = 0;
protected String curHost;
protected int curPort = 0;
int maxRetries, int retryPauseMS) {
masterAddresses = conf.getMasterHeartbeatServersList();
if (randomize) {
Collections.shuffle(masterAddresses);
}
Pair<String, Integer> masterAddr = conf.getMasterHeartbeatServersList()
.get(0);
this.MAX_RETRIES = maxRetries;
this.RETRY_PAUSE_MS = retryPauseMS;
this.rpcProtocol = conf.getMasterHeartbeatRPC();
}
this(conf, randomize, conf.getAgentMultimasterMaxRetries(), conf
.getAgentMultimasterRetryBackoff());
}
return curHost;
}
return curPort;
}
protected synchronized MasterRPC
findServer()
throws IOException {
List<String> failedMasters = new ArrayList<String>();
for (int i = 0; i < masterAddresses.size(); ++i) {
Pair<String, Integer> host = masterAddresses.get(nextMaster);
try {
nextMaster = (nextMaster + 1) % masterAddresses.size();
close();
MasterRPC out = null;
if (FlumeConfiguration.RPC_TYPE_THRIFT.equals(rpcProtocol)) {
out = new ThriftMasterRPC(host.getLeft(), host.getRight());
} else if (FlumeConfiguration.RPC_TYPE_AVRO.equals(rpcProtocol)) {
out = new AvroMasterRPC(host.getLeft(), host.getRight());
} else {
LOG.error("No valid RPC protocl in configurations.");
continue;
}
curHost = host.getLeft();
curPort = host.getRight();
this.masterRPC = out;
return out;
} catch (Exception e) {
failedMasters.add(host.getLeft() + ":" + host.getRight());
LOG.debug("Couldn't connect to master at " + host.getLeft() + ":"
+ host.getRight() + " because: " + e.getMessage());
}
}
throw new IOException("Could not connect to any master nodes (tried "
+ masterAddresses.size() + ": " + failedMasters + ")");
}
throws TTransportException, IOException {
return (masterRPC != null) ? masterRPC : findServer();
}
public synchronized void close() {
if (this.masterRPC != null) {
try {
this.masterRPC.close();
} catch (IOException e) {
LOG.warn("Failed to close connection with RPC master" + curHost);
}
}
curHost = null;
curPort = 0;
}
abstract public T
doRPC()
throws IOException;
synchronized (MultiMasterRPC.this) {
try {
result = doRPC();
return true;
} catch (Exception e) {
try {
LOG.info("Connection to master lost due to " + e.getMessage()
+ ", looking for another...");
LOG.debug(e.getMessage(), e);
findServer();
} catch (IOException e1) {
LOG.error("Unable to find a master server", e1);
}
}
return false;
}
}
}
public FlumeConfigData
getConfig(
final LogicalNode n)
throws IOException {
RPCRetryable<FlumeConfigData> retry = new RPCRetryable<FlumeConfigData>() {
public FlumeConfigData
doRPC()
throws IOException {
return masterRPC.getConfig(n);
}
};
RetryHarness harness = new RetryHarness(retry, new FixedPeriodBackoff(
RETRY_PAUSE_MS, MAX_RETRIES), true);
try {
harness.attempt();
return retry.getResult();
} catch (Exception e) {
throw new IOException(e);
}
}
public boolean checkAck(
final String ackid)
throws IOException {
RPCRetryable<Boolean> retry = new RPCRetryable<Boolean>() {
public Boolean
doRPC()
throws IOException {
return masterRPC.checkAck(ackid);
}
};
RetryHarness harness = new RetryHarness(retry, new FixedPeriodBackoff(
RETRY_PAUSE_MS, MAX_RETRIES), true);
try {
harness.attempt();
return retry.getResult();
} catch (Exception e) {
throw new IOException(e);
}
}
throws IOException {
RPCRetryable<List<String>> retry = new RPCRetryable<List<String>>() {
public List<String>
doRPC()
throws IOException {
return masterRPC.getLogicalNodes(physicalNode);
}
};
RetryHarness harness = new RetryHarness(retry, new FixedPeriodBackoff(
RETRY_PAUSE_MS, MAX_RETRIES), true);
try {
harness.attempt();
return retry.getResult();
} catch (Exception e) {
throw new IOException(e);
}
}
public Map<String, Integer>
getChokeMap(
final String physicalNode)
throws IOException {
RPCRetryable<Map<String, Integer>> retry = new RPCRetryable<Map<String, Integer>>() {
public Map<String, Integer>
doRPC()
throws IOException {
return masterRPC.getChokeMap(physicalNode);
}
};
RetryHarness harness = new RetryHarness(retry, new FixedPeriodBackoff(
RETRY_PAUSE_MS, MAX_RETRIES), true);
try {
harness.attempt();
return retry.getResult();
} catch (Exception e) {
throw new IOException(e);
}
}
public boolean heartbeat(
final LogicalNode n)
throws IOException {
RPCRetryable<Boolean> retry = new RPCRetryable<Boolean>() {
public Boolean
doRPC()
throws IOException {
return masterRPC.heartbeat(n);
}
};
RetryHarness harness = new RetryHarness(retry, new FixedPeriodBackoff(
RETRY_PAUSE_MS, MAX_RETRIES), true);
try {
harness.attempt();
return retry.getResult();
} catch (Exception e) {
throw new IOException(e);
}
}
public void acknowledge(
final String group)
throws IOException {
RPCRetryable<Void> retry = new RPCRetryable<Void>() {
public Void
doRPC()
throws IOException {
masterRPC.acknowledge(group);
return result;
}
};
RetryHarness harness = new RetryHarness(retry, new FixedPeriodBackoff(
RETRY_PAUSE_MS, MAX_RETRIES), true);
try {
harness.attempt();
} catch (Exception e) {
throw new IOException(e);
}
}
public void putReports(
final Map<String, ReportEvent> reports)
throws IOException {
RPCRetryable<Void> retry = new RPCRetryable<Void>() {
public Void
doRPC()
throws IOException {
masterRPC.putReports(reports);
return result;
}
};
RetryHarness harness = new RetryHarness(retry, new FixedPeriodBackoff(
RETRY_PAUSE_MS, MAX_RETRIES), true);
try {
harness.attempt();
} catch (Exception e) {
throw new IOException(e);
}
}
return masterRPC.createAckListener();
}
}