package com.cloudera.flume.handlers.debug;
import java.io.IOException;
import org.junit.Assert;
import org.junit.Test;
import com.cloudera.flume.conf.FlumeBuilder;
import com.cloudera.flume.conf.FlumeSpecException;
import com.cloudera.flume.conf.ReportTestingContext;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventImpl;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.core.EventSinkDecorator;
import com.cloudera.flume.reporter.ReportManager;
import com.cloudera.flume.reporter.aggregator.CounterSink;
@Test
final int repeat = 9;
final int msgs = 10;
CounterSink cnt = new CounterSink("count");
EventSinkDecorator<CounterSink> s =
new MultiplierDecorator<CounterSink>(cnt, repeat);
s.open();
for (int i = 0; i < msgs; i++) {
Event e = new EventImpl(("" + i).getBytes());
s.append(e);
}
Assert.assertEquals(msgs * repeat, cnt.getCount());
}
@Test
final int repeat = 7;
final int msgs = 10;
String cfg = "{ mult(" + repeat + ") => counter(\"count\") }";
EventSink s = FlumeBuilder.buildSink(new ReportTestingContext(), cfg);
s.open();
for (int i = 0; i < msgs; i++) {
Event e = new EventImpl(("" + i).getBytes());
s.append(e);
}
CounterSink cnt = (CounterSink) ReportManager.get().getReportable("count");
Assert.assertEquals(msgs * repeat, cnt.getCount());
}
@Test
FlumeSpecException {
final int repeat = 3;
final int msgs = 4;
String cfg =
"{ benchinject => { mult(" + repeat
+ ") => [console, counter(\"count\")] }}";
EventSink s = FlumeBuilder.buildSink(new ReportTestingContext(), cfg);
s.open();
for (int i = 0; i < msgs; i++) {
Event e = new EventImpl(("" + i).getBytes());
s.append(e);
}
s.close();
CounterSink cnt = (CounterSink) ReportManager.get().getReportable("count");
Assert.assertEquals(msgs * repeat + 3, cnt.getCount());
}
}