package com.cloudera.flume.core;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import com.cloudera.flume.reporter.ReportEvent;
import com.cloudera.util.MultipleIOException;
public class FanOutSink<S
extends EventSink>
extends EventSink.Base {
final List<S> sinks = Collections
.synchronizedList(new CopyOnWriteArrayList<S>());
}
sinks.addAll(l);
}
sinks.addAll(Arrays.asList(ss));
}
sinks.add(r);
}
public void addAll(Collection<?
extends S> c) {
sinks.addAll(c);
}
sinks.remove(r);
}
protected Iterable<S>
iter() {
return sinks;
}
@Override
public void close()
throws IOException {
List<IOException> exs = new ArrayList<IOException>();
for (S snk : sinks) {
try {
snk.close();
} catch (IOException ioe) {
exs.add(ioe);
}
}
if (!exs.isEmpty()) {
throw MultipleIOException.createIOException(exs);
}
}
@Override
public void open()
throws IOException {
List<IOException> exs = new ArrayList<IOException>();
for (S snk : sinks) {
try {
snk.open();
} catch (IOException ioe) {
exs.add(ioe);
}
}
if (!exs.isEmpty()) {
throw MultipleIOException.createIOException(exs);
}
}
@Override
synchronized public void append(Event e)
throws IOException {
List<IOException> exs = new ArrayList<IOException>();
for (S snk : sinks) {
try {
snk.append(e);
super.append(e);
} catch (IOException ioe) {
exs.add(ioe);
}
}
if (!exs.isEmpty()) {
throw MultipleIOException.createIOException(exs);
}
}
@Override
return "Fanout";
}
@Override
public void getReports(String namePrefix, Map<String, ReportEvent> reports) {
super.getReports(namePrefix, reports);
int i = 0;
for (EventSink s : sinks) {
s.getReports(namePrefix + getName() + "." + i + ".", reports);
i++;
}
}
}