package com.cloudera.flume.collector;
import static org.junit.Assert.*;
import java.io.IOException;
import org.apache.log4j.Logger;
import org.junit.Before;
import org.junit.Test;
import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.FlumeBuilder;
import com.cloudera.flume.conf.FlumeSpecException;
import com.cloudera.flume.conf.SinkFactoryImpl;
import com.cloudera.flume.conf.SinkFactory.SinkBuilder;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventImpl;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.handlers.debug.MemorySinkSource;
import com.cloudera.flume.reporter.ReportEvent;
import com.cloudera.util.Clock;
final public static Logger LOG = Logger
.getLogger(TestDiskFailoverThenRoll.class);
final MemorySinkSource mem = new MemorySinkSource();
@Before
SinkFactoryImpl sf = new SinkFactoryImpl();
sf.setSink("null", new SinkBuilder() {
@Override
public EventSink
build(Context context, String... argv) {
LOG.info("excapedCustomDfs replaced with MemorySinkSource");
mem.reset();
return mem;
}
});
FlumeBuilder.setSinkFactory(sf);
}
@Test
InterruptedException {
String agentCollector = "{diskFailover(1000) => roll (100000) { null } }";
Event e = new EventImpl("foo".getBytes());
EventSink agent = FlumeBuilder.buildSink(new Context(), agentCollector);
agent.open();
agent.append(e);
for (int i = 0; i < 30; i++) {
Clock.sleep(100);
ReportEvent r = mem.getReport();
LOG.info(r);
if (r.getLongMetric("number of events") > 0) {
return;
}
}
fail("Test timed out, event didn't make it");
}
}