package com.cloudera.flume.handlers.debug;
import java.io.IOException;
import org.junit.Assert;
import org.junit.Test;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventImpl;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.core.FanOutSink;
import com.cloudera.flume.reporter.aggregator.CounterSink;
@Test
final CounterSink c = new CounterSink("count");
final FanOutSink<EventSink> s = new FanOutSink<EventSink>();
s.add(c);
s.add(new ConsoleEventSink());
final int total = 10;
final LatchedDecorator<EventSink> l = new LatchedDecorator<EventSink>(s, 0,
total);
l.open();
Thread t = new Thread() {
try {
long count;
int i = 0;
while ((count = c.getCount()) < total) {
l.trigger();
i++;
sleep(10);
System.out.println("triggered " + i + " times, count is " + count);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
t.start();
for (int i = 0; i < total; i++) {
Event e = new EventImpl(("message " + i).getBytes());
l.append(e);
}
t.join();
long count = c.getCount();
System.out.println("trigger thread joined, count is now: " + count);
Assert.assertEquals(total, count);
}
}