package com.cloudera.flume.core;
import java.io.IOException;
import org.junit.Assert;
import org.junit.Test;
import com.cloudera.flume.conf.ReportTestingContext;
import com.cloudera.flume.conf.SinkFactory.SinkBuilder;
import com.cloudera.flume.handlers.debug.ExceptionTwiddleDecorator;
import com.cloudera.flume.reporter.ReportManager;
import com.cloudera.flume.reporter.aggregator.CounterSink;
import com.cloudera.flume.util.MockClock;
import com.cloudera.util.Clock;
@Test
MockClock mock = new MockClock(0);
Clock.setClock(mock);
CounterSink primary = new CounterSink("primary");
CounterSink secondary = new CounterSink("backup");
ExceptionTwiddleDecorator<CounterSink> twiddle =
new ExceptionTwiddleDecorator<CounterSink>(primary);
BackOffFailOverSink failsink =
new BackOffFailOverSink(twiddle, secondary, 100, 10000);
failsink.open();
Event e = new EventImpl("event".getBytes());
failsink.append(e);
failsink.append(e);
System.out.println(mock);
System.out.printf("pri: %4d sec: %4d fail: %4d\n", primary.getCount(),
secondary.getCount(), failsink.getFails());
Assert.assertEquals(2, primary.getCount());
Assert.assertEquals(0, secondary.getCount());
mock.forward(100);
twiddle.setAppendOk(false);
failsink.append(e);
System.out.println(mock);
System.out.printf("pri: %4d sec: %4d fail: %4d\n", primary.getCount(),
secondary.getCount(), failsink.getFails());
Assert.assertEquals(1, failsink.getFails());
Assert.assertEquals(2, primary.getCount());
Assert.assertEquals(1, secondary.getCount());
mock.forward(50);
failsink.append(e);
System.out.println(mock);
System.out.printf("pri: %4d sec: %4d fail: %4d\n", primary.getCount(),
secondary.getCount(), failsink.getFails());
Assert.assertEquals(1, failsink.getFails());
Assert.assertEquals(2, primary.getCount());
Assert.assertEquals(2, secondary.getCount());
mock.forward(50);
failsink.append(e);
System.out.println(mock);
System.out.printf("pri: %4d sec: %4d fail: %4d\n", primary.getCount(),
secondary.getCount(), failsink.getFails());
Assert.assertEquals(2, failsink.getFails());
Assert.assertEquals(0, primary.getCount());
Assert.assertEquals(3, secondary.getCount());
mock.forward(200);
failsink.append(e);
System.out.println(mock);
System.out.printf("pri: %4d sec: %4d fail: %4d\n", primary.getCount(),
secondary.getCount(), failsink.getFails());
Assert.assertEquals(3, failsink.getFails());
Assert.assertEquals(0, primary.getCount());
Assert.assertEquals(4, secondary.getCount());
twiddle.setAppendOk(true);
failsink.append(e);
System.out.println(mock);
System.out.printf("pri: %4d sec: %4d fail: %4d\n", primary.getCount(),
secondary.getCount(), failsink.getFails());
Assert.assertEquals(3, failsink.getFails());
Assert.assertEquals(0, primary.getCount());
Assert.assertEquals(5, secondary.getCount());
mock.forward(400);
failsink.append(e);
System.out.println(mock);
System.out.printf("pri: %4d sec: %4d fail: %4d\n", primary.getCount(),
secondary.getCount(), failsink.getFails());
Assert.assertEquals(3, failsink.getFails());
Assert.assertEquals(1, primary.getCount());
Assert.assertEquals(5, secondary.getCount());
failsink.close();
}
@Test
System.out.println("===========================");
MockClock mock = new MockClock(0);
Clock.setClock(mock);
CounterSink primary = new CounterSink("primary");
CounterSink secondary = new CounterSink("backup");
ExceptionTwiddleDecorator<CounterSink> twiddle =
new ExceptionTwiddleDecorator<CounterSink>(primary);
BackOffFailOverSink failsink =
new BackOffFailOverSink(twiddle, secondary, 100, 1000);
failsink.open();
Event e = new EventImpl("event".getBytes());
mock.forward(100);
twiddle.setAppendOk(false);
failsink.append(e);
failsink.append(e);
System.out.println(mock);
System.out.printf("pri: %4d sec: %4d fail: %4d\n", primary.getCount(),
secondary.getCount(), failsink.getFails());
mock.forward(100);
failsink.append(e);
failsink.append(e);
System.out.println(mock);
System.out.printf("pri: %4d sec: %4d fail: %4d\n", primary.getCount(),
secondary.getCount(), failsink.getFails());
mock.forward(200);
failsink.append(e);
failsink.append(e);
System.out.println(mock);
System.out.printf("pri: %4d sec: %4d fail: %4d\n", primary.getCount(),
secondary.getCount(), failsink.getFails());
mock.forward(400);
failsink.append(e);
failsink.append(e);
System.out.println(mock);
System.out.printf("pri: %4d sec: %4d fail: %4d\n", primary.getCount(),
secondary.getCount(), failsink.getFails());
Assert.assertEquals(4, failsink.getFails());
Assert.assertEquals(0, primary.getCount());
Assert.assertEquals(8, secondary.getCount());
mock.forward(800);
failsink.append(e);
failsink.append(e);
System.out.println(mock);
System.out.printf("pri: %4d sec: %4d fail: %4d\n", primary.getCount(),
secondary.getCount(), failsink.getFails());
Assert.assertEquals(5, failsink.getFails());
Assert.assertEquals(0, primary.getCount());
Assert.assertEquals(10, secondary.getCount());
mock.forward(1000);
failsink.append(e);
failsink.append(e);
System.out.println(mock);
System.out.printf("pri: %4d sec: %4d fail: %4d\n", primary.getCount(),
secondary.getCount(), failsink.getFails());
Assert.assertEquals(6, failsink.getFails());
Assert.assertEquals(0, primary.getCount());
Assert.assertEquals(12, secondary.getCount());
}
@Test
SinkBuilder bld = FailOverSink.builder();
EventSink snk =
bld.build(new ReportTestingContext(),
"{intervalFlakeyAppend(2) => counter(\"pri\") } ",
"counter(\"sec\")");
snk.open();
Event e = new EventImpl("foo".getBytes());
snk.append(e);
snk.append(e);
snk.append(e);
snk.append(e);
snk.append(e);
snk.close();
CounterSink priCnt = (CounterSink) ReportManager.get().getReportable("pri");
CounterSink secCnt = (CounterSink) ReportManager.get().getReportable("sec");
Assert.assertEquals(3, priCnt.getCount());
Assert.assertEquals(2, secCnt.getCount());
}
@Test
SinkBuilder bld = BackOffFailOverSink.builder();
EventSink snk =
bld.build(new ReportTestingContext(),
"{intervalFlakeyAppend(2) => counter(\"pri\") } ",
"counter(\"sec\")");
snk.open();
Event e = new EventImpl("foo".getBytes());
snk.append(e);
snk.append(e);
snk.append(e);
snk.append(e);
snk.append(e);
snk.close();
CounterSink priCnt = (CounterSink) ReportManager.get().getReportable("pri");
CounterSink secCnt = (CounterSink) ReportManager.get().getReportable("sec");
Assert.assertEquals(1, priCnt.getCount());
Assert.assertEquals(4, secCnt.getCount());
}
}