package com.cloudera.flume.handlers.rpc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.flume.conf.SourceFactory.SourceBuilder;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.core.EventSource;
import com.cloudera.flume.handlers.avro.AvroEventSource;
import com.cloudera.flume.handlers.thrift.ThriftEventSource;
import com.google.common.base.Preconditions;
public class RpcSource extends EventSink.Base {
public static final Logger LOG = LoggerFactory.getLogger(RpcSource.class);
public static SourceBuilder
builder() {
return new SourceBuilder() {
@Override
public EventSource
build(String... argv) {
Preconditions.checkArgument(argv.length == 1, "usage: rpcSource(port)");
int port = Integer.parseInt(argv[0]);
if (FlumeConfiguration.get().getEventRPC().equals(
FlumeConfiguration.RPC_TYPE_THRIFT)) {
return new ThriftEventSource(port);
}
if (FlumeConfiguration.get().getEventRPC().equals(
FlumeConfiguration.RPC_TYPE_AVRO)) {
return new AvroEventSource(port);
}
LOG.warn("event.rpc.type not defined. It should be either "
+ "\"THRIFT\" or \"AVRO\". Defaulting to Thrift");
return new ThriftEventSource(port);
}
};
}
}