package com.cloudera.flume.handlers.text;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.mortbay.log.Log;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.handlers.text.TailSource.Cursor;
import com.cloudera.util.Clock;
import com.cloudera.util.OSUtils;
@Before
Logger.getLogger(TailSource.class).setLevel(Level.DEBUG);
}
File f = File.createTempFile("tail", ".tmp");
f.deleteOnExit();
FileWriter fw = new FileWriter(f);
for (int i = 0; i < count; i++) {
fw.write("test " + i + "\n");
fw.flush();
}
fw.close();
return f;
}
void appendData(File f,
int start,
int count)
throws IOException {
FileWriter fw = new FileWriter(f, true);
for (int i = start; i < start + count; i++) {
fw.write("test " + i + "\n");
fw.flush();
}
fw.close();
}
@Test
BlockingQueue<Event> q = new ArrayBlockingQueue<Event>(100);
File f = createDataFile(5);
Cursor c = new Cursor(q, f);
assertTrue(c.tailBody());
assertTrue(c.tailBody());
assertFalse(c.tailBody());
assertEquals(5, q.size());
}
@Test
InterruptedException {
File f2 = File.createTempFile("move", ".tmp");
f2.delete();
f2.deleteOnExit();
BlockingQueue<Event> q = new ArrayBlockingQueue<Event>(100);
File f = createDataFile(5);
Cursor c = new Cursor(q, f);
assertTrue(c.tailBody());
f.renameTo(f2);
assertTrue(c.tailBody());
assertEquals(5, q.size());
assertFalse(c.tailBody());
assertEquals(5, q.size());
assertFalse(c.tailBody());
}
@Test
@Ignore("When a file rotates in with the same size, we cannot tell!")
InterruptedException {
File f2 = File.createTempFile("move", ".tmp");
f2.delete();
f2.deleteOnExit();
BlockingQueue<Event> q = new ArrayBlockingQueue<Event>(100);
File f = createDataFile(5);
Cursor c = new Cursor(q, f);
assertTrue(c.tailBody());
f.renameTo(f2);
Clock.sleep(1000);
appendData(f, 5, 5);
assertTrue(c.tailBody());
assertEquals(5, q.size());
assertTrue(c.tailBody());
assertTrue(c.tailBody());
assertEquals(5, q.size());
}
@Test
InterruptedException {
Assume.assumeTrue(!OSUtils.isWindowsOS());
File f2 = File.createTempFile("move", ".tmp");
f2.delete();
f2.deleteOnExit();
BlockingQueue<Event> q = new ArrayBlockingQueue<Event>(100);
File f = createDataFile(5);
Cursor c = new Cursor(q, f);
assertTrue(c.tailBody());
f.renameTo(f2);
Clock.sleep(1000);
appendData(f, 5, 6);
assertTrue(c.tailBody());
assertEquals(5, q.size());
assertTrue(c.tailBody());
assertTrue(c.tailBody());
assertFalse(c.tailBody());
assertEquals(11, q.size());
assertFalse(c.tailBody());
assertEquals(11, q.size());
}
@Test
InterruptedException {
Assume.assumeTrue(!OSUtils.isWindowsOS());
File f2 = File.createTempFile("move", ".tmp");
f2.delete();
f2.deleteOnExit();
BlockingQueue<Event> q = new ArrayBlockingQueue<Event>(100);
File f = createDataFile(5);
Cursor c = new Cursor(q, f);
assertTrue(c.tailBody());
f.renameTo(f2);
Clock.sleep(1000);
appendData(f, 5, 4);
assertTrue(c.tailBody());
assertEquals(5, q.size());
assertTrue(c.tailBody());
assertTrue(c.tailBody());
assertFalse(c.tailBody());
assertEquals(9, q.size());
}
@Test
throws IOException, InterruptedException {
Assume.assumeTrue(!OSUtils.isWindowsOS());
File f2 = File.createTempFile("move", ".tmp");
f2.delete();
f2.deleteOnExit();
BlockingQueue<Event> q = new ArrayBlockingQueue<Event>(100);
File f = createDataFile(5);
Cursor c = new Cursor(q, f);
assertTrue(c.tailBody());
f.renameTo(f2);
assertTrue(c.tailBody());
assertEquals(5, q.size());
assertFalse(c.tailBody());
Clock.sleep(1000);
appendData(f, 5, 5);
assertTrue(c.tailBody());
assertTrue(c.tailBody());
assertFalse(c.tailBody());
assertEquals(10, q.size());
}
@Test
throws IOException, InterruptedException {
Assume.assumeTrue(!OSUtils.isWindowsOS());
File f2 = File.createTempFile("move", ".tmp");
f2.delete();
f2.deleteOnExit();
BlockingQueue<Event> q = new ArrayBlockingQueue<Event>(100);
File f = createDataFile(5);
Cursor c = new Cursor(q, f);
assertTrue(c.tailBody());
assertTrue(c.tailBody());
assertEquals(5, q.size());
assertFalse(c.tailBody());
assertFalse(c.tailBody());
f.renameTo(f2);
Clock.sleep(1000);
appendData(f, 5, 5);
assertTrue(c.tailBody());
assertTrue(c.tailBody());
assertTrue(c.tailBody());
assertFalse(c.tailBody());
assertEquals(10, q.size());
}
@Test
InterruptedException {
BlockingQueue<Event> q = new ArrayBlockingQueue<Event>(10);
File f = createDataFile(5);
f.deleteOnExit();
Cursor c = new Cursor(q, f);
assertTrue(c.tailBody());
assertTrue(c.tailBody());
appendData(f, 5, 5);
assertTrue(c.tailBody());
assertEquals(10, q.size());
}
@Test
BlockingQueue<Event> q = new ArrayBlockingQueue<Event>(10);
File f = File.createTempFile("appear", ".tmp");
f.delete();
f.deleteOnExit();
Cursor c = new Cursor(q, f);
assertFalse(c.tailBody());
assertFalse(c.tailBody());
assertEquals(0, c.lastChannelSize);
assertEquals(null, c.in);
appendData(f, 0, 5);
assertTrue(c.tailBody());
assertEquals(0, c.lastChannelPos);
assertTrue(null != c.in);
assertTrue(c.tailBody());
assertTrue(0 != c.lastChannelSize);
assertTrue(null != c.in);
assertFalse(c.tailBody());
assertEquals(5, q.size());
}
@Test
BlockingQueue<Event> q = new ArrayBlockingQueue<Event>(10);
File f = File.createTempFile("appear", ".tmp");
f.delete();
f.deleteOnExit();
Cursor c = new Cursor(q, f);
assertFalse(c.tailBody());
assertFalse(c.tailBody());
assertEquals(0, c.lastChannelSize);
assertEquals(null, c.in);
FileWriter fw = new FileWriter(f);
fw.append("No new line");
fw.close();
assertTrue(c.tailBody());
assertEquals(0, c.lastChannelPos);
assertTrue(null != c.in);
assertTrue(c.tailBody());
assertTrue(0 != c.lastChannelSize);
assertTrue(null != c.in);
assertFalse(c.tailBody());
c.flush();
assertEquals(1, q.size());
boolean append = true;
FileWriter fw2 = new FileWriter(f, append);
fw2.append("more no new line");
fw2.close();
assertTrue(c.tailBody());
assertTrue(0 != c.lastChannelSize);
assertTrue(null != c.in);
assertTrue(c.tailBody());
assertEquals(1, q.size());
c.flush();
assertEquals(2, q.size());
}
@Test
BlockingQueue<Event> q = new ArrayBlockingQueue<Event>(10);
File f = createDataFile(5);
Cursor c = new Cursor(q, f);
assertTrue(c.tailBody());
assertEquals(0, c.lastChannelPos);
assertTrue(null != c.in);
assertTrue(c.tailBody());
assertTrue(0 != c.lastChannelSize);
assertTrue(null != c.in);
assertFalse(c.tailBody());
assertEquals(5, q.size());
RandomAccessFile raf = new RandomAccessFile(f, "rw");
raf.setLength(10);
raf.close();
assertTrue(c.tailBody());
assertEquals(0, c.lastChannelPos);
assertEquals(null, c.in);
assertTrue(c.tailBody());
assertTrue(0 != c.lastChannelSize);
assertTrue(null != c.in);
assertTrue(c.tailBody());
assertTrue(0 != c.lastChannelSize);
assertTrue(null != c.in);
assertFalse(c.tailBody());
assertEquals(6, q.size());
c.flush();
assertTrue(c.tailBody());
assertEquals(7, q.size());
}
@Test
BlockingQueue<Event> q = new ArrayBlockingQueue<Event>(100);
File f1 = createDataFile(5);
Cursor c1 = new Cursor(q, f1);
File f2 = createDataFile(5);
Cursor c2 = new Cursor(q, f2);
assertTrue(c1.tailBody());
assertTrue(c2.tailBody());
assertTrue(c1.tailBody());
assertTrue(c2.tailBody());
assertFalse(c1.tailBody());
assertFalse(c2.tailBody());
assertEquals(10, q.size());
appendData(f1, 5, 5);
assertTrue(c1.tailBody());
assertFalse(c2.tailBody());
assertEquals(15, q.size());
appendData(f2, 5, 5);
assertFalse(c1.tailBody());
assertTrue(c2.tailBody());
assertEquals(20, q.size());
assertFalse(c1.tailBody());
assertFalse(c2.tailBody());
}
@Test
File f = File.createTempFile("fdes", ".tmp");
f.deleteOnExit();
File f2 = File.createTempFile("fdes", ".tmp");
f2.delete();
f2.deleteOnExit();
FileInputStream fis = new FileInputStream(f);
FileDescriptor fd = fis.getFD();
f.renameTo(f2);
FileDescriptor fd2 = fis.getFD();
assertEquals(fd, fd2);
new File(f.getAbsolutePath()).createNewFile();
FileInputStream fis2 = new FileInputStream(f);
FileDescriptor fd3 = fis2.getFD();
assertTrue(fd3 != fd);
}
@Test
File f = File.createTempFile("fdes", ".tmp");
f.deleteOnExit();
File f2 = File.createTempFile("fdes", ".tmp");
f2.delete();
f2.deleteOnExit();
FileOutputStream fos = new FileOutputStream(f);
FileInputStream fis = new FileInputStream(f);
FileChannel fc = fis.getChannel();
ByteBuffer bb = ByteBuffer.allocate(1 * 1024 * 1024);
int read = fc.read(bb);
assertEquals(-1, read);
fos.write("test\n".getBytes());
fos.flush();
long sz = (long) fc.size();
assertEquals(5, sz);
read = fc.read(bb);
assertEquals(5, read);
byte b[] = new byte[(int) sz];
bb.flip();
bb.get(b);
bb.flip();
f.renameTo(f2);
fos.write("test\n".getBytes());
fos.flush();
sz = (long) fc.size();
assertEquals(10, sz);
read = fc.read(bb);
assertEquals(5, read);
b = new byte[(int) read];
bb.flip();
bb.get(b);
bb.flip();
fos.write("test\n".getBytes());
fos.flush();
sz = (long) fc.size();
assertEquals(15, sz);
read = fc.read(bb);
assertEquals(5, read);
}
@Test
File f = File.createTempFile("first", ".tmp");
f.deleteOnExit();
File f2 = File.createTempFile("second", ".tmp");
f2.delete();
f2.deleteOnExit();
RandomAccessFile raf = new RandomAccessFile(f, "r");
FileChannel ch = raf.getChannel();
ByteBuffer buf = ByteBuffer.allocate(4);
FileWriter fw = new FileWriter(f);
fw.write("this test is 20bytes");
fw.flush();
assertEquals(4, ch.read(buf));
buf.flip();
assertEquals(4, ch.read(buf));
buf.flip();
f.renameTo(f2);
assertEquals(4, ch.read(buf));
buf.flip();
assertEquals(4, ch.read(buf));
buf.flip();
assertEquals(4, ch.read(buf));
assertEquals("ytes", new String(buf.array()));
buf.flip();
}
@Test
File f = File.createTempFile("first", ".tmp");
f.deleteOnExit();
File f2 = File.createTempFile("second", ".tmp");
f2.delete();
f2.deleteOnExit();
RandomAccessFile raf = new RandomAccessFile(f, "r");
FileDescriptor fd = raf.getFD();
RandomAccessFile raf2 = new RandomAccessFile(f, "r");
FileDescriptor fd2 = raf2.getFD();
assertFalse(fd.equals(fd2));
}
@Test
File f = File.createTempFile("first", ".tmp");
f.deleteOnExit();
FileWriter fw = new FileWriter(f);
fw.append("this is a test");
fw.close();
RandomAccessFile raf = new RandomAccessFile(f, "r");
String line = raf.readLine();
assertEquals("this is a test", line);
RandomAccessFile raf2 = new RandomAccessFile(f, "rw");
raf2.setLength(4);
raf2.close();
String line2 = raf.readLine();
assertEquals(null, line2);
assertEquals(14, raf.getFilePointer());
assertEquals(4, f.length());
}
@Test
Assume.assumeTrue(!OSUtils.isWindowsOS());
File f = File.createTempFile("tailexhaust", ".txt");
f.deleteOnExit();
File f2 = File.createTempFile("tailexhaust", ".txt");
f2.deleteOnExit();
BlockingQueue<Event> q = new ArrayBlockingQueue<Event>(100);
Cursor c = new Cursor(q, f);
for (int i = 0; i < 3000; i++) {
f2.delete();
appendData(f, i * 5, 5);
assertTrue(c.tailBody());
f.renameTo(f2);
assertTrue(c.tailBody());
assertEquals(5, q.size());
assertFalse(c.tailBody());
assertEquals(5, q.size());
q.clear();
}
Log.info("file handles didn't leak!");
}
}