package com.cloudera.flume.handlers.text;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.flume.conf.SourceFactory.SourceBuilder;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventImpl;
import com.cloudera.flume.core.EventSource;
import com.cloudera.util.Clock;
import com.google.common.base.Preconditions;
public class TailSource extends EventSource.Base {
private static final Logger LOG = LoggerFactory.getLogger(TailSource.class);
public static final String A_TAILSRCFILE = "tailSrcFile";
private static int thdCount = 0;
private volatile boolean done = false;
private final long sleepTime;
final List<Cursor> cursors = new ArrayList<Cursor>();
private final List<Cursor> newCursors = new ArrayList<Cursor>();
private final List<Cursor> rmCursors = new ArrayList<Cursor>();
final SynchronousQueue<Event> sync = new SynchronousQueue<Event>();
private TailThread thd = null;
public TailSource(File f,
long offset,
long waitTime) {
this(f, offset, waitTime, false);
}
public TailSource(File f,
long offset,
long waitTime,
boolean startFromEnd) {
Preconditions.checkArgument(offset >= 0 || startFromEnd,
"offset needs to be >=0 or startFromEnd needs to be true");
Preconditions.checkNotNull(f);
Preconditions.checkArgument(waitTime > 0);
this.sleepTime = waitTime;
long fileLen = f.length();
long readOffset = startFromEnd ? fileLen : offset;
long modTime = f.lastModified();
Cursor c = new Cursor(sync, f, readOffset, fileLen, modTime);
addCursor(c);
}
this.sleepTime = waitTime;
}
final BlockingQueue<Event> sync;
final File file;
final ByteBuffer buf = ByteBuffer.allocateDirect(Short.MAX_VALUE);
RandomAccessFile raf = null;
FileChannel in = null;
long lastFileMod;
long lastChannelPos;
long lastChannelSize;
int readFailures;
Cursor(BlockingQueue<Event> sync, File f) {
this(sync, f, 0, 0, 0);
}
Cursor(BlockingQueue<Event> sync, File f,
long lastReadOffset,
long lastFileLen, long lastMod) {
this.sync = sync;
this.file = f;
this.lastChannelPos = lastReadOffset;
this.lastChannelSize = lastFileLen;
this.lastFileMod = lastMod;
this.readFailures = 0;
}
try {
LOG.debug("initCursorPos " + file);
raf = new RandomAccessFile(file, "r");
raf.seek(lastChannelPos);
in = raf.getChannel();
} catch (FileNotFoundException e) {
resetRAF();
} catch (IOException e) {
resetRAF();
}
}
void flush()
throws InterruptedException {
if (raf != null) {
try {
raf.close();
} catch (IOException e) {
LOG.error("problem closing file " + e.getMessage(), e);
}
}
buf.flip();
int remaining = buf.remaining();
if (remaining > 0) {
byte[] body = new byte[remaining];
buf.get(body, 0, remaining);
Event e = new EventImpl(body);
e.set(A_TAILSRCFILE, file.getName().getBytes());
try {
sync.put(e);
} catch (InterruptedException e1) {
LOG.error("interruptedException! " + e1.getMessage(), e1);
throw e1;
}
}
in = null;
buf.clear();
}
void resetRAF()
throws InterruptedException {
LOG.debug("reseting cursor");
flush();
lastChannelPos = 0;
lastFileMod = 0;
readFailures = 0;
}
LOG.debug("tail " + file + " : recheck");
if (file.isDirectory()) {
IOException ioe = new IOException("Tail expects a file '" + file
+ "', but it is a dir!");
LOG.error(ioe.getMessage());
throw ioe;
}
if (!file.exists()) {
LOG.debug("Tail '" + file + "': nothing to do, waiting for a file");
return false;
}
if (!file.canRead()) {
throw new IOException("Permission denied on " + file);
}
try {
if (in != null) {
if (lastFileMod == file.lastModified()
&& lastChannelPos == file.length()) {
LOG.debug("Tail '" + file + "': recheck still the same");
return false;
}
}
raf = new RandomAccessFile(file, "r");
lastFileMod = file.lastModified();
in = raf.getChannel();
lastChannelPos = in.position();
lastChannelSize = in.size();
LOG.debug("Tail '" + file + "': opened last mod=" + lastFileMod
+ " lastChannelPos=" + lastChannelPos + " lastChannelSize="
+ lastChannelSize);
return true;
} catch (FileNotFoundException fnfe) {
LOG.debug("Tail '" + file
+ "': a file existed then disappeared, odd but continue");
return false;
}
}
boolean (ByteBuffer buf, long fmod) throws IOException,
InterruptedException {
boolean madeProgress = false;
int start = buf.position();
buf.mark();
while (buf.hasRemaining()) {
byte b = buf.get();
if (b == '\n') {
int end = buf.position();
int sz = end - start;
byte[] body = new byte[sz - 1];
buf.reset();
buf.get(body, 0, sz - 1);
buf.get();
buf.mark();
start = buf.position();
lastChannelPos = in.position();
lastFileMod = fmod;
Event e = new EventImpl(body);
e.set(A_TAILSRCFILE, file.getName().getBytes());
sync.put(e);
madeProgress = true;
}
}
buf.reset();
buf.compact();
return madeProgress;
}
boolean tailBody()
throws InterruptedException {
try {
if (in == null) {
LOG.debug("tail " + file + " : cur file is null");
return checkForUpdates();
}
long flen = file.length();
long chlen = in.size();
long fmod = file.lastModified();
lastChannelSize = chlen;
if (chlen == flen && lastChannelPos == flen) {
if (lastFileMod == fmod) {
LOG.debug("tail " + file + " : no change");
return false;
} else {
LOG.debug("tail " + file
+ " : same file len, but new last mod time" + " -> reset");
resetRAF();
return true;
}
}
LOG.debug("tail " + file + " : file changed");
LOG.debug("tail " + file + " : old size, mod time " + lastChannelPos
+ "," + lastFileMod);
LOG.debug("tail " + file + " : new size, " + "mod time " + flen + ","
+ fmod);
if (lastChannelPos > flen) {
LOG.debug("tail " + file + " : file truncated!?");
resetRAF();
return true;
}
boolean madeProgress = false;
int rd;
while ((rd = in.read(buf)) > 0) {
lastChannelPos += (rd < 0 ? 0 : rd);
int lastRd = 0;
int loops = 0;
boolean progress = false;
do {
if (lastRd == -1 && rd == -1) {
return madeProgress;
}
buf.flip();
progress = extractLines(buf, fmod);
if (progress) {
madeProgress = true;
}
lastRd = rd;
loops++;
} while (progress);
}
if (rd == -1 && flen != lastChannelSize) {
LOG.debug("tail " + file
+ " : no progress but raflen != filelen, resetting");
resetRAF();
return true;
}
LOG.debug("tail " + file + ": read " + lastChannelPos + " bytes");
} catch (IOException e) {
LOG.debug(e.getMessage(), e);
in = null;
readFailures++;
if (readFailures > 3) {
LOG.warn("Encountered " + readFailures + " failures on "
+ file.getAbsolutePath() + " - sleeping");
return false;
}
}
return true;
}
};
super("TailThread-" + thdCount++);
}
@Override
try {
for (Cursor c : cursors) {
c.initCursorPos();
}
while (!done) {
synchronized (newCursors) {
cursors.addAll(newCursors);
newCursors.clear();
}
synchronized (rmCursors) {
cursors.removeAll(rmCursors);
for (Cursor c : rmCursors) {
c.flush();
}
rmCursors.clear();
}
boolean madeProgress = false;
for (Cursor c : cursors) {
LOG.debug("Progress loop: " + c.file);
if (c.tailBody()) {
madeProgress = true;
}
}
if (!madeProgress) {
Clock.sleep(sleepTime);
}
}
LOG.debug("Tail got done flag");
} catch (InterruptedException e) {
LOG.error("tail unexpected interrupted: " + e.getMessage(), e);
} finally {
LOG.info("TailThread has exited");
}
}
}
Preconditions.checkArgument(cursor != null);
if (thd == null) {
cursors.add(cursor);
LOG.debug("Unstarted Tail has added cursor: " + cursor.file.getName());
} else {
synchronized (newCursors) {
newCursors.add(cursor);
}
LOG.debug("Tail added new cursor to new cursor list: "
+ cursor.file.getName());
}
}
Preconditions.checkArgument(cursor != null);
if (thd == null) {
cursors.remove(cursor);
} else {
synchronized (rmCursors) {
rmCursors.add(cursor);
}
}
}
@Override
public void close()
throws IOException {
synchronized (this) {
done = true;
thd = null;
}
}
@Override
public Event
next()
throws IOException {
try {
while (!done) {
Event e = sync.poll(100, TimeUnit.MILLISECONDS);
if (e == null)
continue;
updateEventProcessingStats(e);
return e;
}
return null;
} catch (InterruptedException e1) {
LOG.warn("next unexpectedly interrupted :" + e1.getMessage(), e1);
Thread.currentThread().interrupt();
throw new IOException(e1.getMessage());
}
}
@Override
synchronized public void open()
throws IOException {
if (thd != null) {
throw new IllegalStateException("Attempted to open tail source twice!");
}
thd = new TailThread();
thd.start();
}
public static SourceBuilder
builder() {
return new SourceBuilder() {
@Override
public EventSource
build(String... argv) {
if (argv.length != 1 && argv.length != 2) {
throw new IllegalArgumentException(
"usage: tail(filename, [startFromEnd]) ");
}
boolean startFromEnd = false;
if (argv.length == 2) {
startFromEnd = Boolean.parseBoolean(argv[1]);
}
return new TailSource(new File(argv[0]), 0, FlumeConfiguration.get()
.getTailPollPeriod(), startFromEnd);
}
};
}
return new SourceBuilder() {
@Override
public EventSource
build(String... argv) {
Preconditions.checkArgument(argv.length >= 1,
"usage: multitail(file1[, file2[, ...]]) ");
boolean startFromEnd = false;
long pollPeriod = FlumeConfiguration.get().getTailPollPeriod();
TailSource src = null;
for (int i = 0; i < argv.length; i++) {
if (src == null) {
src = new TailSource(new File(argv[i]), 0, pollPeriod, startFromEnd);
} else {
src.addCursor(new Cursor(src.sync, new File(argv[i])));
}
}
return src;
}
};
}
}