package com.cloudera.flume.watchdog;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.Iterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.util.InputStreamPipe;
static final Logger LOG = LoggerFactory.getLogger(Watchdog.class);
String[] args;
Selector selector;
Runtime rt;
Process proc;
InputStreamPipe outPipe, errPipe, watchdogPipe;
PrintStream procIn;
SelectableChannel stdout, stderr, stdin;
boolean interactive;
public Watchdog(String[] args,
boolean interactive) {
this.args = args.clone();
this.interactive = interactive;
}
this(args, false);
}
void startup()
throws IOException {
selector = Selector.open();
rt = Runtime.getRuntime();
proc = rt.exec(args);
outPipe = new InputStreamPipe(proc.getInputStream());
errPipe = new InputStreamPipe(proc.getErrorStream());
if (interactive) {
watchdogPipe = new InputStreamPipe(System.in);
procIn = new PrintStream(proc.getOutputStream());
}
stdout = outPipe.getChannel();
stderr = errPipe.getChannel();
if (interactive) {
stdin = watchdogPipe.getChannel();
}
stdout.register(selector, SelectionKey.OP_READ);
stderr.register(selector, SelectionKey.OP_READ);
if (interactive) {
stdin.register(selector, SelectionKey.OP_READ);
}
outPipe.start();
errPipe.start();
if (interactive) {
watchdogPipe.start();
}
rt.addShutdownHook(new Thread() {
synchronized (Watchdog.this) {
LOG.info("Watchdog shutdown hook");
if (proc != null) {
proc.destroy();
}
}
}
});
}
if (proc == null)
return;
if (interactive) {
watchdogPipe.shutdown();
watchdogPipe = null;
}
outPipe.shutdown();
outPipe = null;
errPipe.shutdown();
errPipe = null;
if (interactive) {
procIn.close();
procIn = null;
}
proc.getOutputStream().close();
proc.getInputStream().close();
proc.getErrorStream().close();
proc.destroy();
proc = null;
if (interactive) {
stdin.close();
stdin = null;
}
stdout.close();
stdout = null;
stderr.close();
stderr = null;
selector.close();
selector = null;
}
public int launchAgent()
throws IOException, InterruptedException {
startup();
ByteBuffer buffer = ByteBuffer.allocate(32);
while (true) {
selector.select(2000);
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
if (!it.hasNext()) {
continue;
}
SelectionKey key = it.next();
it.remove();
buffer.clear();
ReadableByteChannel channel = (ReadableByteChannel) key.channel();
int count = channel.read(buffer);
if (count < 0) {
channel.close();
break;
}
buffer.flip();
while (buffer.hasRemaining()) {
if (key.channel() == stdout) {
System.out.print((char) buffer.get());
} else if (key.channel() == stderr) {
System.err.print((char) buffer.get());
} else if (key.channel() == stdin) {
procIn.print((char) buffer.get());
procIn.flush();
}
}
}
int retval = proc.waitFor();
shutdown();
LOG.info("Subprocess exited with value " + retval);
return retval;
}
public void run(
int maxTriesPerMin) {
ArrayList<Date> times = new ArrayList<Date>();
while (true) {
Date now = new Date();
if (times.size() > maxTriesPerMin) {
ArrayList<Date> newTimes = new ArrayList<Date>();
for (Date t : times) {
Calendar c = Calendar.getInstance();
c.setTime(t);
c.add(Calendar.MINUTE, 1);
Date t_plus1 = c.getTime();
if (t_plus1.getTime() - now.getTime() > 0) {
newTimes.add(t);
}
}
times = newTimes;
if (newTimes.size() > maxTriesPerMin) {
try {
Calendar c = Calendar.getInstance();
c.setTime(times.get(0));
c.add(Calendar.MINUTE, 1);
Date old_plus1 = c.getTime();
long delta = old_plus1.getTime() - now.getTime();
LOG.warn("too many attempts failed per minute -- waiting for "
+ (delta / 1000) + "s");
Thread.sleep(delta);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Date d = now;
LOG.info("Restarting process @ " + d);
times.add(d);
try {
int ret = launchAgent();
if (ret == 0) {
LOG.info("Subprocess exited cleanly, closing watchdog");
break;
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Deprecated
public static void main(String[] argv) {
if (argv.length == 0) {
System.out.println("need to specify watched command as arguments");
System.exit(-1);
}
String[] args = argv;
FlumeConfiguration conf = FlumeConfiguration.hardExitLoadConfig();
int maxTriesPerMin = conf.getMaxRestartsPerMin();
Watchdog watchdog = new Watchdog(args);
watchdog.run(maxTriesPerMin);
}
}