package com.cloudera.flume.agent;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.flume.conf.thrift.ThriftFlumeClientServer;
import com.cloudera.flume.conf.thrift.ThriftFlumeConfigData;
import com.cloudera.flume.conf.FlumeConfigData;
import com.cloudera.flume.conf.thrift.FlumeNodeState;
import com.cloudera.flume.conf.thrift.ThriftFlumeClientServer.Iface;
import com.cloudera.flume.master.MasterClientServerThrift;
import com.cloudera.flume.reporter.server.thrift.ThriftFlumeReport;
import com.cloudera.flume.util.ThriftServer;
static final Logger LOG = LoggerFactory.getLogger(TestThriftMultiMasterRPC.class);
boolean first = true;
public void serve()
throws TTransportException {
serve(56789);
}
public void serve(
int port)
throws TTransportException {
LOG.info("Starting dummy server");
this.start(new ThriftFlumeClientServer.Processor(this), port, "MyThriftServer"
+ port);
}
@Override
public void acknowledge(String ackid)
throws TException {
}
@Override
public boolean checkAck(String ackid)
throws TException {
if (first) {
first = false;
return true;
}
throw new TException("Throwing an exception");
}
@Override
public ThriftFlumeConfigData
getConfig(String sourceId)
throws TException {
return MasterClientServerThrift.configToThrift(new FlumeConfigData());
}
@Override
return null;
}
@Override
public boolean heartbeat(String logicalNode, String physicalNode,
String clienthost, FlumeNodeState s, long timestamp) throws TException {
return true;
}
@Override
public void putReports(Map<String, ThriftFlumeReport> reports)
throws TException {
}
@Override
public Map<String, Integer>
getChokeMap(String physNode)
throws TException {
return null;
}
}
@Test
public void testConnect()
throws TTransportException, IOException,
InterruptedException {
FlumeConfiguration conf = FlumeConfiguration.get();
conf.set(FlumeConfiguration.MASTER_HEARTBEAT_SERVERS,
"localhost:9999,localhost:56789,localhost:56790");
conf.set(FlumeConfiguration.MASTER_HEARBEAT_RPC, "THRIFT");
MultiMasterRPC masterRPC = new MultiMasterRPC(conf, false);
MyThriftServer server1 = new MyThriftServer();
server1.serve(56789);
MyThriftServer server2 = new MyThriftServer();
server2.serve(56790);
masterRPC.checkAck("UNKNOWNACK");
assertEquals("Port should have been 56789, got " + masterRPC.getCurPort(),
56789, masterRPC.getCurPort());
server1.stop();
masterRPC.checkAck("UNKNOWNACK");
assertEquals("Port should have been 56790, got " + masterRPC.getCurPort(),
56790, masterRPC.getCurPort());
masterRPC.close();
server2.stop();
}
}