package com.cloudera.flume.handlers.hdfs;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.flume.conf.FlumeSpecException;
import com.cloudera.flume.conf.SinkFactory.SinkBuilder;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.handlers.text.FormatFactory;
import com.cloudera.flume.handlers.text.output.OutputFormat;
import com.cloudera.flume.handlers.text.output.RawOutputFormat;
import com.google.common.base.Preconditions;
static final Logger LOG = LoggerFactory.getLogger(EscapedCustomDfsSink.class);
final String path;
OutputFormat format;
CustomDfsSink writer = null;
Event localEvent;
final Map<String, CustomDfsSink> sfWriters = new HashMap<String, CustomDfsSink>();
boolean shouldSub = false;
private String filename = "";
protected String absolutePath = "";
protected String hiveTableName = "";
protected boolean runMarkerQueries = false;
protected String elasticSearchUrl = "", elasticIndex = "", elasticType = "";
this.path = path;
this.filename = filename;
shouldSub = Event.containsTag(path) || Event.containsTag(filename);
this.format = o;
absolutePath = path;
if (filename != null && filename.length() > 0) {
if (!absolutePath.endsWith(Path.SEPARATOR)) {
absolutePath += Path.SEPARATOR;
}
absolutePath += this.filename;
}
}
this.path = path;
this.filename = filename;
shouldSub = Event.containsTag(path) || Event.containsTag(filename);
this.format = o;
absolutePath = path;
this.hiveTableName = hiveTableName;
if (filename != null && filename.length() > 0) {
if (!absolutePath.endsWith(Path.SEPARATOR)) {
absolutePath += Path.SEPARATOR;
}
absolutePath += this.filename;
}
}
public EscapedCustomDfsSink(String path, String filename, String hiveTableName, OutputFormat o, String elasticSearchUrl, String elasticIndex, String elasticType) {
this.path = path;
this.filename = filename;
shouldSub = Event.containsTag(path) || Event.containsTag(filename);
this.format = o;
absolutePath = path;
this.hiveTableName = hiveTableName;
this.elasticSearchUrl = elasticSearchUrl;
this.elasticIndex = elasticIndex;
this.elasticType = elasticType;
if (filename != null && filename.length() > 0) {
if (!absolutePath.endsWith(Path.SEPARATOR)) {
absolutePath += Path.SEPARATOR;
}
absolutePath += this.filename;
}
}
public EscapedCustomDfsSink(String path, String filename, String hiveTableName, OutputFormat o, String elasticSearchUrl, String elasticIndex, String elasticType,
boolean runMarkerQueries) {
this.path = path;
this.filename = filename;
this.runMarkerQueries = runMarkerQueries;
shouldSub = Event.containsTag(path) || Event.containsTag(filename);
this.format = o;
absolutePath = path;
this.hiveTableName = hiveTableName;
this.elasticSearchUrl = elasticSearchUrl;
this.elasticIndex = elasticIndex;
this.elasticType = elasticType;
if (filename != null && filename.length() > 0) {
if (!absolutePath.endsWith(Path.SEPARATOR)) {
absolutePath += Path.SEPARATOR;
}
absolutePath += this.filename;
}
}
try {
return FormatFactory.get().getOutputFormat(
FlumeConfiguration.get().getDefaultOutputFormat());
} catch (FlumeSpecException e) {
LOG.warn("format from conf file not found, using default", e);
return new RawOutputFormat();
}
}
this(path, filename, getDefaultOutputFormat());
}
this(path, filename, hiveTableName, getDefaultOutputFormat());
}
public EscapedCustomDfsSink(String path, String filename, String hiveTableName, String elasticSearchUrl, String elasticIndex, String elasticType) {
this(path, filename, hiveTableName, getDefaultOutputFormat(), elasticSearchUrl, elasticIndex, elasticType);
}
public EscapedCustomDfsSink(String path, String filename, String hiveTableName, String elasticSearchUrl, String elasticIndex, String elasticType,
boolean runMarkerQueries) {
this(path, filename, hiveTableName, getDefaultOutputFormat(), elasticSearchUrl, elasticIndex, elasticType, runMarkerQueries);
}
protected CustomDfsSink
openWriter(String p, Event e)
throws IOException {
CustomDfsSink w;
if (StringUtils.isNotBlank(elasticSearchUrl)) {
LOG.info("passing elastic sink args: ELASTICSEARCH: COLLECTOR SINK:");
LOG.info("inside EscapedCustomDfsSink: URL: " + elasticSearchUrl + " Index: " + elasticIndex + " Type: " + elasticType);
w = new CustomDfsSink(p, format, e, hiveTableName, elasticSearchUrl, elasticIndex, elasticType, runMarkerQueries);
} else if (StringUtils.isNotBlank(hiveTableName)) {
w = new CustomDfsSink(p, format, e, hiveTableName);
} else {
w = new CustomDfsSink(p, format, e);
}
this.localEvent = e;
w.open();
return w;
}
@Override
public void append(Event e)
throws IOException {
CustomDfsSink w = writer;
if (shouldSub) {
String realPath = e.escapeString(absolutePath);
w = sfWriters.get(realPath);
if (w == null) {
w = openWriter(realPath,e);
sfWriters.put(realPath, w);
}
}
w.append(e);
super.append(e);
}
@Override
public void close()
throws IOException {
if (shouldSub) {
for (Entry<String, CustomDfsSink> e : sfWriters.entrySet()) {
LOG.info("Closing " + e.getKey());
e.getValue().close();
}
} else {
LOG.info("Closing " + absolutePath);
if (writer == null) {
LOG.warn("EscapedCustomDfsSink's Writer for '" + absolutePath
+ "' was already closed!");
return;
}
writer.close();
writer = null;
}
}
@Override
public void open()
throws IOException {
if (!shouldSub) {
writer = openWriter(absolutePath, localEvent);
}
}
public static SinkBuilder
builder() {
return new SinkBuilder() {
@Override
public EventSink
build(Context context, String... args) {
Preconditions.checkArgument(args.length >= 1 && args.length <= 3,
"usage: escapedCustomDfs(\"[(hdfs|file|s3n|...)://namenode[:port]]/path\""
+ "[, file [,outputformat ]])");
String filename = "";
if (args.length >= 2) {
filename = args[1];
}
String format = FlumeConfiguration.get().getDefaultOutputFormat();
if (args.length >= 3) {
format = args[2];
}
OutputFormat o;
try {
o = FormatFactory.get().getOutputFormat(format);
} catch (FlumeSpecException e) {
LOG.warn("Illegal format type " + format + ".", e);
o = null;
}
Preconditions.checkArgument(o != null, "Illegal format type " + format
+ ".");
return new EscapedCustomDfsSink(args[0], filename, o);
}
};
}
}