package com.cloudera.flume.master.flow;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.thrift.transport.TTransportException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.flume.conf.FlumeSpecException;
import com.cloudera.flume.conf.FlumeConfigData;
import com.cloudera.flume.master.CommandManager;
import com.cloudera.flume.master.ConfigManager;
import com.cloudera.flume.master.ConfigurationManager;
import com.cloudera.flume.master.FlumeMaster;
import com.cloudera.flume.master.MasterAckManager;
import com.cloudera.flume.master.StatusManager;
import com.cloudera.flume.master.StatusManager.NodeState;
import com.cloudera.flume.master.flows.FlowConfigManager;
import com.cloudera.flume.master.logical.LogicalConfigurationManager;
import com.cloudera.util.Clock;
import com.cloudera.util.FileUtil;
import com.cloudera.util.NetUtils;
final public static Logger LOG = Logger
.getLogger(TestLogicalFailoverFlowConfigManager.class);
protected FlumeMaster flumeMaster = null;
private File tmpdir = null;
protected ConfigManager cfgMan;
protected FlowConfigManager flowed;
protected LogicalConfigurationManager logical;
@Before
Logger.getRootLogger().setLevel(Level.DEBUG);
}
@Before
FlumeSpecException {
tmpdir = FileUtil.mktempdir();
FlumeConfiguration.createTestableConfiguration();
FlumeConfiguration.get().set(FlumeConfiguration.MASTER_STORE, "memory");
buildMaster();
ConfigurationManager loaded = cfgMan;
loaded.setConfig("coll11", "flow1", "autoCollectorSource", "null");
loaded.setConfig("coll12", "flow1", "autoCollectorSource", "null");
loaded.setConfig("coll13", "flow1", "autoCollectorSource", "null");
loaded.setConfig("coll14", "flow1", "autoCollectorSource", "null");
loaded.setConfig("agent1", "flow1", "null", "autoBEChain");
loaded.setConfig("coll21", "flow2", "autoCollectorSource", "null");
loaded.setConfig("coll22", "flow2", "autoCollectorSource", "null");
loaded.setConfig("coll23", "flow2", "autoCollectorSource", "null");
loaded.setConfig("coll24", "flow2", "autoCollectorSource", "null");
loaded.setConfig("agent2", "flow2", "null", "autoBEChain");
ConfigurationManager cfgman1 = flumeMaster.getSpecMan();
Map<String, FlumeConfigData> cfgs1 = cfgman1.getTranslatedConfigs();
assertEquals(0, cfgs1.size());
flumeMaster.serve();
String host = NetUtils.localhost();
long ver = Clock.unixTime();
flumeMaster.getStatMan().updateHeartbeatStatus(host, "phys", "coll11",
NodeState.IDLE, ver);
flumeMaster.getStatMan().updateHeartbeatStatus(host, "phys", "coll12",
NodeState.IDLE, ver);
flumeMaster.getStatMan().updateHeartbeatStatus(host, "phys", "coll13",
NodeState.IDLE, ver);
flumeMaster.getStatMan().updateHeartbeatStatus(host, "phys", "coll14",
NodeState.IDLE, ver);
flumeMaster.getStatMan().updateHeartbeatStatus(host, "phys", "agent1",
NodeState.IDLE, ver);
flumeMaster.getStatMan().updateHeartbeatStatus(host, "phys", "coll21",
NodeState.IDLE, ver);
flumeMaster.getStatMan().updateHeartbeatStatus(host, "phys", "coll22",
NodeState.IDLE, ver);
flumeMaster.getStatMan().updateHeartbeatStatus(host, "phys", "coll23",
NodeState.IDLE, ver);
flumeMaster.getStatMan().updateHeartbeatStatus(host, "phys", "coll24",
NodeState.IDLE, ver);
flumeMaster.getStatMan().updateHeartbeatStatus(host, "phys", "agent2",
NodeState.IDLE, ver);
flumeMaster.getSpecMan().addLogicalNode("host", "coll11");
flumeMaster.getSpecMan().addLogicalNode("host", "coll12");
flumeMaster.getSpecMan().addLogicalNode("host", "coll13");
flumeMaster.getSpecMan().addLogicalNode("host", "coll14");
flumeMaster.getSpecMan().addLogicalNode("host", "agent1");
flumeMaster.getSpecMan().addLogicalNode("host", "coll21");
flumeMaster.getSpecMan().addLogicalNode("host", "coll22");
flumeMaster.getSpecMan().addLogicalNode("host", "coll23");
flumeMaster.getSpecMan().addLogicalNode("host", "coll24");
flumeMaster.getSpecMan().addLogicalNode("host", "agent2");
}
cfgMan = new ConfigManager(FlumeMaster.createConfigStore(FlumeConfiguration
.get()));
StatusManager statman = new StatusManager();
flowed = new FlowConfigManager.FailoverFlowConfigManager(cfgMan, statman);
logical = new LogicalConfigurationManager(flowed, new ConfigManager(),
statman);
flumeMaster = new FlumeMaster(new CommandManager(), logical, statman,
new MasterAckManager(), FlumeConfiguration.get());
}
@After
if (flumeMaster != null) {
flumeMaster.shutdown();
flumeMaster = null;
}
if (tmpdir != null) {
FileUtil.rmr(tmpdir);
tmpdir = null;
}
}
@Test
assertEquals(5, flowed.getConfigManForFlow("flow1").getTranslatedConfigs()
.size());
assertEquals(5, flowed.getConfigManForFlow("flow2").getTranslatedConfigs()
.size());
assertEquals(5, flowed.getConfigManForFlow("flow1").getAllConfigs().size());
assertEquals(5, flowed.getConfigManForFlow("flow2").getAllConfigs().size());
assertEquals(10, flowed.getTranslatedConfigs().size());
FlumeConfigData agent1 = logical.getConfig("agent1");
FlumeConfigData agent2 = logical.getConfig("agent2");
String host = NetUtils.localhost();
LOG.info(agent1);
assertEquals("< { lazyOpen => rpcSink( \"" + host + "\", 35856 ) } ? "
+ "< { lazyOpen => rpcSink( \"" + host + "\", 35853 ) } ? "
+ "< { lazyOpen => rpcSink( \"" + host + "\", 35855 ) } ? null > > >",
agent1.sinkConfig);
LOG.info(agent2);
assertEquals("< { lazyOpen => rpcSink( \"" + host + "\", 35859 ) } ? "
+ "< { lazyOpen => rpcSink( \"" + host + "\", 35858 ) } ? "
+ "< { lazyOpen => rpcSink( \"" + host + "\", 35857 ) } ? null > > >",
agent2.sinkConfig);
logical.setConfig("coll14", "flow2", "autoCollectorSource", "null");
FlumeConfigData nextAgent1 = logical.getConfig("agent1");
FlumeConfigData nextAgent2 = logical.getConfig("agent2");
LOG.info(nextAgent1);
assertEquals("< { lazyOpen => rpcSink( \"" + host + "\", 35853 ) } ? "
+ "< { lazyOpen => rpcSink( \"" + host + "\", 35855 ) } ? "
+ "< { lazyOpen => rpcSink( \"" + host + "\", 35854 ) } ? null > > >",
nextAgent1.sinkConfig);
LOG.info(nextAgent2);
assertEquals("< { lazyOpen => rpcSink( \"" + host + "\", 35859 ) } ? "
+ "< { lazyOpen => rpcSink( \"" + host + "\", 35858 ) } ? "
+ "< { lazyOpen => rpcSink( \"" + host + "\", 35856 ) } ? null > > >",
nextAgent2.sinkConfig);
}
}