package com.cloudera.flume.handlers.hdfs;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.io.Writable;
import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventBaseImpl;
import com.cloudera.flume.core.EventImpl;
import com.google.common.base.Preconditions;
public class WriteableEvent extends EventBaseImpl
implements Writable {
final static long MAX_BODY_SIZE = FlumeConfiguration.get().getEventMaxSizeBytes();
private Event e;
this(new EventImpl("".getBytes()));
}
assert (e != null);
this.e = e;
}
public static WriteableEvent
create(
byte[] raw)
throws IOException {
WriteableEvent e = new WriteableEvent();
DataInput in = new DataInputStream(new ByteArrayInputStream(raw));
e.readFields(in);
return e;
}
return e;
}
return e.getBody();
}
return e.getPriority();
}
return e.getTimestamp();
}
@Override
return e.getNanos();
}
@Override
return e.getHost();
}
return new WriteableEventKey(e);
}
public void readFields(DataInput in)
throws IOException {
int len = in.readInt();
Preconditions.checkArgument((len >= 0) && (len <= MAX_BODY_SIZE), "byte length is %s which is not <= %s and >= 0", len, MAX_BODY_SIZE);
byte[] body = new byte[len];
in.readFully(body);
long time = in.readLong();
int prioidx = in.readInt();
assert (Priority.values().length > prioidx);
Priority prio = Priority.values()[prioidx];
long nanos = in.readLong();
String host = in.readUTF();
Map<String, byte[]> fields = unserializeMap(in);
e = new EventImpl(body, time, prio, nanos, host, fields);
}
public void write(DataOutput out)
throws IOException {
byte[] utf8 = getBody();
out.writeInt(utf8.length);
out.write(utf8);
out.writeLong(getTimestamp());
out.writeInt(getPriority().ordinal());
out.writeLong(getNanos());
out.writeUTF(getHost());
serializeMap(out, e.getAttrs());
}
throws IOException {
WriteableEvent we = new WriteableEvent();
DataInput in = new DataInputStream(new ByteArrayInputStream(bytes));
we.readFields(in);
return we;
}
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream(2 >> 15);
DataOutput out = new DataOutputStream(baos);
write(out);
return baos.toByteArray();
} catch (IOException ioe) {
assert (false);
return null;
}
}
@Override
public byte[]
get(String attr) {
return e.get(attr);
}
public static DataOutput
serializeMap(DataOutput out, Map<String,
byte[]> m)
throws IOException {
int sz = m.size();
out.writeInt(sz);
for (Entry<String, byte[]> e : m.entrySet()) {
out.writeUTF(e.getKey());
byte[] v = e.getValue();
out.writeInt(v.length);
out.write(v);
}
return out;
}
throws IOException {
int sz = in.readInt();
Map<String, byte[]> fields = new HashMap<String, byte[]>();
for (int i = 0; i < sz; i++) {
String f = in.readUTF();
int l = in.readInt();
byte[] val = new byte[l];
in.readFully(val);
fields.put(f, val);
}
return fields;
}
public static DataOutput
serializeList(DataOutput out, List<
byte[]> l)
throws IOException {
int sz = l.size();
out.writeInt(sz);
for (byte[] v : l) {
out.writeInt(v.length);
out.write(v);
}
return out;
}
public static List<
byte[]>
unserializeList(DataInput in)
throws IOException {
int sz = in.readInt();
List<byte[]> l = new ArrayList<byte[]>(sz);
for (int i = 0; i < sz; i++) {
int vsz = in.readInt();
byte[] v = new byte[vsz];
in.readFully(v);
l.add(v);
}
return l;
}
@Override
return Collections.unmodifiableMap(e.getAttrs());
}
@Override
public void set(String attr,
byte[] v) {
e.set(attr, v);
}
}