下面列出了怎么用java.nio.channels.CompletionHandler的API类实例代码及写法,或者点击链接到github查看源代码。
private OperationState(boolean read, ByteBuffer[] buffers, int offset, int length,
BlockingMode block, long timeout, TimeUnit unit, A attachment,
CompletionCheck check, CompletionHandler<Long, ? super A> handler,
Semaphore semaphore) {
this.read = read;
this.buffers = buffers;
this.offset = offset;
this.length = length;
this.block = block;
this.timeout = timeout;
this.unit = unit;
this.attachment = attachment;
this.check = check;
this.handler = handler;
this.semaphore = semaphore;
}
@Override
public <A> void read(
ByteBuffer dst,
A attach, CompletionHandler<Integer, ? super A> handler) {
checkReadOnly(dst);
if (!dst.hasRemaining()) {
completeWithZeroInt(attach, handler);
return;
}
group.startRead(
registeredSocket,
new ByteBufferSet(dst),
0, TimeUnit.MILLISECONDS,
c -> group.executor.submit(() -> handler.completed((int) c, attach)),
e -> group.executor.submit(() -> handler.failed(e, attach)));
}
@Override
public <A> void write(
ByteBuffer src,
long timeout,
TimeUnit unit,
A attach,
CompletionHandler<Integer, ? super A> handler) {
if (!src.hasRemaining()) {
completeWithZeroInt(attach, handler);
return;
}
group.startWrite(
registeredSocket,
new ByteBufferSet(src),
timeout,
unit,
c -> group.executor.submit(() -> handler.completed((int) c, attach)),
e -> group.executor.submit(() -> handler.failed(e, attach)));
}
@Override
public <A> void read(
ByteBuffer[] dsts, int offset, int length,
long timeout, TimeUnit unit,
A attach, CompletionHandler<Long, ? super A> handler) {
ByteBufferSet bufferSet = new ByteBufferSet(dsts, offset, length);
if (bufferSet.isReadOnly()) {
throw new IllegalArgumentException("buffer is read-only");
}
if (!bufferSet.hasRemaining()) {
completeWithZeroLong(attach, handler);
return;
}
group.startRead(
registeredSocket,
bufferSet,
timeout, unit,
c -> group.executor.submit(() -> handler.completed(c, attach)),
e -> group.executor.submit(() -> handler.failed(e, attach)));
}
@Override
public <A> void write(
ByteBuffer src,
long timeout, TimeUnit unit,
A attach, CompletionHandler<Integer, ? super A> handler) {
if (!src.hasRemaining()) {
completeWithZeroInt(attach, handler);
return;
}
group.startWrite(
registeredSocket,
new ByteBufferSet(src),
timeout, unit,
c -> group.executor.submit(() -> handler.completed((int) c, attach)),
e -> group.executor.submit(() -> handler.failed(e, attach)));
}
@Override
public <A> void write(
ByteBuffer[] srcs, int offset, int length,
long timeout, TimeUnit unit,
A attach, CompletionHandler<Long, ? super A> handler) {
ByteBufferSet bufferSet = new ByteBufferSet(srcs, offset, length);
if (!bufferSet.hasRemaining()) {
completeWithZeroLong(attach, handler);
return;
}
group.startWrite(
registeredSocket,
bufferSet,
timeout, unit,
c -> group.executor.submit(() -> handler.completed(c, attach)),
e -> group.executor.submit(() -> handler.failed(e, attach)));
}
@Override
@NonNull
public JavaFileObject createFileObject(
@NonNull final Location location,
@NonNull final File file,
@NonNull final File root,
@NullAllowed final JavaFileFilterImplementation filter,
@NullAllowed final Charset encoding) {
final CompletionHandler<Void,Void> handler = getAsyncHandler();
return handler == null || !JavaIndexerWorker.supportsConcurrent()?
FileObjects.fileFileObject(file, root, filter, encoding) :
FileObjects.asyncWriteFileObject(
file,
root,
filter,
encoding,
JavaIndexerWorker.getExecutor(),
handler);
}
@NonNull
public static PrefetchableJavaFileObject asyncWriteFileObject(
@NonNull final File file,
@NonNull final File root,
@NullAllowed JavaFileFilterImplementation filter,
@NullAllowed Charset encoding,
@NonNull final Executor pool,
@NonNull final CompletionHandler<Void,Void> done) {
final String[] pkgNamePair = getFolderAndBaseName(getRelativePath(root,file),File.separatorChar);
return new AsyncWriteFileObject(
file,
convertFolder2Package(pkgNamePair[0], File.separatorChar),
pkgNamePair[1],
filter,
encoding,
pool,
done);
}
@Override
public <A> void read(
ByteBuffer[] dsts,
int offset,
int length,
long timeout,
TimeUnit unit,
A attach,
CompletionHandler<Long, ? super A> handler) {
ByteBufferSet bufferSet = new ByteBufferSet(dsts, offset, length);
if (bufferSet.isReadOnly()) {
throw new IllegalArgumentException("buffer is read-only");
}
if (!bufferSet.hasRemaining()) {
completeWithZeroLong(attach, handler);
return;
}
group.startRead(
registeredSocket,
bufferSet,
timeout,
unit,
c -> group.executor.submit(() -> handler.completed(c, attach)),
e -> group.executor.submit(() -> handler.failed(e, attach)));
}
@Override
public <A> void read(
ByteBuffer dst,
long timeout,
TimeUnit unit,
A attach,
CompletionHandler<Integer, ? super A> handler) {
checkReadOnly(dst);
if (!dst.hasRemaining()) {
completeWithZeroInt(attach, handler);
return;
}
group.startRead(
registeredSocket,
new ByteBufferSet(dst),
timeout,
unit,
c -> group.executor.submit(() -> handler.completed((int) c, attach)),
e -> group.executor.submit(() -> handler.failed(e, attach)));
}
@Override
public <B,A extends B> void read(ByteBuffer dst, A attachment,
CompletionHandler<Integer,B> handler) {
WrapperFuture<Integer,B> future =
new WrapperFuture<Integer, B>(handler, attachment);
if (!reading.compareAndSet(false, true)) {
throw new IllegalStateException(sm.getString(
"asyncChannelWrapperSecure.concurrentRead"));
}
ReadTask readTask = new ReadTask(dst, future);
executor.execute(readTask);
}
@Override
public <B,A extends B> void read(ByteBuffer dst, A attachment,
CompletionHandler<Integer,B> handler) {
WrapperFuture<Integer,B> future =
new WrapperFuture<Integer, B>(handler, attachment);
if (!reading.compareAndSet(false, true)) {
throw new IllegalStateException(sm.getString(
"asyncChannelWrapperSecure.concurrentRead"));
}
ReadTask readTask = new ReadTask(dst, future);
executor.execute(readTask);
}
/**
* Asynchronously write a message with a notification being delivered to <code>callback</code> upon completion of write of entire message.
*
* @param message
* message extending {@link XMessage}
* @param callback
* an optional callback to receive notification of when the message is completely written
*/
public void writeAsync(XMessage message, CompletionHandler<Long, Void> callback) {
MessageLite msg = message.getMessage();
int type = MessageConstants.getTypeForMessageClass(msg.getClass());
int size = msg.getSerializedSize();
int payloadSize = size + 1;
// we check maxAllowedPacket against payloadSize as that's considered the "packet size" (not including 4 byte size header)
if (this.maxAllowedPacket > 0 && payloadSize > this.maxAllowedPacket) {
throw new CJPacketTooBigException(Messages.getString("PacketTooBigException.1", new Object[] { size, this.maxAllowedPacket }));
}
// for debugging
//System.err.println("Initiating write of message (size=" + payloadSize + ", tag=" + com.mysql.cj.mysqlx.protobuf.Mysqlx.ClientMessages.Type.valueOf(type) + ")");
ByteBuffer messageBuf = ByteBuffer.allocate(HEADER_LEN + size).order(ByteOrder.LITTLE_ENDIAN).putInt(payloadSize);
messageBuf.put((byte) type);
try {
// directly access the ByteBuffer's backing array as protobuf's CodedOutputStream.newInstance(ByteBuffer) is giving a stream that doesn't actually
// write any data
msg.writeTo(CodedOutputStream.newInstance(messageBuf.array(), HEADER_LEN, size + HEADER_LEN));
messageBuf.position(messageBuf.limit());
} catch (IOException ex) {
throw new CJCommunicationsException("Unable to write message", ex);
}
messageBuf.flip();
this.bufferWriter.queueBuffer(messageBuf, callback);
}
@Override
public <B,A extends B> void write(ByteBuffer[] srcs, int offset, int length,
long timeout, TimeUnit unit, A attachment,
CompletionHandler<Long,B> handler) {
WrapperFuture<Long,B> future =
new WrapperFuture<Long, B>(handler, attachment);
if (!writing.compareAndSet(false, true)) {
throw new IllegalStateException(sm.getString(
"asyncChannelWrapperSecure.concurrentWrite"));
}
WriteTask writeTask = new WriteTask(srcs, offset, length, future);
executor.execute(writeTask);
}
@SuppressWarnings({"unchecked", "rawtypes"})
public MySQLConnection make(MySQLDataSource pool, ResponseHandler handler,
String schema) throws IOException {
DataSourceConfig dsc = pool.getConfig();
NetworkChannel channel = openSocketChannel(DbleServer.getInstance().isAIO());
MySQLConnection c = new MySQLConnection(channel, pool.isReadNode(), pool.isAutocommitSynced(), pool.isIsolationSynced());
c.setSocketParams(false);
c.setHost(dsc.getIp());
c.setPort(dsc.getPort());
c.setUser(dsc.getUser());
c.setPassword(dsc.getPassword());
c.setSchema(schema);
c.setHandler(new MySQLConnectionAuthenticator(c, handler));
c.setPool(pool);
c.setIdleTimeout(pool.getConfig().getIdleTimeout());
if (channel instanceof AsynchronousSocketChannel) {
((AsynchronousSocketChannel) channel).connect(
new InetSocketAddress(dsc.getIp(), dsc.getPort()), c,
(CompletionHandler) DbleServer.getInstance().getConnector());
} else {
((NIOConnector) DbleServer.getInstance().getConnector()).postConnect(c);
}
return c;
}
@Override
public <B,A extends B> void write(ByteBuffer[] srcs, int offset, int length,
long timeout, TimeUnit unit, A attachment,
CompletionHandler<Long,B> handler) {
socketChannel.write(
srcs, offset, length, timeout, unit, attachment, handler);
}
@Override
public <A> void write(
ByteBuffer src,
long position,
@NullableDecl A attachment,
CompletionHandler<Integer, ? super A> handler) {
addCallback(write(src, position), handler, attachment);
}
/**
* Enqueue a read
* @param completionHandler callback on completed read
*/
public void read(CompletionHandler<Integer, ? super ByteBuffer> completionHandler) {
ByteBuffer input = ByteBuffer.allocate(256);
if (!channel.isOpen()) {
return;
}
channel.read(input, input, completionHandler);
}
@Override
public <B,A extends B> void write(ByteBuffer[] srcs, int offset, int length,
long timeout, TimeUnit unit, A attachment,
CompletionHandler<Long,B> handler) {
socketChannel.write(
srcs, offset, length, timeout, unit, attachment, handler);
}
static void checkAsyncModifier(Class param, Method method) {
if (param == CompletionHandler.class) return;
if (Modifier.isFinal(param.getModifiers())) {
throw new RuntimeException("CompletionHandler Type Parameter on {" + method + "} cannot final modifier");
}
if (!Modifier.isPublic(param.getModifiers())) {
throw new RuntimeException("CompletionHandler Type Parameter on {" + method + "} must be public modifier");
}
if (param.isInterface()) return;
boolean constructorflag = false;
for (Constructor c : param.getDeclaredConstructors()) {
if (c.getParameterCount() == 0) {
int mod = c.getModifiers();
if (Modifier.isPublic(mod) || Modifier.isProtected(mod)) {
constructorflag = true;
break;
}
}
}
if (param.getDeclaredConstructors().length == 0) constructorflag = true;
if (!constructorflag) throw new RuntimeException(param + " must have a empty parameter Constructor");
for (Method m : param.getMethods()) {
if (m.getName().equals("completed") && Modifier.isFinal(m.getModifiers())) {
throw new RuntimeException(param + "'s completed method cannot final modifier");
} else if (m.getName().equals("failed") && Modifier.isFinal(m.getModifiers())) {
throw new RuntimeException(param + "'s failed method cannot final modifier");
} else if (m.getName().equals("sncp_getParams") && Modifier.isFinal(m.getModifiers())) {
throw new RuntimeException(param + "'s sncp_getParams method cannot final modifier");
} else if (m.getName().equals("sncp_setParams") && Modifier.isFinal(m.getModifiers())) {
throw new RuntimeException(param + "'s sncp_setParams method cannot final modifier");
} else if (m.getName().equals("sncp_setFuture") && Modifier.isFinal(m.getModifiers())) {
throw new RuntimeException(param + "'s sncp_setFuture method cannot final modifier");
} else if (m.getName().equals("sncp_getFuture") && Modifier.isFinal(m.getModifiers())) {
throw new RuntimeException(param + "'s sncp_getFuture method cannot final modifier");
}
}
}
@Override
public <A> void write(ByteBuffer src, A attach, CompletionHandler<Integer, ? super A> handler) {
if (!src.hasRemaining()) {
completeWithZeroInt(attach, handler);
return;
}
group.startWrite(
registeredSocket,
new ByteBufferSet(src),
0,
TimeUnit.MILLISECONDS,
c -> group.executor.submit(() -> handler.completed((int) c, attach)),
e -> group.executor.submit(() -> handler.failed(e, attach)));
}
/**
* Enqueue a read
* @param completionHandler callback on completed read
*/
public void read(CompletionHandler<Integer, ? super ByteBuffer> completionHandler) {
ByteBuffer input = ByteBuffer.allocate(256);
if (!channel.isOpen()) {
return;
}
channel.read(input, input, completionHandler);
}
/**
* Enqueue a read
* @param completionHandler callback on completed read
*/
public void read(CompletionHandler<Integer, ? super ByteBuffer> completionHandler) {
ByteBuffer input = ByteBuffer.allocate(256);
if (!channel.isOpen()) {
return;
}
channel.read(input, input, completionHandler);
}
/**
* Enqueue a read
* @param completionHandler callback on completed read
*/
public void read(CompletionHandler<Integer, ? super ByteBuffer> completionHandler) {
ByteBuffer input = ByteBuffer.allocate(256);
if (!channel.isOpen()) {
return;
}
channel.read(input, input, completionHandler);
}
public CompletableFuture<SqlResult> asyncExecuteSql(String sql, List<Object> args) {
newCommand();
CompletableFuture<SqlResult> f = new CompletableFuture<>();
com.mysql.cj.protocol.MessageListener<XMessage> l = new SqlResultMessageListener(f, this.fieldFactory, this.noticeFactory,
this.serverSession.getDefaultTimeZone());
CompletionHandler<Long, Void> resultHandler = new ErrorToFutureCompletionHandler<>(f, () -> ((AsyncMessageReader) this.reader).pushMessageListener(l));
((AsyncMessageSender) this.writer).writeAsync(this.messageBuilder.buildSqlStatement(sql, args), resultHandler);
return f;
}
@SuppressWarnings("unchecked")
@Override
public <RES extends QueryResult> CompletableFuture<RES> sendAsync(Message message) {
newCommand();
CompletableFuture<StatementExecuteOk> f = new CompletableFuture<>();
final StatementExecuteOkMessageListener l = new StatementExecuteOkMessageListener(f, this.noticeFactory);
CompletionHandler<Long, Void> resultHandler = new ErrorToFutureCompletionHandler<>(f, () -> ((AsyncMessageReader) this.reader).pushMessageListener(l));
((AsyncMessageSender) this.writer).writeAsync((XMessage) message, resultHandler);
return (CompletableFuture<RES>) f;
}
/**
* Completion handler for channel writes.
*/
public void completed(Long bytesWritten, Void v) {
// collect completed writes to notify after initiating the next write
LinkedList<CompletionHandler<Long, Void>> completedWrites = new LinkedList<>();
synchronized (this.pendingWrites) {
while (this.pendingWrites.peek() != null && !this.pendingWrites.peek().getBuffer().hasRemaining() && completedWrites.size() < WRITES_AT_ONCE) {
completedWrites.add(this.pendingWrites.remove().getHandler());
}
// notify handler(s) before initiating write to satisfy ordering guarantees
completedWrites.stream().filter(Objects::nonNull).forEach(l -> {
// prevent exceptions in handler from blocking other notifications
try {
l.completed(0L, null);
} catch (Throwable ex) {
// presumably unexpected, notify so futures don't block
try {
l.failed(ex, null);
} catch (Throwable ex2) {
// nothing we can do here
ex2.printStackTrace(); // TODO log error normally instead of sysout
}
}
});
if (this.pendingWrites.size() > 0) {
initiateWrite();
}
}
}
/**
*
* @param buf
*/
public void write(ByteBuffer buf, CompletionHandler<Object, Object> handler) {
if (this.closeAfterWrite) {
throw new CodingError("file is already closed");
}
// find out the max. number of bytes can be written
int bytesToWrite = (int)Math.min(buf.remaining(), this.filesize - pos);
ByteBuffer another = buf.asReadOnlyBuffer();
another.limit(another.position() + bytesToWrite);
// increase write count, prevent accidental closing
this.writeCount.incrementAndGet();
// close this file if it reaches the end
if (bytesToWrite == this.filesize - pos) {
this.closeAfterWrite = true;
}
// write to the file
_log.trace("writting {} bytes", bytesToWrite);
this.ch.write(another, this.pos, null, new MyHandler(bytesToWrite, handler));
try {
this.ch.force(true);
}
catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
this.pos += bytesToWrite;
// reposition the read pointer
buf.position(buf.position() + bytesToWrite);
}
@SuppressWarnings("unchecked")
@Override
public <RES extends QueryResult> CompletableFuture<RES> sendAsync(Message message) {
newCommand();
CompletableFuture<StatementExecuteOk> f = new CompletableFuture<>();
final StatementExecuteOkMessageListener l = new StatementExecuteOkMessageListener(f, this.noticeFactory);
CompletionHandler<Long, Void> resultHandler = new ErrorToFutureCompletionHandler<>(f, () -> this.reader.pushMessageListener(l));
this.sender.send((XMessage) message, resultHandler);
return (CompletableFuture<RES>) f;
}
@SuppressWarnings({ "unchecked", "rawtypes" })
public PostgreSQLBackendConnection make(PostgreSQLDataSource pool,
ResponseHandler handler, final String schema) throws IOException {
final DBHostConfig dsc = pool.getConfig();
NetworkChannel channel = this.openSocketChannel(MycatServer
.getInstance().isAIO());
final PostgreSQLBackendConnection c = new PostgreSQLBackendConnection(
channel, pool.isReadNode());
MycatServer.getInstance().getConfig().setSocketParams(c, false);
// 设置NIOHandler
c.setHandler(new PostgreSQLBackendConnectionHandler(c));
c.setHost(dsc.getIp());
c.setPort(dsc.getPort());
c.setUser(dsc.getUser());
c.setPassword(dsc.getPassword());
c.setSchema(schema);
c.setPool(pool);
c.setResponseHandler(handler);
c.setIdleTimeout(pool.getConfig().getIdleTimeout());
if (channel instanceof AsynchronousSocketChannel) {
((AsynchronousSocketChannel) channel).connect(
new InetSocketAddress(dsc.getIp(), dsc.getPort()), c,
(CompletionHandler) MycatServer.getInstance()
.getConnector());
} else {
((NIOConnector) MycatServer.getInstance().getConnector())
.postConnect(c);
}
return c;
}