package com.cloudera.flume.handlers.batch;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.zip.GZIPOutputStream;
import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.SinkFactory.SinkDecoBuilder;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventImpl;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.core.EventSinkDecorator;
import com.cloudera.flume.handlers.hdfs.WriteableEvent;
import com.google.common.base.Preconditions;
public class GzipDecorator<S
extends EventSink>
extends EventSinkDecorator<S> {
super(s);
}
@Override
public void append(Event e)
throws IOException {
WriteableEvent we = new WriteableEvent(e);
byte[] bs = we.toBytes();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream gzos = new GZIPOutputStream(baos);
gzos.write(bs);
gzos.close();
Event gze = new EventImpl(new byte[0]);
gze.set(GunzipDecorator.GZDOC, baos.toByteArray());
super.append(gze);
}
public static SinkDecoBuilder
builder() {
return new SinkDecoBuilder() {
@Override
public EventSinkDecorator<EventSink>
build(Context context,
String... argv) {
Preconditions.checkArgument(argv.length == 0, "usage: gzip");
return new GzipDecorator<EventSink>(null);
}
};
}
}