package com.cloudera.flume.handlers.debug;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.junit.Test;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.core.EventSource;
import com.cloudera.flume.handlers.avro.AvroJsonOutputFormat;
import com.cloudera.util.BenchmarkHarness;
static final Logger LOG = LoggerFactory.getLogger(TestSynthSources.class);
@Test
EventSource src = new SynthSource(5, 10, 1337);
Event e = null;
EventSink snk = new ConsoleEventSink(new AvroJsonOutputFormat());
MemorySinkSource mem = new MemorySinkSource();
while ((e = src.next()) != null) {
snk.append(e);
mem.append(e);
}
mem.open();
int i = 0;
while ((e = mem.next()) != null) {
i++;
assertEquals(10, e.getBody().length);
}
assertEquals(5, i);
}
@Test
Event e1, e2;
for (EventSource src : BenchmarkHarness.varyMsgBytes.values()) {
src.open();
e1 = src.next();
src.open();
e2 = src.next();
assertTrue(Arrays.equals(e1.getBody(), e2.getBody()));
}
}
@Test
EventSource src = new AttrSynthSource(5, 10, 20, 15, 1337);
Event e = null;
EventSink snk = new ConsoleEventSink(new AvroJsonOutputFormat());
MemorySinkSource mem = new MemorySinkSource();
while ((e = src.next()) != null) {
snk.append(e);
mem.append(e);
}
mem.open();
int i = 0;
while ((e = mem.next()) != null) {
i++;
Map<String, byte[]> ents = e.getAttrs();
assertEquals(10, ents.size());
for (String a : ents.keySet()) {
assertEquals(20, a.length());
}
for (byte[] v : ents.values()) {
assertEquals(15, v.length);
}
}
assertEquals(5, i);
}
@Test
Event e1, e2;
for (EventSource src : BenchmarkHarness.varyNumAttrs.values()) {
src.open();
e1 = src.next();
src.open();
e2 = src.next();
assertTrue(Arrays.equals(e1.getBody(), e2.getBody()));
}
}
}