package com.cloudera.flume.reporter.sampler;
import java.io.IOException;
import java.util.List;
import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.SinkFactory.SinkDecoBuilder;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.core.EventSinkDecorator;
import com.cloudera.util.ReservoirSampler;
import com.google.common.base.Preconditions;
EventSinkDecorator<R> {
final ReservoirSampler<Event> sampler;
super(snk);
this.sampler = new ReservoirSampler<Event>(samples);
}
@Override
public void close()
throws IOException {
flush();
super.close();
}
public void flush()
throws IOException {
Preconditions.checkNotNull(sampler);
List<Event> es = sampler.sample();
for (Event e : es) {
getSink().append(e);
}
sampler.clear();
}
@Override
sampler.onNext(v);
}
public static SinkDecoBuilder
builder() {
return new SinkDecoBuilder() {
@Override
public EventSinkDecorator<EventSink>
build(Context context,
String... argv) {
Preconditions.checkArgument(argv.length == 1,
"usage: reservoirSampler(sample)");
int sample = Integer.parseInt(argv[0]);
return new ReservoirSamplerDeco<EventSink>(null, sample);
}
};
}
}