package com.cloudera.flume.handlers.debug;
import static org.junit.Assert.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.junit.Before;
import org.junit.Test;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.core.connector.DirectDriver;
import com.cloudera.flume.reporter.ReportEvent;
public static final Logger LOG = LoggerFactory.getLogger(TestChokeDecos.class);
Random rand = new Random(System.currentTimeMillis());
final ChokeManager testChokeMan = new ChokeManager();
final HashMap<String, Integer> chokeMap = new HashMap<String, Integer>();
final long testTime = 5000;
final int numDrivers = 50;
int minTlimit = 500;
int maxTlimit = 20000;
int minMsgSize = 50;
int maxMsgSize = 30000;
double highErrorLimit = 5;
double lowErrorLimit = .5;
@Before
testChokeMan.setPayLoadHeaderSize(0);
}
class TestChoke<S
extends EventSink>
extends ChokeDecorator<S> {
super(s, tId);
}
@Override
public void append(Event e)
throws IOException {
testChokeMan.spendTokens(chokeId, e.getBody().length);
updateAppendStats(e);
}
}
@Test
int numChokes = numDrivers;
LOG.info("Setting up Individual Test");
for (int i = 0; i < numChokes; i++) {
chokeMap.put(Integer.toString(i), minTlimit
+ rand.nextInt(maxTlimit - minTlimit));
}
testChokeMan.updateChokeLimitMap(chokeMap);
TestChoke[] tchokeArray = new TestChoke[numChokes];
for (int i = 0; i < numChokes; i++) {
tchokeArray[i] = new TestChoke<EventSink>(null, Integer.toString(i));
}
DirectDriver[] directDriverArray = new DirectDriver[numDrivers];
for (int i = 0; i < numDrivers; i++) {
directDriverArray[i] = new DirectDriver("TestDriver" + i,
new SynthSourceRndSize(0, minMsgSize, maxMsgSize), tchokeArray[i]);
}
LOG.info("Running the Individual Test Now!");
for (int i = 0; i < numDrivers; i++) {
if (!testChokeMan.isChokeId(Integer.toString(i))) {
LOG.error("ChokeID " + Integer.toString(i) + "not present");
fail("ChokeID " + Integer.toString(i) + "not present");
}
}
testChokeMan.start();
for (DirectDriver d : directDriverArray) {
d.start();
}
Thread.sleep(testTime);
for (DirectDriver d : directDriverArray) {
d.stop();
}
testChokeMan.halt();
double errorRatio = 1.0;
for (TestChoke<EventSink> t : tchokeArray) {
errorRatio = ((double) (chokeMap.get(t.getChokeId()) * testTime))
/ (double) (t.getReport().getLongMetric("number of bytes"));
LOG.info("ChokeID: " + t.getChokeId() + ", error-ratio: " + errorRatio);
ReportEvent r = t.getReport();
LOG.info(" events :" + r.getLongMetric("number of events"));
assertFalse((errorRatio > this.highErrorLimit || errorRatio < this.lowErrorLimit));
}
LOG.info("Individual Test successful !!!");
}
@Test
int numChokes = 5;
LOG.info("Setting up Collective Test");
for (int i = 0; i < numChokes; i++) {
chokeMap.put(Integer.toString(i), minTlimit
+ rand.nextInt(maxTlimit - minTlimit));
}
testChokeMan.updateChokeLimitMap(chokeMap);
TestChoke[] tchokeArray = new TestChoke[numChokes];
for (int i = 0; i < numChokes; i++) {
tchokeArray[i] = new TestChoke<EventSink>(null, Integer.toString(i));
}
Set<TestChoke<EventSink>> chokesUsed = new HashSet<TestChoke<EventSink>>();
DirectDriver[] directDriverArray = new DirectDriver[numDrivers];
int randChokeIndex = 0;
for (int i = 0; i < numDrivers; i++) {
randChokeIndex = rand.nextInt(numChokes);
directDriverArray[i] = new DirectDriver(new SynthSourceRndSize(0,
minMsgSize, maxMsgSize), tchokeArray[randChokeIndex]);
chokesUsed.add(tchokeArray[randChokeIndex]);
}
LOG.info("Running the Collective Test Now!");
for (TestChoke<EventSink> t : chokesUsed) {
if (!testChokeMan.isChokeId(t.getChokeId())) {
LOG.error("ChokeID " + t.getChokeId() + "not present");
fail();
}
}
testChokeMan.start();
for (DirectDriver f : directDriverArray) {
f.start();
}
Thread.sleep(testTime);
for (DirectDriver f : directDriverArray) {
f.stop();
}
testChokeMan.halt();
double errorRatio = 1.0;
for (TestChoke<EventSink> t : chokesUsed) {
errorRatio = ((double) (chokeMap.get(t.getChokeId()) * testTime))
/ (double) (t.getReport().getLongMetric("number of bytes"));
LOG.info("ChokeID: " + t.getChokeId() + ", error-ratio: " + errorRatio);
assertFalse((errorRatio > this.highErrorLimit || errorRatio < this.lowErrorLimit));
}
LOG.info("Collective test successful !!!");
}
}