package com.cloudera.util;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Pipe;
import java.nio.channels.SelectableChannel;
import java.nio.channels.WritableByteChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
static final Logger LOG = LoggerFactory.getLogger(InputStreamPipe.class);
final InputStream input;
final Pipe pipe;
final CopyThread copyThread;
this.input = in;
pipe = Pipe.open();
copyThread = new CopyThread(in, pipe.sink());
}
this(System.in);
}
copyThread.start();
}
copyThread.shutdown();
}
public SelectableChannel
getChannel()
throws IOException {
SelectableChannel channel = pipe.source();
channel.configureBlocking(false);
return (channel);
}
copyThread.shutdown();
}
volatile boolean keepRunning = true;
InputStream in;
WritableByteChannel out;
CopyThread(InputStream in, WritableByteChannel out) {
this.in = in;
this.out = out;
this.setDaemon(true);
}
keepRunning = false;
this.interrupt();
try {
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
byte[] bytes = new byte[4096];
ByteBuffer buffer = ByteBuffer.wrap(bytes);
try {
while (keepRunning) {
int count = in.read(bytes);
if (count <= 0) {
Clock.sleep(100);
break;
}
buffer.clear().limit(count);
out.write(buffer);
}
out.close();
} catch (IOException e) {
LOG.error("Input stream pipe closed", e);
} catch (InterruptedException e) {
LOG.info("Input stream pipe interrupted");
}
}
}
}