package com.cloudera.flume.handlers.hive;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.flume.conf.FlumeSpecException;
import com.cloudera.flume.conf.SinkFactory.SinkBuilder;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.handlers.avro.AvroJsonOutputFormat;
import com.cloudera.flume.handlers.hdfs.CustomDfsSink;
import com.cloudera.flume.handlers.text.FormatFactory;
import com.cloudera.flume.handlers.text.output.OutputFormat;
import com.cloudera.flume.handlers.text.output.RawOutputFormat;
import com.google.common.base.Preconditions;
static final Logger LOG =
LoggerFactory.getLogger(HiveNotifyingDfsSink.class);
final String dirpath;
final OutputFormat format;
final String hivetable;
final HiveDirCreatedHandler handler;
EventSink writer = null;
final Map<String, EventSink> sfWriters = new HashMap<String, EventSink>();
private String filename = "";
protected String absolutePath = "";
OutputFormat o, HiveDirCreatedHandler handler) {
this.dirpath = path;
this.filename = filename;
this.format = o;
absolutePath = path;
this.hivetable = hivetable;
if (filename != null && filename.length() > 0) {
if (!absolutePath.endsWith(Path.SEPARATOR)) {
absolutePath += Path.SEPARATOR;
}
absolutePath += this.filename;
}
if (!(o instanceof AvroJsonOutputFormat)) {
LOG
.warn("Currently, hive only supports only AvroJson output format SerDe.");
}
this.handler = handler;
}
this(path, filename, hivetable, getDefaultOutputFormat(),
new DefaultHandler());
}
@Override
LOG.info("Notifying Hive Metastore with ready event " + notif);
}
};
HashSet<String> cache = new HashSet<String>();
HiveDirCreatedHandler simple;
this.simple = hfrh;
}
if (cache.contains(notif.getNotifDir())) {
return;
}
simple.handleNotification(notif);
cache.add(notif.getNotifDir());
}
}
try {
return FormatFactory.get().getOutputFormat(
FlumeConfiguration.get().getDefaultOutputFormat());
} catch (FlumeSpecException e) {
LOG.warn("format from conf file not found, using default", e);
return new RawOutputFormat();
}
}
protected EventSink
getWriter(Event e)
throws IOException {
final Event evt = e;
String realpath = e.escapeString(absolutePath);
EventSink w = sfWriters.get(realpath);
if (w != null) {
return w;
}
LOG.info("Opening " + realpath);
w = new CustomDfsSink(realpath, format,null);
SinkCloseNotifier<EventSink, HiveDirCreatedNotification> notif =
new SinkCloseNotifier<EventSink, HiveDirCreatedNotification>(w) {
@Override
String escdirpath = evt.escapeString(dirpath);
Map<String, String> partitions = evt.getEscapeMapping(dirpath);
return new HiveDirCreatedNotification(hivetable, escdirpath,
partitions);
}
@Override
public void notify(HiveDirCreatedNotification e) {
handler.handleNotification(e);
}
};
notif.open();
sfWriters.put(realpath, notif);
return notif;
}
@Override
public void append(Event e)
throws IOException {
EventSink w = getWriter(e);
w.append(e);
super.append(e);
}
@Override
public void close()
throws IOException {
for (Entry<String, EventSink> e : sfWriters.entrySet()) {
LOG.info("Closing " + e.getKey());
e.getValue().close();
}
}
@Override
public void open()
throws IOException {
}
public static SinkBuilder
builder() {
return new SinkBuilder() {
@Override
public EventSink
build(Context context, String... args) {
Preconditions.checkArgument(args.length >= 2 && args.length <= 3,
"usage: hivedfs(\"[(hdfs|file|s3n|...)://namenode[:port]]/path\""
+ ", \"hivetable\", [, outputformat ])");
String format = FlumeConfiguration.get().getDefaultOutputFormat();
String hivetable = args[1];
if (args.length >= 3) {
format = args[2];
}
OutputFormat o;
try {
o = FormatFactory.get().getOutputFormat(format);
} catch (FlumeSpecException e) {
LOG.warn("Illegal format type " + format + ".", e);
o = null;
}
Preconditions.checkArgument(o != null, "Illegal format type " + format
+ ".");
return new HiveNotifyingDfsSink(args[0], "", hivetable, o,
new DefaultHandler());
}
};
}
}