package com.cloudera.flume.core;
import java.io.IOException;
import java.util.Map;
import com.cloudera.flume.conf.SourceFactory.SourceBuilder;
import com.cloudera.flume.reporter.ReportEvent;
import com.cloudera.flume.reporter.Reportable;
import com.google.common.base.Preconditions;
Event
next()
throws IOException;
public void open()
throws IOException;
public void close()
throws IOException;
public void getReports(String namePrefix, Map<String, ReportEvent> reports);
public static class StubSource implements EventSource {
@Override
public void close()
throws IOException {
throw new IOException("Attempting to close a Stub Source!");
}
@Override
public Event
next()
throws IOException {
throw new IOException("Attempting to next a Stub Source!");
}
@Override
public void open()
throws IOException {
throw new IOException("Attempting to open a Stub Source!");
}
@Override
return this.getClass().getSimpleName();
}
@Override
return new ReportEvent(getName());
}
public static SourceBuilder
builder() {
return new SourceBuilder() {
@Override
public EventSource
build(String... argv) {
return new StubSource();
}
};
}
public static SourceBuilder
builder(
final int minArgs,
final int maxArgs) {
return new SourceBuilder() {
@Override
public EventSource
build(String... argv) {
Preconditions.checkArgument(argv.length >= minArgs,
"Too few arguments: expected at least " + minArgs
+ " but only had " + argv.length);
Preconditions.checkArgument(argv.length <= maxArgs,
"Too many arguments : exepected at most " + maxArgs + " but had "
+ argv.length);
return new StubSource();
}
};
}
@Override
public void getReports(String namePrefix, Map<String, ReportEvent> reports) {
reports.put(namePrefix + getName(), getReport());
}
}
public static class Base implements EventSource {
protected static final String R_TYPE = "type";
protected static final String R_NUM_BYTES = "number of bytes";
protected static final String R_NUM_EVENTS = "number of events";
private long numEvents = 0;
private long numBytes = 0;
@Override
public void close()
throws IOException {
}
@Override
public Event
next()
throws IOException {
return null;
}
if (e == null)
return;
numBytes += e.getBody().length;
numEvents++;
}
@Override
public void open()
throws IOException {
}
@Override
return this.getClass().getSimpleName();
}
@Override
synchronized public ReportEvent
getReport() {
ReportEvent rpt = new ReportEvent(getName());
rpt.setStringMetric(R_TYPE, getName());
rpt.setLongMetric(R_NUM_BYTES, numBytes);
rpt.setLongMetric(R_NUM_EVENTS, numEvents);
return rpt;
}
@Override
public void getReports(String namePrefix, Map<String, ReportEvent> reports) {
reports.put(namePrefix + getName(), getReport());
}
}
}