package com.cloudera.flume.handlers.debug;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import com.cloudera.flume.core.EventImpl;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.reporter.ReportEvent;
import com.cloudera.flume.util.MockClock;
import com.cloudera.util.BackoffPolicy;
import com.cloudera.util.CappedExponentialBackoff;
import com.cloudera.util.Clock;
@Test
EventSink fail2x = mock(EventSink.Base.class);
doThrow(new IOException("mock2")).doThrow(new IOException("mock"))
.doNothing().when(fail2x).open();
doReturn(new ReportEvent("stub")).when(fail2x).getReport();
BackoffPolicy bop = new CappedExponentialBackoff(10, 5000);
InsistentOpenDecorator<EventSink> sink = new InsistentOpenDecorator<EventSink>(
fail2x, bop);
sink.open();
sink.append(new EventImpl("test".getBytes()));
sink.close();
fail2x.getReport();
ReportEvent rpt = sink.getReport();
assertEquals(new Long(1), rpt
.getLongMetric(InsistentOpenDecorator.A_REQUESTS));
assertEquals(new Long(3), rpt
.getLongMetric(InsistentOpenDecorator.A_ATTEMPTS));
assertEquals(new Long(1), rpt
.getLongMetric(InsistentOpenDecorator.A_SUCCESSES));
assertEquals(new Long(2), rpt
.getLongMetric(InsistentOpenDecorator.A_RETRIES));
System.out.println(rpt.toText());
}
@Test
final MockClock m = new MockClock(0);
EventSink failWhale = new EventSink.Base() {
return new ReportEvent("failwhale-report");
}
@Override
public void open()
throws IOException {
m.forward(100);
throw new IOException("fail open");
}
};
Clock.setClock(m);
InsistentOpenDecorator<EventSink> sink = new InsistentOpenDecorator<EventSink>(
failWhale, 1000, 10, 1000);
try {
sink.open();
} catch (IOException e1) {
ReportEvent rpt = sink.getReport();
assertEquals(new Long(1), rpt
.getLongMetric(InsistentOpenDecorator.A_REQUESTS));
assertEquals(new Long(0), rpt
.getLongMetric(InsistentOpenDecorator.A_SUCCESSES));
assertEquals(new Long(11), rpt
.getLongMetric(InsistentOpenDecorator.A_ATTEMPTS));
assertEquals(new Long(11), rpt
.getLongMetric(InsistentOpenDecorator.A_RETRIES));
Clock.resetDefault();
return;
}
fail("Ehr? Somehow the failwhale succeeded!");
}
@Test
InterruptedException {
EventSink fail4eva = mock(EventSink.Base.class);
doThrow(new IOException("mock")).when(fail4eva).open();
doReturn(new ReportEvent("stub")).when(fail4eva).getReport();
final CountDownLatch done = new CountDownLatch(1);
BackoffPolicy bop = new CappedExponentialBackoff(10, 5000);
final InsistentOpenDecorator<EventSink> sink = new InsistentOpenDecorator<EventSink>(
fail4eva, bop);
Thread t = new Thread() {
@Override
try {
sink.open();
} catch (IOException e) {
done.countDown();
}
}
};
t.start();
Clock.sleep(1000);
t.interrupt();
assertTrue("Timed out", done.await(1000, TimeUnit.MILLISECONDS));
}
}