package com.cloudera.flume.core;
import java.io.IOException;
import java.util.Map;
import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.SinkFactory.SinkBuilder;
import com.cloudera.flume.reporter.ReportEvent;
import com.cloudera.flume.reporter.Reportable;
public interface EventSink extends Reportable {
public void append(Event e)
throws IOException;
public void open()
throws IOException;
public void close()
throws IOException;
public void getReports(String namePrefix, Map<String, ReportEvent> reports);
public static class Base implements EventSink {
protected static final String R_TYPE = "type";
protected static final String R_NUM_BYTES = "number of bytes";
protected static final String R_NUM_EVENTS = "number of events";
private long numEvents = 0;
private long numBytes = 0;
@Override
synchronized public void append(Event e)
throws IOException {
updateAppendStats(e);
}
if (e == null)
return;
numBytes += e.getBody().length;
numEvents++;
}
@Override
public void close()
throws IOException {
}
@Override
public void open()
throws IOException {
}
@Override
return this.getClass().getSimpleName();
}
@Override
ReportEvent rpt = new ReportEvent(getName());
rpt.setStringMetric(R_TYPE, getName());
rpt.setLongMetric(R_NUM_BYTES, numBytes);
rpt.setLongMetric(R_NUM_EVENTS, numEvents);
return rpt;
}
@Override
public void getReports(String namePrefix, Map<String, ReportEvent> reports) {
reports.put(namePrefix + getName(), getReport());
}
}
public static class StubSink extends Base {
String name;
this.name = name;
}
@Override
return name;
}
@Override
public void append(Event e)
throws IOException {
throw new IOException("Attemping to append to a Stub Sink!");
}
@Override
public void close()
throws IOException {
}
@Override
public void open()
throws IOException {
throw new IOException("Attempting to open a stub sink '" + getName()
+ "'!");
}
public static SinkBuilder
builder(
final String name) {
return new SinkBuilder() {
@Override
public EventSink
build(Context context, String... argv) {
return new StubSink(name);
}
};
}
}
}