package com.cloudera.flume.handlers.text;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudera.util.Clock;
public static final Logger LOG = LoggerFactory.getLogger(TestFileNIO.class);
public CountDownLatch
slowWrite(File f,
final int sleep)
throws IOException {
final FileOutputStream fos = new FileOutputStream(f);
final FileChannel out = fos.getChannel();
final CountDownLatch done = new CountDownLatch(1);
Thread t = new Thread() {
try {
for (int i = 0; i < 100; i++) {
ByteBuffer buf = ByteBuffer.wrap(("test " + i + "\n").getBytes());
out.write(buf);
if (sleep > 0) {
Clock.sleep(sleep);
}
}
fos.close();
done.countDown();
} catch (Exception e) {
LOG.error("Exception when running thread", e);
}
}
};
t.start();
return done;
}
@Test
File f1 = File.createTempFile("moved", "");
f1.delete();
f1.deleteOnExit();
File f2 = File.createTempFile("orig", "");
f2.deleteOnExit();
f2.renameTo(f1);
LOG.info("f1 = " + f1.getAbsolutePath() + " exists " + f1.exists());
LOG.info("f2 = " + f2.getAbsolutePath() + " exists " + f2.exists());
}
@Test
ByteBuffer buf = ByteBuffer.allocate(1000);
assertEquals(1000, buf.remaining());
buf.flip();
assertEquals(0, buf.remaining());
buf.clear();
assertEquals(1000, buf.remaining());
byte[] bs = "this is a test".getBytes();
buf.put(bs);
buf.flip();
assertEquals(bs.length, buf.remaining());
buf.compact();
buf.put("test2".getBytes());
buf.flip();
assertEquals(bs.length + 5, buf.remaining());
}
@Test
File f = File.createTempFile("test", ".test");
f.deleteOnExit();
CountDownLatch done = slowWrite(f, 20);
final FileInputStream fis = new FileInputStream(f);
final FileChannel in = fis.getChannel();
ByteBuffer buf = ByteBuffer.allocate(3);
int loops = 0;
int read = 0;
while (!done.await(5, TimeUnit.MILLISECONDS)) {
int rd = in.read(buf);
read += (rd < 0 ? 0 : rd);
buf.flip();
byte[] arr = new byte[buf.remaining()];
buf.get(arr);
String s = new String(arr);
System.out.print(s);
buf.clear();
loops++;
}
LOG.info("read " + read + " bytes in " + loops + " iterations");
in.close();
assertEquals(790, read);
}
@Test
InterruptedException {
File f = File.createTempFile("test", ".test");
f.deleteOnExit();
CountDownLatch done = slowWrite(f, 0);
final FileInputStream fis = new FileInputStream(f);
final FileChannel in = fis.getChannel();
ByteBuffer buf = ByteBuffer.allocate(1000);
Clock.sleep(500);
int loops = 0;
int read = 0;
do {
int rd = in.read(buf);
read += (rd < 0 ? 0 : rd);
buf.flip();
byte[] arr = new byte[buf.remaining()];
buf.get(arr);
String s = new String(arr);
System.out.print(s);
buf.clear();
loops++;
} while (!done.await(5, TimeUnit.MILLISECONDS));
LOG.info("read " + read + " bytes in " + loops + " iterations");
in.close();
assertEquals(790, read);
}
@Test
File f = File.createTempFile("test", ".test");
f.deleteOnExit();
CountDownLatch done = slowWrite(f, 0);
final FileInputStream fis = new FileInputStream(f);
final FileChannel in = fis.getChannel();
ByteBuffer buf = ByteBuffer.allocate(1000);
Clock.sleep(500);
int loops = 0;
int read = 0;
do {
int rd = in.read(buf);
read += (rd < 0 ? 0 : rd);
buf.flip();
int start = buf.position();
buf.mark();
while (buf.hasRemaining()) {
byte b = buf.get();
if (b == '\n') {
int end = buf.position();
int sz = end - start;
byte[] body = new byte[sz];
buf.reset();
buf.get(body, 0, sz - 1);
buf.get();
buf.mark();
start = buf.position();
String s = new String(body);
LOG.info("=> " + s);
}
}
buf.clear();
loops++;
} while (!done.await(5, TimeUnit.MILLISECONDS));
LOG.info("read " + read + " bytes in " + loops + " iterations");
in.close();
assertEquals(790, read);
}
@Test
InterruptedException {
File f = File.createTempFile("test", ".test");
f.deleteOnExit();
CountDownLatch done = slowWrite(f, 0);
final FileInputStream fis = new FileInputStream(f);
final FileChannel in = fis.getChannel();
ByteBuffer buf = ByteBuffer.allocate(20);
Clock.sleep(500);
int loops = 0;
int read = 0;
do {
int rd = in.read(buf);
read += (rd < 0 ? 0 : rd);
buf.flip();
int start = buf.position();
buf.mark();
while (buf.hasRemaining()) {
byte b = buf.get();
if (b == '\n') {
int end = buf.position();
int sz = end - start;
byte[] body = new byte[sz];
buf.reset();
buf.get(body, 0, sz - 1);
buf.get();
buf.mark();
start = buf.position();
String s = new String(body);
LOG.info("=> " + s);
}
}
buf.reset();
buf.compact();
loops++;
} while (read < in.size());
LOG.info("read " + read + " bytes in " + loops + " iterations");
in.close();
assertEquals(790, read);
}
@Test
InterruptedException {
File f = File.createTempFile("test", ".test");
f.deleteOnExit();
CountDownLatch done = slowWrite(f, 100);
final FileInputStream fis = new FileInputStream(f);
final FileChannel in = fis.getChannel();
ByteBuffer buf = ByteBuffer.allocate(10);
Clock.sleep(500);
int loops = 0;
int read = 0;
do {
int rd = in.read(buf);
read += (rd < 0 ? 0 : rd);
buf.flip();
int start = buf.position();
buf.mark();
while (buf.hasRemaining()) {
byte b = buf.get();
if (b == '\n') {
int end = buf.position();
int sz = end - start;
byte[] body = new byte[sz];
buf.reset();
buf.get(body, 0, sz - 1);
buf.get();
buf.mark();
start = buf.position();
String s = new String(body);
LOG.info("=> " + s);
}
}
buf.reset();
buf.compact();
loops++;
} while (!(done.await(10, TimeUnit.MILLISECONDS) && read == in.size()));
LOG.info("read " + read + " bytes in " + loops + " iterations");
in.close();
assertEquals(790, read);
}
@Test
InterruptedException {
File f = File.createTempFile("test", ".test");
f.deleteOnExit();
final CountDownLatch writeDone = slowWrite(f, 0);
final CountDownLatch readDone = new CountDownLatch(1);
final FileInputStream fis = new FileInputStream(f);
final FileChannel in = fis.getChannel();
Thread t = new Thread() {
ByteBuffer buf = ByteBuffer.allocate(10);
int loops = 0;
int read = 0;
try {
do {
Clock.sleep(100);
int rd = in.read(buf);
read += (rd < 0 ? 0 : rd);
buf.flip();
int start = buf.position();
buf.mark();
while (buf.hasRemaining()) {
byte b = buf.get();
if (b == '\n') {
int end = buf.position();
int sz = end - start;
byte[] body = new byte[sz];
buf.reset();
buf.get(body, 0, sz - 1);
buf.get();
buf.mark();
start = buf.position();
String s = new String(body);
LOG.info("=> " + s);
}
}
buf.reset();
buf.compact();
loops++;
} while (!(writeDone.await(10, TimeUnit.MILLISECONDS) && read == in
.size()));
LOG.info("read " + read + " bytes in " + loops + " iterations");
assertEquals(790, read);
readDone.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}
};
t.start();
in.close();
assertFalse(readDone.await(1, TimeUnit.SECONDS));
}
}