package org.couchbase.mock.memcached;
import java.nio.ByteBuffer;
import org.couchbase.mock.memcached.protocol.BinaryResponse;
import org.couchbase.mock.memcached.protocol.CommandCode;
import org.couchbase.mock.memcached.protocol.ErrorCode;
import org.couchbase.mock.memcached.protocol.BinaryCommand;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.security.AccessControlException;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import net.sf.json.JSONObject;
import org.couchbase.mock.Bucket;
import org.couchbase.mock.Bucket.BucketType;
private final DataStore datastore;
private final long bootTime;
private final String hostname;
private final ServerSocketChannel server;
private Selector selector;
private final int port;
private CommandExecutor[] executors = new CommandExecutor[0xff];
private final Bucket bucket;
private boolean active = true;
private int hiccupTime = 0;
private int hiccupOffset = 0;
private int truncateLimit = 0;
public MemcachedServer(Bucket bucket, String hostname,
int port, DataStore datastore)
throws IOException {
this.bucket = bucket;
this.datastore = datastore;
UnknownCommandExecutor unknownHandler = new UnknownCommandExecutor();
for (int ii = 0; ii < executors.length; ++ii) {
executors[ii] = unknownHandler;
}
executors[CommandCode.QUIT.cc()] = new QuitCommandExecutor();
executors[CommandCode.QUITQ.cc()] = new QuitCommandExecutor();
executors[CommandCode.FLUSH.cc()] = new FlushCommandExecutor();
executors[CommandCode.FLUSHQ.cc()] = new FlushCommandExecutor();
executors[CommandCode.NOOP.cc()] = new NoopCommandExecutor();
executors[CommandCode.VERSION.cc()] = new VersionCommandExecutor();
executors[CommandCode.STAT.cc()] = new StatCommandExecutor();
executors[CommandCode.VERBOSITY.cc()] = new VerbosityCommandExecutor();
executors[CommandCode.ADD.cc()] = new StoreCommandExecutor();
executors[CommandCode.ADDQ.cc()] = executors[CommandCode.ADD.cc()];
executors[CommandCode.APPEND.cc()] = new AppendCommandExecutor();
executors[CommandCode.APPENDQ.cc()] = new AppendCommandExecutor();
executors[CommandCode.PREPEND.cc()] = new PrependCommandExecutor();
executors[CommandCode.PREPENDQ.cc()] = new PrependCommandExecutor();
executors[CommandCode.SET.cc()] = executors[CommandCode.ADD.cc()];
executors[CommandCode.SETQ.cc()] = executors[CommandCode.ADD.cc()];
executors[CommandCode.REPLACE.cc()] = executors[CommandCode.ADD.cc()];
executors[CommandCode.REPLACEQ.cc()] = executors[CommandCode.ADD.cc()];
executors[CommandCode.DELETE.cc()] = new DeleteCommandExecutor();
executors[CommandCode.DELETEQ.cc()] = executors[CommandCode.DELETE.cc()];
executors[CommandCode.GET.cc()] = new GetCommandExecutor();
executors[CommandCode.GETQ.cc()] = executors[CommandCode.GET.cc()];
executors[CommandCode.GETK.cc()] = executors[CommandCode.GET.cc()];
executors[CommandCode.GETKQ.cc()] = executors[CommandCode.GET.cc()];
executors[CommandCode.TOUCH.cc()] = executors[CommandCode.GET.cc()];
executors[CommandCode.GAT.cc()] = executors[CommandCode.GET.cc()];
executors[CommandCode.GATQ.cc()] = executors[CommandCode.GET.cc()];
executors[CommandCode.INCREMENT.cc()] = new ArithmeticCommandExecutor();
executors[CommandCode.INCREMENTQ.cc()] = executors[CommandCode.INCREMENT.cc()];
executors[CommandCode.DECREMENT.cc()] = executors[CommandCode.INCREMENT.cc()];
executors[CommandCode.DECREMENTQ.cc()] = executors[CommandCode.INCREMENT.cc()];
executors[CommandCode.SASL_LIST_MECHS.cc()] = new SaslCommandExecutor();
executors[CommandCode.SASL_AUTH.cc()] = executors[CommandCode.SASL_LIST_MECHS.cc()];
executors[CommandCode.SASL_STEP.cc()] = executors[CommandCode.SASL_LIST_MECHS.cc()];
bootTime = System.currentTimeMillis() / 1000;
selector = Selector.open();
server = ServerSocketChannel.open();
server.configureBlocking(false);
if (hostname != null && !hostname.equals("*")) {
server.socket().bind(new InetSocketAddress(hostname, port));
this.hostname = hostname;
} else {
server.socket().bind(new InetSocketAddress(port));
InetAddress address = server.socket().getInetAddress();
if (address.isAnyLocalAddress()) {
String name;
try {
name = InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException ex) {
name = "localhost";
}
this.hostname = name;
} else {
this.hostname = address.getHostName();
}
}
this.port = server.socket().getLocalPort();
server.register(selector, SelectionKey.OP_ACCEPT);
}
return datastore;
}
@Override
Map<String, Object> map = new HashMap<String, Object>();
long now = System.currentTimeMillis() / 1000;
int uptime = (int) (now - bootTime);
map.put("uptime", new Long(uptime));
map.put("replication", 1);
map.put("clusterMembership", "active");
map.put("status", "healthy");
map.put("hostname", getSocketName());
map.put("clusterCompatibility", 1);
map.put("version", "9.9.9");
StringBuilder sb = new StringBuilder(System.getProperty("os.arch"));
sb.append("-");
sb.append(System.getProperty("os.name"));
sb.append("-");
sb.append(System.getProperty("os.version"));
map.put("os", sb.toString().replaceAll(" ", "_"));
Map<String, Integer> ports = new HashMap<String, Integer>();
ports.put("direct", port);
ports.put("proxy", 0);
map.put("ports", ports);
return JSONObject.fromObject(map).toString();
}
HashMap<String, String> stats = new HashMap<String, String>();
stats.put("pid", Long.toString(Thread.currentThread().getId()));
stats.put("time", Long.toString(new Date().getTime()));
stats.put("version", "9.9.9");
stats.put("uptime", "15554");
stats.put("accepting_conns", "1");
stats.put("auth_cmds", "0");
stats.put("auth_errors", "0");
stats.put("bucket_active_conns", "1");
stats.put("bucket_conns", "3");
stats.put("bytes_read", "1108621");
stats.put("bytes_written", "205374436");
stats.put("cas_badval", "0");
stats.put("cas_hits", "0");
stats.put("cas_misses", "0");
return stats;
}
return hostname + ":" + port;
}
throws IOException, ClosedChannelException
{
int wv;
int nw = 0;
do {
wv = channel.write(buf);
nw += wv;
} while (wv > 0);
if (wv == -1) {
channel.close();
throw new ClosedChannelException();
}
return nw;
}
@Override
try {
while (!Thread.currentThread().isInterrupted()) {
try {
selector.select();
if (!active) {
selector.selectedKeys().clear();
continue;
}
} catch (IOException ex) {
continue;
}
try {
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
MemcachedConnection client = (MemcachedConnection) key.attachment();
if (client != null) {
try {
int ioevents = SelectionKey.OP_READ;
SocketChannel channel = (SocketChannel) key.channel();
if (key.isReadable()) {
if (channel.read(client.getInputBuffer()) == -1) {
channel.close();
throw new ClosedChannelException();
} else {
client.step();
}
}
if (key.isWritable()) {
ByteBuffer buf;
while ((buf = client.getOutputBuffer()) != null) {
if (truncateLimit > 0 && buf.limit() > truncateLimit) {
buf.limit(truncateLimit);
}
if (hiccupOffset > 0 && buf.limit() > hiccupOffset) {
ByteBuffer immediateBuf = buf.slice();
buf.position(hiccupOffset);
immediateBuf.limit(hiccupOffset);
writeResponse(channel, immediateBuf);
try {
Thread.sleep(hiccupTime);
}
catch (InterruptedException exintr) {
}
}
writeResponse(channel, buf);
}
}
if (client.hasOutput()) {
ioevents |= SelectionKey.OP_WRITE;
}
channel.register(selector, ioevents, client);
} catch (ClosedChannelException exp) {
} catch (IOException ioexp) {
}
} else {
if (key.isAcceptable()) {
SocketChannel cc = server.accept();
cc.configureBlocking(false);
cc.register(selector, SelectionKey.OP_READ, new MemcachedConnection(this));
}
}
}
} catch (IOException e) {
Logger.getLogger(MemcachedServer.class.getName()).log(Level.SEVERE, null, e);
}
}
} finally {
try {
server.close();
selector.close();
} catch (IOException e) {
Logger.getLogger(MemcachedServer.class.getName()).log(Level.SEVERE, null, e);
}
}
}
{
return bucket;
}
@Override
public void execute(BinaryCommand cmd, MemcachedConnection client)
throws IOException {
try {
if (client.isAuthenticated()
|| cmd.getComCode() == CommandCode.SASL_AUTH
|| cmd.getComCode() == CommandCode.SASL_LIST_MECHS
|| cmd.getComCode() == CommandCode.SASL_STEP) {
executors[cmd.getComCode().cc()].execute(cmd, this, client);
} else {
client.sendResponse(new BinaryResponse(cmd, ErrorCode.AUTH_ERROR));
}
} catch (AccessControlException ex) {
client.sendResponse(new BinaryResponse(cmd, ErrorCode.NOT_MY_VBUCKET));
}
}
return this;
}
active = false;
}
active = true;
}
public void setHiccup(
int msecs,
int offset) {
if (msecs < 0 || offset < 0) {
throw new IllegalArgumentException("Time and offset must be >= 0");
}
hiccupTime = msecs;
hiccupOffset = offset;
}
truncateLimit = limit;
}
public static void main(String[] args) {
try {
DataStore ds = new DataStore(1024);
MemcachedServer server = new MemcachedServer(null, null, 11211, ds);
for (int ii = 0; ii < 1024; ++ii) {
ds.setOwnership(ii, server);
}
server.run();
} catch (IOException e) {
Logger.getLogger(MemcachedServer.class.getName()).log(Level.SEVERE, "Fatal error! failed to create socket: ", e);
}
}
return active;
}
return bucket.getType();
}
}