package com.cloudera.flume.handlers.hive;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.junit.Test;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventImpl;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.core.Event.Priority;
import com.cloudera.flume.handlers.avro.AvroJsonOutputFormat;
import com.cloudera.util.Clock;
import com.cloudera.util.FileUtil;
@Test
File f = FileUtil.mktempdir();
EventSink snk = new HiveNotifyingDfsSink("file://" + f + "/%Y-%m-%d/",
"file-%{host}", "hivetable");
snk.open();
long day_millis = 1000 * 60 * 60 * 24;
Event e1 = new EventImpl(new byte[0], Clock.unixTime(), Priority.INFO, 0,
"localhost");
Event e2 = new EventImpl(new byte[0], e1.getTimestamp() + day_millis,
Priority.INFO, 0, "localhost");
Event e3 = new EventImpl(new byte[0], e1.getTimestamp() + 2 * day_millis,
Priority.INFO, 0, "localhost");
snk.append(e1);
snk.append(e2);
snk.append(e3);
snk.close();
FileUtil.rmr(f);
}
@Test
File f = FileUtil.mktempdir();
final List<HiveDirCreatedNotification> saves = new ArrayList<HiveDirCreatedNotification>();
HiveDirCreatedHandler hfrh = new HiveDirCreatedHandler() {
@Override
saves.add(notif);
}
};
String path = "file://" + f + "/%Y-%m-%d/";
EventSink snk = new HiveNotifyingDfsSink(path, "file-%{host}", "hivetable",
new AvroJsonOutputFormat(), hfrh);
snk.open();
long day_millis = 1000 * 60 * 60 * 24;
Event e1 = new EventImpl(new byte[0], Clock.unixTime(), Priority.INFO, 0,
"localhost");
Event e2 = new EventImpl(new byte[0], e1.getTimestamp() + day_millis,
Priority.INFO, 0, "localhost");
Event e3 = new EventImpl(new byte[0], e1.getTimestamp() + 2 * day_millis,
Priority.INFO, 0, "localhost");
snk.append(e1);
snk.append(e2);
snk.append(e3);
snk.close();
FileUtil.rmr(f);
assertEquals(3, saves.get(0).meta.size());
assertEquals("hivetable", saves.get(0).table);
Set<String> paths = new HashSet<String>();
paths.add(e1.escapeString(path));
paths.add(e2.escapeString(path));
paths.add(e3.escapeString(path));
assertTrue(paths.remove(saves.get(0).dir));
assertTrue(paths.remove(saves.get(1).dir));
assertTrue(paths.remove(saves.get(2).dir));
}
@Test
File f = FileUtil.mktempdir();
final List<HiveDirCreatedNotification> saves = new ArrayList<HiveDirCreatedNotification>();
HiveDirCreatedHandler hfrh = new HiveNotifyingDfsSink.DedupDefaultHandler(
new HiveDirCreatedHandler() {
@Override
saves.add(notif);
}
});
long day_millis = 1000 * 60 * 60 * 24;
Event e1 = new EventImpl(new byte[0], Clock.unixTime(), Priority.INFO, 0,
"localhost");
Event e2 = new EventImpl(new byte[0], e1.getTimestamp() + day_millis,
Priority.INFO, 0, "localhost");
Event e3 = new EventImpl(new byte[0], e1.getTimestamp() + 2 * day_millis,
Priority.INFO, 0, "localhost");
String path = "file://" + f + "/%Y-%m-%d/";
EventSink snk = new HiveNotifyingDfsSink(path, "file-%{host}", "hivetable",
new AvroJsonOutputFormat(), hfrh);
snk.open();
snk.append(e1);
snk.append(e2);
snk.append(e3);
snk.close();
snk = new HiveNotifyingDfsSink(path, "file-%{host}", "hivetable",
new AvroJsonOutputFormat(), hfrh);
snk.open();
snk.append(e1);
snk.append(e2);
snk.append(e3);
snk.close();
FileUtil.rmr(f);
assertEquals(3, saves.get(0).meta.size());
assertEquals("hivetable", saves.get(0).table);
Set<String> paths = new HashSet<String>();
paths.add(e1.escapeString(path));
paths.add(e2.escapeString(path));
paths.add(e3.escapeString(path));
assertTrue(paths.remove(saves.get(0).dir));
assertTrue(paths.remove(saves.get(1).dir));
assertTrue(paths.remove(saves.get(2).dir));
}
}