package com.cloudera.flume.master;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudera.flume.master.commands.CreateLogicalNodeForm;
import com.cloudera.flume.master.commands.DecommissionLogicalNodeForm;
import com.cloudera.flume.master.commands.RefreshAllCommand;
import com.cloudera.flume.master.commands.RefreshCommand;
import com.cloudera.flume.master.commands.SetChokeLimitForm;
import com.cloudera.flume.master.commands.UnconfigCommand;
import com.cloudera.flume.master.commands.UnmapLogicalNodeForm;
import com.cloudera.flume.master.commands.UpdateAllCommand;
import com.cloudera.flume.reporter.ReportEvent;
import com.cloudera.flume.reporter.Reportable;
import com.google.common.base.Preconditions;
static final Logger LOG = LoggerFactory.getLogger(CommandManager.class);
final LinkedBlockingQueue<CommandStatus> queue = new LinkedBlockingQueue<CommandStatus>();
final Map<String, Execable> cmds = new HashMap<String, Execable>();
final SortedMap<Long, CommandStatus> statuses = new TreeMap<Long, CommandStatus>();
final AtomicLong curCommandId = new AtomicLong();
static Execable noopExec = new Execable() {
public void exec(String[] args)
throws MasterExecException {
if (args.length == 1) {
long delay = Long.parseLong(args[0]);
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
LOG.debug("Delay noop interrupted", e);
throw new MasterExecException("Delay Noop Interrupted!", e);
}
}
}
};
ExecThread execThread;
final static Object[][] cmdArrays = { { "noop", noopExec },
{ "config", ConfigCommand.buildExecable() },
{ "multiconfig", MultiConfigCommand.buildExecable() },
{ "unconfig", UnconfigCommand.buildExecable() },
{ "refresh", RefreshCommand.buildExecable() },
{ "refreshAll", RefreshAllCommand.buildExecable() },
{ "updateAll", UpdateAllCommand.buildExecable() },
{ "save", SaveConfigCommand.buildExecable() },
{ "load", LoadConfigCommand.buildExecable() },
{ "spawn", CreateLogicalNodeForm.buildExecable() },
{ "map", CreateLogicalNodeForm.buildExecable() },
{ "decommission", DecommissionLogicalNodeForm.buildExecable() },
{ "unmap", UnmapLogicalNodeForm.buildExecable() },
{ "unmapAll", UnmapLogicalNodeForm.buildUnmapAllExecable() },
{ "setChokeLimit", SetChokeLimitForm.buildExecable() }
};
this(cmdArrays);
}
for (Object[] c : cmdArray) {
cmds.put((String) c[0], (Execable) c[1]);
}
}
Preconditions.checkNotNull(cmd, "Command must not be null");
Preconditions.checkNotNull(ex, "Execable must not be null");
if (cmds.containsKey(cmd)) {
LOG.warn("Command '" + cmd
+ "' previously existed and is being overwritten");
}
cmds.put(cmd, ex);
}
synchronized public void start() {
if (execThread != null) {
LOG.error("Command Manager already started, not spawning another");
return;
}
execThread = new ExecThread();
execThread.start();
}
synchronized public void stop() {
execThread.shutdown();
execThread = null;
}
public long submit(Command cmd) {
Preconditions.checkNotNull(cmd, "No null commands allowed, use \"noop\"");
LOG.info("Submitting command: " + cmd);
long cmdId = curCommandId.getAndIncrement();
CommandStatus cmdStat = CommandStatus.createCommandStatus(cmdId, cmd);
synchronized (this) {
statuses.put(cmdId, cmdStat);
queue.add(cmdStat);
}
return cmdId;
}
CommandStatus stat = null;
synchronized (this) {
stat = statuses.get(cmdid);
}
return stat != null && stat.isSuccess();
}
CommandStatus stat = null;
synchronized (this) {
stat = statuses.get(cmdid);
}
return stat != null && stat.isFailure();
}
try {
if (cmd == null) {
return;
}
cmd.toExecing("");
exec(cmd.cmd);
cmd.toSucceeded("");
} catch (MasterExecException e) {
LOG.warn("During " + cmd + " : " + e.getMessage());
cmd.toFailed(e.getMessage());
} catch (Exception e) {
LOG.error("Unexpected exception during " + cmd + " : " + e.getMessage(),
e);
cmd.toFailed(e.getMessage());
}
}
volatile boolean done = false;
CountDownLatch stopped = new CountDownLatch(1);
super("exec-thread");
}
@Override
try {
while (!done) {
CommandStatus cmd = queue.poll(1000, TimeUnit.MILLISECONDS);
handleCommand(cmd);
}
} catch (InterruptedException e) {
LOG.error("Master exec thread interrupted!", e);
} finally {
stopped.countDown();
}
}
done = true;
try {
stopped.await();
} catch (InterruptedException e) {
LOG.error("Shutdown of command manager was interrupted");
}
}
}
void exec(Command cmd)
throws MasterExecException {
Execable ex = cmds.get(cmd.getCommand());
if (ex == null) {
throw new MasterExecException("Don't know how to handle Command: '" + cmd
+ "'", null);
}
LOG.info("Executing command: " + cmd);
try {
ex.exec(cmd.getArgs());
} catch (MasterExecException e) {
throw e;
} catch (IOException e) {
throw new MasterExecException(e.getMessage(), e);
}
}
@Override
return "command manager";
}
return statuses.get(id);
}
@Override
StringBuilder html = new StringBuilder();
html.append("<div class=\"CommandManager\">");
html
.append("<h2>Command history </h2>\n<table border=\"1\"><tr><th>id</th><th>State</th><th>command line</th><th>message</th></tr>\n");
List<CommandStatus> values = null;
synchronized (this) {
values = new ArrayList<CommandStatus>(statuses.values());
}
for (CommandStatus stat : values) {
html.append(" <tr>");
html.append(" <td>");
html.append(stat.getCmdID());
html.append("</td>\n");
html.append(" <td>");
html.append(stat.getState());
html.append("</td>\n");
html.append(" <td>");
html.append(stat.getCommand());
html.append("</td>\n");
html.append(" <td>");
html.append(stat.getMessage());
html.append("</td>\n");
html.append(" </tr>\n");
}
html.append("</table>\n\n");
html.append("</div>");
return ReportEvent.createLegacyHtmlReport("", html.toString());
}
}