package com.cloudera.flume.handlers.endtoend;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.junit.Test;
import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.FlumeBuilder;
import com.cloudera.flume.conf.FlumeSpecException;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventImpl;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.core.EventSinkDecorator;
import com.cloudera.flume.core.FanOutSink;
import com.cloudera.flume.handlers.debug.ConsoleEventSink;
import com.cloudera.flume.handlers.debug.MemorySinkSource;
import com.cloudera.flume.reporter.aggregator.CounterSink;
public static final Logger LOG = LoggerFactory.getLogger(TestAckChecksumDecos.class);
@Test
int msgs = 5;
CounterSink cnt = new CounterSink("count");
AckChecksumInjector<EventSink> aci =
new AckChecksumInjector<EventSink>(new FanOutSink<EventSink>(cnt,
new ConsoleEventSink()));
aci.open();
for (int i = 0; i < msgs; i++) {
Event e = new EventImpl(("this is a test " + i).getBytes());
aci.append(e);
}
aci.close();
assertEquals(5 + 2, cnt.getCount());
}
@Test
int msgs = 100;
MemorySinkSource mss = new MemorySinkSource();
AckChecksumChecker<EventSink> acc = new AckChecksumChecker<EventSink>(mss);
AckChecksumInjector<EventSink> aci =
new AckChecksumInjector<EventSink>(acc);
aci.open();
for (int i = 0; i < msgs; i++) {
Event e = new EventImpl(("this is a test " + i).getBytes());
aci.append(e);
}
aci.close();
Event eo = null;
int count = 0;
while ((eo = mss.next()) != null) {
System.out.println(eo);
count++;
}
assertEquals(msgs, count);
}
@Test
MemorySinkSource mss = new MemorySinkSource();
AckChecksumChecker<EventSink> cc = new AckChecksumChecker<EventSink>(mss);
ReorderDecorator<EventSink> ro =
new ReorderDecorator<EventSink>(cc, .5, .5, 0);
AckChecksumInjector<EventSink> cp = new AckChecksumInjector<EventSink>(ro);
cp.open();
for (int i = 0; i < 100; i++) {
Event e = new EventImpl(("this is a test " + i).getBytes());
cp.append(e);
}
cp.close();
Event eo = null;
while ((eo = mss.next()) != null) {
System.out.println(eo);
}
}
@Test
final int msgs = 100;
MemorySinkSource mss = new MemorySinkSource();
AckChecksumChecker<EventSink> cc = new AckChecksumChecker<EventSink>(mss);
EventSinkDecorator<EventSink> dropFirst =
new EventSinkDecorator<EventSink>(cc) {
int count = 0;
public void append(Event e)
throws IOException {
if (count == 0) {
count++;
return;
}
count++;
getSink().append(e);
}
};
AckChecksumInjector<EventSink> cp =
new AckChecksumInjector<EventSink>(dropFirst);
try {
cp.open();
for (int i = 0; i < msgs; i++) {
Event e = new EventImpl(("this is a test " + i).getBytes());
cp.append(e);
}
cp.close();
} catch (IOException ioe) {
fail("didn't throw no start exception");
}
}
@Test
final int msgs = 100;
MemorySinkSource mss = new MemorySinkSource();
AckChecksumChecker<EventSink> cc = new AckChecksumChecker<EventSink>(mss);
EventSinkDecorator<EventSink> dropStop =
new EventSinkDecorator<EventSink>(cc) {
int drop = msgs + 1;
int count = 0;
public void append(Event e)
throws IOException {
if (count == drop) {
count++;
return;
}
count++;
getSink().append(e);
}
};
AckChecksumInjector<EventSink> cp =
new AckChecksumInjector<EventSink>(dropStop);
cp.open();
for (int i = 0; i < msgs; i++) {
Event e = new EventImpl(("this is a test " + i).getBytes());
cp.append(e);
}
cp.close();
}
@Test
public void testDupe()
throws IOException {
final int msgs = 100;
MemorySinkSource mss = new MemorySinkSource();
AckChecksumChecker<EventSink> cc =
new AckChecksumChecker<EventSink>(mss, new AckListener.Empty() {
@Override
public void err(String group)
throws IOException {
throw new IOException("Fail");
}
});
EventSinkDecorator<EventSink> dupeMsg =
new EventSinkDecorator<EventSink>(cc) {
int drop = msgs / 2;
int count = 0;
public void append(Event e)
throws IOException {
if (count == drop) {
getSink().append(e);
}
count++;
getSink().append(e);
}
};
AckChecksumInjector<EventSink> cp =
new AckChecksumInjector<EventSink>(dupeMsg);
try {
cp.open();
for (int i = 0; i < 100; i++) {
Event e = new EventImpl(("this is a test " + i).getBytes());
cp.append(e);
}
cp.close();
} catch (IOException ioe) {
return;
}
LOG.info(cc.getReport().toJson());
fail("should have failed");
}
@Test
FlumeBuilder.buildSink(new Context(), "{ ackInjector => null}");
}
@Test(expected = FlumeSpecException.class)
FlumeBuilder.buildSink(new Context(), "{ ackInjector(false) => null}");
}
@Test
FlumeBuilder.buildSink(new Context(), "{ackChecker => null}");
}
@Test(expected = FlumeSpecException.class)
FlumeBuilder.buildSink(new Context(), "{ackChecker(false) => null}");
}
}