package fr.liglab.adele.cilia.framework.monitor.statevariable;
import java.util.Collections;
import java.util.Dictionary;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import org.apache.felix.ipojo.ConfigurationException;
import org.apache.felix.ipojo.metadata.Element;
import org.osgi.framework.BundleContext;
import org.osgi.framework.InvalidSyntaxException;
import org.osgi.framework.ServiceReference;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import fr.liglab.adele.cilia.Data;
import fr.liglab.adele.cilia.framework.monitor.AbstractMonitor;
import fr.liglab.adele.cilia.model.impl.ConstModel;
import fr.liglab.adele.cilia.runtime.ConstRuntime;
import fr.liglab.adele.cilia.runtime.WorkQueue;
import fr.liglab.adele.cilia.util.FrameworkUtils;
import fr.liglab.adele.cilia.util.Watch;
import fr.liglab.adele.cilia.util.concurrent.ConcurrentReaderHashMap;
@SuppressWarnings({ "rawtypes", "unchecked" })
private static Logger logger = LoggerFactory.getLogger(ConstRuntime.LOGGER_KNOWLEDGE);
private BundleContext m_bundleContext;
private static final String PROPERTY_MSG_HISTORY = "cilia.message.history";
private static final String PROPERTY_BINDING_TIME = "cilia.message.time.bind";
private WorkQueue m_systemQueue;
private long[] m_counters = new long[12];
private LinkedList m_gatherMsgIn = new LinkedList();
private LinkedList m_snapshootMsg = new LinkedList();
private LinkedList m_historyList = new LinkedList();
private Object _lock = new Object();
private Watch processTime;
private String chainId, componentId, uuid;
private Map m_statevar = new ConcurrentReaderHashMap();
private Set listStateVarEnabled = new HashSet();
private Set previousStateVarEnabled = new HashSet();
private String topic;
public void configure(Element metadata, Dictionary configuration)
throws ConfigurationException {
chainId = (String) configuration.get(ConstModel.PROPERTY_CHAIN_ID);
componentId = (String) configuration.get(ConstModel.PROPERTY_COMPONENT_ID);
uuid = (String) configuration.get(ConstModel.PROPERTY_UUID);
topic = ConstRuntime.TOPIC_HEADER + chainId;
configureStateVar(configuration);
}
Map configs = (Map) configuration.get(ConstRuntime.MONITORING_CONFIGURATION);
if (configs != null) {
previousStateVarEnabled.clear();
previousStateVarEnabled.addAll(listStateVarEnabled);
Iterator it = configs.entrySet().iterator();
while (it.hasNext()) {
Map.Entry pairs = (Map.Entry) it.next();
String key = (String) pairs.getKey();
if (key.equalsIgnoreCase("enable")) {
listStateVarEnabled.clear();
Set enabled = (Set) pairs.getValue();
Iterator iter = enabled.iterator();
while (iter.hasNext()) {
String stateVarId = (String) iter.next();
listStateVarEnabled.add(stateVarId);
stateVarConfiguration(stateVarId);
}
} else {
String ldapfilter = (String) pairs.getValue();
stateVarConfiguration(key, ldapfilter);
}
}
}
}
retreiveEventAdmin();
fireStatusChange();
}
}
m_bundleContext = getFactory().getBundleContext();
}
listStateVarEnabled.clear();
}
ServiceReference[] refs = null;
ServiceReference refEventAdmin;
try {
refs = m_bundleContext.getServiceReferences(EventAdmin.class.getName(), null);
} catch (InvalidSyntaxException e) {
logger.error("Event Admin service lookup unrecoverable error");
throw new RuntimeException("Event Adminservice lookup unrecoverable error");
}
if (refs != null)
refEventAdmin = refs[0];
else
refEventAdmin = null;
return refEventAdmin;
}
StateVarItem item = (StateVarItem) m_statevar.get(stateVarId);
if (item == null) {
item = new StateVarItem();
}
m_statevar.put(stateVarId, item);
}
Set union = new TreeSet(previousStateVarEnabled);
union.addAll(listStateVarEnabled);
Iterator it = union.iterator();
String variableId;
while (it.hasNext()) {
variableId = (String) it.next();
if ((previousStateVarEnabled.contains(variableId))
&& (!listStateVarEnabled.contains(variableId)))
firerVariableStatus(variableId, false);
else {
if ((!previousStateVarEnabled.contains(variableId))
&& (listStateVarEnabled.contains(variableId)))
firerVariableStatus(variableId, true);
}
}
}
Condition cond = null;
if ((ldapFilter != null) && (ldapFilter.length() > 0)) {
try {
cond = new Condition(getInstanceManager().getContext(), ldapFilter);
} catch (Exception ex) {
logger.error("Invalid LDAP syntax '" + ldapFilter + "' ,state variable '"
+ stateVarId + "'");
cond = null;
}
}
StateVarItem item = (StateVarItem) m_statevar.get(stateVarId);
if (item == null) {
item = new StateVarItem();
}
item.condition = cond;
m_statevar.put(stateVarId, item);
}
private boolean isEnabled(String stateVarId) {
return listStateVarEnabled.contains(stateVarId);
}
private void publish(String stateVarId, Object data,
long ticksCount) {
long last_ticksCount;
Condition cond;
boolean fire;
StateVarItem item = (StateVarItem) m_statevar.get(stateVarId);
if (item != null) {
fire = true;
} else {
cond = item.condition;
if (cond != null) {
last_ticksCount = item.lastpublish.longValue();
fire = cond.match(
ticksCount,
Watch.fromTicksToMs(ticksCount)
- Watch.fromTicksToMs(last_ticksCount));
} else {
fire = true;
item.lastpublish = new Long(ticksCount);
}
}
if (fire)
firer(stateVarId, data, ticksCount);
}
private void firer(String stateVarId, Object value,
long ticksCount) {
EventAdmin m_eventAdmin;
ServiceReference refEventAdmin = retreiveEventAdmin();
if (refEventAdmin == null) {
logger.error("Unable to retrieve Event Admin");
} else {
Map data = new HashMap(5);
data.put(ConstRuntime.EVENT_TYPE, ConstRuntime.TYPE_DATA);
data.put(ConstRuntime.UUID, uuid);
data.put(ConstRuntime.VARIABLE_ID, stateVarId);
data.put(ConstRuntime.VALUE, value);
data.put(ConstRuntime.TIMESTAMP, new Long(ticksCount));
StateVarItem item = (StateVarItem) m_statevar.get(stateVarId);
if (item != null)
item.lastpublish = new Long(ticksCount);
m_eventAdmin = (EventAdmin) m_bundleContext.getService(refEventAdmin);
m_eventAdmin.postEvent(new Event(topic, data));
m_bundleContext.ungetService(refEventAdmin);
if (logger.isDebugEnabled()) {
logger.debug("Node [{}] publish state variable [{}]",
FrameworkUtils.makeQualifiedId(chainId, componentId, uuid) + ":"
+ stateVarId, value.toString());
}
}
}
EventAdmin m_eventAdmin;
ServiceReference refEventAdmin = retreiveEventAdmin();
if (refEventAdmin == null) {
logger.error("Unable to retrieve Event Admin");
} else {
Map data = new HashMap(4);
data.put(ConstRuntime.EVENT_TYPE, ConstRuntime.TYPE_STATUS_VARIABLE);
data.put(ConstRuntime.UUID, uuid);
data.put(ConstRuntime.VARIABLE_ID, stateVarId);
data.put(ConstRuntime.VALUE, new Boolean(value));
m_eventAdmin = (EventAdmin) m_bundleContext.getService(refEventAdmin);
m_eventAdmin.postEvent(new Event(topic, data));
m_bundleContext.ungetService(refEventAdmin);
}
}
if (data != null) {
synchronized (_lock) {
List list = (List) data.getProperty(PROPERTY_MSG_HISTORY);
if (list != null)
m_gatherMsgIn.addAll(list);
}
}
}
synchronized (_lock) {
if (!m_gatherMsgIn.isEmpty())
m_snapshootMsg.addAll(m_gatherMsgIn);
m_gatherMsgIn.clear();
}
}
Iterator it;
Watch watch;
synchronized (_lock) {
if (!m_snapshootMsg.isEmpty()) {
m_historyList.addAll(m_snapshootMsg);
m_snapshootMsg.clear();
}
watch = new Watch(componentId);
m_historyList.addLast(watch);
}
if (!m_historyList.isEmpty()) {
it = listData.iterator();
while (it.hasNext()) {
Data data = (Data) it.next();
data.setProperty(PROPERTY_MSG_HISTORY, new LinkedList(m_historyList));
data.setProperty(PROPERTY_BINDING_TIME, watch);
}
m_historyList.clear();
}
}
if (listStateVarEnabled.isEmpty())
return;
gatherIncommingHistory(data);
if (isEnabled("scheduler.count")) {
m_counters[0]++;
m_systemQueue.execute(new AsynchronousExec("scheduler.count", new Long(
m_counters[0])));
}
if (isEnabled("scheduler.data")) {
m_systemQueue.execute(new AsynchronousExec("scheduler.data", new Data(data)));
}
if (isEnabled("transmission.delay")) {
if (data != null) {
synchronized (_lock) {
Watch watch = (Watch) data.getProperty(PROPERTY_BINDING_TIME);
data.removeProperty(PROPERTY_BINDING_TIME);
if (watch != null) {
long elapsedTime = Watch.fromTicksToMs(watch.getElapsedTicks());
m_systemQueue.execute(new AsynchronousExec("transmission.delay",
new Long(elapsedTime)));
}
}
}
}
if (isEnabled("message.history")) {
List list = (List) data.getProperty(PROPERTY_MSG_HISTORY);
if ((list != null) && (!list.isEmpty())) {
m_systemQueue.execute(new AsynchronousExec("message.history", new Data(
list)));
}
}
}
if (listStateVarEnabled.isEmpty())
return;
snapShotHistory();
processTime = new Watch();
if (isEnabled("process.entry.count")) {
m_counters[1]++;
m_systemQueue.execute(new AsynchronousExec("process.entry.count", new Long(
m_counters[1])));
}
if (isEnabled("process.entry.data")) {
m_systemQueue.execute(new AsynchronousExec("process.entry.data", new Data(
data)));
}
if (isEnabled("process.msg.treated")) {
if (data != null)
m_counters[8] = data.size();
m_systemQueue.execute(new AsynchronousExec("process.msg.treated", new Long(
m_counters[8])));
}
}
if (listStateVarEnabled.isEmpty())
return;
if (isEnabled("message.history") || isEnabled("transmission.delay"))
injectTags(data);
if (isEnabled("processing.delay")) {
m_systemQueue.execute(new AsynchronousExec("processing.delay", new Long(Watch
.fromTicksToMs(processTime.getElapsedTicks()))));
}
if (isEnabled("process.exit.count")) {
m_counters[2]++;
m_systemQueue.execute(new AsynchronousExec("process.exit.count", new Long(
m_counters[2])));
}
}
if (listStateVarEnabled.isEmpty())
return;
if (isEnabled("dispatch.count")) {
m_counters[3]++;
m_systemQueue.execute(new AsynchronousExec("dispatch.count", new Long(
m_counters[3])));
}
if (isEnabled("dispatch.data")) {
m_systemQueue.execute(new AsynchronousExec("dispatch.data", new Data(data)));
}
if (isEnabled("dispatch.msg.treated")) {
if (data != null)
m_counters[9] = data.size();
m_systemQueue.execute(new AsynchronousExec("dispatch.msg.treated", new Long(
m_counters[9])));
}
}
if (listStateVarEnabled.isEmpty())
return;
m_counters[4]++;
if (isEnabled("process.err.count")) {
m_counters[4]++;
m_systemQueue.execute(new AsynchronousExec("process.err.count", new Long(
m_counters[4])));
}
if (isEnabled("process.err.data")) {
m_systemQueue
.execute(new AsynchronousExec("process.err.data", new Data(data)));
}
}
if (listStateVarEnabled.isEmpty())
return;
if (isEnabled("fire.event")) {
m_systemQueue.execute(new AsynchronousExec("fire.event", info));
}
if (isEnabled("fire.event.count")) {
m_counters[5]++;
m_systemQueue.execute(new AsynchronousExec("fire.event.count", new Long(
m_counters[5])));
}
}
if (listStateVarEnabled.isEmpty())
return;
if (isEnabled("service.arrival")) {
m_systemQueue.execute(new AsynchronousExec("service.arrival", info));
}
if (isEnabled("service.arrival.count")) {
m_counters[6]++;
m_systemQueue.execute(new AsynchronousExec("service.arrival.count", new Long(
m_counters[6])));
}
}
if (listStateVarEnabled.isEmpty())
return;
if (isEnabled("service.departure")) {
m_systemQueue.execute(new AsynchronousExec("service.departure", info));
}
if (isEnabled("service.departure.count")) {
m_counters[7]++;
m_systemQueue.execute(new AsynchronousExec("service.departure.count",
new Long(m_counters[7])));
}
}
if (listStateVarEnabled.isEmpty())
return;
if (isEnabled("field.get")) {
m_systemQueue.equals(new AsynchronousExec("field.get", Collections
.singletonMap(field, o)));
}
if (isEnabled("field.get.count")) {
m_counters[10]++;
m_systemQueue.execute(new AsynchronousExec("field.get.count", new Long(
m_counters[10])));
}
}
if (listStateVarEnabled.isEmpty())
return;
if (isEnabled("field.set")) {
m_systemQueue.equals(new AsynchronousExec("field.set", Collections
.singletonMap(field, o)));
}
if (isEnabled("field.set.count")) {
m_counters[11]++;
m_systemQueue.execute(new AsynchronousExec("field.set.count", new Long(
m_counters[11])));
}
}
private final String stateVar;
private final Object data;
private final long tickCount = Watch.getCurrentTicks();
this.stateVar = stateVar;
this.data = data;
}
publish(stateVar, data, tickCount);
}
}
configureStateVar(configuration);
fireStatusChange();
}
Condition condition;
Long lastpublish;
lastpublish = new Long(0);
condition = null;
}
}
}