package com.cloudera.flume.handlers.debug;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventImpl;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.reporter.ReportEvent;
@Test
final AtomicInteger ok = new AtomicInteger();
EventSink failAppend = new EventSink.Base() {
int n = 4;
int count = 0;
@Override
public void append(Event e)
throws IOException {
count++;
if (count % n == 0) {
System.out.println("failed at " + count);
throw new IOException("Failed, but will succeed later");
}
ok.incrementAndGet();
System.out.print(".");
}
@Override
public void close()
throws IOException {
System.out.println("close");
}
@Override
public void open()
throws IOException {
System.out.println("open");
}
};
StubbornAppendSink<EventSink> sink = new StubbornAppendSink<EventSink>(
failAppend);
sink.open();
for (int i = 0; i < 100; i++) {
Event e = new EventImpl(("attempt " + i).getBytes());
sink.append(e);
}
Assert.assertEquals(ok.get(), 100);
ReportEvent rpt = sink.getReport();
Writer out = new OutputStreamWriter(System.out);
rpt.toJson(out);
out.flush();
Assert.assertEquals(new Long(100),
rpt.getLongMetric(StubbornAppendSink.A_SUCCESSES));
Assert.assertEquals(new Long(33),
rpt.getLongMetric(StubbornAppendSink.A_FAILS));
Assert.assertEquals(new Long(33),
rpt.getLongMetric(StubbornAppendSink.A_RECOVERS));
}
@Test
EventSink failAppend = mock(EventSink.class);
Event e = new EventImpl("test".getBytes());
doNothing().doNothing().doThrow(new IOException()).doNothing().when(
failAppend).append(Mockito.<Event> anyObject());
doReturn(new ReportEvent("stub")).when(failAppend).getReport();
StubbornAppendSink<EventSink> sink = new StubbornAppendSink<EventSink>(
failAppend);
sink.open();
for (int i = 0; i < 3; i++) {
sink.append(e);
System.out.println(i);
}
ReportEvent rpt = sink.getReport();
Assert.assertEquals(new Long(1), rpt.getLongMetric(StubbornAppendSink.A_FAILS));
Assert.assertEquals(new Long(1), rpt.getLongMetric(StubbornAppendSink.A_RECOVERS));
}
@Test
final AtomicInteger ok = new AtomicInteger();
EventSink cnt = new EventSink.Base() {
@Override
public void append(Event e)
throws IOException {
ok.incrementAndGet();
}
};
IntervalFlakeyEventSink<EventSink> flake = new IntervalFlakeyEventSink<EventSink>(
cnt, 5);
StubbornAppendSink<EventSink> sink = new StubbornAppendSink<EventSink>(
flake);
sink.open();
for (int i = 0; i < 100; i++) {
Event e = new EventImpl(("attempt " + i).getBytes());
sink.append(e);
}
Assert.assertEquals(ok.get(), 100);
ReportEvent rpt = sink.getReport();
Assert.assertEquals(new Long(24),
rpt.getLongMetric(StubbornAppendSink.A_FAILS));
Assert.assertEquals(new Long(24),
rpt.getLongMetric(StubbornAppendSink.A_RECOVERS));
}
@Test
EventSink mock = mock(EventSink.class);
doNothing().doNothing().doThrow(new IOException()).doThrow(
new IOException()).when(mock).append(Mockito.<Event> anyObject());
doReturn(new ReportEvent("stub")).when(mock).getReport();
StubbornAppendSink<EventSink> sink = new StubbornAppendSink<EventSink>(mock);
Event e = new EventImpl("foo".getBytes());
sink.open();
sink.append(e);
sink.append(e);
try {
sink.append(e);
} catch (Exception exn) {
ReportEvent rpt = sink.getReport();
Assert.assertEquals(new Long(2), rpt
.getLongMetric(StubbornAppendSink.A_SUCCESSES));
Assert.assertEquals(new Long(1),
rpt.getLongMetric(StubbornAppendSink.A_FAILS));
Assert.assertEquals(new Long(0), rpt
.getLongMetric(StubbornAppendSink.A_RECOVERS));
return;
}
Assert.fail("should have thrown exception");
}
}