package com.cloudera.flume.handlers.avro;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumWriter;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventImpl;
import com.cloudera.flume.handlers.text.FormatFactory.OutputFormatBuilder;
import com.cloudera.flume.handlers.text.output.AbstractOutputFormat;
import com.cloudera.flume.handlers.text.output.OutputFormat;
import com.google.common.base.Preconditions;
final static ReflectData reflectData = ReflectData.get();
final static Schema schema = reflectData.getSchema(EventImpl.class);
private static final String NAME = "avrodata";
DatumWriter<EventImpl> writer = new ReflectDatumWriter<EventImpl>(schema);
DataFileWriter<EventImpl> sink = null;
OutputStream cachedOut = null;
}
@Override
public void format(OutputStream o, Event e)
throws IOException {
if (sink == null) {
cachedOut = o;
sink = new DataFileWriter<EventImpl>(writer);
sink.create(schema, o);
}
if (cachedOut != o) {
throw new IOException(
"OutputFormat instance can only write to the same OutputStream");
}
EventImpl ei = null;
if (e instanceof EventImpl) {
ei = (EventImpl) e;
} else {
ei = new EventImpl(e);
}
sink.append(ei);
sink.flush();
}
public static OutputFormatBuilder
builder() {
return new OutputFormatBuilder() {
@Override
public OutputFormat
build(String... args) {
Preconditions.checkArgument(args.length == 0, "usage: avrodata");
OutputFormat format = new AvroDataFileOutputFormat();
format.setBuilder(this);
return format;
}
@Override
return NAME;
}
};
}
}