package com.cloudera.flume.handlers.debug;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public static final int timeQuanta = 100;
private int physicalLimit;
private volatile boolean active = false;
private int payLoadheadrsize = 50;
private final HashMap<String, ChokeInfoData> chokeInfoMap = new HashMap<String, ChokeInfoData>();
ReentrantReadWriteLock rwlChokeInfoMap;
super("ChokeManager");
rwlChokeInfoMap = new ReentrantReadWriteLock();
this.physicalLimit = Integer.MAX_VALUE;
}
public void (int size) throws IllegalArgumentException {
if (size < 0) {
throw new IllegalArgumentException(
"Payload header size cannot be negative");
}
this.payLoadheadrsize = size;
}
private void register(String chokeID,
int limit) {
if (chokeInfoMap.get(chokeID) == null) {
chokeInfoMap.put(chokeID, new ChokeInfoData(limit, chokeID));
}
else {
this.chokeInfoMap.get(chokeID).setMaxLimit(limit);
}
}
rwlChokeInfoMap.writeLock().lock();
try {
for (String s : newMap.keySet()) {
register(s, newMap.get(s));
}
if (newMap.containsKey("")) {
this.physicalLimit = newMap.get("");
}
} finally {
rwlChokeInfoMap.writeLock().unlock();
}
}
Boolean res;
rwlChokeInfoMap.readLock().lock();
try {
res = this.chokeInfoMap.containsKey(ID);
} finally {
rwlChokeInfoMap.readLock().unlock();
}
return res;
}
@Override
active = true;
while (this.active) {
try {
Thread.sleep(timeQuanta);
} catch (InterruptedException e) {
continue;
}
rwlChokeInfoMap.readLock().lock();
try {
for (ChokeInfoData choke : this.chokeInfoMap.values()) {
synchronized (choke) {
choke.bucketFillup();
choke.notifyAll();
}
}
} finally {
rwlChokeInfoMap.readLock().unlock();
}
}
}
active = false;
}
public void spendTokens(String id,
int numBytes)
throws IOException {
rwlChokeInfoMap.readLock().lock();
try {
if (this.isChokeId(id) != false) {
int loopCount = 0;
ChokeInfoData myTinfoData = this.chokeInfoMap.get(id);
synchronized (myTinfoData) {
while (this.active
&& !myTinfoData.bucketCompare(numBytes + this.payLoadheadrsize)) {
try {
myTinfoData.wait(ChokeManager.timeQuanta);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
}
if (loopCount++ >= 2)
break;
}
myTinfoData.removeTokens(numBytes + this.payLoadheadrsize);
}
}
} finally {
rwlChokeInfoMap.readLock().unlock();
}
}
}