package com.cloudera.flume.handlers.syslog;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import com.cloudera.flume.conf.SourceFactory.SourceBuilder;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventSource;
import com.cloudera.flume.handlers.text.EventExtractException;
import com.google.common.base.Preconditions;
public final static int SYSLOG_TCP_PORT = 514;
ServerSocket sock = null;
int port = SYSLOG_TCP_PORT;
DataInputStream is;
long rejects = 0;
this.port = port;
}
}
@Override
public void close()
throws IOException {
sock.close();
sock = null;
}
@Override
public Event
next()
throws IOException {
Event e = null;
while (true) {
try {
e = SyslogWireExtractor.extractEvent(is);
updateEventProcessingStats(e);
return e;
} catch (EventExtractException ex) {
rejects++;
}
}
}
@Override
public void open()
throws IOException {
Preconditions.checkState(sock == null);
sock = new ServerSocket(port);
Socket client = sock.accept();
is = new DataInputStream(client.getInputStream());
}
public static SourceBuilder
builder() {
return new SourceBuilder() {
@Override
public EventSource
build(String... argv) {
int port = SYSLOG_TCP_PORT;
if (argv.length > 1) {
throw new IllegalArgumentException("usage: syslogTcp1([port no]) ");
}
if (argv.length == 1) {
port = Integer.parseInt(argv[0]);
}
return new SyslogTcpSource(port);
}
};
}
}