package com.cloudera.flume.handlers.thrift;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TBinaryProtocol.Factory;
import org.apache.thrift.server.TSaneThreadPoolServer;
import org.apache.thrift.transport.TSaneServerSocket;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.flume.conf.SourceFactory.SourceBuilder;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.core.EventSource;
import com.cloudera.flume.reporter.ReportEvent;
import com.cloudera.util.Clock;
import com.google.common.base.Preconditions;
final static int DEFAULT_QUEUE_SIZE = FlumeConfiguration.get()
.getThriftQueueSize();
final static long MAX_CLOSE_SLEEP = FlumeConfiguration.get()
.getThriftCloseMaxSleep();
static final Logger LOG = LoggerFactory.getLogger(ThriftEventSource.class);
public static final String A_QUEUE_CAPACITY = "queueCapacity";
public static final String A_QUEUE_FREE = "queueFree";
public static final String A_ENQUEUED = "enqueued";
public static final String A_DEQUEUED = "dequeued";
public static final String A_BYTES_IN = "bytesIn";
final int port;
final ThriftFlumeEventServer svr;
TSaneThreadPoolServer server;
final BlockingQueue<Event> q;
final AtomicLong enqueued = new AtomicLong();
final AtomicLong dequeued = new AtomicLong();
final AtomicLong bytesIn = new AtomicLong();
boolean closed = true;
this.port = port;
this.svr = new ThriftFlumeEventServer();
this.q = new LinkedBlockingQueue<Event>(qsize);
}
synchronized public ReportEvent
getReport() {
ReportEvent rpt = super.getReport();
rpt.setLongMetric(A_QUEUE_CAPACITY, q.size());
rpt.setLongMetric(A_QUEUE_FREE, q.remainingCapacity());
rpt.setLongMetric(A_ENQUEUED, enqueued.get());
rpt.setLongMetric(A_DEQUEUED, dequeued.get());
rpt.setLongMetric(A_BYTES_IN, server.getBytesReceived());
return rpt;
}
Preconditions.checkNotNull(q);
this.port = port;
this.svr = new ThriftFlumeEventServer();
this.q = q;
}
this(port, DEFAULT_QUEUE_SIZE);
}
void enqueue(Event e)
throws IOException {
try {
q.put(e);
enqueued.getAndIncrement();
} catch (InterruptedException e1) {
LOG.error("blocked append was interrupted", e1);
throw new IOException(e1);
}
}
@Override
synchronized public void open()
throws IOException {
try {
ThriftFlumeEventServer.Processor processor = new ThriftFlumeEventServer.Processor(
new ThriftFlumeEventServerImpl(new EventSink.Base() {
@Override
public void append(Event e)
throws IOException {
enqueue(e);
super.append(e);
}
}));
Factory protFactory = new TBinaryProtocol.Factory(true, true);
TSaneServerSocket serverTransport = new TSaneServerSocket(port);
server = new TSaneThreadPoolServer(processor, serverTransport,
protFactory);
LOG.info(String.format(
"Starting blocking thread pool server on port %d...", port));
server.start();
this.closed = false;
} catch (TTransportException e) {
throw new IOException("Failed to create event server " + e.getMessage(),
e);
}
}
@Override
synchronized public void close()
throws IOException {
if (server == null) {
LOG.info(String.format("Server on port %d was already closed!", port));
return;
}
server.stop();
LOG.info(String.format("Closed server on port %d...", port));
long sz = q.size();
LOG.info(String.format("Queue still has %d elements ...", sz));
long maxSleep = MAX_CLOSE_SLEEP;
long start = Clock.unixTime();
while (q.peek() != null) {
if (Clock.unixTime() - start > maxSleep) {
if (sz == q.size()) {
LOG
.warn("Close timed out due to no progress. Closing despite having "
+ q.size() + " values still enqued");
return;
}
start = Clock.unixTime();
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOG.error("Unexpected interrupt of close " + e.getMessage(), e);
Thread.currentThread().interrupt();
closed = true;
throw new IOException(e);
}
}
closed = true;
}
@Override
public Event
next()
throws IOException {
try {
Event e = null;
while ((e = q.poll(100, TimeUnit.MILLISECONDS)) == null) {
synchronized (this) {
if (closed) {
return null;
}
}
}
synchronized (this) {
dequeued.getAndIncrement();
updateEventProcessingStats(e);
return e;
}
} catch (InterruptedException e) {
throw new IOException("Waiting for queue element was interupted! "
+ e.getMessage(), e);
}
}
public static SourceBuilder
builder() {
return new SourceBuilder() {
@Override
public EventSource
build(String... argv) {
Preconditions.checkArgument(argv.length == 1,
"usage: thriftSource(port)");
int port = Integer.parseInt(argv[0]);
return new ThriftEventSource(port);
}
};
}
}