package com.cloudera.flume.master;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import java.io.IOException;
import org.apache.thrift.transport.TTransportException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.cloudera.flume.agent.DirectMasterRPC;
import com.cloudera.flume.agent.FlumeNode;
import com.cloudera.flume.agent.LivenessManager;
import com.cloudera.flume.agent.MasterRPC;
import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.flume.conf.FlumeSpecException;
import com.cloudera.flume.master.StatusManager.NodeState;
import com.cloudera.util.NetUtils;
import com.google.common.collect.Multimap;
FlumeMaster master = null;
FlumeConfiguration cfg;
@Before
public void setCfg()
throws IOException {
cfg = FlumeConfiguration.createTestableConfiguration();
cfg.set(FlumeConfiguration.MASTER_STORE, "memory");
cfg.set(FlumeConfiguration.WEBAPPS_PATH, "build/webapps");
}
@After
if (master != null) {
master.shutdown();
master = null;
}
}
@Test
FlumeMaster master = new FlumeMaster(cfg);
master.getSpecMan().addLogicalNode(NetUtils.localhost(), "bar");
master.getSpecMan().addLogicalNode(NetUtils.localhost(), "baz");
MasterRPC rpc = new DirectMasterRPC(master);
FlumeNode node = new FlumeNode(rpc, false, false);
assertEquals(0, node.getLogicalNodeManager().getNodes().size());
LivenessManager liveMan = node.getLivenessManager();
liveMan.checkLogicalNodes();
assertEquals(3, node.getLogicalNodeManager().getNodes().size());
}
@Test
FlumeMaster master = new FlumeMaster(cfg);
MasterRPC rpc = new DirectMasterRPC(master);
FlumeNode node = new FlumeNode(rpc, false, false);
assertEquals(0, node.getLogicalNodeManager().getNodes().size());
master.getSpecMan().addLogicalNode(NetUtils.localhost(),
node.getPhysicalNodeName());
master.getSpecMan().addLogicalNode(NetUtils.localhost(), "bar");
master.getSpecMan().addLogicalNode(NetUtils.localhost(), "baz");
LivenessManager liveMan = node.getLivenessManager();
liveMan.checkLogicalNodes();
assertEquals(3, node.getLogicalNodeManager().getNodes().size());
}
@Test
InterruptedException {
FlumeMaster master = new FlumeMaster(cfg);
MasterRPC rpc = new DirectMasterRPC(master);
FlumeNode node = new FlumeNode(rpc, false, false);
assertEquals(0, node.getLogicalNodeManager().getNodes().size());
master.getSpecMan().addLogicalNode(node.getPhysicalNodeName(), "foo");
master.getStatMan().updateHeartbeatStatus(NetUtils.localhost(),
node.getPhysicalNodeName(), "foo", NodeState.ACTIVE, 10);
master.getSpecMan().unmapLogicalNode(NetUtils.localhost(), "foo");
master.getStatMan().checkup();
assertEquals(NodeState.DECOMMISSIONED, master.getStatMan()
.getNodeStatuses().get("foo").state);
master.getSpecMan().addLogicalNode(node.getPhysicalNodeName(), "foo");
master.getStatMan().updateHeartbeatStatus(NetUtils.localhost(),
node.getPhysicalNodeName(), "foo", NodeState.ACTIVE, 10);
master.getStatMan().checkup();
assertEquals(NodeState.ACTIVE, master.getStatMan().getNodeStatuses().get(
"foo").state);
}
@Test
FlumeMaster master = new FlumeMaster(cfg);
MasterRPC rpc = new DirectMasterRPC(master);
FlumeNode node = new FlumeNode(rpc, false, false);
assertEquals(0, node.getLogicalNodeManager().getNodes().size());
master.getSpecMan().addLogicalNode(NetUtils.localhost(),
node.getPhysicalNodeName());
master.getSpecMan().addLogicalNode(NetUtils.localhost(), "bar");
master.getSpecMan().addLogicalNode(NetUtils.localhost(), "baz");
LivenessManager liveMan = node.getLivenessManager();
liveMan.checkLogicalNodes();
assertEquals(3, node.getLogicalNodeManager().getNodes().size());
}
@Test
FlumeMaster master = new FlumeMaster(new CommandManager(),
new ConfigManager(), new StatusManager(), new MasterAckManager(), cfg);
MasterRPC rpc = new DirectMasterRPC(master);
FlumeNode node = new FlumeNode(rpc, false, false);
assertEquals(0, node.getLogicalNodeManager().getNodes().size());
String local = NetUtils.localhost();
master.getSpecMan().addLogicalNode(local, node.getPhysicalNodeName());
master.getSpecMan().addLogicalNode(local, "bar");
master.getSpecMan().addLogicalNode(local, "baz");
LivenessManager liveMan = node.getLivenessManager();
liveMan.checkLogicalNodes();
assertEquals(local, master.getSpecMan().getPhysicalNode("bar"));
assertEquals(local, master.getSpecMan().getPhysicalNode("baz"));
assertEquals(local, master.getSpecMan().getPhysicalNode(local));
assertNull(master.getSpecMan().getConfig("bar"));
assertNull(master.getSpecMan().getConfig("baz"));
assertNull(master.getSpecMan().getConfig(local));
master.getSpecMan().unmapLogicalNode(local, "bar");
liveMan.checkLogicalNodes();
assertEquals(null, master.getSpecMan().getPhysicalNode("bar"));
assertEquals(local, master.getSpecMan().getPhysicalNode("baz"));
assertEquals(local, master.getSpecMan().getPhysicalNode(local));
assertNull(master.getSpecMan().getConfig("bar"));
assertNull(master.getSpecMan().getConfig("baz"));
assertNull(master.getSpecMan().getConfig(local));
master.getSpecMan().unmapLogicalNode(local, "baz");
liveMan.checkLogicalNodes();
assertEquals(null, master.getSpecMan().getPhysicalNode("bar"));
assertEquals(null, master.getSpecMan().getPhysicalNode("baz"));
assertEquals(local, master.getSpecMan().getPhysicalNode(local));
assertNull(master.getSpecMan().getConfig("bar"));
assertNull(master.getSpecMan().getConfig("baz"));
assertNull(master.getSpecMan().getConfig(local));
master.getSpecMan().unmapLogicalNode(local, local);
liveMan.checkLogicalNodes();
assertNull(master.getSpecMan().getConfig("bar"));
assertNull(master.getSpecMan().getConfig("baz"));
assertNull(master.getSpecMan().getConfig(local));
assertEquals(null, master.getSpecMan().getPhysicalNode("bar"));
assertEquals(null, master.getSpecMan().getPhysicalNode("baz"));
assertEquals(local, master.getSpecMan().getPhysicalNode(local));
}
@Test
FlumeMaster master = new FlumeMaster(new CommandManager(),
new ConfigManager(), new StatusManager(), new MasterAckManager(), cfg);
MasterRPC rpc = new DirectMasterRPC(master);
FlumeNode node = new FlumeNode(rpc, false, false);
assertEquals(0, node.getLogicalNodeManager().getNodes().size());
String local = NetUtils.localhost();
master.getSpecMan().addLogicalNode(local, node.getPhysicalNodeName());
master.getSpecMan().addLogicalNode(local, "bar");
master.getSpecMan().addLogicalNode(local, "baz");
master.getSpecMan().addLogicalNode(local, node.getPhysicalNodeName());
master.getSpecMan().addLogicalNode(local, "bar");
master.getSpecMan().addLogicalNode(local, "baz");
Multimap<String, String> mapping = master.getSpecMan().getLogicalNodeMap();
assertEquals(3, mapping.size());
LivenessManager liveMan = node.getLivenessManager();
liveMan.checkLogicalNodes();
assertEquals(3, node.getLogicalNodeManager().getNodes().size());
}
@Test
FlumeMaster master = new FlumeMaster(new CommandManager(),
new ConfigManager(), new StatusManager(), new MasterAckManager(), cfg);
MasterRPC rpc = new DirectMasterRPC(master);
FlumeNode node = new FlumeNode(rpc, false, false);
assertEquals(0, node.getLogicalNodeManager().getNodes().size());
String local = NetUtils.localhost();
master.getSpecMan().addLogicalNode(local, node.getPhysicalNodeName());
master.getSpecMan().addLogicalNode(local, "bar");
master.getSpecMan().addLogicalNode(local, "baz");
master.getSpecMan().setConfig(local, "my-test-flow", "null", "null");
master.getSpecMan().setConfig("bar", "my-test-flow", "null", "null");
master.getSpecMan().setConfig("baz", "my-test-flow", "null", "null");
LivenessManager liveMan = node.getLivenessManager();
liveMan.heartbeatChecks();
assertEquals(local, master.getSpecMan().getPhysicalNode("bar"));
assertEquals(local, master.getSpecMan().getPhysicalNode("baz"));
assertEquals(local, master.getSpecMan().getPhysicalNode(local));
assertNotNull(master.getSpecMan().getConfig("bar"));
assertNotNull(master.getSpecMan().getConfig("baz"));
assertNotNull(master.getSpecMan().getConfig(local));
master.getSpecMan().removeLogicalNode("bar");
liveMan.heartbeatChecks();
assertEquals(null, master.getSpecMan().getPhysicalNode("bar"));
assertEquals(local, master.getSpecMan().getPhysicalNode("baz"));
assertEquals(local, master.getSpecMan().getPhysicalNode(local));
assertNull(master.getSpecMan().getConfig("bar"));
assertNotNull(master.getSpecMan().getConfig("baz"));
assertNotNull(master.getSpecMan().getConfig(local));
master.getSpecMan().removeLogicalNode("baz");
liveMan.heartbeatChecks();
assertEquals(null, master.getSpecMan().getPhysicalNode("bar"));
assertEquals(null, master.getSpecMan().getPhysicalNode("baz"));
assertEquals(local, master.getSpecMan().getPhysicalNode(local));
assertNull(master.getSpecMan().getConfig("bar"));
assertNull(master.getSpecMan().getConfig("baz"));
assertNotNull(master.getSpecMan().getConfig(local));
master.getSpecMan().removeLogicalNode(local);
liveMan.heartbeatChecks();
assertNull(master.getSpecMan().getConfig("bar"));
assertNull(master.getSpecMan().getConfig("baz"));
assertNull(master.getSpecMan().getConfig(local));
assertEquals(null, master.getSpecMan().getPhysicalNode("bar"));
assertEquals(null, master.getSpecMan().getPhysicalNode("baz"));
assertEquals(local, master.getSpecMan().getPhysicalNode(local));
}
@Test
cfg.set(FlumeConfiguration.MASTER_STORE, "zookeeper");
master = new FlumeMaster(new CommandManager(), new ConfigManager(),
new StatusManager(), new MasterAckManager(), cfg);
master.serve();
MasterRPC rpc = new DirectMasterRPC(master);
FlumeNode node = new FlumeNode(rpc, false, false);
assertEquals(0, node.getLogicalNodeManager().getNodes().size());
master.getSpecMan().addLogicalNode(NetUtils.localhost(),
node.getPhysicalNodeName());
master.getSpecMan().addLogicalNode(NetUtils.localhost(), "bar");
master.getSpecMan().addLogicalNode(NetUtils.localhost(), "baz");
LivenessManager liveMan = node.getLivenessManager();
liveMan.checkLogicalNodes();
assertEquals(3, node.getLogicalNodeManager().getNodes().size());
}
@Test
cfg.set(FlumeConfiguration.MASTER_STORE, "zookeeper");
master = new FlumeMaster(new CommandManager(), new ConfigManager(),
new StatusManager(), new MasterAckManager(), cfg);
master.serve();
MasterRPC rpc = new DirectMasterRPC(master);
FlumeNode node = new FlumeNode(rpc, false, false);
assertEquals(0, node.getLogicalNodeManager().getNodes().size());
String local = NetUtils.localhost();
master.getSpecMan().addLogicalNode(local, node.getPhysicalNodeName());
master.getSpecMan().addLogicalNode(local, "bar");
master.getSpecMan().addLogicalNode(local, "baz");
LivenessManager liveMan = node.getLivenessManager();
liveMan.checkLogicalNodes();
assertEquals(local, master.getSpecMan().getPhysicalNode("bar"));
assertEquals(local, master.getSpecMan().getPhysicalNode("baz"));
assertEquals(local, master.getSpecMan().getPhysicalNode(local));
assertNull(master.getSpecMan().getConfig("bar"));
assertNull(master.getSpecMan().getConfig("baz"));
assertNull(master.getSpecMan().getConfig(local));
master.getSpecMan().unmapLogicalNode(local, "bar");
liveMan.checkLogicalNodes();
assertEquals(null, master.getSpecMan().getPhysicalNode("bar"));
assertEquals(local, master.getSpecMan().getPhysicalNode("baz"));
assertEquals(local, master.getSpecMan().getPhysicalNode(local));
assertNull(master.getSpecMan().getConfig("bar"));
assertNull(master.getSpecMan().getConfig("baz"));
assertNull(master.getSpecMan().getConfig(local));
master.getSpecMan().unmapLogicalNode(local, "baz");
liveMan.checkLogicalNodes();
assertEquals(null, master.getSpecMan().getPhysicalNode("bar"));
assertEquals(null, master.getSpecMan().getPhysicalNode("baz"));
assertEquals(local, master.getSpecMan().getPhysicalNode(local));
assertNull(master.getSpecMan().getConfig("bar"));
assertNull(master.getSpecMan().getConfig("baz"));
assertNull(master.getSpecMan().getConfig(local));
master.getSpecMan().unmapLogicalNode(local, local);
liveMan.checkLogicalNodes();
assertNull(master.getSpecMan().getConfig("bar"));
assertNull(master.getSpecMan().getConfig("baz"));
assertNull(master.getSpecMan().getConfig(local));
assertEquals(null, master.getSpecMan().getPhysicalNode("bar"));
assertEquals(null, master.getSpecMan().getPhysicalNode("baz"));
assertEquals(local, master.getSpecMan().getPhysicalNode(local));
}
@Test
TTransportException {
cfg.set(FlumeConfiguration.MASTER_STORE, "zookeeper");
master = new FlumeMaster(new CommandManager(), new ConfigManager(),
new StatusManager(), new MasterAckManager(), cfg);
master.serve();
MasterRPC rpc = new DirectMasterRPC(master);
FlumeNode node = new FlumeNode(rpc, false, false);
assertEquals(0, node.getLogicalNodeManager().getNodes().size());
String local = NetUtils.localhost();
master.getSpecMan().addLogicalNode(local, node.getPhysicalNodeName());
master.getSpecMan().addLogicalNode(local, "bar");
master.getSpecMan().addLogicalNode(local, "baz");
master.getSpecMan().setConfig(local, "my-test-flow", "null", "null");
master.getSpecMan().setConfig("bar", "my-test-flow", "null", "null");
master.getSpecMan().setConfig("baz", "my-test-flow", "null", "null");
LivenessManager liveMan = node.getLivenessManager();
liveMan.heartbeatChecks();
assertEquals(local, master.getSpecMan().getPhysicalNode("bar"));
assertEquals(local, master.getSpecMan().getPhysicalNode("baz"));
assertEquals(local, master.getSpecMan().getPhysicalNode(local));
assertNotNull(master.getSpecMan().getConfig("bar"));
assertNotNull(master.getSpecMan().getConfig("baz"));
assertNotNull(master.getSpecMan().getConfig(local));
master.getSpecMan().removeLogicalNode("bar");
liveMan.heartbeatChecks();
assertEquals(null, master.getSpecMan().getPhysicalNode("bar"));
assertEquals(local, master.getSpecMan().getPhysicalNode("baz"));
assertEquals(local, master.getSpecMan().getPhysicalNode(local));
assertNull(master.getSpecMan().getConfig("bar"));
assertNotNull(master.getSpecMan().getConfig("baz"));
assertNotNull(master.getSpecMan().getConfig(local));
master.getSpecMan().removeLogicalNode("baz");
liveMan.heartbeatChecks();
assertEquals(null, master.getSpecMan().getPhysicalNode("bar"));
assertEquals(null, master.getSpecMan().getPhysicalNode("baz"));
assertEquals(local, master.getSpecMan().getPhysicalNode(local));
assertNull(master.getSpecMan().getConfig("bar"));
assertNull(master.getSpecMan().getConfig("baz"));
assertNotNull(master.getSpecMan().getConfig(local));
master.getSpecMan().removeLogicalNode(local);
liveMan.heartbeatChecks();
assertNull(master.getSpecMan().getConfig("bar"));
assertNull(master.getSpecMan().getConfig("baz"));
assertNull(master.getSpecMan().getConfig(local));
assertEquals(null, master.getSpecMan().getPhysicalNode("bar"));
assertEquals(null, master.getSpecMan().getPhysicalNode("baz"));
assertEquals(local, master.getSpecMan().getPhysicalNode(local));
}
}