package com.cloudera.flume.core.connector;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import java.io.IOException;
import org.junit.Test;
import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.ReportTestingContext;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventImpl;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.core.EventSource;
import com.cloudera.flume.handlers.debug.InsistentAppendDecorator;
import com.cloudera.flume.handlers.debug.InsistentOpenDecorator;
import com.cloudera.flume.handlers.debug.LazyOpenDecorator;
import com.cloudera.flume.handlers.debug.LazyOpenSource;
import com.cloudera.flume.handlers.debug.NoNlASCIISynthSource;
import com.cloudera.flume.handlers.debug.StubbornAppendSink;
import com.cloudera.flume.handlers.rolling.RollSink;
import com.cloudera.flume.reporter.ReportEvent;
import com.cloudera.util.BackoffPolicy;
import com.cloudera.util.CappedExponentialBackoff;
import com.cloudera.util.Clock;
@Test
InterruptedException {
EventSink fail4eva = mock(EventSink.Base.class);
doThrow(new IOException("mock exception")).when(fail4eva).open();
doReturn(new ReportEvent("stub")).when(fail4eva).getReport();
BackoffPolicy bop = new CappedExponentialBackoff(10, 5000);
final InsistentOpenDecorator<EventSink> insistent = new InsistentOpenDecorator<EventSink>(
fail4eva, bop);
final EventSink sink = new LazyOpenDecorator<EventSink>(insistent);
sink.open();
final EventSource source = new LazyOpenSource<EventSource>(
new NoNlASCIISynthSource(0, 100));
source.open();
DirectDriver driver = new DirectDriver(source, sink);
driver.start();
Clock.sleep(1000);
driver.stop();
boolean closed = driver.join(1000);
assertFalse(closed);
driver.cancel();
assertTrue(driver.join(1000));
}
@Test
InterruptedException {
EventSink fail4eva = mock(EventSink.Base.class);
final Event e = new EventImpl("foo".getBytes());
doThrow(new IOException("mock exception")).when(fail4eva).append(e);
doReturn(new ReportEvent("mock report")).when(fail4eva).getReport();
doReturn("mock name").when(fail4eva).getName();
BackoffPolicy bop = new CappedExponentialBackoff(10, 5000);
final EventSink insistent = new InsistentAppendDecorator<EventSink>(
fail4eva, bop);
final EventSink sink = new LazyOpenDecorator<EventSink>(insistent);
sink.open();
final EventSource source = new EventSource.Base() {
@Override
return e;
}
};
DirectDriver driver = new DirectDriver(source, sink);
driver.start();
Clock.sleep(1000);
driver.stop();
boolean closed = driver.join(1000);
assertFalse(closed);
driver.cancel();
assertTrue(driver.join(1000));
}
@Test
EventSink fail4eva = mock(EventSink.Base.class);
final Event e = new EventImpl("foo".getBytes());
doThrow(new IOException("mock exception")).when(fail4eva).append(e);
doReturn(new ReportEvent("mock report")).when(fail4eva).getReport();
doReturn("mock name").when(fail4eva).getName();
BackoffPolicy bop = new CappedExponentialBackoff(10, 5000);
final EventSink insistent = new InsistentAppendDecorator<EventSink>(
fail4eva, bop);
final EventSink sink = new LazyOpenDecorator<EventSink>(insistent);
final EventSink roll = new RollSink(new ReportTestingContext(), "mock",
10000, 100) {
@Override
public EventSink
newSink(Context ctx)
throws IOException {
return sink;
}
};
roll.open();
final EventSource source = new EventSource.Base() {
@Override
return e;
}
};
DirectDriver driver = new DirectDriver(source, roll);
driver.start();
Clock.sleep(1000);
driver.stop();
boolean closed = driver.join(1000);
assertFalse(closed);
driver.cancel();
assertTrue(driver.join(1000));
}
@Test
EventSink fail4eva = mock(EventSink.Base.class);
doThrow(new IOException("mock exception")).when(fail4eva).open();
doReturn(new ReportEvent("stub")).when(fail4eva).getReport();
BackoffPolicy bop = new CappedExponentialBackoff(10, 5000);
final InsistentOpenDecorator<EventSink> insistent = new InsistentOpenDecorator<EventSink>(
fail4eva, bop);
final StubbornAppendSink<EventSink> stubborn = new StubbornAppendSink<EventSink>(
insistent);
final InsistentAppendDecorator<EventSink> append = new InsistentAppendDecorator<EventSink>(
stubborn, new CappedExponentialBackoff(100, 100000));
final EventSink sink = new LazyOpenDecorator<EventSink>(append);
sink.open();
final EventSource source = new LazyOpenSource<EventSource>(
new NoNlASCIISynthSource(0, 100));
source.open();
DirectDriver driver = new DirectDriver(source, sink);
driver.start();
Clock.sleep(1000);
driver.stop();
boolean closed = driver.join(1000);
assertFalse(closed);
driver.cancel();
assertTrue(driver.join(1000));
}
}