package com.cloudera.flume.handlers;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.junit.Test;
import com.cloudera.flume.core.EventSource;
import com.cloudera.util.Clock;
static public void testOpenOpen(
final Logger LOG,
final EventSource src)
throws IOException {
src.open();
try {
src.open();
} catch (Exception e) {
LOG.info("Expected failure " + e.getMessage());
return;
}
src.close();
fail("Open Open should fail");
}
static public void testCloseClose(
final Logger LOG,
final EventSource src)
throws IOException {
src.close();
src.open();
src.close();
src.close();
}
@Test
final EventSource src) throws InterruptedException, IOException {
final CountDownLatch started = new CountDownLatch(1);
final CountDownLatch done = new CountDownLatch(1);
new Thread() {
@Override
try {
src.open();
started.countDown();
src.next();
done.countDown();
} catch (IOException e) {
LOG.error(e.getMessage(), e);
}
}
}.start();
assertTrue("Open timed out", started.await(5, TimeUnit.SECONDS));
Clock.sleep(100);
src.close();
assertTrue("Next timed out", done.await(5, TimeUnit.SECONDS));
}
throws IOException {
for (int i = 0; i < 50; i++) {
LOG.info("ThirftEventSource open close attempt " + i);
src.open();
src.close();
}
}
}