package com.cloudera.flume.handlers.debug;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.flume.conf.SinkFactory.SinkDecoBuilder;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.core.EventSinkDecorator;
import com.cloudera.flume.reporter.ReportEvent;
import com.cloudera.flume.reporter.Reportable;
import com.cloudera.util.BackoffPolicy;
import com.cloudera.util.CappedExponentialBackoff;
import com.cloudera.util.CumulativeCappedExponentialBackoff;
import com.cloudera.util.MultipleIOException;
import com.google.common.base.Preconditions;
EventSinkDecorator<S> implements Reportable {
static final Logger LOG = LoggerFactory.getLogger(InsistentOpenDecorator.class);
final BackoffPolicy backoff;
final public static String A_INITIALSLEEP = "intialSleep";
final public static String A_MAXSLEEP = "maxSleep";
final public static String A_ATTEMPTS = "openAttempts";
final public static String A_REQUESTS = "openRequests";
final public static String A_SUCCESSES = "openSuccessses";
final public static String A_RETRIES = "openRetries";
final public static String A_GIVEUPS = "openGiveups";
long openRequests;
long openAttempts;
long openSuccesses;
long openRetries;
long openGiveups;
volatile boolean opening = false;
super(s);
this.backoff = backoff;
this.openSuccesses = 0;
this.openRetries = 0;
}
long cumulativeCap) {
super(s);
this.backoff = new CumulativeCappedExponentialBackoff(initial, sleepCap,
cumulativeCap);
this.openSuccesses = 0;
this.openRetries = 0;
}
super(s);
this.backoff = new CappedExponentialBackoff(initial, sleepCap);
this.openSuccesses = 0;
this.openRetries = 0;
}
@Override
synchronized public void open()
throws IOException {
List<IOException> exns = new ArrayList<IOException>();
int attemptRetries = 0;
opening = true;
openRequests++;
while (!backoff.isFailed() && opening
&& !Thread.currentThread().isInterrupted()) {
try {
openAttempts++;
super.open();
openSuccesses++;
backoff.reset();
LOG.info("Opened " + sink.getName() + " on try " + attemptRetries);
opening = false;
return;
} catch (Exception e) {
if (Thread.currentThread().isInterrupted()) {
throw new IOException("Open has been interrupted");
}
if (!opening) {
throw new IOException("Unable to open and then close requested");
}
long waitTime = backoff.sleepIncrement();
LOG.info("open attempt " + attemptRetries + " failed, backoff ("
+ waitTime + "ms): " + e.getMessage());
LOG.debug(e.getMessage(), e);
exns.add((e instanceof IOException) ? (IOException) e
: new IOException(e));
backoff.backoff();
try {
backoff.waitUntilRetryOk();
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
exns.add(new IOException(e1));
throw MultipleIOException.createIOException(exns);
}
attemptRetries++;
openRetries++;
}
}
openGiveups++;
opening = false;
IOException ioe = MultipleIOException.createIOException(exns);
if (ioe == null) {
return;
}
throw ioe;
}
@Override
synchronized public void append(Event e)
throws IOException {
super.append(e);
}
@Override
synchronized public void close()
throws IOException {
opening = false;
super.close();
}
public static SinkDecoBuilder
builder() {
return new SinkDecoBuilder() {
@Override
public EventSinkDecorator<EventSink>
build(Context context,
String... argv) {
long initMs = FlumeConfiguration.get().getInsistentOpenInitBackoff();
long cumulativeMaxMs = FlumeConfiguration.get()
.getFailoverMaxCumulativeBackoff();
long maxMs = FlumeConfiguration.get().getFailoverMaxSingleBackoff();
Preconditions.checkArgument(argv.length <= 3,
"usage: insistentOpen([max=" + maxMs + "[,init=" + initMs
+ "[,cumulativeMax=maxint]]])");
if (argv.length >= 1) {
maxMs = Long.parseLong(argv[0]);
}
if (argv.length >= 2) {
initMs = Long.parseLong(argv[1]);
}
if (argv.length == 3) {
cumulativeMaxMs = Long.parseLong(argv[2]);
return new InsistentOpenDecorator<EventSink>(null,
new CumulativeCappedExponentialBackoff(initMs, maxMs,
cumulativeMaxMs));
}
return new InsistentOpenDecorator<EventSink>(null,
new CappedExponentialBackoff(initMs, maxMs));
}
};
}
@Override
return "InsistentOpen";
}
@Override
ReportEvent rpt = super.getReport();
rpt.hierarchicalMerge(backoff.getName(), backoff.getReport());
rpt.setLongMetric(A_REQUESTS, openRequests);
rpt.setLongMetric(A_ATTEMPTS, openAttempts);
rpt.setLongMetric(A_SUCCESSES, openSuccesses);
rpt.setLongMetric(A_RETRIES, openRetries);
rpt.setLongMetric(A_GIVEUPS, openGiveups);
return rpt;
}
}