package com.cloudera.flume.master;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.thrift.transport.TTransportException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.flume.conf.FlumeSpecException;
import com.cloudera.flume.conf.FlumeConfigData;
import com.cloudera.flume.master.StatusManager.NodeState;
import com.cloudera.flume.master.availability.ConsistentHashFailoverChainManager;
import com.cloudera.flume.master.availability.FailoverChainManager;
import com.cloudera.flume.master.failover.FailoverConfigurationManager;
import com.cloudera.flume.master.logical.LogicalConfigurationManager;
import com.cloudera.util.Clock;
import com.cloudera.util.FileUtil;
import com.cloudera.util.NetUtils;
public static final Logger LOG = LoggerFactory
.getLogger(TestMasterAutoUpdatesDFO.class);
protected FlumeMaster flumeMaster = null;
private File tmpdir = null;
protected ConfigManager cfgMan;
@Before
FlumeSpecException {
tmpdir = FileUtil.mktempdir();
FlumeConfiguration.createTestableConfiguration();
FlumeConfiguration.get().set(FlumeConfiguration.MASTER_STORE, "memory");
buildMaster();
ConfigurationManager loaded = cfgMan;
loaded.setConfig("node1", "flow", "autoCollectorSource", "null");
loaded.setConfig("node2", "flow", "autoCollectorSource", "null");
loaded.setConfig("node3", "flow", "autoCollectorSource", "null");
loaded.setConfig("node4", "flow", "autoCollectorSource", "null");
loaded.setConfig("agent", "flow", "null", "autoDFOChain");
ConfigurationManager cfgman1 = flumeMaster.getSpecMan();
Map<String, FlumeConfigData> cfgs1 = cfgman1.getTranslatedConfigs();
assertEquals(0, cfgs1.size());
flumeMaster.serve();
}
cfgMan = new ConfigManager(FlumeMaster.createConfigStore(FlumeConfiguration
.get()));
FailoverChainManager fcMan = new ConsistentHashFailoverChainManager(3);
ConfigurationManager self2 = new ConfigManager();
ConfigurationManager failover = new FailoverConfigurationManager(cfgMan,
self2, fcMan);
StatusManager statman = new StatusManager();
ConfigurationManager self = new ConfigManager();
ConfigurationManager logical = new LogicalConfigurationManager(failover,
self, statman);
flumeMaster = new FlumeMaster(new CommandManager(), logical, statman,
new MasterAckManager(), FlumeConfiguration.get());
}
@After
if (flumeMaster != null) {
flumeMaster.shutdown();
flumeMaster = null;
}
if (tmpdir != null) {
FileUtil.rmr(tmpdir);
tmpdir = null;
}
}
@Test
FlumeSpecException {
ConfigurationManager cfgman2 = flumeMaster.getSpecMan();
Map<String, FlumeConfigData> cfgs2 = cfgman2.getTranslatedConfigs();
assertEquals(5, cfgs2.size());
}
@Test
Map<String, FlumeConfigData> xcfgs = flumeMaster.getSpecMan()
.getTranslatedConfigs();
FlumeConfigData agentFcd = xcfgs.get("agent");
String ans1 = "< < { lazyOpen => fail( \"logicalSink( \\\"node4\\\" )\" ) } ?"
+ " < { lazyOpen => fail( \"logicalSink( \\\"node2\\\" )\" ) } ?"
+ " { lazyOpen => fail( \"logicalSink( \\\"node1\\\" )\" ) } > > ?"
+ " { diskFailover => { insistentAppend => { stubbornAppend =>"
+ " { insistentOpen =>"
+ " < { lazyOpen => fail( \"logicalSink( \\\"node4\\\" )\" ) } ?"
+ " < { lazyOpen => fail( \"logicalSink( \\\"node2\\\" )\" ) } ?"
+ " { lazyOpen => fail( \"logicalSink( \\\"node1\\\" )\" ) } > > } } } } >";
assertEquals(agentFcd.sinkConfig, ans1);
}
@Test
FlumeSpecException {
flumeMaster.getSpecMan().setConfig("node2", "flow", "null", "null");
Map<String, FlumeConfigData> xcfgs2 = flumeMaster.getSpecMan()
.getTranslatedConfigs();
FlumeConfigData agentFcd2 = xcfgs2.get("agent");
String ans2 = "< < { lazyOpen => fail( \"logicalSink( \\\"node4\\\" )\" ) } ?"
+ " < { lazyOpen => fail( \"logicalSink( \\\"node1\\\" )\" ) } ?"
+ " { lazyOpen => fail( \"logicalSink( \\\"node3\\\" )\" ) } > > ?"
+ " { diskFailover => { insistentAppend => { stubbornAppend =>"
+ " { insistentOpen =>"
+ " < { lazyOpen => fail( \"logicalSink( \\\"node4\\\" )\" ) } ?"
+ " < { lazyOpen => fail( \"logicalSink( \\\"node1\\\" )\" ) } ?"
+ " { lazyOpen => fail( \"logicalSink( \\\"node3\\\" )\" ) } > > } } } } >";
assertEquals(agentFcd2.sinkConfig, ans2);
}
@Test
flumeMaster.getSpecMan().removeLogicalNode("node2");
Map<String, FlumeConfigData> xcfgs2 = flumeMaster.getSpecMan()
.getTranslatedConfigs();
FlumeConfigData agentFcd2 = xcfgs2.get("agent");
String ans2 = "< < { lazyOpen => fail( \"logicalSink( \\\"node4\\\" )\" ) } ?"
+ " < { lazyOpen => fail( \"logicalSink( \\\"node1\\\" )\" ) } ?"
+ " { lazyOpen => fail( \"logicalSink( \\\"node3\\\" )\" ) } > > ?"
+ " { diskFailover => { insistentAppend => { stubbornAppend =>"
+ " { insistentOpen =>"
+ " < { lazyOpen => fail( \"logicalSink( \\\"node4\\\" )\" ) } ?"
+ " < { lazyOpen => fail( \"logicalSink( \\\"node1\\\" )\" ) } ?"
+ " { lazyOpen => fail( \"logicalSink( \\\"node3\\\" )\" ) } > > } } } } >";
assertEquals(agentFcd2.sinkConfig, ans2);
}
@Test
FlumeSpecException {
flumeMaster.getSpecMan().setConfig("nodeNew", "flow",
"autoCollectorSource", "null");
Map<String, FlumeConfigData> xcfgs2 = flumeMaster.getSpecMan()
.getTranslatedConfigs();
FlumeConfigData agentFcd2 = xcfgs2.get("agent");
String ans2 = "< < { lazyOpen => fail( \"logicalSink( \\\"nodeNew\\\" )\" ) } ?"
+ " < { lazyOpen => fail( \"logicalSink( \\\"node4\\\" )\" ) } ?"
+ " { lazyOpen => fail( \"logicalSink( \\\"node2\\\" )\" ) } > > ?"
+ " { diskFailover => { insistentAppend => { stubbornAppend =>"
+ " { insistentOpen =>"
+ " < { lazyOpen => fail( \"logicalSink( \\\"nodeNew\\\" )\" ) } ?"
+ " < { lazyOpen => fail( \"logicalSink( \\\"node4\\\" )\" ) } ?"
+ " { lazyOpen => fail( \"logicalSink( \\\"node2\\\" )\" ) } > > } } } } >";
assertEquals(agentFcd2.sinkConfig, ans2);
}
@Test
String host = NetUtils.localhost();
long ver = Clock.unixTime();
flumeMaster.getStatMan().updateHeartbeatStatus(host, "physnode", "node1",
NodeState.IDLE, ver);
flumeMaster.getStatMan().updateHeartbeatStatus(host, "physnode", "node2",
NodeState.IDLE, ver);
flumeMaster.getStatMan().updateHeartbeatStatus(host, "physnode", "node3",
NodeState.IDLE, ver);
flumeMaster.getStatMan().updateHeartbeatStatus(host, "physnode", "node4",
NodeState.IDLE, ver);
flumeMaster.getStatMan().updateHeartbeatStatus(host, "physnode", "agent",
NodeState.IDLE, ver);
flumeMaster.getSpecMan().addLogicalNode(host, "node1");
flumeMaster.getSpecMan().addLogicalNode(host, "node2");
flumeMaster.getSpecMan().addLogicalNode(host, "node3");
flumeMaster.getSpecMan().addLogicalNode(host, "node4");
flumeMaster.getSpecMan().addLogicalNode(host, "agent");
Map<String, FlumeConfigData> xcfgs2 = flumeMaster.getSpecMan()
.getTranslatedConfigs();
FlumeConfigData agentFcd2 = xcfgs2.get("agent");
String ans2 = "< < { lazyOpen => rpcSink( \"" + host + "\", 35856 ) } ?"
+ " < { lazyOpen => rpcSink( \"" + host + "\", 35854 ) } ?"
+ " { lazyOpen => rpcSink( \"" + host + "\", 35853 ) } > > ?"
+ " { diskFailover => { insistentAppend => { stubbornAppend =>"
+ " { insistentOpen => < { lazyOpen => rpcSink( \"" + host
+ "\", 35856 ) } ?" + " < { lazyOpen => rpcSink( \"" + host
+ "\", 35854 ) } ?" + " { lazyOpen => rpcSink( \"" + host
+ "\", 35853 ) } > > } } } } >";
assertEquals(ans2, agentFcd2.sinkConfig);
}
@Test
String host = NetUtils.localhost();
long ver = Clock.unixTime();
flumeMaster.getStatMan().updateHeartbeatStatus(host, "physnode", "node1",
NodeState.IDLE, ver);
flumeMaster.getStatMan().updateHeartbeatStatus(host, "physnode", "node2",
NodeState.IDLE, ver);
flumeMaster.getStatMan().updateHeartbeatStatus(host, "physnode", "node3",
NodeState.IDLE, ver);
flumeMaster.getStatMan().updateHeartbeatStatus(host, "physnode", "node4",
NodeState.IDLE, ver);
flumeMaster.getStatMan().updateHeartbeatStatus(host, "physnode", "agent",
NodeState.IDLE, ver);
flumeMaster.getSpecMan().addLogicalNode("host", "node1");
flumeMaster.getSpecMan().addLogicalNode("host", "node2");
flumeMaster.getSpecMan().addLogicalNode("host", "node3");
flumeMaster.getSpecMan().addLogicalNode("host", "node4");
flumeMaster.getSpecMan().addLogicalNode("host", "agent");
flumeMaster.getSpecMan().unmapAllLogicalNodes();
Map<String, FlumeConfigData> xcfgs2 = flumeMaster.getSpecMan()
.getTranslatedConfigs();
FlumeConfigData agentFcd2 = xcfgs2.get("agent");
String ans2 = "< < { lazyOpen => fail( \"logicalSink( \\\"node4\\\" )\" ) } ?"
+ " < { lazyOpen => fail( \"logicalSink( \\\"node2\\\" )\" ) } ?"
+ " { lazyOpen => fail( \"logicalSink( \\\"node1\\\" )\" ) } > > ?"
+ " { diskFailover => { insistentAppend => { stubbornAppend =>"
+ " { insistentOpen => < { lazyOpen => fail( \"logicalSink( \\\"node4\\\" )\" ) } ?"
+ " < { lazyOpen => fail( \"logicalSink( \\\"node2\\\" )\" ) } ?"
+ " { lazyOpen => fail( \"logicalSink( \\\"node1\\\" )\" ) } > > } } } } >";
assertEquals(ans2, agentFcd2.sinkConfig);
}
}