package com.cloudera.flume.handlers.debug;
import java.io.IOException;
import com.cloudera.flume.agent.FlumeNode;
import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.SinkFactory.SinkDecoBuilder;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.core.EventSinkDecorator;
import com.google.common.base.Preconditions;
public class ChokeDecorator<S
extends EventSink>
extends EventSinkDecorator<S> {
final String chokeId;
private ChokeManager chokeMan;
super(s);
chokeId = tId;
}
@Override
public void append(Event e)
throws IOException {
try {
chokeMan.spendTokens(chokeId, e.getBody().length);
super.append(e);
} catch (Exception e1) {
throw new IOException(e1.getMessage(), e1);
}
}
@Override
public void open()
throws IOException {
this.chokeMan = FlumeNode.getInstance().getChokeManager();
super.open();
}
return chokeId;
}
public static SinkDecoBuilder
builder() {
return new SinkDecoBuilder() {
@Override
public EventSinkDecorator<EventSink>
build(Context context,
String... argv) {
Preconditions.checkArgument(argv.length ==1,
"usage: choke(\"chokeId\")");
String chokeID = argv[0];
return new ChokeDecorator<EventSink>(null, chokeID);
}
};
}
}