下面列出了java.nio.channels.Pipe#sink() 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void test1() throws IOException {
//1.获取管道
Pipe pipe = Pipe.open();
//2.将缓冲区数据写入管道
ByteBuffer buf = ByteBuffer.allocate(1024);
Pipe.SinkChannel sinkChannel = pipe.sink();
buf.put("通过单向管道发送数据".getBytes());
buf.flip();
sinkChannel.write(buf);
//3.读取缓冲区中的数据
Pipe.SourceChannel sourceChannel = pipe.source();
buf.flip();
int len = sourceChannel.read(buf);
System.out.println(new String(buf.array(), 0, len));
sourceChannel.close();
sinkChannel.close();
}
@Test
public void test1() throws IOException {
// 1. 获取管道
Pipe pipe = Pipe.open();
// 2. 将缓冲区中的数据写入管道
ByteBuffer buf = ByteBuffer.allocate(1024);
Pipe.SinkChannel sinkChannel = pipe.sink();
buf.put("通过单向管道发送数据".getBytes());
buf.flip();
sinkChannel.write(buf);
// 3. 读取缓冲区中的数据
Pipe.SourceChannel sourceChannel = pipe.source();
buf.flip();
int len = sourceChannel.read(buf);
System.out.println(new String(buf.array(), 0, len));
sourceChannel.close();
sinkChannel.close();
}
/**
* Creates a mock input stream which send some data every SMALL_TIMEOUT ms.
*/
@Before
public void setup() throws Exception {
mock = TestUtils.mockInputStream(text);
final Pipe pipe = Pipe.open();
input = new SingleInputExpect(
pipe.source(),
pipe.sink(),
mock.getStream(),
Charset.defaultCharset(),
null,
null,
DEFAULT_BUFFER_SIZE,
false);
executor = Executors.newSingleThreadExecutor();
input.start(executor);
mock.waitUntilReady();
}
private InputStream initializeUploadPipe() throws IOException {
switch (options.getPipeType()) {
case NIO_CHANNEL_PIPE:
Pipe pipe = Pipe.open();
pipeSink = pipe.sink();
InputStream pipeSource = Channels.newInputStream(pipe.source());
return options.getPipeBufferSize() > 0
? new BufferedInputStream(pipeSource, options.getPipeBufferSize())
: pipeSource;
case IO_STREAM_PIPE:
PipedInputStream internalPipeSource = new PipedInputStream(options.getPipeBufferSize());
PipedOutputStream internalPipeSink = new PipedOutputStream(internalPipeSource);
pipeSink = Channels.newChannel(internalPipeSink);
return internalPipeSource;
}
throw new IllegalStateException("Unknown PipeType: " + options.getPipeType());
}
private Pipe.SourceChannel createNonBlockingChannel(byte[] content) throws IOException {
Pipe pipe = Pipe.open();
WritableByteChannel sinkChannel = pipe.sink();
sinkChannel.write(ByteBuffer.wrap(content));
Pipe.SourceChannel sourceChannel = pipe.source();
sourceChannel.configureBlocking(false);
return sourceChannel;
}
public NBCircularIOStream() throws IOException {
final Pipe pipe = Pipe.open();
sink = new BufferedWritableSelectableChannel(new PipeSinkWritableSelectableChannel(pipe.sink()));
final Pipe.SourceChannel source = pipe.source();
sink.configureBlocking(false);
source.configureBlocking(true);
in = Channels.newInputStream(source);
}
@Test
public void testReadableByteChannel() throws Exception {
int[] testSizes = { 0, 10, 1023, 1024, 1025, 2047, 2048, 2049 };
for (int size : testSizes) {
SelectorProvider sp = SelectorProvider.provider();
Pipe p = sp.openPipe();
Pipe.SinkChannel sink = p.sink();
Pipe.SourceChannel source = p.source();
sink.configureBlocking(false);
ByteBuffer outgoingdata = ByteBuffer.allocateDirect(size + 10);
byte[] someBytes = new byte[size + 10];
generator.nextBytes(someBytes);
outgoingdata.put(someBytes);
outgoingdata.flip();
int totalWritten = 0;
while (totalWritten < size + 10) {
int written = sink.write(outgoingdata);
if (written < 0)
throw new Exception("Write failed");
totalWritten += written;
}
File f = File.createTempFile("blah"+size, null);
f.deleteOnExit();
RandomAccessFile raf = new RandomAccessFile(f, "rw");
FileChannel fc = raf.getChannel();
long oldPosition = fc.position();
long bytesWritten = fc.transferFrom(source, 0, size);
fc.force(true);
if (bytesWritten != size)
throw new RuntimeException("Transfer failed");
if (fc.position() != oldPosition)
throw new RuntimeException("Position changed");
if (fc.size() != size)
throw new RuntimeException("Unexpected sink size "+ fc.size());
fc.close();
sink.close();
source.close();
f.delete();
}
}
@Test
public void testSocketIOWithTimeout() throws Exception {
// first open pipe:
Pipe pipe = Pipe.open();
Pipe.SourceChannel source = pipe.source();
Pipe.SinkChannel sink = pipe.sink();
try {
final InputStream in = new SocketInputStream(source, TIMEOUT);
OutputStream out = new SocketOutputStream(sink, TIMEOUT);
byte[] writeBytes = TEST_STRING.getBytes();
byte[] readBytes = new byte[writeBytes.length];
byte byteWithHighBit = (byte)0x80;
out.write(writeBytes);
out.write(byteWithHighBit);
doIO(null, out, TIMEOUT);
in.read(readBytes);
assertTrue(Arrays.equals(writeBytes, readBytes));
assertEquals(byteWithHighBit & 0xff, in.read());
doIO(in, null, TIMEOUT);
// Change timeout on the read side.
((SocketInputStream)in).setTimeout(TIMEOUT * 2);
doIO(in, null, TIMEOUT * 2);
/*
* Verify that it handles interrupted threads properly.
* Use a large timeout and expect the thread to return quickly
* upon interruption.
*/
((SocketInputStream)in).setTimeout(0);
TestingThread thread = new TestingThread(ctx) {
@Override
public void doWork() throws Exception {
try {
in.read();
fail("Did not fail with interrupt");
} catch (InterruptedIOException ste) {
LOG.info("Got expection while reading as expected : " +
ste.getMessage());
}
}
};
ctx.addThread(thread);
ctx.startThreads();
// If the thread is interrupted before it calls read()
// then it throws ClosedByInterruptException due to
// some Java quirk. Waiting for it to call read()
// gets it into select(), so we get the expected
// InterruptedIOException.
Thread.sleep(1000);
thread.interrupt();
ctx.stop();
//make sure the channels are still open
assertTrue(source.isOpen());
assertTrue(sink.isOpen());
// Nevertheless, the output stream is closed, because
// a partial write may have succeeded (see comment in
// SocketOutputStream#write(byte[]), int, int)
// This portion of the test cannot pass on Windows due to differences in
// behavior of partial writes. Windows appears to buffer large amounts of
// written data and send it all atomically, thus making it impossible to
// simulate a partial write scenario. Attempts were made to switch the
// test from using a pipe to a network socket and also to use larger and
// larger buffers in doIO. Nothing helped the situation though.
if (!Shell.WINDOWS) {
try {
out.write(1);
fail("Did not throw");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains(
"stream is closed", ioe);
}
}
out.close();
assertFalse(sink.isOpen());
// close sink and expect -1 from source.read()
assertEquals(-1, in.read());
// make sure close() closes the underlying channel.
in.close();
assertFalse(source.isOpen());
} finally {
if (source != null) {
source.close();
}
if (sink != null) {
sink.close();
}
}
}
@Test
public void testSocketIOWithTimeout() throws Exception {
// first open pipe:
Pipe pipe = Pipe.open();
Pipe.SourceChannel source = pipe.source();
Pipe.SinkChannel sink = pipe.sink();
try {
final InputStream in = new SocketInputStream(source, TIMEOUT);
OutputStream out = new SocketOutputStream(sink, TIMEOUT);
byte[] writeBytes = TEST_STRING.getBytes();
byte[] readBytes = new byte[writeBytes.length];
byte byteWithHighBit = (byte)0x80;
out.write(writeBytes);
out.write(byteWithHighBit);
doIO(null, out, TIMEOUT);
in.read(readBytes);
assertTrue(Arrays.equals(writeBytes, readBytes));
assertEquals(byteWithHighBit & 0xff, in.read());
doIO(in, null, TIMEOUT);
// Change timeout on the read side.
((SocketInputStream)in).setTimeout(TIMEOUT * 2);
doIO(in, null, TIMEOUT * 2);
/*
* Verify that it handles interrupted threads properly.
* Use a large timeout and expect the thread to return quickly
* upon interruption.
*/
((SocketInputStream)in).setTimeout(0);
TestingThread thread = new TestingThread(ctx) {
@Override
public void doWork() throws Exception {
try {
in.read();
fail("Did not fail with interrupt");
} catch (InterruptedIOException ste) {
LOG.info("Got expection while reading as expected : " +
ste.getMessage());
}
}
};
ctx.addThread(thread);
ctx.startThreads();
// If the thread is interrupted before it calls read()
// then it throws ClosedByInterruptException due to
// some Java quirk. Waiting for it to call read()
// gets it into select(), so we get the expected
// InterruptedIOException.
Thread.sleep(1000);
thread.interrupt();
ctx.stop();
//make sure the channels are still open
assertTrue(source.isOpen());
assertTrue(sink.isOpen());
// Nevertheless, the output stream is closed, because
// a partial write may have succeeded (see comment in
// SocketOutputStream#write(byte[]), int, int)
// This portion of the test cannot pass on Windows due to differences in
// behavior of partial writes. Windows appears to buffer large amounts of
// written data and send it all atomically, thus making it impossible to
// simulate a partial write scenario. Attempts were made to switch the
// test from using a pipe to a network socket and also to use larger and
// larger buffers in doIO. Nothing helped the situation though.
if (!Shell.WINDOWS) {
try {
out.write(1);
fail("Did not throw");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains(
"stream is closed", ioe);
}
}
out.close();
assertFalse(sink.isOpen());
// close sink and expect -1 from source.read()
assertEquals(-1, in.read());
// make sure close() closes the underlying channel.
in.close();
assertFalse(source.isOpen());
} finally {
if (source != null) {
source.close();
}
if (sink != null) {
sink.close();
}
}
}
/**
* @tests java.nio.channels.Pipe#sink()
*/
public void test_sink() throws IOException {
Pipe pipe = Pipe.open();
SinkChannel sink = pipe.sink();
assertTrue(sink.isBlocking());
}
/**
* @tests java.nio.channels.Pipe.SourceChannel#read(ByteBuffer[], int, int)
*/
public void test_read_$LByteBufferII() throws IOException {
ByteBuffer[] bufArray = { buffer, positionedBuffer };
boolean[] sinkBlockingMode = { true, true, false, false };
boolean[] sourceBlockingMode = { true, false, true, false };
for (int i = 0; i < sinkBlockingMode.length; ++i) {
Pipe pipe = Pipe.open();
sink = pipe.sink();
source = pipe.source();
sink.configureBlocking(sinkBlockingMode[i]);
source.configureBlocking(sourceBlockingMode[i]);
buffer.position(0);
positionedBuffer.position(BUFFER_SIZE);
try {
sink.write(bufArray);
// invoke close to ensure all data will be sent out
sink.close();
// read until EOF is meet or readBufArray is full.
ByteBuffer[] readBufArray = { ByteBuffer.allocate(BUFFER_SIZE),
ByteBuffer.allocate(BUFFER_SIZE) };
long totalCount = 0;
do {
long count = source.read(readBufArray, 0, 2);
if (count < 0) {
break;
}
if (0 == count && BUFFER_SIZE == readBufArray[1].position()) {
// source.read returns 0 because readBufArray is full
break;
}
totalCount += count;
} while (totalCount != 10);
// assert read result
for (ByteBuffer readBuf : readBufArray) {
// RI may fail because of its bug implementation
assertEquals(BUFFER_SIZE, readBuf.position());
assertEquals("bytes",
new String(readBuf.array(), ISO8859_1));
}
} finally {
sink.close();
source.close();
}
}
}
public void testSocketIOWithTimeout() throws IOException {
// first open pipe:
Pipe pipe = Pipe.open();
Pipe.SourceChannel source = pipe.source();
Pipe.SinkChannel sink = pipe.sink();
try {
InputStream in = new SocketInputStream(source, TIMEOUT);
OutputStream out = new SocketOutputStream(sink, TIMEOUT);
byte[] writeBytes = TEST_STRING.getBytes();
byte[] readBytes = new byte[writeBytes.length];
out.write(writeBytes);
doIO(null, out);
in.read(readBytes);
assertTrue(Arrays.equals(writeBytes, readBytes));
doIO(in, null);
/*
* Verify that it handles interrupted threads properly.
* Use a large timeout and expect the thread to return quickly.
*/
in = new SocketInputStream(source, 0);
Thread thread = new Thread(new ReadRunnable(in));
thread.start();
try {
Thread.sleep(1000);
} catch (InterruptedException ignored) {}
thread.interrupt();
try {
thread.join();
} catch (InterruptedException e) {
throw new IOException("Unexpected InterruptedException : " + e);
}
//make sure the channels are still open
assertTrue(source.isOpen());
assertTrue(sink.isOpen());
out.close();
assertFalse(sink.isOpen());
// close sink and expect -1 from source.read()
assertEquals(-1, in.read());
// make sure close() closes the underlying channel.
in.close();
assertFalse(source.isOpen());
} finally {
if (source != null) {
source.close();
}
if (sink != null) {
sink.close();
}
}
}
public void testSocketIOWithTimeout() throws IOException {
// first open pipe:
Pipe pipe = Pipe.open();
Pipe.SourceChannel source = pipe.source();
Pipe.SinkChannel sink = pipe.sink();
try {
InputStream in = new SocketInputStream(source, TIMEOUT);
OutputStream out = new SocketOutputStream(sink, TIMEOUT);
byte[] writeBytes = TEST_STRING.getBytes();
byte[] readBytes = new byte[writeBytes.length];
out.write(writeBytes);
doIO(null, out);
in.read(readBytes);
assertTrue(Arrays.equals(writeBytes, readBytes));
doIO(in, null);
/*
* Verify that it handles interrupted threads properly.
* Use a large timeout and expect the thread to return quickly.
*/
in = new SocketInputStream(source, 0);
Thread thread = new Thread(new ReadRunnable(in));
thread.start();
try {
Thread.sleep(1000);
} catch (InterruptedException ignored) {}
thread.interrupt();
try {
thread.join();
} catch (InterruptedException e) {
throw new IOException("Unexpected InterruptedException : " + e);
}
//make sure the channels are still open
assertTrue(source.isOpen());
assertTrue(sink.isOpen());
out.close();
assertFalse(sink.isOpen());
// close sink and expect -1 from source.read()
assertEquals(-1, in.read());
// make sure close() closes the underlying channel.
in.close();
assertFalse(source.isOpen());
} finally {
if (source != null) {
source.close();
}
if (sink != null) {
sink.close();
}
}
}