package com.ning.arecibo.collector.process;
import com.google.common.base.Function;
import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import com.ning.arecibo.collector.guice.EventFilter;
import com.ning.arecibo.event.BatchedEvent;
import com.ning.arecibo.event.receiver.EventProcessor;
import com.ning.arecibo.eventlogger.Event;
import com.ning.arecibo.util.jmx.MonitorableManaged;
import com.ning.arecibo.util.jmx.MonitoringType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
{
private static final Logger log = LoggerFactory.getLogger(CollectorEventProcessor.class);
private final AtomicLong eventsReceived = new AtomicLong(0L);
private final AtomicLong eventsFiltered = new AtomicLong(0L);
private final List<EventHandler> eventHandlers;
private final Function<Event, Event> filter;
@Inject
public CollectorEventProcessor(
final List<EventHandler> eventHandlers, @EventFilter
final Function<Event, Event> filter)
throws IOException
{
this.eventHandlers = eventHandlers;
this.filter = filter;
log.info("Event processor filter: {}", filter.getClass().toString());
}
{
final List<Event> events = new ArrayList<Event>();
if (evt instanceof BatchedEvent) {
events.addAll(((BatchedEvent) evt).getEvents());
}
else {
events.add(evt);
}
eventsReceived.getAndAdd(events.size());
final Collection<Event> filteredEvents = Lists.newArrayList(
Iterables.filter(
Iterables.transform(events, filter),
Predicates.<Event>notNull()
)
);
eventsFiltered.addAndGet(events.size() - filteredEvents.size());
for (final Event event : filteredEvents) {
for (final EventHandler handler : eventHandlers) {
try {
handler.handle(event);
}
catch (RuntimeException ruEx) {
log.warn("Exception handling event", ruEx);
}
}
}
}
@MonitorableManaged(description = "Number of events received", monitored = true, monitoringType = {MonitoringType.COUNTER, MonitoringType.RATE})
{
return eventsReceived.get();
}
@MonitorableManaged(description = "Number of events dropped by filters", monitored = true, monitoringType = {MonitoringType.COUNTER, MonitoringType.RATE})
{
return eventsFiltered.get();
}
}