package com.cloudera.flume.agent;
import java.io.File;
import java.io.IOException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.FlumeBuilder;
import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.flume.conf.FlumeSpecException;
import com.cloudera.flume.conf.ReportTestingContext;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventImpl;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.reporter.ReportManager;
import com.cloudera.flume.reporter.aggregator.CounterSink;
import com.cloudera.util.Clock;
import com.cloudera.util.FileUtil;
static final Logger LOG = LoggerFactory.getLogger(TestWriteAheadLogDecorator.class);
File tmpdir = null;
@Before
try {
tmpdir = FileUtil.mktempdir();
} catch (Exception e) {
Assert.fail("mk temp dir failed");
}
FlumeConfiguration conf = FlumeConfiguration.get();
conf.set(FlumeConfiguration.AGENT_LOG_DIR_NEW, tmpdir.getAbsolutePath());
MockMasterRPC mock = new MockMasterRPC();
@SuppressWarnings("unused")
FlumeNode node = new FlumeNode(mock, false , false );
}
@After
try {
FileUtil.rmr(tmpdir);
} catch (IOException e) {
LOG.error("Failed to remove dir " + tmpdir, e);
}
}
@Test
@SuppressWarnings("unused")
FlumeNode node = new FlumeNode(new MockMasterRPC(), false, false);
String cfg = " { ackedWriteAhead => null}";
FlumeBuilder.buildSink(new Context(), cfg);
String cfg1 = "{ ackedWriteAhead(15000) => null}";
FlumeBuilder.buildSink(new Context(), cfg1);
String cfg4 = "{ ackedWriteAhead(\"failurama\") => null}";
try {
FlumeBuilder.buildSink(new Context(), cfg4);
} catch (Exception e) {
return;
}
Assert.fail("unexpected fall through");
}
@Test
public void testOpenClose()
throws IOException, FlumeSpecException {
String rpt = "foo";
String snk = " { ackedWriteAhead(100) => [console, counter(\"" + rpt
+ "\") ] } ";
for (int i = 0; i < 100; i++) {
EventSink es = FlumeBuilder.buildSink(new Context(), snk);
es.open();
es.close();
}
}
@Test
public void testBehavior()
throws FlumeSpecException, InterruptedException,
IOException {
int count = 10;
String rpt = "foo";
String snk = " { ackedWriteAhead(500) => { ackChecker => [console, counter(\""
+ rpt + "\") ] } } ";
EventSink es = FlumeBuilder.buildSink(new ReportTestingContext(), snk);
es.open();
for (int i = 0; i < count; i++) {
Event e = new EventImpl(("test message " + i).getBytes());
System.out.println("initial append: " + e);
es.append(e);
Clock.sleep(100);
}
Clock.sleep(5000);
es.close();
CounterSink ctr = (CounterSink) ReportManager.get().getReportable(rpt);
Assert.assertEquals(count, ctr.getCount());
}
}