package com.cloudera.flume.core;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.flume.conf.FlumeSpecException;
import com.cloudera.flume.conf.SinkFactory.SinkBuilder;
import com.cloudera.flume.reporter.ReportEvent;
import com.cloudera.util.BackoffPolicy;
import com.cloudera.util.CappedExponentialBackoff;
import com.cloudera.util.MultipleIOException;
import com.google.common.base.Preconditions;
static final Logger LOG = LoggerFactory.getLogger(BackOffFailOverSink.class);
final String A_PRIMARY = "sentPrimary";
final String A_FAILS = "failsPrimary";
final String A_BACKUPS = "sentBackups";
final EventSink primary;
final EventSink backup;
AtomicLong primarySent = new AtomicLong();
AtomicLong fails = new AtomicLong();
AtomicLong backups = new AtomicLong();
boolean primaryOk = false;
boolean backupOpen = false;
final BackoffPolicy backoffPolicy;
BackoffPolicy backoff) {
Preconditions.checkNotNull(primary,
"BackOffFailOverSink called with null primary");
Preconditions.checkNotNull(backup,
"BackOffFailOverSink called with null backup");
this.primary = primary;
this.backup = backup;
this.backoffPolicy = backoff;
}
this(primary, backup, FlumeConfiguration.get().getFailoverInitialBackoff(),
FlumeConfiguration.get().getFailoverMaxSingleBackoff());
}
long initialBackoff, long maxBackoff) {
this(primary, backup, new CappedExponentialBackoff(initialBackoff,
maxBackoff));
}
@Override
public void append(Event e)
throws IOException {
if (!primaryOk) {
if (backoffPolicy.isRetryOk()) {
IOException ioe = tryOpenPrimary();
if (ioe != null) {
backoffPolicy.backoff();
try {
backup.append(e);
backups.incrementAndGet();
super.append(e);
return;
} catch (IOException ioe2) {
List<IOException> exceptions = new ArrayList<IOException>(2);
exceptions.add(ioe);
exceptions.add(ioe2);
IOException mio = MultipleIOException.createIOException(exceptions);
throw mio;
}
}
} else {
backup.append(e);
backups.incrementAndGet();
super.append(e);
return;
}
}
try {
primary.append(e);
primarySent.incrementAndGet();
super.append(e);
primaryOk = true;
backoffPolicy.reset();
return;
} catch (IOException ioe3) {
fails.incrementAndGet();
primaryOk = false;
backoffPolicy.backoff();
backup.append(e);
backups.incrementAndGet();
super.append(e);
}
}
@Override
public void close()
throws IOException {
List<IOException> exs = new ArrayList<IOException>(2);
try {
if (primaryOk)
primary.close();
} catch (IOException ex) {
exs.add(ex);
}
try {
if (backupOpen)
backup.close();
} catch (IOException ex) {
exs.add(ex);
}
if (exs.size() != 0) {
throw MultipleIOException.createIOException(exs);
}
}
IOException priEx = null;
try {
if (!primaryOk) {
primary.close();
primary.open();
primaryOk = true;
}
} catch (IOException ex) {
try {
primary.close();
} catch (IOException e) {
e.printStackTrace();
}
primaryOk = false;
priEx = ex;
}
return priEx;
}
@Override
public void open()
throws IOException {
IOException priEx = tryOpenPrimary();
if (Thread.currentThread().isInterrupted()) {
LOG.error("Backoff Failover sink exited because of interruption");
throw new IOException("Was interrupted, bailing out");
}
try {
backup.open();
backupOpen = true;
} catch (IOException ex) {
backupOpen = false;
if (priEx != null) {
IOException mioe = MultipleIOException.createIOException(Arrays.asList(
priEx, ex));
throw mioe;
}
}
}
@Override
return "BackoffFailover";
}
@Override
ReportEvent rpt = super.getReport();
rpt.setLongMetric(A_FAILS, fails.get());
rpt.setLongMetric(A_BACKUPS, backups.get());
rpt.setLongMetric(A_PRIMARY, primarySent.get());
return rpt;
}
@Override
public void getReports(String namePrefix, Map<String, ReportEvent> reports) {
super.getReports(namePrefix, reports);
primary.getReports(namePrefix + getName() + ".primary.", reports);
backup.getReports(namePrefix + getName() + ".backup.", reports);
}
return fails.get();
}
public static SinkBuilder
builder() {
return new SinkBuilder() {
@Override
public EventSink
build(Context context, String... argv) {
Preconditions.checkArgument(argv.length == 2);
String primary = argv[0];
String secondary = argv[1];
try {
EventSink pri = new CompositeSink(context, primary);
EventSink sec = new CompositeSink(context, secondary);
return new BackOffFailOverSink(pri, sec);
} catch (FlumeSpecException e) {
LOG.warn("Spec parsing problem", e);
throw new IllegalArgumentException("Spec parsing problem", e);
}
}
};
}
}