package com.cloudera.flume.master;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.thrift.transport.TTransportException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.flume.conf.FlumeSpecException;
import com.cloudera.flume.conf.FlumeConfigData;
import com.cloudera.util.FileUtil;
public static Logger LOG = Logger.getLogger(TestFlumeConfigMaster.class);
File tmpdir = null;
FlumeConfiguration cfg = FlumeConfiguration.createTestableConfiguration();
@Before
cfg.set(FlumeConfiguration.WEBAPPS_PATH, "build/webapps");
tmpdir = FileUtil.mktempdir();
cfg.set(FlumeConfiguration.MASTER_ZK_LOGDIR, tmpdir.getAbsolutePath());
Logger.getRootLogger().setLevel(Level.DEBUG);
}
@After
if (tmpdir != null) {
FileUtil.rmr(tmpdir);
tmpdir = null;
}
}
@Test
FlumeMaster fcm = new FlumeMaster(cfg, false);
fcm.serve();
ConfigurationManager cm = fcm.getSpecMan();
cm.setConfig("test1", "test-flow", "console", "console");
File f = File.createTempFile("test", ".tmp");
f.deleteOnExit();
cm.saveConfigFile(f.getAbsolutePath());
assertTrue(f.exists());
assertTrue(f.length() > 0);
FlumeConfigData data = cm.getConfig("test1");
assertEquals(data.sinkConfig, "console");
assertEquals(data.sourceConfig, "console");
cm.setConfig("test1", "test-flow", "null", "null");
data = cm.getConfig("test1");
assertEquals(data.sinkConfig, "null");
assertEquals(data.sourceConfig, "null");
cm.loadConfigFile(f.getAbsolutePath());
data = cm.getConfig("test1");
assertEquals(data.sinkConfig, "console");
assertEquals(data.sourceConfig, "console");
fcm.shutdown();
}
@Test
public void testOpenClose()
throws TTransportException, IOException {
FlumeMaster fm = new FlumeMaster(new CommandManager(), new ConfigManager(),
new StatusManager(), new MasterAckManager(), cfg);
for (int i = 0; i < 10; i++) {
LOG.info("flume master open close " + i);
fm.serve();
fm.shutdown();
}
}
@Test
FlumeMaster fm = new FlumeMaster(cfg, false);
for (int i = 0; i < 10; i++) {
LOG.info("flume master open close " + i);
fm.serve();
fm.shutdown();
}
}
@Test
MasterExecException {
FlumeMaster fm = new FlumeMaster(cfg, false);
fm.serve();
Execable cfg = ConfigCommand.buildExecable();
try {
String[] args = { "node", "text(\"bla", "null" };
cfg.exec(args);
fail("should have thrown exception");
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
assertEquals(0, fm.getSpecMan().getAllConfigs().size());
assertEquals(0, fm.getSpecMan().getTranslatedConfigs().size());
fm.shutdown();
}
}