package com.cloudera.flume.handlers.debug;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import org.junit.Ignore;
import org.junit.Test;
import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.FlumeBuilder;
import com.cloudera.flume.conf.FlumeSpecException;
import com.cloudera.flume.conf.ReportTestingContext;
import com.cloudera.flume.conf.SinkFactory.SinkDecoBuilder;
import com.cloudera.flume.core.Attributes;
import com.cloudera.flume.core.EventImpl;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.core.EventSinkDecorator;
import com.cloudera.flume.core.EventSource;
import com.cloudera.flume.core.EventUtil;
import com.cloudera.flume.handlers.debug.BloomCheckDecorator.BloomCheckState;
import com.cloudera.flume.reporter.ReportEvent;
import com.cloudera.flume.reporter.ReportManager;
import com.cloudera.flume.reporter.aggregator.CounterSink;
import com.cloudera.util.bloom.BloomSet;
@Test
BloomSet b1 = new BloomSet(10000, 5);
BloomSet b2 = new BloomSet(10000, 5);
for (int i = 0; i < 10; i++) {
b1.addInt(i);
if (i % 2 == 0)
b2.addInt(i);
}
assertEquals(b1, b1);
assertEquals(b2, b2);
assertTrue(b1.contains(b2));
assertFalse(b2.contains(b1));
}
@Test(expected = IllegalArgumentException.class)
BloomSet b1 = new BloomSet(10000, 5);
BloomSet b2 = new BloomSet(10000, 6);
for (int i = 0; i < 10; i++) {
b1.addInt(i);
b2.addInt(i);
}
assertFalse(b1.contains(b2));
assertFalse(b2.contains(b1));
}
@Test
@Ignore("Takes too long to run")
BloomSet b1 = new BloomSet(1000000000, 2);
BloomSet b2 = new BloomSet(1000000000, 2);
int drop = 54323423;
for (int i = 0; i < 100000000; i++) {
b1.addInt(i);
if (i != drop)
b2.addInt(i);
}
assertTrue(b1.contains(b2));
assertFalse(b2.contains(b1));
}
@Test
@Ignore("Takes too long to run")
BloomSet b1 = new BloomSet(1000000000, 2);
BloomSet b2 = new BloomSet(1000000000, 2);
for (int i = 0; i < 100000000; i++) {
if (i != 234000)
b1.addInt(i);
if (i <= 10000000)
b2.addInt(i);
}
assertFalse(b1.contains(b2));
assertFalse(b2.contains(b1));
}
@Test
SinkDecoBuilder b = BloomGeneratorDeco.builder();
Context ctx = new Context();
b.build(ctx, "2234", "123");
b.build(ctx, "1234");
b.build(ctx);
}
@Test(expected = IllegalArgumentException.class)
SinkDecoBuilder b = BloomGeneratorDeco.builder();
b.build(new Context(), "2234", "123", "r3414");
}
@Test(expected = IllegalArgumentException.class)
SinkDecoBuilder b = BloomGeneratorDeco.builder();
b.build(new Context(), "r3414");
}
@Test
SinkDecoBuilder b = BloomGeneratorDeco.builder();
Context ctx = new Context();
b.build(ctx, "2234", "123");
b.build(ctx, "1234");
b.build(ctx);
}
@Test(expected = IllegalArgumentException.class)
SinkDecoBuilder b = BloomGeneratorDeco.builder();
b.build(new Context(), "2234", "123", "r3414");
}
@Test(expected = IllegalArgumentException.class)
SinkDecoBuilder b = BloomGeneratorDeco.builder();
b.build(new Context(), "r3414");
}
@Test
String spec = "{bloomCheck(1,2, \"text(\\\"test\\\")\") => null } ";
FlumeBuilder.buildSink(new Context(), spec);
}
@SuppressWarnings("unchecked")
@Test
String spec = "{ bloomGen(10000,2) => { bloomCheck(10000,2) => counter(\"test\")} } ";
EventSink snk = FlumeBuilder.buildSink(new ReportTestingContext(), spec);
EventSource src = FlumeBuilder.buildSource("asciisynth(10000)");
snk.open();
src.open();
EventUtil.dumpAll(src, snk);
src.close();
snk.close();
CounterSink ctr = (CounterSink) ReportManager.get().getReportable("test");
assertEquals(ctr.getCount(), 10000);
BloomCheckDecorator bcd = (BloomCheckDecorator) (((EventSinkDecorator<EventSink>) snk)
.getSink());
ReportEvent r = bcd.getReport();
assertEquals(BloomCheckState.SUCCESS.toString(), new String(r
.get(BloomCheckDecorator.A_STATE)));
assertEquals(1, Attributes.readInt(r, BloomCheckDecorator.A_SUCCESS)
.intValue());
assertEquals(0, Attributes.readInt(r, BloomCheckDecorator.A_FAILS)
.intValue());
}
@Test
String spec = "{bloomGen(100,2) => {bloomCheck(100,2,\"counter(\\\"test\\\") \") => counter(\"total\") } } }";
EventSink snk = FlumeBuilder.buildSink(new ReportTestingContext(), spec);
snk.open();
snk.append(new EventImpl(new byte[0]));
snk.append(new EventImpl(new byte[0]));
CounterSink ctr = (CounterSink) ReportManager.get().getReportable("test");
assertEquals(0, ctr.getCount());
CounterSink total = (CounterSink) ReportManager.get()
.getReportable("total");
assertEquals(2, total.getCount());
snk.close();
assertEquals(1, ctr.getCount());
}
}