下面列出了怎么用java.io.PipedWriter的API类实例代码及写法,或者点击链接到github查看源代码。
public static void main(String[] args) throws Exception {
PipedWriter out = new PipedWriter();
PipedReader in = new PipedReader();
// 将输出流和输入流进行连接,否则在使用时会抛出IOException
out.connect(in);
Thread printThread = new Thread(new Print(in), "PrintThread");
printThread.start();
int receive = 0;
try {
while ((receive = System.in.read()) != -1) {
out.write(receive);
}
} finally {
out.close();
}
}
public void test_writeI() throws Exception {
// Test for method void java.io.PipedWriter.write(int)
pw = new PipedWriter();
try {
pw.write(42);
fail("Test 1: IOException expected.");
} catch (IOException e) {
// Expected.
}
readerThread = new Thread(reader = new PReader(pw), "writeI");
readerThread.start();
pw.write(1);
pw.write(2);
pw.write(3);
pw.close();
reader.read(3);
assertTrue("Test 2: The charaacters read do not match the characters written: " +
(int) reader.buf[0] + " " + (int) reader.buf[1] + " " + (int) reader.buf[2],
reader.buf[0] == 1 && reader.buf[1] == 2 && reader.buf[2] == 3);
}
@Override
protected void assertConnect(String term) throws Exception {
if (endpoint != null) {
throw failure("Already a session");
}
final CountDownLatch latch = new CountDownLatch(1);
final PipedWriter out = new PipedWriter();
in = new PipedReader(out);
endpoint = new Endpoint() {
@Override
public void onOpen(Session session, EndpointConfig endpointConfig) {
session.addMessageHandler(new MessageHandler.Whole<String>() {
@Override
public void onMessage(String message) {
try {
out.write(message);
} catch (IOException e) {
e.printStackTrace();
}
}
});
latch.countDown();
}
@Override
public void onClose(Session sess, CloseReason closeReason) {
session = null;
endpoint = null;
in = null;
}
@Override
public void onError(Session session, Throwable thr) {
}
};
ClientEndpointConfig clientEndpointConfig = ClientEndpointConfig.Builder.create().build();
WebSocketContainer webSocketContainer = ContainerProvider.getWebSocketContainer();
session = webSocketContainer.connectToServer(endpoint, clientEndpointConfig, new URI("http://localhost:8080/ws"));
latch.await();
}
private static PipedReader newPipedReader(String contents) {
try (PipedWriter w = new PipedWriter()) {
PipedReader r = new PipedReader(w);
w.write(contents);
return r;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@Override
protected void assertConnect(String term) throws Exception {
if (endpoint != null) {
throw failure("Already a session");
}
CountDownLatch latch = new CountDownLatch(1);
PipedWriter out = new PipedWriter();
in = new PipedReader(out);
endpoint = new Endpoint() {
@Override
public void onOpen(Session session, EndpointConfig endpointConfig) {
session.addMessageHandler(new MessageHandler.Whole<String>() {
@Override
public void onMessage(String message) {
try {
out.write(message);
} catch (IOException e) {
e.printStackTrace();
}
}
});
latch.countDown();
}
@Override
public void onClose(Session sess, CloseReason closeReason) {
session = null;
endpoint = null;
in = null;
}
@Override
public void onError(Session session, Throwable thr) {
}
};
ClientEndpointConfig clientEndpointConfig = ClientEndpointConfig.Builder.create().build();
WebSocketContainer webSocketContainer = ContainerProvider.getWebSocketContainer();
session = webSocketContainer.connectToServer(endpoint, clientEndpointConfig, new URI("http://localhost:8080/ws"));
latch.await();
}
@Override
protected void assertConnect(String term) throws Exception {
if (endpoint != null) {
throw failure("Already a session");
}
CountDownLatch latch = new CountDownLatch(1);
PipedWriter out = new PipedWriter();
in = new PipedReader(out);
endpoint = new Endpoint() {
@Override
public void onOpen(Session session, EndpointConfig endpointConfig) {
session.addMessageHandler(new MessageHandler.Whole<String>() {
@Override
public void onMessage(String message) {
try {
out.write(message);
} catch (IOException e) {
e.printStackTrace();
}
}
});
latch.countDown();
}
@Override
public void onClose(Session sess, CloseReason closeReason) {
session = null;
endpoint = null;
in = null;
}
@Override
public void onError(Session session, Throwable thr) {
}
};
ClientEndpointConfig clientEndpointConfig = ClientEndpointConfig.Builder.create().build();
WebSocketContainer webSocketContainer = ContainerProvider.getWebSocketContainer();
session = webSocketContainer.connectToServer(endpoint, clientEndpointConfig, new URI("http://localhost:8080/ws"));
latch.await();
}
private void testInterruptWriter(final PipedWriter writer) throws Exception {
Thread thread = interruptMeLater();
try {
// this will block when the receiving buffer fills up
while (true) {
writer.write(new char[BUFFER_SIZE]);
}
} catch (InterruptedIOException expected) {
} finally {
confirmInterrupted(thread);
}
}
public PReader(PipedWriter pw) {
try {
pr = new PipedReader(pw);
} catch (IOException e) {
System.out.println("Exception setting up reader: "
+ e.toString());
}
}
public void test_Constructor() {
pw = new PipedWriter();
assertNotNull(pw);
try {
pw.close();
} catch (IOException e) {
fail("Unexpeceted IOException.");
}
}
public void test_close() throws Exception {
PipedReader rd = new PipedReader();
pw = new PipedWriter(rd);
reader = new PReader(rd);
try {
pw.close();
} catch (IOException e) {
fail("Test 1: Unexpected IOException: " + e.getMessage());
}
}
public void test_flush() throws Exception {
// Test for method void java.io.PipedWriter.flush()
pw = new PipedWriter();
readerThread = new Thread(reader = new PReader(pw), "flush");
readerThread.start();
pw.write(testBuf);
pw.flush();
assertEquals("Test 1: Flush failed. ", testString,
reader.read(testLength));
}
Pipe(Term term) throws IOException {
pipedReader = new PipedReader();
pipedWriter = new PipedWriter(pipedReader);
term.addInputListener(new TermListener());
}
public TreeStreamResult (PipedWriter writer) {
this.writer = new TreeStreamWriter (writer);
}
/**
* Create it writing result in pipe (convertable to Reader).
*/
public TreeStreamWriter (PipedWriter writer) {
this.writer = writer;
}
public PipedWriter getPipedWriter() {
return pipeW;
}
public PipedWriter getPipedWriter() {
return pipeW;
}
public void testInterruptPipedReader() throws Exception {
PipedWriter writer = new PipedWriter();
PipedReader reader = new PipedReader(writer);
testInterruptReader(reader);
}
public void testInterruptPipedWriter() throws Exception {
final PipedWriter writer = new PipedWriter();
new PipedReader(writer);
testInterruptWriter(writer);
}
public PipedWriter getPipedWriter() {
return pipeW;
}
/**
* Instantiates the RecordReader.
* @param format RDF serialization format to parse.
* @param charBufferSize Number of input characters to hold in
* memory; if exceeded, wait until the parser
* thread consumes some text before proceeding
* with reading input.
* @param statementBufferSize Number of output statements to hold in
* memory; if exceeded, wait until the
* client consumes data before proceeding
* with parsing.
* @param timeoutSeconds Number of seconds to wait for the parser
* thread to provide the next statement (or
* state that there are none). If exceeded,
* abort.
*/
RdfFileRecordReader(RDFFormat format, int charBufferSize, int statementBufferSize, int timeoutSeconds) {
rdfParser = Rio.createParser(format);
rdfParser.setRDFHandler(this);
statementCache = new LinkedBlockingQueue<RyaStatementWritable>(statementBufferSize);
pipeOut = new PipedWriter();
pipeIn = new PipedReader(charBufferSize);
this.timeoutSeconds = timeoutSeconds;
logger.info("Initializing RecordReader with parameters:");
logger.info("\tRDF serialization format = " + format.getName());
logger.info("\tinput buffer size = " + charBufferSize + " characters");
logger.info("\tstatement cache size = " + statementBufferSize);
logger.info("\tparser timeout = " + timeoutSeconds + " seconds");
}