package com.cloudera.flume.master.flow;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.thrift.transport.TTransportException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.flume.conf.FlumeSpecException;
import com.cloudera.flume.conf.FlumeConfigData;
import com.cloudera.flume.master.CommandManager;
import com.cloudera.flume.master.ConfigManager;
import com.cloudera.flume.master.ConfigurationManager;
import com.cloudera.flume.master.FlumeMaster;
import com.cloudera.flume.master.MasterAckManager;
import com.cloudera.flume.master.StatusManager;
import com.cloudera.flume.master.flows.FlowConfigManager;
import com.cloudera.util.FileUtil;
final public static Logger LOG = Logger
.getLogger(TestFailoverFlowConfigManager.class);
protected FlumeMaster flumeMaster = null;
private File tmpdir = null;
protected ConfigManager cfgMan;
protected FlowConfigManager flowed;
@Before
Logger.getRootLogger().setLevel(Level.DEBUG);
}
@Before
FlumeSpecException {
tmpdir = FileUtil.mktempdir();
FlumeConfiguration.createTestableConfiguration();
FlumeConfiguration.get().set(FlumeConfiguration.MASTER_STORE, "memory");
buildMaster();
ConfigurationManager loaded = cfgMan;
loaded.setConfig("coll11", "flow1", "autoCollectorSource", "null");
loaded.setConfig("coll12", "flow1", "autoCollectorSource", "null");
loaded.setConfig("coll13", "flow1", "autoCollectorSource", "null");
loaded.setConfig("coll14", "flow1", "autoCollectorSource", "null");
loaded.setConfig("agent1", "flow1", "null", "autoBEChain");
loaded.setConfig("coll21", "flow2", "autoCollectorSource", "null");
loaded.setConfig("coll22", "flow2", "autoCollectorSource", "null");
loaded.setConfig("coll23", "flow2", "autoCollectorSource", "null");
loaded.setConfig("coll24", "flow2", "autoCollectorSource", "null");
loaded.setConfig("agent2", "flow2", "null", "autoBEChain");
ConfigurationManager cfgman1 = flumeMaster.getSpecMan();
Map<String, FlumeConfigData> cfgs1 = cfgman1.getTranslatedConfigs();
assertEquals(0, cfgs1.size());
flumeMaster.serve();
}
cfgMan = new ConfigManager(FlumeMaster.createConfigStore(FlumeConfiguration
.get()));
StatusManager statman = new StatusManager();
flowed = new FlowConfigManager.FailoverFlowConfigManager(cfgMan, statman);
flumeMaster = new FlumeMaster(new CommandManager(), flowed, statman,
new MasterAckManager(), FlumeConfiguration.get());
}
@After
if (flumeMaster != null) {
flumeMaster.shutdown();
flumeMaster = null;
}
if (tmpdir != null) {
FileUtil.rmr(tmpdir);
tmpdir = null;
}
}
@Test
assertEquals(5, flowed.getConfigManForFlow("flow1").getTranslatedConfigs()
.size());
assertEquals(5, flowed.getConfigManForFlow("flow2").getTranslatedConfigs()
.size());
assertEquals(5, flowed.getConfigManForFlow("flow1").getAllConfigs().size());
assertEquals(5, flowed.getConfigManForFlow("flow2").getAllConfigs().size());
assertEquals(10, flowed.getTranslatedConfigs().size());
FlumeConfigData agent1 = flowed.getConfig("agent1");
FlumeConfigData agent2 = flowed.getConfig("agent2");
LOG.info(agent1);
assertEquals("< { lazyOpen => logicalSink( \"coll14\" ) } ? "
+ "< { lazyOpen => logicalSink( \"coll11\" ) } ? "
+ "< { lazyOpen => logicalSink( \"coll13\" ) } ? null > > >",
agent1.sinkConfig);
LOG.info(agent2);
assertEquals("< { lazyOpen => logicalSink( \"coll23\" ) } ? "
+ "< { lazyOpen => logicalSink( \"coll22\" ) } ? "
+ "< { lazyOpen => logicalSink( \"coll21\" ) } ? null > > >",
agent2.sinkConfig);
flowed.setConfig("coll14", "flow2", "autoCollectorSource", "null");
FlumeConfigData nextAgent1 = flowed.getConfig("agent1");
FlumeConfigData nextAgent2 = flowed.getConfig("agent2");
LOG.info(nextAgent1);
assertEquals("< { lazyOpen => logicalSink( \"coll11\" ) } ? "
+ "< { lazyOpen => logicalSink( \"coll13\" ) } ? "
+ "< { lazyOpen => logicalSink( \"coll12\" ) } ? null > > >",
nextAgent1.sinkConfig);
LOG.info(nextAgent2);
assertEquals("< { lazyOpen => logicalSink( \"coll23\" ) } ? "
+ "< { lazyOpen => logicalSink( \"coll22\" ) } ? "
+ "< { lazyOpen => logicalSink( \"coll14\" ) } ? null > > >",
nextAgent2.sinkConfig);
}
}