package org.couchbase.mock.http;
import java.io.IOException;
import java.io.OutputStream;
import java.io.StringWriter;
import java.util.Observable;
import java.util.Observer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.couchbase.mock.Bucket;
import org.couchbase.mock.CouchbaseMock.HarakiriMonitor;
private final OutputStream output;
private final Bucket bucket;
private final HarakiriMonitor monitor;
private final CountDownLatch completed;
private final Lock updateHandlerLock;
this.output = output;
this.bucket = bucket;
this.monitor = monitor;
this.completed = new CountDownLatch(1);
updateHandlerLock = new ReentrantLock();
}
StringWriter sw = new StringWriter();
sw.append(StateGrabber.getBucketJSON(bucket));
sw.append(StateGrabber.getStreamDelimiter());
return sw.toString().getBytes();
}
@Override
public void update(Observable o, Object arg) {
updateHandlerLock.lock();
try {
output.write(getConfigBytes());
output.flush();
}
catch (IOException ex) {
completed.countDown();
} finally {
updateHandlerLock.unlock();
}
}
public void startStreaming()
throws IOException, InterruptedException {
byte[] configBytes;
bucket.configReadLock();
configBytes = getConfigBytes();
updateHandlerLock.lock();
if (monitor != null) {
monitor.addObserver(this);
}
bucket.configReadUnlock();
try {
output.write(configBytes);
output.flush();
} finally {
updateHandlerLock.unlock();
}
completed.await();
output.close();
}
}