package com.cloudera.flume.master;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.util.FileUtil;
static final Logger LOG = LoggerFactory.getLogger(ZooKeeperCounterTest.class);
static ZooKeeperService svc;
static File tmp;
@BeforeClass
static public void setupZK()
throws IOException, InterruptedException {
tmp = FileUtil.mktempdir();
FlumeConfiguration.get().set(FlumeConfiguration.MASTER_ZK_LOGDIR,
tmp.getAbsolutePath());
svc = ZooKeeperService.getAndInit();
}
@AfterClass
static public void tearDownZK()
throws IOException {
svc.shutdown();
FileUtil.rmr(tmp);
}
@Test
KeeperException {
ZooKeeperCounter seq = new ZooKeeperCounter(svc, "/seq-generator-test");
for (long i = 0; i < 1024; ++i) {
assertEquals(i, seq.incrementAndGet());
}
seq.shutdown();
}
@Test
KeeperException {
ZooKeeperCounter seq = new ZooKeeperCounter(svc, "/seq-generator-incr-test");
for (long i = 0; i < 1024; ++i) {
assertEquals(i * 4, seq.incrementAndGetBy(4L));
}
seq.shutdown();
}
@Test
public void testReset()
throws IOException, InterruptedException, KeeperException {
ZooKeeperCounter seq = new ZooKeeperCounter(svc, "/seq-generator-reset-test");
for (long i=0; i < 1024; ++i) {
seq.resetTo(i);
assertEquals(i, seq.incrementAndGet());
}
seq.shutdown();
}
final static int LOOP_COUNT = 200;
private ZooKeeperCounter seq;
public ArrayList<Long> counts = new ArrayList<Long>();
this.seq = seq;
}
for (int i = 0; i < LOOP_COUNT; ++i) {
try {
counts.add(seq.incrementAndGet());
} catch (Exception e) {
LOG.error("CounterThread saw exception",e);
}
}
}
}
@Test
InterruptedException {
ZooKeeperCounter seq = new ZooKeeperCounter(svc, "/seq-generator-conc-test");
CounterThread t1 = new CounterThread(seq);
CounterThread t2 = new CounterThread(seq);
t1.start();
t2.start();
t1.join();
t2.join();
long last = -1;
for (Long l : t1.counts) {
assertTrue("Expected increasing counts", last < l);
last = l;
}
last = -1;
for (Long l : t2.counts) {
assertTrue("Expected increasing counts", last < l);
last = l;
}
ArrayList<Long> counts = t1.counts;
counts.addAll(t2.counts);
Collections.sort(counts);
for (long i = 0; i < LOOP_COUNT * 2; ++i) {
assertEquals("Missing counter value", i, (long) counts.get((int) i));
}
seq.shutdown();
}
}