package com.cloudera.flume.handlers.syslog;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventImpl;
import com.cloudera.flume.handlers.text.EventExtractException;
import com.cloudera.flume.handlers.text.Extractor;
import com.google.common.base.Preconditions;
static final Logger LOG =
LoggerFactory.getLogger(SyslogWireExtractor.class);
static SyslogWireExtractor format = new SyslogWireExtractor();
int slPrio = 0;
byte[] fac = e.get(SYSLOG_FACILITY);
if (fac == null || fac.length != 1) {
slPrio = 1 * 8;
} else {
slPrio = fac[0] * 8;
}
byte[] sev = e.get(SYSLOG_SEVERITY);
if (sev == null || sev.length != 1) {
slPrio += PRIO2SEVERITY[e.getPriority().ordinal()];
} else {
slPrio += sev[0];
}
return slPrio;
}
try {
int slPrio = calcSyslogPrio(e);
ByteArrayOutputStream bais = new ByteArrayOutputStream();
bais.write('<');
bais.write(("" + slPrio).getBytes());
bais.write('>');
bais.write(e.getBody());
bais.write('\n');
return bais.toByteArray();
} catch (IOException e1) {
LOG.warn("Ran out of bytes during extraction", e1);
}
return null;
}
public static Event (DataInputStream in)
throws EventExtractException {
return format.extract(in);
}
enum Mode {
START, PRIO, DATA, ERR
};
static Event
buildEvent(StringBuilder prio, ByteArrayOutputStream baos) {
int pri = Integer.parseInt(prio.toString());
byte[] facility = { (byte) (pri / 8) };
byte[] sev = { (byte) (pri % 8) };
Event e = new EventImpl(baos.toByteArray());
e.set(SYSLOG_FACILITY, facility);
e.set(SYSLOG_SEVERITY, sev);
return e;
}
public Event (DataInputStream in) throws EventExtractException {
Preconditions.checkNotNull(in);
Mode m = Mode.START;
StringBuilder prio = new StringBuilder();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
byte b = 0;
long cnt = 0;
try {
while (true) {
b = in.readByte();
cnt++;
switch (m) {
case START:
if (b == '<') {
m = Mode.PRIO;
} else {
m = Mode.ERR;
}
break;
case PRIO:
if (b == '>') {
m = Mode.DATA;
} else {
char ch = (char) b;
if (Character.isDigit(ch)) {
prio.append(ch);
} else {
m = Mode.ERR;
}
}
break;
case DATA:
if (b == '\n') {
Event e = buildEvent(prio, baos);
return e;
}
baos.write(b);
break;
case ERR:
if (b == '\n') {
throw new EventExtractException(
"Failed to extract syslog wire entry");
}
break;
}
}
} catch (EOFException e) {
switch (m) {
case ERR:
throw new EventExtractException("Failed to extract syslog wire entry");
case DATA:
return buildEvent(prio, baos);
default:
return null;
}
} catch (IOException e) {
throw new EventExtractException("Failed to extract syslog wire entry: "
+ e.getMessage());
}
}
return format.toBytes(e);
}
}