package com.cloudera.flume.shell;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import org.apache.thrift.transport.TTransportException;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudera.flume.agent.DirectMasterRPC;
import com.cloudera.flume.agent.FlumeNode;
import com.cloudera.flume.conf.FlumeConfigData;
import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.flume.master.ConfigurationManager;
import com.cloudera.flume.master.SetupMasterTestEnv;
import com.cloudera.flume.master.StatusManager.NodeState;
import com.cloudera.flume.master.StatusManager.NodeStatus;
import com.cloudera.flume.reporter.ReportManager;
import com.cloudera.flume.reporter.aggregator.AccumulatorSink;
import com.cloudera.flume.util.FlumeShell;
import com.cloudera.util.Clock;
public static final Logger LOG = LoggerFactory.getLogger(TestFlumeShell.class);
@Test
TTransportException, IOException {
assertEquals(0, flumeMaster.getSpecMan().getAllConfigs().size());
FlumeShell sh = new FlumeShell();
sh
.executeLine("connect localhost:"
+ FlumeConfiguration.DEFAULT_ADMIN_PORT);
sh.executeLine("exec config localhost null null");
Clock.sleep(250);
assertTrue(flumeMaster.getSpecMan().getAllConfigs().size() > 0);
}
@Test
TTransportException, IOException {
FlumeShell sh = new FlumeShell();
sh
.executeLine("connect localhost:"
+ FlumeConfiguration.DEFAULT_ADMIN_PORT);
sh.executeLine("exec setChokeLimit physNode choke 786");
Clock.sleep(250);
assertEquals(786, flumeMaster.getSpecMan().getChokeMap("physNode").get(
"choke").intValue());
}
@Test
TTransportException, IOException {
assertEquals(0, flumeMaster.getSpecMan().getAllConfigs().size());
FlumeShell sh = new FlumeShell();
sh
.executeLine("connect localhost:"
+ FlumeConfiguration.DEFAULT_ADMIN_PORT);
sh.executeLine("exec multiconfig 'localhost:text(\"/var/log/messages\") | "
+ "{delay (250) => console(\"avrojson\") };'");
Clock.sleep(250);
assertTrue(flumeMaster.getSpecMan().getAllConfigs().size() > 0);
}
@Test
FlumeShell sh = new FlumeShell();
long retval;
retval = sh.executeLine("connect localhost:"
+ FlumeConfiguration.DEFAULT_ADMIN_PORT);
assertEquals(0, retval);
retval = sh.executeLine("exec config foo 'null' 'console'");
assertEquals(0, retval);
File saveFile = File.createTempFile("test-flume", "");
saveFile.delete();
saveFile.deleteOnExit();
retval = sh.executeLine("exec save '" + saveFile.getAbsolutePath() + "'");
assertEquals(0, retval);
BufferedReader in = new BufferedReader(new FileReader(saveFile));
assertEquals("foo : null | console;", in.readLine());
assertNull(in.readLine());
in.close();
}
@Test
FlumeShell sh = new FlumeShell();
long retval;
retval = sh.executeLine("connect localhost:"
+ FlumeConfiguration.DEFAULT_ADMIN_PORT);
assertEquals(0, retval);
File saveFile = File.createTempFile("test-flume", "");
saveFile.deleteOnExit();
BufferedWriter out = new BufferedWriter(new FileWriter(saveFile));
out.write("foo : null | console;\n");
out.close();
retval = sh.executeLine("exec load '" + saveFile.getAbsolutePath() + "'");
assertEquals(0, retval);
ConfigurationManager manager = flumeMaster.getSpecMan();
FlumeConfigData data = manager.getConfig("foo");
assertEquals(data.getSinkConfig(), "console");
assertEquals(data.getSourceConfig(), "null");
}
@Test
FlumeShell sh = new FlumeShell();
long retval;
retval = sh.executeLine("connect localhost:"
+ FlumeConfiguration.DEFAULT_ADMIN_PORT);
assertEquals(0, retval);
retval = sh.executeLine("getmappings");
assertEquals(0, retval);
assertEquals(0, flumeMaster.getSpecMan().getLogicalNodeMap().size());
Clock.sleep(1000);
retval = sh
.executeLine("exec config foo 'tail(\"/var/log/messages\")' 'console(\"avrojson\")'");
assertEquals(0, retval);
retval = sh
.executeLine("exec config bar 'tail(\"/var/log/messages2\")' 'console(\"avrojson\")'");
assertEquals(0, retval);
retval = sh.executeLine("exec spawn localhost foo");
assertEquals(0, retval);
assertEquals(1, flumeMaster.getSpecMan().getLogicalNodeMap().size());
retval = sh.executeLine("getmappings");
assertEquals(0, retval);
retval = sh.executeLine("exec spawn localhost bar");
assertEquals(0, retval);
assertEquals(2, flumeMaster.getSpecMan().getLogicalNodeMap().size());
retval = sh.executeLine("getmappings");
assertEquals(0, retval);
retval = sh.executeLine("getmappings idonotexist");
assertEquals(0, retval);
retval = sh.executeLine("getmappings localhost");
assertEquals(0, retval);
}
@Test
TTransportException, IOException {
assertEquals(0, flumeMaster.getSpecMan().getAllConfigs().size());
FlumeShell sh = new FlumeShell();
sh
.executeLine("connect localhost:"
+ FlumeConfiguration.DEFAULT_ADMIN_PORT);
long start = Clock.unixTime();
sh.executeLine("exec noop 20000");
long delta = Clock.unixTime() - start;
assertTrue(delta < 20000);
assertTrue(delta > FlumeShell.CMD_WAIT_TIME_MS);
}
@Test
TTransportException, IOException {
assertEquals(0, flumeMaster.getSpecMan().getAllConfigs().size());
FlumeShell sh = new FlumeShell();
sh
.executeLine("connect localhost:"
+ FlumeConfiguration.DEFAULT_ADMIN_PORT);
long start = Clock.unixTime();
sh.executeLine("submit noop 3000");
long delta = Clock.unixTime() - start;
assertTrue(delta < 3000);
sh.executeLine("wait");
delta = Clock.unixTime() - start;
assertTrue(delta > 3000);
}
@Test
public void testNodesDone()
throws InterruptedException, TTransportException,
IOException {
assertEquals(0, flumeMaster.getSpecMan().getAllConfigs().size());
String nodename = "bar";
flumeMaster.getSpecMan().addLogicalNode(nodename, "foo");
FlumeNode n = new FlumeNode(FlumeConfiguration.get(), nodename,
new DirectMasterRPC(flumeMaster), false, false);
n.start();
n.getLivenessManager().heartbeatChecks();
assertEquals(2, flumeMaster.getStatMan().getNodeStatuses().size());
FlumeShell sh = new FlumeShell();
sh.executeLine("connect localhost: "
+ FlumeConfiguration.DEFAULT_ADMIN_PORT);
sh
.executeLine("exec config foo 'synth(100)' '{delay(100) => accumulator(\"count\") }' ");
FlumeConfigData fcd = flumeMaster.getSpecMan().getConfig("foo");
assertEquals("{delay(100) => accumulator(\"count\") }", fcd.sinkConfig);
assertEquals("synth(100)", fcd.sourceConfig);
assertTrue(0 != fcd.timestamp);
sh.executeLine("waitForNodesDone 0 foo");
n.getLivenessManager().heartbeatChecks();
NodeState status = flumeMaster.getStatMan().getNodeStatuses().get("foo").state;
NodeState idle = NodeState.IDLE;
assertEquals(status, idle);
n.stop();
}
@Test
@Ignore("Can't get a handle to the countersink, must fix")
TTransportException, IOException {
assertEquals(0, flumeMaster.getSpecMan().getAllConfigs().size());
String nodename = "foo";
FlumeConfiguration conf = FlumeConfiguration.createTestableConfiguration();
FlumeNode n = new FlumeNode(conf, nodename,
new DirectMasterRPC(flumeMaster), false, false);
n.start();
String node2 = "bar";
FlumeNode n2 = new FlumeNode(conf, node2, new DirectMasterRPC(flumeMaster),
false, false);
n2.start();
String node3 = "baz";
FlumeNode n3 = new FlumeNode(conf, node3, new DirectMasterRPC(flumeMaster),
false, false);
n3.start();
n.getLivenessManager().heartbeatChecks();
n2.getLivenessManager().heartbeatChecks();
n3.getLivenessManager().heartbeatChecks();
assertEquals(3, flumeMaster.getStatMan().getNodeStatuses().size());
FlumeShell sh = new FlumeShell();
sh.executeLine("connect localhost: "
+ FlumeConfiguration.DEFAULT_ADMIN_PORT);
sh
.executeLine("exec config foo 'synth(100)' '{delay(100) => accumulator(\"count\") }' ");
sh
.executeLine("exec config bar 'synth(50)' '{delay(100) => accumulator(\"count2\") }' ");
sh
.executeLine("exec config baz 'synth(75)' '{delay(100) => accumulator(\"count3\") }' ");
FlumeConfigData fcd = flumeMaster.getSpecMan().getConfig("foo");
assertEquals("{delay(100) => accumulator(\"count\") }", fcd.sinkConfig);
assertEquals("synth(100)", fcd.sourceConfig);
assertTrue(0 != fcd.timestamp);
sh.executeLine("waitForNodesDone 0 foo bar baz");
n.getLivenessManager().heartbeatChecks();
NodeState status = flumeMaster.getStatMan().getNodeStatuses().get(nodename).state;
NodeState idle = NodeState.IDLE;
assertEquals(status, idle);
AccumulatorSink cnt = (AccumulatorSink) ReportManager.get().getReportable(
"count");
assertEquals(100, cnt.getCount());
AccumulatorSink cnt2 = (AccumulatorSink) ReportManager.get().getReportable(
"count2");
assertEquals(50, cnt2.getCount());
AccumulatorSink cnt3 = (AccumulatorSink) ReportManager.get().getReportable(
"count3");
assertEquals(75, cnt3.getCount());
n.stop();
n2.stop();
n3.stop();
}
@Test
@Ignore("Can't get a handle to the countersink, must fix")
TTransportException, IOException {
assertEquals(0, flumeMaster.getSpecMan().getAllConfigs().size());
String nodename = "foo";
FlumeNode n = new FlumeNode(FlumeConfiguration.get(), nodename,
new DirectMasterRPC(flumeMaster), false, false);
n.start();
n.getLivenessManager().heartbeatChecks();
assertEquals(1, flumeMaster.getStatMan().getNodeStatuses().size());
FlumeShell sh = new FlumeShell();
sh.executeLine("connect localhost: "
+ FlumeConfiguration.DEFAULT_ADMIN_PORT);
sh
.executeLine("exec config foo 'synth(100)' '{delay(100) => accumulator(\"count\") }' ");
FlumeConfigData fcd = flumeMaster.getSpecMan().getConfig("foo");
assertEquals("{delay(100) => accumulator(\"count\") }", fcd.sinkConfig);
assertEquals("synth(100)", fcd.sourceConfig);
assertTrue(0 != fcd.timestamp);
sh.executeLine("waitForNodesActive 0 foo");
n.getLivenessManager().heartbeatChecks();
NodeStatus status = flumeMaster.getStatMan().getNodeStatuses()
.get(nodename);
NodeState active = NodeState.ACTIVE;
assertEquals(status.state, active);
sh.executeLine("waitForNodesDone 0 foo");
n.getLivenessManager().heartbeatChecks();
status = flumeMaster.getStatMan().getNodeStatuses().get(nodename);
NodeState idle = NodeState.IDLE;
assertEquals(status.state, idle);
AccumulatorSink cnt = (AccumulatorSink) ReportManager.get().getReportable(
"count");
assertEquals(100, cnt.getCount());
n.stop();
}
@Test
PrintStream olderr = System.err;
ByteArrayOutputStream tmpErr = new ByteArrayOutputStream();
System.setErr(new PrintStream(tmpErr));
FlumeShell sh = new FlumeShell();
sh.executeLine("connect localhost: "
+ FlumeConfiguration.DEFAULT_ADMIN_PORT);
long task = sh.executeLine("exec config foo 'blah' "
+ "'{delay(100) => accumulator(\"count\") }' ");
System.setErr(olderr);
ByteArrayInputStream bais = new ByteArrayInputStream(tmpErr.toByteArray());
BufferedReader bis = new BufferedReader(new InputStreamReader(bais));
int count = 0;
while (bis.ready()) {
String line = bis.readLine();
count++;
LOG.error(line);
assertTrue(line.equals("Command failed")
|| line.equals("Attempted to write an invalid sink/source:"
+ " Invalid source: blah"));
}
assertEquals(2, count);
}
}