package com.cloudera.flume.reporter.ganglia;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.junit.Ignore;
import org.junit.Test;
import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.flume.conf.FlumeSpecException;
import com.cloudera.flume.core.Attributes;
import com.cloudera.flume.core.CompositeSink;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventImpl;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.core.FanOutSink;
import com.cloudera.flume.core.Attributes.Type;
import com.cloudera.util.Clock;
import com.cloudera.util.NetUtils;
private static final Logger LOG = Logger.getLogger(TestGangliaSink.class);
String ATTR_LONG = "long test metric";
String ATTR_INT = "int test metric";
String ATTR_STRING = "string test metric";
String ATTR_DOUBLE = "double test metric";
@Ignore("Slow test, requires human to verify that values show up in ganglia")
@Test
InterruptedException {
String svrs = "239.2.11.71";
EventSink lsnk = new GangliaSink(svrs, ATTR_LONG, "bytes", Type.LONG);
EventSink isnk = new GangliaSink(svrs, ATTR_INT, "bytes", Type.INT);
EventSink dsnk = new GangliaSink(svrs, ATTR_DOUBLE, "bytes", Type.DOUBLE);
EventSink snk = new FanOutSink<EventSink>(lsnk, isnk, dsnk);
snk.open();
for (int i = 0; i < 60; i++) {
Event e = new EventImpl("".getBytes());
Attributes.setLong(e, ATTR_LONG, i * 1000000);
Attributes.setInt(e, ATTR_INT, (int) (i * 1000000));
Attributes.setDouble(e, ATTR_DOUBLE, (double) (1.0 / (i % 20)));
snk.append(e);
Clock.sleep(1000);
}
snk.close();
}
@Test
EventSink snk = GangliaSink.builder().build(new Context(), "localhost",
"foo", "int");
for (int i = 0; i < 10; i++) {
snk.open();
snk.append(new EventImpl("".getBytes()));
snk.close();
}
EventSink snk4 = GangliaSink.builder().build(new Context(), "localhost",
"foo", "int", FlumeConfiguration.get().getGangliaServers());
for (int i = 0; i < 10; i++) {
snk4.open();
snk4.append(new EventImpl("".getBytes()));
snk4.close();
}
try {
GangliaSink.builder().build(new Context(), "localhost", "foo", "bar");
} catch (IllegalArgumentException e) {
return;
}
fail("expected failure");
}
@Test
EventSink snk = new CompositeSink(new Context(),
"ganglia(\"localhost\", \"foo\", \"int\")");
for (int i = 0; i < 10; i++) {
snk.open();
snk.append(new EventImpl("".getBytes()));
snk.close();
}
}
private boolean isConfigured = false;
private boolean hasData = false;
private byte[] byteData;
private int port;
public CountDownLatch listening = new CountDownLatch(1);
public CountDownLatch received = new CountDownLatch(1);
public CountDownLatch done = new CountDownLatch(1);
DatagramSocket s;
try {
s = new DatagramSocket();
setPort(s.getLocalPort());
setConfigured(true);
} catch (IOException e) {
LOG.warn(e);
listening.countDown();
received.countDown();
done.countDown();
return;
}
listening.countDown();
byte[] b = new byte[8192];
DatagramPacket info = new DatagramPacket(b, b.length);
try {
s.receive(info);
received.countDown();
} catch (IOException e) {
LOG.warn(e);
return;
}
LOG.info("Got a new packet, length " + info.getLength());
int bytesRead = info.getLength();
if (bytesRead > 0)
setHasData(true);
byteData = new byte[info.getLength()];
System.arraycopy(info.getData(), 0, byteData, 0, bytesRead);
done.countDown();
}
this.isConfigured = isConfigured;
}
return isConfigured;
}
this.hasData = hasData;
}
return hasData;
}
return byteData;
}
this.port = port;
}
return port;
}
}
@Test
String hostName = NetUtils.localhost();
GangliaSocketListener listener = new GangliaSocketListener();
Thread listenerThread = new Thread(listener);
listenerThread.start();
assertTrue("Took too long to bind to a port", listener.listening.await(5,
TimeUnit.SECONDS));
LOG.info("Listening to port " + listener.getPort());
EventSink ganglia = new GangliaSink(hostName + ":" + listener.getPort(),
"foo", "bars", Type.INT);
ganglia.open();
Event e = new EventImpl("baz".getBytes());
Attributes.setInt(e, "foo", 1337);
ganglia.append(e);
ganglia.close();
assertTrue("Took too long to recieve a packet", listener.received.await(5,
TimeUnit.SECONDS));
assertTrue("Did not receive proper packet", listener.done.await(5,
TimeUnit.SECONDS));
assertTrue("Did not recieve Ganglia data", listener.getHasData());
byte[] hostNameBytes = hostName.getBytes();
byte[] xdrBytes = listener.getBytes();
boolean hasHostname = false;
LOG.info("Checking to make sure that the Ganglia data contains host "
+ hostName);
for (int i = 0; i < xdrBytes.length - hostNameBytes.length; i++) {
hasHostname = true;
for (int j = 0; j < hostNameBytes.length; j++) {
if (xdrBytes[i + j] != hostNameBytes[j]) {
hasHostname = false;
break;
}
}
if (hasHostname)
break;
}
assertTrue("Did not correctly resolve hostname in Ganglia", hasHostname);
}
}