package com.cloudera.flume.handlers.thrift;
import java.io.IOException;
import java.util.Comparator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TBinaryProtocol.Factory;
import org.apache.thrift.server.TSaneThreadPoolServer;
import org.apache.thrift.transport.TSaneServerSocket;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.flume.conf.SourceFactory.SourceBuilder;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.core.EventSource;
import com.google.common.base.Preconditions;
static int DEFAULT_QUEUE_SIZE = 1000;
static final Logger LOG = LoggerFactory.getLogger(PrioritizedThriftEventSource.class);
int port;
ThriftFlumeEventServer svr;
TSaneThreadPoolServer server;
public static class EventQueue extends PriorityBlockingQueue<Event> {
private static final long serialVersionUID = 7280524922090162382L;
super(size, new Comparator<Event>() {
@Override
public int compare(Event o1, Event o2) {
int priorityDiff =
o1.getPriority().ordinal() - o2.getPriority().ordinal();
if (priorityDiff != 0)
return priorityDiff;
long tdelta = (o1.getTimestamp() - o2.getTimestamp());
if (tdelta != 0)
return (int) tdelta;
long ndelta = (o1.getNanos() - o2.getNanos());
return (int) ndelta;
}
});
}
}
final BlockingQueue<Event> q;
this.port = port;
this.svr = new ThriftFlumeEventServer();
this.q = new EventQueue(qsize);
}
this(port, DEFAULT_QUEUE_SIZE);
}
@Override
public void open()
throws IOException {
try {
ThriftFlumeEventServer.Processor processor =
new ThriftFlumeEventServer.Processor(new ThriftFlumeEventServerImpl(
new EventSink.Base() {
@Override
public void append(Event e)
throws IOException {
q.add(e);
super.append(e);
}
}));
Factory protFactory = new TBinaryProtocol.Factory(true, true);
TSaneServerSocket serverTransport = new TSaneServerSocket(port);
server =
new TSaneThreadPoolServer(processor, serverTransport, protFactory);
LOG.info(String.format(
"Starting blocking thread pool server on port %d...", port));
server.start();
} catch (TTransportException e) {
e.printStackTrace();
throw new IOException("Failed to create event server " + e);
}
}
@Override
public void close()
throws IOException {
if (server == null) {
LOG.info(String.format("Server on port %d was already closed!", port));
return;
}
server.stop();
LOG.info(String.format("Closed server on port %d...", port));
}
@Override
public Event
next()
throws IOException {
try {
Event e = q.take();
updateEventProcessingStats(e);
return e;
} catch (InterruptedException e) {
e.printStackTrace();
throw new IOException("Waiting for queue element was interupted! " + e);
}
}
public static void main(String[] argv) {
FlumeConfiguration conf = FlumeConfiguration.get();
PrioritizedThriftEventSource src =
new PrioritizedThriftEventSource(conf.getCollectorPort());
try {
src.open();
Event e;
e = src.next();
while (e != null) {
System.out.println(e);
e = src.next();
}
} catch (IOException e1) {
e1.printStackTrace();
}
}
public static SourceBuilder
builder() {
return new SourceBuilder() {
@Override
public EventSource
build(String... argv) {
Preconditions.checkArgument(argv.length == 1, "usage: tsource(port)");
int port = Integer.parseInt(argv[0]);
return new PrioritizedThriftEventSource(port);
}
};
}
}