package com.cloudera.flume.agent;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudera.flume.conf.FlumeSpecException;
public static final Logger LOG = LoggerFactory
.getLogger(TestAgentCloseNoDeadlock.class);
FlumeSpecException, InterruptedException {
final CountDownLatch go = new CountDownLatch(1);
final CountDownLatch heartstop = new CountDownLatch(3);
final LogicalNodeManager lnm = new LogicalNodeManager("local");
new Thread("sim heartbeat") {
@Override
try {
go.await();
while (heartstop.getCount() > 0) {
lnm.spawn("foo1", "asciisynth(1)", sink);
heartstop.countDown();
}
} catch (Exception e) {
LOG.error(e.getMessage(), e);
e.printStackTrace();
}
}
}.start();
new Thread("sim report pusher") {
@Override
try {
go.await();
while (heartstop.getCount() > 0) {
lnm.getReport();
}
} catch (Exception e) {
LOG.error(e.getMessage(), e);
e.printStackTrace();
}
}
}.start();
go.countDown();
assertTrue("heartbeat thread blocked", heartstop.await(200,
TimeUnit.SECONDS));
}
@Test
FlumeSpecException, InterruptedException {
doReportDeadlockTest("agentBESink");
}
@Test
FlumeSpecException, InterruptedException {
doReportDeadlockTest("agentDFOSink");
}
@Test
FlumeSpecException, InterruptedException {
doReportDeadlockTest("agentE2ESink");
}
}