下面列出了java.nio.channels.Pipe#open() 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testPackageUploadEventuallySucceeds() throws Exception {
Pipe pipe = Pipe.open();
File tmpFile = makeFileWithContents("file.txt", "This is a test!");
when(mockGcsUtil.getObjects(anyListOf(GcsPath.class)))
.thenReturn(
ImmutableList.of(
StorageObjectOrIOException.create(new FileNotFoundException("some/path"))));
when(mockGcsUtil.create(any(GcsPath.class), anyString()))
.thenThrow(new IOException("Fake Exception: 410 Gone")) // First attempt fails
.thenReturn(pipe.sink()); // second attempt succeeds
try (PackageUtil directPackageUtil =
PackageUtil.withExecutorService(MoreExecutors.newDirectExecutorService())) {
directPackageUtil.stageClasspathElements(
ImmutableList.of(makeStagedFile(tmpFile.getAbsolutePath())),
STAGING_PATH,
fastNanoClockAndSleeper,
createOptions);
} finally {
verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
verify(mockGcsUtil, times(2)).create(any(GcsPath.class), anyString());
verifyNoMoreInteractions(mockGcsUtil);
}
}
@Test
void selectorRemovesKeysOnChannelCloseWhenSelecting() throws Exception {
Pipe pipe = Pipe.open();
Selector selector = Selector.open();
SelectableChannel source = pipe.source();
source.configureBlocking(false);
SelectionKey key = source.register(selector, OP_READ);
assertTrue(selector.keys().contains(key));
source.close();
assertTrue(selector.keys().contains(key));
selector.selectNow();
assertFalse(selector.keys().contains(key));
}
protected void setUp() throws Exception {
super.setUp();
pipe = Pipe.open();
sink = pipe.sink();
source = pipe.source();
buffer = ByteBuffer.wrap("bytes".getBytes(ISO8859_1));
positionedBuffer = ByteBuffer.wrap("12345bytes".getBytes(ISO8859_1));
positionedBuffer.position(BUFFER_SIZE);
}
WindowsSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
pollWrapper = new PollArrayWrapper(INIT_CAP);
wakeupPipe = Pipe.open();
wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
// Disable the Nagle algorithm so that the wakeup is more immediate
SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
(sink.sc).socket().setTcpNoDelay(true);
wakeupSinkFd = ((SelChImpl)sink).getFDVal();
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}
public void test() throws Exception {
Thread tester = new Thread("PipeTester") {
private Pipe testPipe = null;
@Override
public void run() {
for (;;) {
boolean interrupted = this.isInterrupted();
try {
testPipe = Pipe.open();
close();
if (interrupted) {
if (!this.isInterrupted())
exc = new RuntimeException("interrupt status reset");
break;
}
} catch (IOException ioe) {
exc = ioe;
}
}
}
private void close() throws IOException {
if (testPipe != null) {
testPipe.sink().close();
testPipe.source().close();
}
}
};
tester.start();
Thread.sleep(200);
tester.interrupt();
tester.join();
if (exc != null)
throw exc;
}
WindowsSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
pollWrapper = new PollArrayWrapper(INIT_CAP);
wakeupPipe = Pipe.open();
wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
// Disable the Nagle algorithm so that the wakeup is more immediate
SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
(sink.sc).socket().setTcpNoDelay(true);
wakeupSinkFd = ((SelChImpl)sink).getFDVal();
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}
@Test
public void testPackageUploadWithFileSucceeds() throws Exception {
Pipe pipe = Pipe.open();
String contents = "This is a test!";
File tmpFile = makeFileWithContents("file.txt", contents);
when(mockGcsUtil.getObjects(anyListOf(GcsPath.class)))
.thenReturn(
ImmutableList.of(
StorageObjectOrIOException.create(new FileNotFoundException("some/path"))));
when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
List<DataflowPackage> targets =
defaultPackageUtil.stageClasspathElements(
ImmutableList.of(makeStagedFile(tmpFile.getAbsolutePath())),
STAGING_PATH,
createOptions);
DataflowPackage target = Iterables.getOnlyElement(targets);
verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
verify(mockGcsUtil).create(any(GcsPath.class), anyString());
verifyNoMoreInteractions(mockGcsUtil);
assertThat(target.getName(), endsWith(".txt"));
assertThat(target.getLocation(), equalTo(STAGING_PATH + target.getName()));
assertThat(
new LineReader(Channels.newReader(pipe.source(), StandardCharsets.UTF_8.name())).readLine(),
equalTo(contents));
}
WindowsSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
pollWrapper = new PollArrayWrapper(INIT_CAP);
wakeupPipe = Pipe.open();
wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
// Disable the Nagle algorithm so that the wakeup is more immediate
SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
(sink.sc).socket().setTcpNoDelay(true);
wakeupSinkFd = ((SelChImpl)sink).getFDVal();
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}
public void test() throws Exception {
Thread tester = new Thread("PipeTester") {
private Pipe testPipe = null;
@Override
public void run() {
for (;;) {
boolean interrupted = this.isInterrupted();
try {
testPipe = Pipe.open();
close();
if (interrupted) {
if (!this.isInterrupted())
exc = new RuntimeException("interrupt status reset");
break;
}
} catch (IOException ioe) {
exc = ioe;
}
}
}
private void close() throws IOException {
if (testPipe != null) {
testPipe.sink().close();
testPipe.source().close();
}
}
};
tester.start();
Thread.sleep(200);
tester.interrupt();
tester.join();
if (exc != null)
throw exc;
}
public void test() throws Exception {
Thread tester = new Thread("PipeTester") {
private Pipe testPipe = null;
@Override
public void run() {
for (;;) {
boolean interrupted = this.isInterrupted();
try {
testPipe = Pipe.open();
close();
if (interrupted) {
if (!this.isInterrupted())
exc = new RuntimeException("interrupt status reset");
break;
}
} catch (IOException ioe) {
exc = ioe;
}
}
}
private void close() throws IOException {
if (testPipe != null) {
testPipe.sink().close();
testPipe.source().close();
}
}
};
tester.start();
Thread.sleep(200);
tester.interrupt();
tester.join();
if (exc != null)
throw exc;
}
private static Pipe[] pipePair()
{
try {
return new Pipe[] { Pipe.open(), Pipe.open() };
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* @tests java.nio.channels.FileChannel#transferFrom(ReadableByteChannel,long,long)
*/
public void test_transferFromLReadableByteChannelJJ_Pipe() throws Exception {
// inits data in file.
writeDataToFile(fileOfWriteOnlyFileChannel);
// inits pipe.
pipe = Pipe.open();
// writes content to pipe.
ByteBuffer writeBuffer = ByteBuffer.wrap(CONTENT_AS_BYTES);
pipe.sink().write(writeBuffer);
// transfers data from pipe to fileChannel.
final int OFFSET = 2;
final int LENGTH = 4;
long result = writeOnlyFileChannel.transferFrom(pipe.source(), OFFSET,
LENGTH);
assertEquals(LENGTH, result);
writeOnlyFileChannel.close();
// gets content from file.
fis = new FileInputStream(fileOfWriteOnlyFileChannel);
byte[] resultBytes = new byte[OFFSET + LENGTH];
fis.read(resultBytes);
// compares content.
byte[] expectedBytes = new byte[OFFSET + LENGTH];
System.arraycopy(CONTENT_AS_BYTES, 0, expectedBytes, 0, OFFSET);
System.arraycopy(CONTENT_AS_BYTES, 0, expectedBytes, OFFSET, LENGTH);
assertTrue(Arrays.equals(expectedBytes, resultBytes));
}
WindowsSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
pollWrapper = new PollArrayWrapper(INIT_CAP);
wakeupPipe = Pipe.open();
wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
// Disable the Nagle algorithm so that the wakeup is more immediate
SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
(sink.sc).socket().setTcpNoDelay(true);
wakeupSinkFd = ((SelChImpl)sink).getFDVal();
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}
private Pipe.SourceChannel createNonBlockingChannel(byte[] content) throws IOException {
Pipe pipe = Pipe.open();
WritableByteChannel sinkChannel = pipe.sink();
sinkChannel.write(ByteBuffer.wrap(content));
Pipe.SourceChannel sourceChannel = pipe.source();
sourceChannel.configureBlocking(false);
return sourceChannel;
}
@Test
void selectorRemovesKeysOnCancelWhileSelecting() throws Exception {
Pipe pipe = Pipe.open();
Selector selector = Selector.open();
SelectableChannel source = pipe.source();
source.configureBlocking(false);
SelectionKey key = source.register(selector, OP_READ);
assertTrue(selector.keys().contains(key));
CountDownLatch latch = new CountDownLatch(1);
Future<?> job = executor.submit(() -> {
latch.countDown();
try {
selector.select();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
latch.await();
Thread.sleep(100);
key.cancel();
assertTrue(selector.keys().contains(key));
assertSame(key, source.keyFor(selector));
selector.wakeup();
job.get();
assertFalse(selector.keys().contains(key));
assertNull(source.keyFor(selector));
}
WindowsSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
pollWrapper = new PollArrayWrapper(INIT_CAP);
wakeupPipe = Pipe.open();
wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
// Disable the Nagle algorithm so that the wakeup is more immediate
SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
(sink.sc).socket().setTcpNoDelay(true);
wakeupSinkFd = ((SelChImpl)sink).getFDVal();
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}
public void testSocketIOWithTimeout() throws IOException {
// first open pipe:
Pipe pipe = Pipe.open();
Pipe.SourceChannel source = pipe.source();
Pipe.SinkChannel sink = pipe.sink();
try {
InputStream in = new SocketInputStream(source, TIMEOUT);
OutputStream out = new SocketOutputStream(sink, TIMEOUT);
byte[] writeBytes = TEST_STRING.getBytes();
byte[] readBytes = new byte[writeBytes.length];
out.write(writeBytes);
doIO(null, out);
in.read(readBytes);
assertTrue(Arrays.equals(writeBytes, readBytes));
doIO(in, null);
/*
* Verify that it handles interrupted threads properly.
* Use a large timeout and expect the thread to return quickly.
*/
in = new SocketInputStream(source, 0);
Thread thread = new Thread(new ReadRunnable(in));
thread.start();
try {
Thread.sleep(1000);
} catch (InterruptedException ignored) {}
thread.interrupt();
try {
thread.join();
} catch (InterruptedException e) {
throw new IOException("Unexpected InterruptedException : " + e);
}
//make sure the channels are still open
assertTrue(source.isOpen());
assertTrue(sink.isOpen());
out.close();
assertFalse(sink.isOpen());
// close sink and expect -1 from source.read()
assertEquals(-1, in.read());
// make sure close() closes the underlying channel.
in.close();
assertFalse(source.isOpen());
} finally {
if (source != null) {
source.close();
}
if (sink != null) {
sink.close();
}
}
}
/**
* @tests java.nio.channels.Pipe.SourceChannel#read(ByteBuffer[])
*/
public void test_read_$LByteBuffer() throws IOException {
ByteBuffer[] bufArray = { buffer, positionedBuffer };
boolean[] sinkBlockingMode = { true, true, false, false };
boolean[] sourceBlockingMode = { true, false, true, false };
for (int i = 0; i < sinkBlockingMode.length; ++i) {
// open new pipe everytime, will be closed in finally block
pipe = Pipe.open();
sink = pipe.sink();
source = pipe.source();
sink.configureBlocking(sinkBlockingMode[i]);
source.configureBlocking(sourceBlockingMode[i]);
buffer.position(0);
positionedBuffer.position(BUFFER_SIZE);
try {
long writeCount = sink.write(bufArray);
assertEquals(10, writeCount);
// invoke close to ensure all data will be sent out
sink.close();
// read until EOF is meet or readBufArray is full.
ByteBuffer[] readBufArray = { ByteBuffer.allocate(BUFFER_SIZE),
ByteBuffer.allocate(BUFFER_SIZE) };
long totalCount = 0;
do {
long count = source.read(readBufArray);
if (count < 0) {
break;
}
if (0 == count && BUFFER_SIZE == readBufArray[1].position()) {
// source.read returns 0 because readBufArray is full
break;
}
totalCount += count;
} while (totalCount <= 10);
// assert read result
for (ByteBuffer readBuf : readBufArray) {
// RI may fail because of its bug implementation
assertEquals(BUFFER_SIZE, readBuf.position());
assertEquals("bytes",
new String(readBuf.array(), ISO8859_1));
}
} finally {
// close pipe everytime
sink.close();
source.close();
}
}
}
/**
* @tests java.nio.channels.Pipe.SourceChannel#read(ByteBuffer[], int, int)
*/
public void test_read_$LByteBufferII() throws IOException {
ByteBuffer[] bufArray = { buffer, positionedBuffer };
boolean[] sinkBlockingMode = { true, true, false, false };
boolean[] sourceBlockingMode = { true, false, true, false };
for (int i = 0; i < sinkBlockingMode.length; ++i) {
Pipe pipe = Pipe.open();
sink = pipe.sink();
source = pipe.source();
sink.configureBlocking(sinkBlockingMode[i]);
source.configureBlocking(sourceBlockingMode[i]);
buffer.position(0);
positionedBuffer.position(BUFFER_SIZE);
try {
sink.write(bufArray);
// invoke close to ensure all data will be sent out
sink.close();
// read until EOF is meet or readBufArray is full.
ByteBuffer[] readBufArray = { ByteBuffer.allocate(BUFFER_SIZE),
ByteBuffer.allocate(BUFFER_SIZE) };
long totalCount = 0;
do {
long count = source.read(readBufArray, 0, 2);
if (count < 0) {
break;
}
if (0 == count && BUFFER_SIZE == readBufArray[1].position()) {
// source.read returns 0 because readBufArray is full
break;
}
totalCount += count;
} while (totalCount != 10);
// assert read result
for (ByteBuffer readBuf : readBufArray) {
// RI may fail because of its bug implementation
assertEquals(BUFFER_SIZE, readBuf.position());
assertEquals("bytes",
new String(readBuf.array(), ISO8859_1));
}
} finally {
sink.close();
source.close();
}
}
}
public static void main(String[] args) throws Exception {
Pipe p = Pipe.open();
p.source().close();
p.sink().close();
}