package com.cloudera.flume.agent.diskfailover;
import java.io.IOException;
import org.junit.Assert;
import org.junit.Test;
import com.cloudera.flume.conf.FlumeBuilder;
import com.cloudera.flume.conf.FlumeSpecException;
import com.cloudera.flume.conf.ReportTestingContext;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.core.EventSource;
import com.cloudera.flume.core.EventUtil;
import com.cloudera.flume.handlers.debug.MemorySinkSource;
import com.cloudera.flume.reporter.ReportManager;
import com.cloudera.flume.reporter.aggregator.CounterSink;
import com.cloudera.util.BenchmarkHarness;
@Test
BenchmarkHarness.setupLocalWriteDir();
String spec = "{ benchinject => { benchreport(\"pre\") => { diskFailover => counter(\"beforecount\") } } }";
EventSink snk = FlumeBuilder.buildSink(new ReportTestingContext(), spec);
EventSource src = MemorySinkSource.cannedData("test ", 5);
snk.open();
src.open();
EventUtil.dumpAll(src, snk);
src.close();
snk.close();
CounterSink cnt = (CounterSink) ReportManager.get().getReportable(
"beforecount");
Assert.assertEquals(5, cnt.getCount());
BenchmarkHarness.cleanupLocalWriteDir();
}
@Test
BenchmarkHarness.setupLocalWriteDir();
String spec = "{ benchinject => { diskFailover => { benchreport(\"post\") => counter(\"beforecount\") } } }";
EventSink snk = FlumeBuilder.buildSink(new ReportTestingContext(), spec);
EventSource src = MemorySinkSource.cannedData("test ", 5);
snk.open();
src.open();
EventUtil.dumpAll(src, snk);
src.close();
snk.close();
CounterSink cnt = (CounterSink) ReportManager.get().getReportable(
"beforecount");
Assert.assertEquals(5, cnt.getCount());
BenchmarkHarness.cleanupLocalWriteDir();
}
@Test
IOException {
BenchmarkHarness.setupLocalWriteDir();
String spec = "{ benchinject => { benchreport(\"pre\") => { diskFailover => counter(\"beforecount\") } } }";
EventSink snk = FlumeBuilder.buildSink(new ReportTestingContext(), spec);
EventSource src = MemorySinkSource.cannedData("test ", 5);
snk.open();
src.open();
EventUtil.dumpAll(src, snk);
src.close();
snk.close();
CounterSink cnt = (CounterSink) ReportManager.get().getReportable(
"beforecount");
Assert.assertEquals(5, cnt.getCount());
BenchmarkHarness.cleanupLocalWriteDir();
}
@Test
BenchmarkHarness.setupLocalWriteDir();
String spec = "{ benchinject => { ackedWriteAhead => { benchreport(\"post\") => counter(\"beforecount\") } } }";
EventSink snk = FlumeBuilder.buildSink(new ReportTestingContext(), spec);
EventSource src = MemorySinkSource.cannedData("test ", 5);
snk.open();
src.open();
EventUtil.dumpAll(src, snk);
src.close();
snk.close();
CounterSink cnt = (CounterSink) ReportManager.get().getReportable(
"beforecount");
Assert.assertEquals(5 + 2, cnt.getCount());
BenchmarkHarness.cleanupLocalWriteDir();
}
}