下面列出了怎么用java.nio.channels.Pipe的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
void selectorRemovesKeysOnChannelCloseWhenSelecting() throws Exception {
Pipe pipe = Pipe.open();
Selector selector = Selector.open();
SelectableChannel source = pipe.source();
source.configureBlocking(false);
SelectionKey key = source.register(selector, OP_READ);
assertTrue(selector.keys().contains(key));
source.close();
assertTrue(selector.keys().contains(key));
selector.selectNow();
assertFalse(selector.keys().contains(key));
}
@Test
void selectorRemovesKeysOnCancelWhenSelecting() throws Exception {
Pipe pipe = Pipe.open();
Selector selector = Selector.open();
SelectableChannel source = pipe.source();
source.configureBlocking(false);
SelectionKey key = source.register(selector, OP_READ);
assertTrue(selector.keys().contains(key));
key.cancel();
assertTrue(selector.keys().contains(key));
assertSame(key, source.keyFor(selector));
selector.selectNow();
assertFalse(selector.keys().contains(key));
assertNull(source.keyFor(selector));
}
@Test
void cancelledKeyRemovedFromChannel() throws Exception {
Pipe pipe = Pipe.open();
Selector selector = Selector.open();
SelectableChannel source = pipe.source();
source.configureBlocking(false);
for (int i = 0; i < 1000; ++i) {
assertNull(source.keyFor(selector));
SelectionKey key = source.register(selector, OP_READ);
selector.selectedKeys().clear();
selector.selectNow();
key.cancel();
selector.wakeup();
selector.selectedKeys().clear();
selector.selectNow();
}
}
@Test
public void test1() throws IOException {
//1.获取管道
Pipe pipe = Pipe.open();
//2.将缓冲区数据写入管道
ByteBuffer buf = ByteBuffer.allocate(1024);
Pipe.SinkChannel sinkChannel = pipe.sink();
buf.put("通过单向管道发送数据".getBytes());
buf.flip();
sinkChannel.write(buf);
//3.读取缓冲区中的数据
Pipe.SourceChannel sourceChannel = pipe.source();
buf.flip();
int len = sourceChannel.read(buf);
System.out.println(new String(buf.array(), 0, len));
sourceChannel.close();
sinkChannel.close();
}
protected SingleInputExpect(
final Pipe.SourceChannel source,
final Pipe.SinkChannel sink,
final InputStream input,
final Charset charset,
final Appendable echoInput,
final Filter filter,
final int bufferSize,
final boolean autoFlushEcho) throws IOException {
this.input = input;
this.charset = charset;
this.echoInput = echoInput;
this.filter = filter;
this.bufferSize = bufferSize;
this.autoFlushEcho = autoFlushEcho;
this.source = source;
this.sink = sink;
source.configureBlocking(false);
buffer = new StringBuilder();
}
@Test
void selectorRemovesKeysOnChannelCloseWhenSelecting() throws Exception {
Pipe pipe = Pipe.open();
Selector selector = Selector.open();
SelectableChannel source = pipe.source();
source.configureBlocking(false);
SelectionKey key = source.register(selector, OP_READ);
assertTrue(selector.keys().contains(key));
source.close();
assertTrue(selector.keys().contains(key));
selector.selectNow();
assertFalse(selector.keys().contains(key));
}
@Test
void selectorRemovesKeysOnCancelWhenSelecting() throws Exception {
Pipe pipe = Pipe.open();
Selector selector = Selector.open();
SelectableChannel source = pipe.source();
source.configureBlocking(false);
SelectionKey key = source.register(selector, OP_READ);
assertTrue(selector.keys().contains(key));
key.cancel();
assertTrue(selector.keys().contains(key));
assertSame(key, source.keyFor(selector));
selector.selectNow();
assertFalse(selector.keys().contains(key));
assertNull(source.keyFor(selector));
}
@Test
void cancelledKeyRemovedFromChannel() throws Exception {
Pipe pipe = Pipe.open();
Selector selector = Selector.open();
SelectableChannel source = pipe.source();
source.configureBlocking(false);
for (int i = 0; i < 1000; ++i) {
assertNull(source.keyFor(selector));
SelectionKey key = source.register(selector, OP_READ);
selector.selectedKeys().clear();
selector.selectNow();
key.cancel();
selector.wakeup();
selector.selectedKeys().clear();
selector.selectNow();
}
}
@Test
public void test1() throws IOException {
// 1. 获取管道
Pipe pipe = Pipe.open();
// 2. 将缓冲区中的数据写入管道
ByteBuffer buf = ByteBuffer.allocate(1024);
Pipe.SinkChannel sinkChannel = pipe.sink();
buf.put("通过单向管道发送数据".getBytes());
buf.flip();
sinkChannel.write(buf);
// 3. 读取缓冲区中的数据
Pipe.SourceChannel sourceChannel = pipe.source();
buf.flip();
int len = sourceChannel.read(buf);
System.out.println(new String(buf.array(), 0, len));
sourceChannel.close();
sinkChannel.close();
}
private InputStream initializeUploadPipe() throws IOException {
switch (options.getPipeType()) {
case NIO_CHANNEL_PIPE:
Pipe pipe = Pipe.open();
pipeSink = pipe.sink();
InputStream pipeSource = Channels.newInputStream(pipe.source());
return options.getPipeBufferSize() > 0
? new BufferedInputStream(pipeSource, options.getPipeBufferSize())
: pipeSource;
case IO_STREAM_PIPE:
PipedInputStream internalPipeSource = new PipedInputStream(options.getPipeBufferSize());
PipedOutputStream internalPipeSink = new PipedOutputStream(internalPipeSource);
pipeSink = Channels.newChannel(internalPipeSink);
return internalPipeSource;
}
throw new IllegalStateException("Unknown PipeType: " + options.getPipeType());
}
/**
* Creates a mock input stream which send some data every SMALL_TIMEOUT ms.
*/
@Before
public void setup() throws Exception {
mock = TestUtils.mockInputStream(text);
final Pipe pipe = Pipe.open();
input = new SingleInputExpect(
pipe.source(),
pipe.sink(),
mock.getStream(),
Charset.defaultCharset(),
null,
null,
DEFAULT_BUFFER_SIZE,
false);
executor = Executors.newSingleThreadExecutor();
input.start(executor);
mock.waitUntilReady();
}
@Test
public void testPackageUploadIsNotSkippedWhenSizesAreDifferent() throws Exception {
Pipe pipe = Pipe.open();
File tmpDirectory = tmpFolder.newFolder("folder");
tmpFolder.newFolder("folder", "empty_directory");
tmpFolder.newFolder("folder", "directory");
makeFileWithContents("folder/file.txt", "This is a test!");
makeFileWithContents("folder/directory/file.txt", "This is also a test!");
when(mockGcsUtil.getObjects(anyListOf(GcsPath.class)))
.thenReturn(
ImmutableList.of(
StorageObjectOrIOException.create(
createStorageObject(STAGING_PATH, Long.MAX_VALUE))));
when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
defaultPackageUtil.stageClasspathElements(
ImmutableList.of(makeStagedFile(tmpDirectory.getAbsolutePath())),
STAGING_PATH,
createOptions);
verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
verify(mockGcsUtil).create(any(GcsPath.class), anyString());
verifyNoMoreInteractions(mockGcsUtil);
}
@Test
public void testPackageUploadWithExplicitPackageName() throws Exception {
Pipe pipe = Pipe.open();
File tmpFile = makeFileWithContents("file.txt", "This is a test!");
final String overriddenName = "alias.txt";
when(mockGcsUtil.getObjects(anyListOf(GcsPath.class)))
.thenReturn(
ImmutableList.of(
StorageObjectOrIOException.create(new FileNotFoundException("some/path"))));
when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
List<DataflowPackage> targets =
defaultPackageUtil.stageClasspathElements(
ImmutableList.of(makeStagedFile(tmpFile.getAbsolutePath(), overriddenName)),
STAGING_PATH,
createOptions);
DataflowPackage target = Iterables.getOnlyElement(targets);
verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
verify(mockGcsUtil).create(any(GcsPath.class), anyString());
verifyNoMoreInteractions(mockGcsUtil);
assertThat(target.getName(), equalTo(overriddenName));
assertThat(target.getLocation(), RegexMatcher.matches(STAGING_PATH + "alias.txt"));
}
@Test
void selectorRemovesKeysOnChannelCloseWhileSelecting() throws Exception {
Pipe pipe = Pipe.open();
Selector selector = Selector.open();
SelectableChannel source = pipe.source();
source.configureBlocking(false);
SelectionKey key = source.register(selector, OP_READ);
assertTrue(selector.keys().contains(key));
CountDownLatch latch = new CountDownLatch(1);
Future<?> job = executor.submit(() -> {
latch.countDown();
try {
selector.select();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
latch.await();
Thread.sleep(100);
source.close();
selector.wakeup();
job.get();
assertFalse(selector.keys().contains(key));
}
@Test
void selectorRemovesKeysOnCancelWhileSelecting() throws Exception {
Pipe pipe = Pipe.open();
Selector selector = Selector.open();
SelectableChannel source = pipe.source();
source.configureBlocking(false);
SelectionKey key = source.register(selector, OP_READ);
assertTrue(selector.keys().contains(key));
CountDownLatch latch = new CountDownLatch(1);
Future<?> job = executor.submit(() -> {
latch.countDown();
try {
selector.select();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
latch.await();
Thread.sleep(100);
key.cancel();
assertTrue(selector.keys().contains(key));
assertSame(key, source.keyFor(selector));
selector.wakeup();
job.get();
assertFalse(selector.keys().contains(key));
assertNull(source.keyFor(selector));
}
public void testStreamNonBlocking() throws IOException {
Pipe.SourceChannel sourceChannel = createNonBlockingChannel("abc".getBytes("UTF-8"));
try {
Channels.newInputStream(sourceChannel).read();
fail();
} catch (IllegalBlockingModeException expected) {
}
}
public void test() throws Exception {
Thread tester = new Thread("PipeTester") {
private Pipe testPipe = null;
@Override
public void run() {
for (;;) {
boolean interrupted = this.isInterrupted();
try {
testPipe = Pipe.open();
close();
if (interrupted) {
if (!this.isInterrupted())
exc = new RuntimeException("interrupt status reset");
break;
}
} catch (IOException ioe) {
exc = ioe;
}
}
}
private void close() throws IOException {
if (testPipe != null) {
testPipe.sink().close();
testPipe.source().close();
}
}
};
tester.start();
Thread.sleep(200);
tester.interrupt();
tester.join();
if (exc != null)
throw exc;
}
/**
* @tests java.nio.channels.FileChannel#transferTo(long,long,WritableByteChannel)
*/
public void test_transferToJJLWritableByteChannel_Pipe() throws Exception {
// inits data in file.
writeDataToFile(fileOfReadOnlyFileChannel);
// inits pipe.
pipe = Pipe.open();
// transfers data from fileChannel to pipe.
final int OFFSET = 2;
final int LENGTH = 4;
long result = readOnlyFileChannel.transferTo(OFFSET, LENGTH, pipe
.sink());
assertEquals(LENGTH, result);
assertEquals(0, readOnlyFileChannel.position());
readOnlyFileChannel.close();
// gets content from pipe.
ByteBuffer readBuffer = ByteBuffer.allocate(LENGTH);
result = pipe.source().read(readBuffer);
assertEquals(LENGTH, result);
// compares content.
readBuffer.flip();
for (int i = OFFSET; i < OFFSET + LENGTH; i++) {
assertEquals(CONTENT_AS_BYTES[i], readBuffer.get());
}
}
WindowsSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
pollWrapper = new PollArrayWrapper(INIT_CAP);
wakeupPipe = Pipe.open();
wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
// Disable the Nagle algorithm so that the wakeup is more immediate
SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
(sink.sc).socket().setTcpNoDelay(true);
wakeupSinkFd = ((SelChImpl)sink).getFDVal();
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}
public void test() throws Exception {
Thread tester = new Thread("PipeTester") {
private Pipe testPipe = null;
@Override
public void run() {
for (;;) {
boolean interrupted = this.isInterrupted();
try {
testPipe = Pipe.open();
close();
if (interrupted) {
if (!this.isInterrupted())
exc = new RuntimeException("interrupt status reset");
break;
}
} catch (IOException ioe) {
exc = ioe;
}
}
}
private void close() throws IOException {
if (testPipe != null) {
testPipe.sink().close();
testPipe.source().close();
}
}
};
tester.start();
Thread.sleep(200);
tester.interrupt();
tester.join();
if (exc != null)
throw exc;
}
WindowsSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
pollWrapper = new PollArrayWrapper(INIT_CAP);
wakeupPipe = Pipe.open();
wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
// Disable the Nagle algorithm so that the wakeup is more immediate
SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
(sink.sc).socket().setTcpNoDelay(true);
wakeupSinkFd = ((SelChImpl)sink).getFDVal();
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}
public void test() throws Exception {
Thread tester = new Thread("PipeTester") {
private Pipe testPipe = null;
@Override
public void run() {
for (;;) {
boolean interrupted = this.isInterrupted();
try {
testPipe = Pipe.open();
close();
if (interrupted) {
if (!this.isInterrupted())
exc = new RuntimeException("interrupt status reset");
break;
}
} catch (IOException ioe) {
exc = ioe;
}
}
}
private void close() throws IOException {
if (testPipe != null) {
testPipe.sink().close();
testPipe.source().close();
}
}
};
tester.start();
Thread.sleep(200);
tester.interrupt();
tester.join();
if (exc != null)
throw exc;
}
@Test
void selectorRemovesKeysOnChannelCloseWhileSelecting() throws Exception {
Pipe pipe = Pipe.open();
Selector selector = Selector.open();
SelectableChannel source = pipe.source();
source.configureBlocking(false);
SelectionKey key = source.register(selector, OP_READ);
assertTrue(selector.keys().contains(key));
CountDownLatch latch = new CountDownLatch(1);
Future<?> job = executor.submit(() -> {
latch.countDown();
try {
selector.select();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
latch.await();
Thread.sleep(100);
source.close();
selector.wakeup();
job.get();
assertFalse(selector.keys().contains(key));
}
@Test
void selectorRemovesKeysOnCancelWhileSelecting() throws Exception {
Pipe pipe = Pipe.open();
Selector selector = Selector.open();
SelectableChannel source = pipe.source();
source.configureBlocking(false);
SelectionKey key = source.register(selector, OP_READ);
assertTrue(selector.keys().contains(key));
CountDownLatch latch = new CountDownLatch(1);
Future<?> job = executor.submit(() -> {
latch.countDown();
try {
selector.select();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
latch.await();
Thread.sleep(100);
key.cancel();
assertTrue(selector.keys().contains(key));
assertSame(key, source.keyFor(selector));
selector.wakeup();
job.get();
assertFalse(selector.keys().contains(key));
assertNull(source.keyFor(selector));
}
WindowsSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
pollWrapper = new PollArrayWrapper(INIT_CAP);
wakeupPipe = Pipe.open();
wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
// Disable the Nagle algorithm so that the wakeup is more immediate
SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
(sink.sc).socket().setTcpNoDelay(true);
wakeupSinkFd = ((SelChImpl)sink).getFDVal();
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}
protected void setUp() throws Exception {
super.setUp();
pipe = Pipe.open();
sink = pipe.sink();
source = pipe.source();
buffer = ByteBuffer.wrap("bytes".getBytes(ISO8859_1));
positionedBuffer = ByteBuffer.wrap("12345bytes".getBytes(ISO8859_1));
positionedBuffer.position(BUFFER_SIZE);
}
WindowsSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
pollWrapper = new PollArrayWrapper(INIT_CAP);
wakeupPipe = Pipe.open();
wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
// Disable the Nagle algorithm so that the wakeup is more immediate
SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
(sink.sc).socket().setTcpNoDelay(true);
wakeupSinkFd = ((SelChImpl)sink).getFDVal();
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}
protected void setUp() throws Exception {
super.setUp();
pipe = Pipe.open();
sink = pipe.sink();
source = pipe.source();
buffer = ByteBuffer.wrap("bytes".getBytes(ISO8859_1));
positionedBuffer = ByteBuffer.wrap("12345bytes".getBytes(ISO8859_1));
positionedBuffer.position(BUFFER_SIZE);
}
public void test() throws Exception {
Thread tester = new Thread("PipeTester") {
private Pipe testPipe = null;
@Override
public void run() {
for (;;) {
boolean interrupted = this.isInterrupted();
try {
testPipe = Pipe.open();
close();
if (interrupted) {
if (!this.isInterrupted())
exc = new RuntimeException("interrupt status reset");
break;
}
} catch (IOException ioe) {
exc = ioe;
}
}
}
private void close() throws IOException {
if (testPipe != null) {
testPipe.sink().close();
testPipe.source().close();
}
}
};
tester.start();
Thread.sleep(200);
tester.interrupt();
tester.join();
if (exc != null)
throw exc;
}
WindowsSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
pollWrapper = new PollArrayWrapper(INIT_CAP);
wakeupPipe = Pipe.open();
wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
// Disable the Nagle algorithm so that the wakeup is more immediate
SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
(sink.sc).socket().setTcpNoDelay(true);
wakeupSinkFd = ((SelChImpl)sink).getFDVal();
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}