package com.cloudera.flume.conf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.log4j.Logger;
import com.cloudera.flume.ExampleData;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventImpl;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.core.EventSource;
import com.cloudera.flume.core.EventUtil;
import com.cloudera.flume.core.connector.DirectDriver;
import com.cloudera.flume.handlers.debug.MemorySinkSource;
import com.cloudera.flume.reporter.ReportManager;
import com.cloudera.flume.reporter.aggregator.AccumulatorSink;
import com.cloudera.util.Pair;
import org.junit.Test;
final static Logger LOG = Logger.getLogger(TestFlumeBuilderFunctional.class
.getName());
final String SOURCE = "asciisynth(25,100)";
final static int LINES = 25;
@Test
EventSink snk = FlumeBuilder.buildSink(new Context(), "console");
snk.open();
snk.append(new EventImpl("test".getBytes()));
snk.close();
}
@Test
LOG.info("Working Dir path: " + new File(".").getAbsolutePath());
EventSource src = FlumeBuilder.buildSource(SOURCE);
src.open();
Event e = null;
int cnt = 0;
while ((e = src.next()) != null) {
LOG.info(e);
cnt++;
}
src.close();
assertEquals(LINES, cnt);
}
@Test
public void testConnector()
throws IOException, InterruptedException,
FlumeSpecException {
EventSink snk = FlumeBuilder.buildSink(new Context(), "console");
snk.open();
EventSource src = FlumeBuilder.buildSource(SOURCE);
src.open();
DirectDriver conn = new DirectDriver(src, snk);
conn.start();
conn.join(Long.MAX_VALUE);
snk.close();
src.close();
assertNull(conn.getError());
}
@Test
public void testMultiSink()
throws IOException, FlumeSpecException {
LOG.info("== multi test start");
String multi = "[ console , accumulator(\"count\") ]";
EventSource src = FlumeBuilder.buildSource(SOURCE);
EventSink snk = FlumeBuilder.buildSink(new ReportTestingContext(), multi);
src.open();
snk.open();
EventUtil.dumpAll(src, snk);
src.close();
snk.close();
AccumulatorSink cnt = (AccumulatorSink) ReportManager.get().getReportable(
"count");
assertEquals(LINES, cnt.getCount());
LOG.info("== multi test stop");
}
@Test
public void testDecorated()
throws IOException, FlumeSpecException {
LOG.info("== Decorated start");
String decorated = "{ intervalSampler(5) => accumulator(\"count\")}";
EventSource src = FlumeBuilder.buildSource(SOURCE);
EventSink snk = FlumeBuilder.buildSink(new ReportTestingContext(),
decorated);
src.open();
snk.open();
EventUtil.dumpAll(src, snk);
src.close();
snk.close();
AccumulatorSink cnt = (AccumulatorSink) ReportManager.get().getReportable(
"count");
assertEquals(LINES / 5, cnt.getCount());
LOG.info("== Decorated stop");
}
@Test
public void testFailover()
throws IOException, FlumeSpecException {
LOG.info("== failover start");
String multi = "< { flakeyAppend(.9,1337) => console } ? accumulator(\"count\") >";
EventSource src = FlumeBuilder.buildSource(SOURCE);
EventSink snk = FlumeBuilder.buildSink(new ReportTestingContext(), multi);
src.open();
snk.open();
EventUtil.dumpAll(src, snk);
src.close();
snk.close();
AccumulatorSink cnt = (AccumulatorSink) ReportManager.get().getReportable(
"count");
assertEquals(LINES, cnt.getCount());
LOG.info("== failover stop");
}
@Test
public void testLet()
throws IOException, FlumeSpecException {
LOG.info("== let and failover start");
String letcount = "let count := accumulator(\"count\") in < { flakeyAppend(.5,1337) => count} ? count >";
EventSource src = MemorySinkSource.cannedData("canned data ", 100);
EventSink snk = FlumeBuilder
.buildSink(new ReportTestingContext(), letcount);
src.open();
snk.open();
EventUtil.dumpAll(src, snk);
src.close();
snk.close();
AccumulatorSink ctr = (AccumulatorSink) ReportManager.get().getReportable(
"count");
assertEquals(100, ctr.getCount());
LOG.info("== let and failover stop");
}
@Test
public void testLetShadow()
throws IOException, FlumeSpecException {
LOG.info("== let shadowing start");
String let = "let foo := accumulator(\"foo\") in let foo := accumulator(\"bar\") in foo";
EventSource src = MemorySinkSource.cannedData("canned data ", 100);
EventSink snk = FlumeBuilder.buildSink(new ReportTestingContext(), let);
src.open();
snk.open();
EventUtil.dumpAll(src, snk);
src.close();
snk.close();
AccumulatorSink fooctr = (AccumulatorSink) ReportManager.get()
.getReportable("foo");
AccumulatorSink barctr = (AccumulatorSink) ReportManager.get()
.getReportable("bar");
assertEquals(0, fooctr.getCount());
assertEquals(100, barctr.getCount());
LOG.info("== let and failover stop");
}
@Test
public void testNode()
throws IOException, FlumeSpecException {
LOG.info("== node start");
String multi = "localhost : "
+ SOURCE
+ " | < { flakeyAppend(.9,1337) => console } ? accumulator(\"count\") > ;";
Map<String, Pair<EventSource, EventSink>> cfg = FlumeBuilder.build(
new ReportTestingContext(), multi);
for (Entry<String, Pair<EventSource, EventSink>> e : cfg.entrySet()) {
EventSource src = e.getValue().getLeft();
EventSink snk = e.getValue().getRight();
src.open();
snk.open();
EventUtil.dumpAll(src, snk);
src.close();
snk.close();
}
AccumulatorSink cnt = (AccumulatorSink) ReportManager.get().getReportable(
"count");
assertEquals(LINES, cnt.getCount());
LOG.info("== node stop");
}
}