package com.cloudera.flume.master;
import java.io.IOException;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudera.flume.conf.FlumeConfiguration;
static final Logger LOG = LoggerFactory.getLogger(ZooKeeperService.class);
boolean external;
ZKInProcessServer zk = null;
boolean initialised = false;
String serverAddr = null;
String clientAddr = null;
final static ZooKeeperService zkServiceSingleton = new ZooKeeperService();
synchronized protected void init(FlumeConfiguration cfg)
throws IOException,
InterruptedException {
if (initialised) {
return;
}
serverAddr = cfg.getMasterZKServers();
clientAddr = cfg.getMasterZKConnectString();
external = true;
if (!cfg.getMasterZKUseExternal()) {
external = false;
if (cfg.getMasterIsDistributed()) {
try {
startZKDistributed(cfg);
} catch (ConfigException e) {
throw new IOException("Couldn't parse ZooKeeper configuration", e);
}
} else {
int port = cfg.getMasterZKClientPort();
String dir = cfg.getMasterZKLogDir();
startZKStandalone(port, dir);
}
}
initialised = true;
}
return initialised;
}
InterruptedException {
LOG.info("Starting standalone ZooKeeper instance on port " + port);
zk = new ZKInProcessServer(port, dir);
zk.start();
}
InterruptedException, ConfigException {
LOG.info("Starting ZooKeeper server as part of ensemble");
zk = new ZKInProcessServer(cfg);
zk.start();
}
static public ZooKeeperService
get() {
return zkServiceSingleton;
}
synchronized public static ZooKeeperService
getAndInit()
throws IOException,
InterruptedException {
zkServiceSingleton.init(FlumeConfiguration.get());
return zkServiceSingleton;
}
synchronized public static ZooKeeperService
getAndInit(FlumeConfiguration cfg)
throws IOException, InterruptedException {
zkServiceSingleton.init(cfg);
return zkServiceSingleton;
}
synchronized public ZKClient
createClient()
throws IOException {
if (!initialised) {
throw new IOException("Not yet initialised!");
}
return new ZKClient(clientAddr);
}
if (!initialised) {
return;
}
if (!external && this.zk != null) {
this.zk.stop();
}
this.zk = null;
this.initialised = false;
}
}