package com.cloudera.flume.reporter.ganglia;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.metrics.spi.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.flume.conf.SinkFactory.SinkBuilder;
import com.cloudera.flume.core.Attributes;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.core.Attributes.Type;
import com.cloudera.util.NetUtils;
import com.google.common.base.Preconditions;
private static final String DEFAULT_GROUP = "flume";
private static final int DEFAULT_TMAX = 60;
private static final int DEFAULT_DMAX = 0;
private static final int DEFAULT_PORT = 8649;
private static final int BUFFER_SIZE = 1500;
private static final Logger LOG = LoggerFactory.getLogger(GangliaSink.class);
private final String servers;
private static final Map<Type, String> typeTable =
new HashMap<Type, String>() {
private static final long serialVersionUID = 1L;
{
put(Type.STRING, "string");
put(Type.INT, "int32");
put(Type.LONG, "float");
put(Type.DOUBLE, "double");
}
};
private byte[] buffer = new byte[BUFFER_SIZE];
private int offset;
private List<? extends SocketAddress> metricsServers;
private DatagramSocket datagramSocket;
final private String attr;
final private String units;
final private Type type;
public GangliaSink(String gangliaSvrs, String attr, String units, Type t) {
this.servers = gangliaSvrs;
this.attr = attr;
this.units = units;
this.type = t;
}
@Override
public void append(Event e)
throws IOException {
String value;
switch (type) {
case LONG: {
Long l = Attributes.readLong(e, attr);
if (l == null) {
return;
}
value = l.toString();
break;
}
case INT: {
Integer i = Attributes.readInt(e, attr);
if (i == null) {
return;
}
value = i.toString();
break;
}
case STRING: {
String s = Attributes.readString(e, attr);
if (s == null) {
return;
}
value = s;
break;
}
case DOUBLE: {
Double d = Attributes.readDouble(e, attr);
if (d == null) {
return;
}
value = d.toString();
break;
}
default:
return;
}
emitMetric(attr, typeTable.get(type), value, units);
super.append(e);
}
@Override
public void open()
throws IOException {
metricsServers = Util.parse(servers, DEFAULT_PORT);
try {
datagramSocket = new DatagramSocket();
} catch (SocketException se) {
LOG.warn("problem with ganglia socket", se);
}
}
@Override
public void close()
throws IOException {
if (datagramSocket == null) {
LOG.warn("Double close");
return;
}
datagramSocket.close();
datagramSocket = null;
}
private void emitMetric(String name, String type, String value, String units)
throws IOException {
int slope = 3;
int tmax = DEFAULT_TMAX;
int dmax = DEFAULT_DMAX;
String hostName = NetUtils.localhost();
xdr_int(128);
xdr_string(hostName);
xdr_string(name);
xdr_int(0);
xdr_string(type);
xdr_string(name);
xdr_string(units);
xdr_int(slope);
xdr_int(tmax);
xdr_int(dmax);
xdr_int(1);
xdr_string("GROUP");
xdr_string(DEFAULT_GROUP);
for (SocketAddress socketAddress : metricsServers) {
DatagramPacket packet = new DatagramPacket(buffer, offset, socketAddress);
datagramSocket.send(packet);
}
offset = 0;
xdr_int(133);
xdr_string(hostName);
xdr_string(name);
xdr_int(0);
xdr_string("%s");
xdr_string(value);
for (SocketAddress socketAddress : metricsServers) {
DatagramPacket packet = new DatagramPacket(buffer, offset, socketAddress);
datagramSocket.send(packet);
}
}
byte[] bytes = s.getBytes();
int len = bytes.length;
xdr_int(len);
System.arraycopy(bytes, 0, buffer, offset, len);
offset += len;
pad();
}
int newOffset = ((offset + 3) / 4) * 4;
while (offset < newOffset) {
buffer[offset++] = 0;
}
}
buffer[offset++] = (byte) ((i >> 24) & 0xff);
buffer[offset++] = (byte) ((i >> 16) & 0xff);
buffer[offset++] = (byte) ((i >> 8) & 0xff);
buffer[offset++] = (byte) (i & 0xff);
}
public static SinkBuilder
builder() {
return new SinkBuilder() {
@Override
public EventSink
build(Context context, String... argv) {
Preconditions.checkArgument(argv.length >= 3 && argv.length <= 4,
"usage: ganglia(\"attr\", \"units\",\"type\"[, \"gmondservers\"])");
String ganglias = FlumeConfiguration.get().getGangliaServers();
String attr = argv[0];
String units = argv[1];
String type = argv[2];
Type t = null;
if (type.equals("int")) {
t = Type.INT;
} else if (type.equals("long")) {
t = Type.LONG;
} else if (type.equals("double")) {
t = Type.DOUBLE;
} else if (type.equals("string")) {
t = Type.STRING;
} else {
throw new IllegalArgumentException(
"Illegal ganglia xdr type: "
+ type
+ " != int|long|double|string\n"
+ "usage: ganglia(\"attr\", \"units\",\"type\"[, \"gmondservers\"]");
}
if (argv.length >= 4) {
ganglias = argv[3];
}
EventSink snk = new GangliaSink(ganglias, attr, units, t);
return snk;
}
};
}
}