package com.cloudera.flume.master;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudera.flume.conf.FlumeConfigData;
import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.flume.conf.thrift.CommandStatusThrift;
import com.cloudera.flume.conf.thrift.FlumeMasterAdminServer;
import com.cloudera.flume.conf.thrift.FlumeMasterCommandThrift;
import com.cloudera.flume.conf.thrift.FlumeNodeStatusThrift;
import com.cloudera.flume.conf.thrift.ThriftFlumeConfigData;
import com.cloudera.flume.conf.thrift.FlumeMasterAdminServer.Iface;
import com.cloudera.flume.util.ThriftServer;
import com.google.common.base.Preconditions;
RPCServer {
StatusManager.NodeStatus status) {
long time = System.currentTimeMillis();
FlumeNodeStatusThrift out = new FlumeNodeStatusThrift();
out.state = MasterClientServerThrift.stateToThrift(status.state);
out.version = status.version;
out.lastseen = status.lastseen;
out.lastSeenDeltaMillis = time - status.lastseen;
out.host = status.host;
out.physicalNode = status.physicalNode;
return out;
}
FlumeNodeStatusThrift status) {
return new StatusManager.NodeStatus(MasterClientServerThrift
.stateFromThrift(status.state), status.version, status.lastseen,
status.host, status.physicalNode);
}
FlumeMasterCommandThrift out = new FlumeMasterCommandThrift();
out.command = cmd.command;
out.arguments = new LinkedList<String>();
for (String s : cmd.args) {
out.arguments.add(s);
}
return out;
}
return new Command(cmd.command, cmd.arguments.toArray(new String[0]));
}
static final Logger LOG = LoggerFactory.getLogger(MasterAdminServerThrift.class);
final int port;
final protected MasterAdminServer delegate;
Preconditions.checkArgument(delegate != null,
"MasterAdminServer is null in MasterAdminServerThrift!");
this.delegate = delegate;
this.port = FlumeConfiguration.get().getConfigAdminPort();
}
@Override
public boolean isFailure(
long cmdid)
throws TException {
return delegate.isFailure(cmdid);
}
@Override
public boolean isSuccess(
long cmdid)
throws TException {
return delegate.isSuccess(cmdid);
}
@Override
public long submit(FlumeMasterCommandThrift command)
throws TException {
return delegate.submit(commandFromThrift(command));
}
protected static FlumeNodeStatusThrift
toThrift(
StatusManager.NodeStatus status) {
long time = System.currentTimeMillis();
return new FlumeNodeStatusThrift(MasterClientServerThrift
.stateToThrift(status.state), status.version, status.lastseen, time
- status.lastseen, status.host, status.physicalNode);
}
@Override
public Map<String, FlumeNodeStatusThrift>
getNodeStatuses()
throws TException {
Map<String, StatusManager.NodeStatus> statuses = delegate.getNodeStatuses();
Map<String, FlumeNodeStatusThrift> ret = new HashMap<String, FlumeNodeStatusThrift>();
for (Entry<String, StatusManager.NodeStatus> e : statuses.entrySet()) {
ret.put(e.getKey(), toThrift(e.getValue()));
}
return ret;
}
@Override
public Map<String, List<String>>
getMappings(String physicalNode)
throws TException {
return delegate.getMappings(physicalNode);
}
@Override
public Map<String, ThriftFlumeConfigData>
getConfigs()
throws TException {
Map<String, ThriftFlumeConfigData> out = new HashMap<String, ThriftFlumeConfigData>();
Map<String, FlumeConfigData> orig = delegate.getConfigs();
for (String key : orig.keySet()) {
out.put(key, MasterClientServerThrift.configToThrift(orig.get(key)));
}
return out;
}
@Override
public boolean hasCmdId(
long cmdid)
throws TException {
return delegate.hasCmdId(cmdid);
}
public void serve()
throws IOException {
LOG.info(String.format(
"Starting blocking thread pool server for admin server on port %d...",
port));
try {
this.start(new FlumeMasterAdminServer.Processor(this), port,
"MasterAdminServer");
} catch (TTransportException e) {
throw new IOException(e);
}
}
@Override
public CommandStatusThrift
getCmdStatus(
long cmdid)
throws TException {
CommandStatus cmd = delegate.getCommandStatus(cmdid);
Command c = cmd.getCommand();
FlumeMasterCommandThrift fmct = new FlumeMasterCommandThrift(
c.getCommand(), Arrays.asList(c.getArgs()));
CommandStatusThrift cst = new CommandStatusThrift(cmd.getCmdID(), cmd
.getState().toString(), cmd.getMessage(), fmct);
return cst;
}
}