package com.cloudera.flume.agent;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.avro.ipc.AvroRemoteException;
import org.apache.avro.ipc.HttpServer;
import org.apache.avro.ipc.Server;
import org.apache.avro.specific.SpecificResponder;
import org.apache.thrift.transport.TTransportException;
import org.junit.Test;
import org.mortbay.log.Log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudera.flume.conf.FlumeConfigData;
import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.flume.conf.avro.AvroFlumeConfigData;
import com.cloudera.flume.reporter.server.avro.AvroFlumeReport;
import com.cloudera.flume.conf.avro.AvroFlumeClientServer;
import com.cloudera.flume.conf.avro.FlumeNodeState;
import com.cloudera.flume.master.MasterClientServerAvro;
static final Logger LOG = LoggerFactory.getLogger(TestAvroMultiMasterRPC.class);
boolean first = true;
protected Server server;
this.server.close();
}
}
public void serve(
int port)
throws IOException {
LOG
.info(String
.format(
"Starting blocking thread pool server for control server on port %d...",
port));
SpecificResponder res = new SpecificResponder(
AvroFlumeClientServer.class, this);
this.server = new HttpServer(res, port);
this.server.start();
}
@Override
throws AvroRemoteException {
return null;
}
@Override
public boolean checkAck(CharSequence ackid)
throws AvroRemoteException {
Log.info("Check-ack called at server on " + this.server.getPort());
if (first) {
first = false;
return true;
}
Log.info("throwing an exception on " + this.server.getPort());
throw new RuntimeException("Throwing an exception");
}
@Override
public AvroFlumeConfigData
getConfig(CharSequence sourceId)
throws AvroRemoteException {
return MasterClientServerAvro.configToAvro(new FlumeConfigData());
}
@Override
throws AvroRemoteException {
return null;
}
@Override
public boolean heartbeat(CharSequence logicalNode,
CharSequence physicalNode, CharSequence clienthost, FlumeNodeState s,
long timestamp) throws AvroRemoteException {
return true;
}
@Override
public Void
putReports(Map<CharSequence, AvroFlumeReport> reports)
throws AvroRemoteException {
return null;
}
@Override
public Map<CharSequence, Integer>
getChokeMap(CharSequence physNode)
throws AvroRemoteException {
return null;
}
}
@Test
public void testConnect()
throws TTransportException, IOException,
InterruptedException {
FlumeConfiguration conf = FlumeConfiguration.get();
conf.set(FlumeConfiguration.MASTER_HEARTBEAT_SERVERS,
"localhost:9999,localhost:56789,localhost:56790");
conf.set(FlumeConfiguration.MASTER_HEARBEAT_RPC, "AVRO");
MultiMasterRPC masterRPC = new MultiMasterRPC(conf, false);
MockAvroServer server1 = new MockAvroServer();
server1.serve(56789);
MockAvroServer server2 = new MockAvroServer();
server2.serve(56790);
assertEquals(true, masterRPC.checkAck("UNKNOWNACK"));
assertEquals("Port should have been 56789, got " + masterRPC.getCurPort(),
56789, masterRPC.getCurPort());
masterRPC.checkAck("UNKNOWNACK");
assertEquals("Port should have been 56790, got " + masterRPC.getCurPort(),
56790, masterRPC.getCurPort());
masterRPC.close();
server2.stop();
}
}