package com.cloudera.flume.agent.durability;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.FlumeSpecException;
import com.cloudera.flume.conf.ReportTestingContext;
import com.cloudera.flume.core.CompositeSink;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.core.EventSource;
import com.cloudera.flume.core.EventUtil;
import com.cloudera.flume.handlers.debug.NoNlASCIISynthSource;
import com.cloudera.flume.handlers.endtoend.AckListener;
import com.cloudera.flume.handlers.rolling.ProcessTagger;
import com.cloudera.flume.handlers.rolling.TimeTrigger;
import com.cloudera.flume.reporter.ReportManager;
import com.cloudera.flume.reporter.aggregator.CounterSink;
import com.cloudera.util.FileUtil;
public static Logger LOG = Logger
.getLogger(TestNaiveFileWALManagerConcurrently.class);
@Before
Logger.getLogger(NaiveFileWALManager.class).setLevel(Level.DEBUG);
}
throws IOException, InterruptedException, FlumeSpecException {
File dir = FileUtil.mktempdir();
final CountDownLatch start = new CountDownLatch(threads);
final CountDownLatch done = new CountDownLatch(threads);
final NaiveFileWALManager wal = new NaiveFileWALManager(dir);
wal.open();
Context ctx = new ReportTestingContext();
EventSink cntsnk = new CompositeSink(ctx, "counter(\"total\")");
final EventSink snk = new NaiveFileWALDeco<EventSink>(ctx, cntsnk, wal,
new TimeTrigger(new ProcessTagger(), 1000000), new AckListener.Empty(),
1000000);
snk.open();
for (int i = 0; i < threads; i++) {
new Thread() {
@Override
start.countDown();
try {
EventSource src = new NoNlASCIISynthSource(count, 100);
start.await();
src.open();
EventUtil.dumpAll(src, snk);
src.close();
} catch (Exception e) {
LOG.error("failure", e);
} finally {
done.countDown();
}
}
}.start();
}
boolean ok = done.await(30, TimeUnit.SECONDS);
assertTrue("Test timed out!", ok);
Thread.sleep(1000);
snk.close();
CounterSink cnt = (CounterSink) ReportManager.get().getReportable("total");
long ci = cnt.getCount();
LOG.info("count : " + ci);
assertEquals((count * threads) + 2, ci);
FileUtil.rmr(dir);
}
throws IOException, InterruptedException {
File dir = FileUtil.mktempdir();
final CountDownLatch start = new CountDownLatch(threads);
final CountDownLatch done = new CountDownLatch(threads);
final NaiveFileWALManager wal = new NaiveFileWALManager(dir);
wal.open();
for (int i = 0; i < threads; i++) {
final int idx = i;
new Thread() {
@Override
start.countDown();
try {
EventSource src = new NoNlASCIISynthSource(count, 100);
Context ctx = new ReportTestingContext();
EventSink snk = new CompositeSink(ctx, "counter(\"total." + idx
+ "\")");
snk = new NaiveFileWALDeco<EventSink>(ctx, snk, wal,
new TimeTrigger(new ProcessTagger(), 1000000),
new AckListener.Empty(), 1000000);
src.open();
snk.open();
start.await();
EventUtil.dumpAll(src, snk);
src.close();
snk.close();
} catch (Exception e) {
LOG.error("failure", e);
} finally {
done.countDown();
}
}
}.start();
}
boolean ok = done.await(30, TimeUnit.SECONDS);
assertTrue("Test timed out!", ok);
Thread.sleep(1000);
long sum = 0;
for (int i = 0; i < threads; i++) {
CounterSink cnt = (CounterSink) ReportManager.get().getReportable(
"total." + i);
long ci = cnt.getCount();
LOG.info("count " + i + " : " + ci);
sum += ci;
}
LOG.info("sum == " + sum);
assertEquals((count + 2) * threads, sum);
FileUtil.rmr(dir);
}
throws IOException, InterruptedException {
File dir = FileUtil.mktempdir();
final CountDownLatch start = new CountDownLatch(threads);
final CountDownLatch done = new CountDownLatch(threads);
final NaiveFileWALManager wal = new NaiveFileWALManager(dir);
wal.open();
for (int i = 0; i < threads; i++) {
final int idx = i;
new Thread() {
@Override
start.countDown();
try {
EventSource src = new NoNlASCIISynthSource(count, 100);
Context ctx = new ReportTestingContext();
EventSink snk = new CompositeSink(ctx, "counter(\"total." + idx
+ "\")");
snk = new NaiveFileWALDeco<EventSink>(ctx, snk, wal,
new TimeTrigger(new ProcessTagger(), 1000000),
new AckListener.Empty(), 1000000);
start.await();
src.open();
snk.open();
EventUtil.dumpAll(src, snk);
src.close();
snk.close();
} catch (Exception e) {
LOG.error("failure", e);
} finally {
done.countDown();
}
}
}.start();
}
boolean ok = done.await(30, TimeUnit.SECONDS);
assertTrue("Test timed out!", ok);
Thread.sleep(1000);
long sum = 0;
for (int i = 0; i < threads; i++) {
CounterSink cnt = (CounterSink) ReportManager.get().getReportable(
"total." + i);
long ci = cnt.getCount();
LOG.info("count " + i + " : " + ci);
sum += ci;
}
LOG.info("sum == " + sum);
assertEquals((count + 2) * threads, sum);
FileUtil.rmr(dir);
}
@Test
FlumeSpecException {
doSharedWALDeco(10000, 10);
}
@Test
FlumeSpecException {
doSharedWALDeco(1000, 100);
}
@Test
FlumeSpecException {
doSharedWALDeco(100, 1000);
}
@Ignore
@Test
doSharedWALManager(10000, 10);
}
@Ignore
@Test
doSharedWALManager(1000, 100);
}
@Ignore
@Test
doSharedWALManager(100, 1000);
}
@Ignore
@Test
InterruptedException {
doSharedWALManager(10000, 10);
}
@Ignore
@Test
InterruptedException {
doSharedWALManager(1000, 100);
}
@Ignore
@Test
InterruptedException {
doSharedWALManager(100, 1000);
}
}