package com.cloudera.flume.master.availability;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.Before;
import org.junit.Test;
import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.FlumeBuilder;
import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.flume.conf.FlumeSpecException;
import com.cloudera.flume.conf.ReportTestingContext;
import com.cloudera.flume.core.CompositeSink;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.core.EventSource;
import com.cloudera.flume.core.EventUtil;
import com.cloudera.flume.handlers.debug.MemorySinkSource;
import com.cloudera.flume.reporter.ReportManager;
import com.cloudera.flume.reporter.Reportable;
import com.cloudera.flume.reporter.aggregator.AccumulatorSink;
import com.cloudera.util.AlwaysRetryPolicy;
final static Logger LOG = Logger.getLogger(TestFailChainSink.class);
@Before
Logger.getLogger(FailoverChainSink.class).setLevel(Level.DEBUG);
Logger.getLogger(AccumulatorSink.class).setLevel(Level.ERROR);
}
@Test
List<String> names = Arrays.asList("first", "second", "third", "fourth",
"fifth");
FailoverChainSink snk = new FailoverChainSink(new ReportTestingContext(),
"{ lazyOpen => { intervalFlakeyAppend(2) => accumulator(\"%s\")}}",
names, new AlwaysRetryPolicy());
LOG.info(snk.getReport().toText());
snk.open();
EventSource src = MemorySinkSource.cannedData("test is a test", 31);
src.open();
EventUtil.dumpAll(src, snk);
int[] ans = { 16, 8, 4, 2, 1 };
for (int i = 0; i < ans.length; i++) {
Reportable rptable = ReportManager.get().getReportable(names.get(i));
long val = rptable.getReport().getLongMetric(names.get(i));
assertEquals(ans[i], val);
}
src.open();
try {
snk.append(src.next());
} catch (IOException ioe) {
src.close();
snk.close();
return;
}
fail("Expected exception");
}
@Test
FlumeConfiguration.get().setInt(
FlumeConfiguration.AGENT_FAILOVER_INITIAL_BACKOFF, 0);
List<String> names = Arrays.asList("first", "second", "third", "fourth",
"fifth");
String body = "{ lazyOpen => { intervalFlakeyAppend(2) => accumulator(\"%s\")}}";
String spec = FailoverChainManager.genAvailableSinkSpec(body, names);
System.out.println(spec);
EventSink snk = new CompositeSink(new ReportTestingContext(), spec);
LOG.info(snk.getReport().toText());
snk.open();
EventSource src = MemorySinkSource.cannedData("test is a test", 31);
src.open();
EventUtil.dumpAll(src, snk);
int[] ans = { 16, 8, 4, 2, 1 };
for (int i = 0; i < ans.length; i++) {
Reportable rptable = ReportManager.get().getReportable(names.get(i));
long val = rptable.getReport().getLongMetric(names.get(i));
System.out.println("report " + names.get(i) + " : " + val);
System.out.flush();
assertEquals(ans[i], val);
}
}
@Test(expected = IOException.class)
FlumeBuilder.buildSink(new Context(), "autoBEChain").open();
}
@Test
FlumeBuilder.buildSink(new Context(),
"let autoBEChain := null in autoBEChain").open();
}
@Test(expected = IOException.class)
FlumeBuilder.buildSink(new Context(), "autoDFOChain").open();
}
@Test
FlumeBuilder.buildSink(new Context(),
"let autoDFOChain := null in autoDFOChain").open();
}
@Test(expected = IOException.class)
FlumeBuilder.buildSink(new Context(), "autoE2EChain").open();
}
@Test
FlumeBuilder.buildSink(new Context(),
"let autoE2EChain := null in autoE2EChain").open();
}
}