package com.cloudera.flume.agent.durability;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudera.flume.agent.FlumeNode;
import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.flume.conf.LogicalNodeContext;
import com.cloudera.flume.conf.SinkFactory.SinkDecoBuilder;
import com.cloudera.flume.core.Driver;
import com.cloudera.flume.core.DriverListener;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.core.EventSinkDecorator;
import com.cloudera.flume.core.EventSource;
import com.cloudera.flume.core.connector.DirectDriver;
import com.cloudera.flume.handlers.debug.LazyOpenDecorator;
import com.cloudera.flume.handlers.endtoend.AckChecksumInjector;
import com.cloudera.flume.handlers.endtoend.AckListener;
import com.cloudera.flume.handlers.rolling.RollSink;
import com.cloudera.flume.handlers.rolling.RollTrigger;
import com.cloudera.flume.handlers.rolling.TimeTrigger;
import com.cloudera.flume.reporter.ReportEvent;
import com.google.common.base.Preconditions;
EventSinkDecorator<S> {
static final Logger LOG = LoggerFactory.getLogger(NaiveFileWALDeco.class);
final WALManager walman;
final RollTrigger trigger;
final AckListener queuer;
final EventSinkDecorator<S> drainSink;
final long checkMs;
RollSink input;
EventSource drainSource;
Driver conn;
Context ctx;
final AckListener al;
CountDownLatch completed = null;
CountDownLatch started = null;
volatile IOException lastExn = null;
RollTrigger t, AckListener al, long checkMs) {
super(s);
this.ctx = ctx;
this.walman = walman;
this.trigger = t;
this.queuer = new AckListener.Empty();
this.al = al;
this.drainSink = (EventSinkDecorator<S>) new EventSinkDecorator(
new LazyOpenDecorator(new AckChecksumRegisterer<S>(s, al)));
this.checkMs = checkMs;
}
EventSinkDecorator<S> {
AckListener q;
super(s);
this.q = q;
}
@Override
public void append(Event e)
throws IOException {
super.append(e);
byte[] btyp = e.get(AckChecksumInjector.ATTR_ACK_TYPE);
if (btyp == null) {
return;
}
byte[] btag = e.get(AckChecksumInjector.ATTR_ACK_TAG);
if (Arrays.equals(btyp, AckChecksumInjector.CHECKSUM_STOP)) {
String k = new String(btag);
LOG.info("Registering interest in checksum group called '" + k + "'");
q.end(k);
return;
}
}
}
@Override
public synchronized void append(Event e)
throws IOException {
Preconditions.checkNotNull(sink, "NaiveFileWALDeco was invalid!");
Preconditions.checkState(isOpen.get(),
"NaiveFileWALDeco not open for append");
if (lastExn != null) {
throw lastExn;
}
input.append(e);
}
@Override
public synchronized void close()
throws IOException {
Preconditions.checkNotNull(sink,
"Attmpted to close a null NaiveFileWALDeco");
LOG.debug("Closing NaiveFileWALDeco");
input.close();
walman.stopDrains();
try {
LOG.debug("Waiting for subthread to complete .. ");
int maxNoProgressTime = 10;
ReportEvent rpt = sink.getReport();
Long levts = rpt.getLongMetric(EventSink.Base.R_NUM_EVENTS);
long evts = (levts == null) ? 0 : levts;
int count = 0;
while (true) {
if (conn.join(500)) {
LOG.debug(".. subthread to completed");
break;
}
ReportEvent rpt2 = sink.getReport();
Long levts2 = rpt2.getLongMetric(EventSink.Base.R_NUM_EVENTS);
long evts2 = (levts2 == null) ? 0 : levts;
if (evts2 > evts) {
count = 0;
evts = evts2;
LOG.info("Closing disk failover log, subsink still making progress");
continue;
}
count++;
LOG.info("Attempt " + count
+ " with no progress being made on disk failover subsink");
if (count >= maxNoProgressTime) {
LOG.warn("DFO drain thread was not making progress, forcing close");
conn.cancel();
break;
}
}
} catch (InterruptedException e) {
LOG.error("WAL drain thread interrupted", e);
}
drainSource.close();
super.close();
try {
completed.await();
} catch (InterruptedException e) {
LOG.error("WAL drain thread flush interrupted", e);
}
if (lastExn != null) {
IOException tmp = lastExn;
lastExn = null;
LOG.warn("Throwing exception from subthread");
throw tmp;
}
LOG.debug("Closed DiskFailoverDeco");
}
@Override
synchronized public void open()
throws IOException {
Preconditions.checkNotNull(sink,
"Attempted to open a null NaiveFileWALDeco subsink");
LOG.debug("Opening NaiveFileWALDeco");
input = walman.getAckingSink(ctx, trigger, queuer, checkMs);
drainSource = walman.getEventSource();
drainSource.open();
drainSink.open();
input.open();
started = new CountDownLatch(1);
completed = new CountDownLatch(1);
conn = new DirectDriver("naive file wal transmit", drainSource, drainSink);
conn.registerListener(new DriverListener.Base() {
@Override
started.countDown();
}
@Override
completed.countDown();
}
@Override
public void fireError(Driver c, Exception ex) {
LOG.error("unexpected error with NaiveFileWALDeco", ex);
lastExn = (ex instanceof IOException) ? (IOException) ex
: new IOException(ex);
try {
conn.getSource().close();
conn.getSink().close();
} catch (IOException e) {
LOG.error("Error closing", e);
}
completed.countDown();
LOG.info("Error'ed Connector closed " + conn);
}
});
conn.start();
try {
started.await();
} catch (InterruptedException e) {
LOG.error("Unexpected error waiting", e);
throw new IOException(e);
}
LOG.debug("Opened NaiveFileWALDeco");
Preconditions.checkNotNull(sink);
Preconditions.checkState(!isOpen.get());
isOpen.set(true);
}
this.sink = sink;
this.drainSink.setSink((S) new LazyOpenDecorator<EventSink>(
new AckChecksumRegisterer<S>(sink, al)));
}
public synchronized boolean rotate() {
return input.rotate();
}
return walman.isEmpty();
}
return new SinkDecoBuilder() {
@Override
public EventSinkDecorator<EventSink>
build(Context context,
String... argv) {
Preconditions.checkArgument(argv.length <= 3,
"usage: ackedWriteAhead[(maxMillis,walnode,checkMs)]");
FlumeConfiguration conf = FlumeConfiguration.get();
long delayMillis = conf.getAgentLogMaxAge();
if (argv.length >= 1) {
delayMillis = Long.parseLong(argv[0]);
}
FlumeNode node = FlumeNode.getInstance();
String walnode = context.getValue(LogicalNodeContext.C_LOGICAL);
if (argv.length >= 2) {
walnode = argv[1];
}
if (walnode == null) {
LOG.warn("Context does not have a logical node name "
+ "-- this will likely be a problem if you have multiple WALs");
}
long checkMs = 250;
if (argv.length >= 3) {
checkMs = Long.parseLong(argv[2]);
}
WALManager walman = node.getAddWALManager(walnode);
return new NaiveFileWALDeco<EventSink>(context, null, walman,
new TimeTrigger(delayMillis), node.getAckChecker()
.getAgentAckQueuer(), checkMs);
}
};
}
@Override
return "NaiveFileWAL";
}
@Override
ReportEvent rpt = super.getReport();
ReportEvent walRpt = walman.getReport();
rpt.merge(walRpt);
ReportEvent sinkReport = sink.getReport();
rpt.hierarchicalMerge(getName(), sinkReport);
return rpt;
}
}