package com.cloudera.flume.agent.durability;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.Before;
import org.junit.Test;
import com.cloudera.flume.conf.FlumeSpecException;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventImpl;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.core.EventSource;
import com.cloudera.flume.handlers.debug.ConsoleEventSink;
import com.cloudera.flume.handlers.rolling.ProcessTagger;
import com.cloudera.flume.handlers.rolling.Tagger;
import com.cloudera.util.BenchmarkHarness;
import com.cloudera.util.FileUtil;
final static String WAL_OK = "src/data/hadoop_logs_5.hdfs";
public static Logger LOG = Logger.getLogger(TestNaiveFileWALManager.class);
@Before
LOG.setLevel(Level.DEBUG);
}
@Test
File dir = FileUtil.mktempdir();
NaiveFileWALManager wal = new NaiveFileWALManager(dir);
wal.open();
File logdir = new File(dir, NaiveFileWALManager.IMPORTDIR);
File src = new File(WAL_OK);
File dest = new File(logdir, "ok.0000000.20091104-101213997-0800.seq");
FileUtil.dumbfilecopy(src, dest);
dest.deleteOnExit();
assertEquals(0, wal.getWritingTags().size());
assertEquals(0, wal.getLoggedTags().size());
assertEquals(0, wal.getSendingTags().size());
assertEquals(0, wal.getSentTags().size());
wal.importData();
assertEquals(0, wal.getWritingTags().size());
assertEquals(1, wal.getLoggedTags().size());
assertEquals(0, wal.getSendingTags().size());
assertEquals(0, wal.getSentTags().size());
wal.stopDrains();
}
@Test
File dir = FileUtil.mktempdir();
NaiveFileWALManager wal = new NaiveFileWALManager(dir);
wal.open();
Tagger t = new ProcessTagger();
EventSink sink = wal.newWritingSink(t);
sink.open();
sink.append(new EventImpl("foo".getBytes()));
assertEquals(1, wal.getWritingTags().size());
sink.close();
assertEquals(0, wal.getWritingTags().size());
assertEquals(1, wal.getLoggedTags().size());
EventSource curSource = wal.getUnackedSource();
assertEquals(0, wal.getWritingTags().size());
assertEquals(0, wal.getLoggedTags().size());
assertEquals(1, wal.getSendingTags().size());
curSource.open();
assertEquals(0, wal.getLoggedTags().size());
assertEquals(1, wal.getSendingTags().size());
Event e = null;
ConsoleEventSink console = new ConsoleEventSink();
while ((e = curSource.next()) != null) {
console.append(e);
}
curSource.close();
assertEquals(0, wal.getSendingTags().size());
assertEquals(1, wal.getSentTags().size());
String tag = wal.getSentTags().iterator().next();
wal.toAcked(tag);
assertEquals(0, wal.getSentTags().size());
}
@Test
public void testRecovers()
throws IOException, FlumeSpecException {
BenchmarkHarness.setupLocalWriteDir();
File tmp = BenchmarkHarness.tmpdir;
NaiveFileWALManager wal = new NaiveFileWALManager(tmp);
wal.open();
File acked = new File(WAL_OK);
FileUtil.dumbfilecopy(acked, new File(wal.writingDir,
"writing.00000000.20100204-015814F430-0800.seq"));
FileUtil.dumbfilecopy(acked, new File(wal.loggedDir,
"logged.00000000.20100204-015814F430-0800.seq"));
FileUtil.dumbfilecopy(acked, new File(wal.sendingDir,
"sending.00000000.20100204-015814F430-0800.seq"));
FileUtil.dumbfilecopy(acked, new File(wal.sentDir,
"send.00000000.20100204-015814F430-0800.seq"));
FileUtil.dumbfilecopy(acked, new File(wal.importDir,
"import.00000000.20100204-015814F430-0800.seq"));
FileUtil.dumbfilecopy(acked, new File(wal.errorDir,
"error.00000000.20100204-015814F430-0800.seq"));
wal.recover();
assertEquals(0, new File(tmp, "writing").list().length);
assertEquals(0, new File(tmp, "sending").list().length);
assertEquals(0, new File(tmp, "sent").list().length);
assertEquals(0, new File(tmp, "done").list().length);
assertEquals(1, new File(tmp, "error").list().length);
assertEquals(4, new File(tmp, "logged").list().length);
BenchmarkHarness.cleanupLocalWriteDir();
}
File src = new File(WAL_OK);
File tmpdir = FileUtil.mktempdir();
FileUtil.dumbfilecopy(src, new File(tmpdir, conflict));
NaiveFileWALManager wal = new NaiveFileWALManager(tmpdir);
try {
wal.open();
} catch (IOException ioe) {
throw ioe;
} finally {
FileUtil.rmr(tmpdir);
}
}
@Test(expected = IOException.class)
doTestBadOpen(NaiveFileWALManager.IMPORTDIR);
}
@Test(expected = IOException.class)
doTestBadOpen(NaiveFileWALManager.WRITINGDIR);
}
@Test(expected = IOException.class)
doTestBadOpen(NaiveFileWALManager.LOGGEDDIR);
}
@Test(expected = IOException.class)
doTestBadOpen(NaiveFileWALManager.SENDINGDIR);
}
@Test(expected = IOException.class)
doTestBadOpen(NaiveFileWALManager.SENTDIR);
}
@Test(expected = IOException.class)
doTestBadOpen(NaiveFileWALManager.ERRORDIR);
}
@Test(expected = IOException.class)
doTestBadOpen(NaiveFileWALManager.DONEDIR);
}
File tmpdir = FileUtil.mktempdir();
File dir = new File(new File(tmpdir, conflict), "foo");
dir.mkdirs();
NaiveFileWALManager wal = new NaiveFileWALManager(tmpdir);
wal.open();
try {
wal.recover();
} catch (IOException ioe) {
throw ioe;
} finally {
FileUtil.rmr(tmpdir);
}
}
@Test(expected = IOException.class)
doTestBadRecover(NaiveFileWALManager.SENTDIR);
}
@Test(expected = IOException.class)
doTestBadRecover(NaiveFileWALManager.SENDINGDIR);
}
@Test(expected = IOException.class)
doTestBadRecover(NaiveFileWALManager.WRITINGDIR);
}
}