package com.cloudera.flume.handlers.syslog;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.nio.ByteBuffer;
import java.util.Arrays;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudera.flume.conf.SourceFactory.SourceBuilder;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventSource;
import com.cloudera.flume.handlers.text.EventExtractException;
import com.cloudera.util.ByteBufferInputStream;
static final Logger LOG = LoggerFactory.getLogger(SyslogUdpSource.class);
final public static int SYSLOG_UDP_PORT = 514;
int port = SYSLOG_UDP_PORT;
int maxsize = 1 << 16;
long rejects = 0;
DatagramSocket sock;
}
this.port = port;
}
@Override
public void close()
throws IOException {
LOG.info("closing SyslogUdpSource on port " + port);
if (sock == null) {
LOG.warn("double close of SyslogUdpSocket on udp:" + port
+ " , (this is ok but odd)");
return;
}
sock.close();
}
@Override
public Event
next()
throws IOException {
byte[] buf = new byte[maxsize];
DatagramPacket pkt = new DatagramPacket(buf, maxsize);
Event e = null;
do {
sock.receive(pkt);
ByteBuffer bb = ByteBuffer.wrap(buf, 0, pkt.getLength());
ByteBufferInputStream bbis = new ByteBufferInputStream(bb);
DataInputStream in = new DataInputStream(bbis);
try {
e = SyslogWireExtractor.extractEvent(in);
} catch (EventExtractException ex) {
rejects++;
LOG.warn(rejects + " rejected packets. packet: " + pkt, ex);
LOG.debug("raw bytes " + Arrays.toString(pkt.getData()));
}
} while (e == null);
updateEventProcessingStats(e);
return e;
}
@Override
public void open()
throws IOException {
sock = new DatagramSocket(port);
}
public static SourceBuilder
builder() {
return new SourceBuilder() {
@Override
public EventSource
build(String... argv) {
int port = SYSLOG_UDP_PORT;
if (argv.length > 1) {
throw new IllegalArgumentException("usage: syslogUdp([port no]) ");
}
if (argv.length == 1) {
port = Integer.parseInt(argv[0]);
}
return new SyslogUdpSource(port);
}
};
}
}