package com.cloudera.flume.reporter.history;
import java.io.IOException;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.handlers.rolling.Tagger;
import com.cloudera.util.Clock;
import com.cloudera.util.Pair;
abstract public class HistoryReporter<S
extends EventSink>
extends EventSink.Base {
LinkedList<Pair<Long, S>> history;
String name;
S sink;
Tagger tagger;
long maxAge;
this.name = name;
this.maxAge = maxAge;
this.tagger = t;
history = new LinkedList<Pair<Long, S>>();
this.tagger = t;
}
abstract public S
newSink(Tagger format)
throws IOException;
@Override
public void append(Event e)
throws IOException {
if (sink == null) {
try {
sink = newSink(tagger);
sink.open();
} catch (IOException e1) {
e1.printStackTrace();
}
}
Date d = tagger.getDate();
long delta = Clock.unixTime() - d.getTime();
if (delta > maxAge) {
try {
sink.close();
history.add(new Pair<Long, S>(d.getTime(), sink));
sink = newSink(tagger);
sink.open();
} catch (IOException e1) {
e1.printStackTrace();
}
}
sink.append(e);
super.append(e);
}
@Override
synchronized public void close()
throws IOException {
sink.close();
sink = null;
}
@Override
public void open()
throws IOException {
if (sink == null) {
try {
sink = newSink(tagger);
sink.open();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
@Override
return name;
}
return history;
}
}