下面列出了怎么用java.nio.channels.AsynchronousSocketChannel的API类实例代码及写法,或者点击链接到github查看源代码。
@SuppressWarnings({"unchecked", "rawtypes"})
public MySQLConnection make(MySQLDataSource pool, ResponseHandler handler,
String schema) throws IOException {
DataSourceConfig dsc = pool.getConfig();
NetworkChannel channel = openSocketChannel(DbleServer.getInstance().isAIO());
MySQLConnection c = new MySQLConnection(channel, pool.isReadNode(), pool.isAutocommitSynced(), pool.isIsolationSynced());
c.setSocketParams(false);
c.setHost(dsc.getIp());
c.setPort(dsc.getPort());
c.setUser(dsc.getUser());
c.setPassword(dsc.getPassword());
c.setSchema(schema);
c.setHandler(new MySQLConnectionAuthenticator(c, handler));
c.setPool(pool);
c.setIdleTimeout(pool.getConfig().getIdleTimeout());
if (channel instanceof AsynchronousSocketChannel) {
((AsynchronousSocketChannel) channel).connect(
new InetSocketAddress(dsc.getIp(), dsc.getPort()), c,
(CompletionHandler) DbleServer.getInstance().getConnector());
} else {
((NIOConnector) DbleServer.getInstance().getConnector()).postConnect(c);
}
return c;
}
@Test
public void testConnect() throws Exception {
try (Server server = new Server()) {
try (AsynchronousSocketChannel ch =
AsynchronousSocketChannel.open(GROUP)) {
CountDownLatch latch = new CountDownLatch(1);
Handler<Void,Object> handler =
new Handler<Void,Object>("connect", latch);
ReferenceQueue queue = new ReferenceQueue<WeakReference>();
WeakReference<Object> ref =
new WeakReference<Object>(handler, queue);
ch.connect(server.address(), null, handler);
try { latch.await(); } catch (InterruptedException ignore) { }
handler = null;
waitForRefToClear(ref, queue);
server.accept().get().close();
}
}
}
private void createClientConnection(AsynchronousSocketChannel socketChannel) {
Injector injector = Guice.createInjector(getGLSPModule());
GsonConfigurator gsonConf = injector.getInstance(GsonConfigurator.class);
InputStream in = Channels.newInputStream(socketChannel);
OutputStream out = Channels.newOutputStream(socketChannel);
Consumer<GsonBuilder> configureGson = (GsonBuilder builder) -> gsonConf.configureGsonBuilder(builder);
Function<MessageConsumer, MessageConsumer> wrapper = Function.identity();
GLSPServer languageServer = injector.getInstance(GLSPServer.class);
Launcher<GLSPClient> launcher = Launcher.createIoLauncher(languageServer, GLSPClient.class, in, out, threadPool,
wrapper, configureGson);
languageServer.connect(launcher.getRemoteProxy());
launcher.startListening();
try {
SocketAddress remoteAddress = socketChannel.getRemoteAddress();
log.info("Started language server for client " + remoteAddress);
} catch (IOException ex) {
log.error("Failed to get the remoteAddress for the new client connection: " + ex.getMessage(), ex);
}
}
@SuppressWarnings("unchecked")
@Override
public <T extends Closeable> T connect(String host, int port, Properties props, int loginTimeout) throws IOException {
try {
this.channel = AsynchronousSocketChannel.open();
//channel.setOption(java.net.StandardSocketOptions.TCP_NODELAY, true);
this.channel.setOption(java.net.StandardSocketOptions.SO_SNDBUF, 128 * 1024);
this.channel.setOption(java.net.StandardSocketOptions.SO_RCVBUF, 128 * 1024);
Future<Void> connectPromise = this.channel.connect(new InetSocketAddress(host, port));
connectPromise.get();
} catch (CJCommunicationsException e) {
throw e;
} catch (IOException | InterruptedException | ExecutionException | RuntimeException ex) {
throw new CJCommunicationsException(ex);
}
return (T) this.channel;
}
/**
* @param asynchronousSocketChannel the asynchronousSocketChannel to set
*/
public void setAsynchronousSocketChannel(AsynchronousSocketChannel asynchronousSocketChannel) {
this.asynchronousSocketChannel = asynchronousSocketChannel;
if (asynchronousSocketChannel != null) {
try {
Node clientNode = createClientNode(asynchronousSocketChannel);
setClientNode(clientNode);
} catch (IOException e) {
log.info(e.toString(), e);
assignAnUnknownClientNode();
}
} else {
assignAnUnknownClientNode();
}
}
/***************************************
* {@inheritDoc}
*/
@Override
protected ByteBuffer execute(
ByteBuffer rData,
Continuation<?> rContinuation)
{
try
{
AsynchronousSocketChannel rChannel =
getSocketChannel(rContinuation);
if (rChannel.getRemoteAddress() == null)
{
rChannel.connect(getSocketAddress(rContinuation)).get();
}
performBlockingOperation(rChannel, rData);
}
catch (Exception e)
{
throw new CoroutineException(e);
}
return rData;
}
/***************************************
* Returns the channel to be used by this step. This first checks the
* currently exexcuting coroutine in the continuation parameter for an
* existing {@link #SOCKET_CHANNEL} relation. If that doesn't exists or the
* channel is closed a new {@link AsynchronousSocketChannel} will be opened
* and stored in the coroutine relation. Using the coroutine to store the
* channel allows coroutines to be structured so that multiple subroutines
* perform communication on different channels.
*
* @param rContinuation The continuation to query for an existing channel
*
* @return The socket channel
*
* @throws IOException If opening the channel fails
*/
protected AsynchronousSocketChannel getSocketChannel(
Continuation<?> rContinuation) throws IOException
{
Coroutine<?, ?> rCoroutine = rContinuation.getCurrentCoroutine();
AsynchronousSocketChannel rChannel = rCoroutine.get(SOCKET_CHANNEL);
if (rChannel == null || !rChannel.isOpen())
{
rChannel =
AsynchronousSocketChannel.open(getChannelGroup(rContinuation));
rCoroutine.set(SOCKET_CHANNEL, rChannel).annotate(MANAGED);
}
return rChannel;
}
/**
* @param asynchronousSocketChannel the asynchronousSocketChannel to set
*/
public void setAsynchronousSocketChannel(AsynchronousSocketChannel asynchronousSocketChannel)
{
this.asynchronousSocketChannel = asynchronousSocketChannel;
if (asynchronousSocketChannel != null)
{
try
{
Node clientNode = createClientNode(asynchronousSocketChannel);
setClientNode(clientNode);
} catch (IOException e)
{
log.info(e.toString(), e);
assignAnUnknownClientNode();
}
} else
{
assignAnUnknownClientNode();
}
}
@SuppressWarnings("unchecked")
@Override
public <T extends Closeable> T connect(String host, int port, PropertySet props, int loginTimeout) throws IOException {
try {
this.channel = AsynchronousSocketChannel.open();
//channel.setOption(java.net.StandardSocketOptions.TCP_NODELAY, true);
this.channel.setOption(java.net.StandardSocketOptions.SO_SNDBUF, 128 * 1024);
this.channel.setOption(java.net.StandardSocketOptions.SO_RCVBUF, 128 * 1024);
Future<Void> connectPromise = this.channel.connect(new InetSocketAddress(host, port));
connectPromise.get();
} catch (CJCommunicationsException e) {
throw e;
} catch (IOException | InterruptedException | ExecutionException | RuntimeException ex) {
throw new CJCommunicationsException(ex);
}
return (T) this.channel;
}
public void startClient(SocketAddress addr, Object attachment, AsynchronousChannelGroup group)
{
AsynchronousSocketChannel channel = null;
try
{
channel = AsynchronousSocketChannel.open(group);
int recvBufSize = onChannelCreated(channel, attachment);
if (recvBufSize >= 0)
channel.connect(addr, new ConnectParam(channel, recvBufSize), _connectHandler);
else
channel.close();
}
catch (Throwable e)
{
doException(null, e);
closeChannel(channel);
}
}
@Test
public void testConnect() throws Exception {
try (Server server = new Server()) {
try (AsynchronousSocketChannel ch =
AsynchronousSocketChannel.open(GROUP)) {
CountDownLatch latch = new CountDownLatch(1);
Handler<Void,Object> handler =
new Handler<Void,Object>("connect", latch);
ReferenceQueue queue = new ReferenceQueue<WeakReference>();
WeakReference<Object> ref =
new WeakReference<Object>(handler, queue);
ch.connect(server.address(), null, handler);
try { latch.await(); } catch (InterruptedException ignore) { }
handler = null;
waitForRefToClear(ref, queue);
server.accept().get().close();
}
}
}
@Override
public void failed(Throwable ex, ConnectParam param)
{
AsynchronousSocketChannel channel = param.channel;
try
{
SocketAddress addr = (channel.isOpen() ? channel.getRemoteAddress() : null);
closeChannel(channel);
onConnectFailed(addr, ex);
}
catch (Exception e)
{
closeChannel(channel);
doException(null, e);
}
}
private Optional<String> retrieveReport() {
try (AsynchronousSocketChannel chan = AsynchronousSocketChannel.open()) {
InetSocketAddress zkAddress = new InetSocketAddress(InetAddress.getLoopbackAddress(), zkPort);
Future<Void> connected = chan.connect(zkAddress);
connected.get(CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
Future<Integer> written = chan.write(ByteBuffer.wrap("mntr\n".getBytes(StandardCharsets.UTF_8)));
written.get(WRITE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
int nread = -1;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ByteBuffer buffer = ByteBuffer.allocate(4096);
do {
Future<Integer> read = chan.read(buffer);
nread = read.get(READ_TIMEOUT_MS, TimeUnit.MILLISECONDS);
buffer.flip();
baos.write(buffer.array());
buffer.clear();
} while (nread >= 0);
return Optional.of(baos.toString(StandardCharsets.UTF_8));
} catch (IOException | InterruptedException | ExecutionException | TimeoutException e) {
log.warning("Failure in retrieving monitoring data: (" + e.getClass().getName() + ") " + e.getMessage());
return Optional.empty();
}
}
/**
*
* @param tioConfig
* @param asynchronousSocketChannel
* @author tanyaowu
*/
public ChannelContext(TioConfig tioConfig, AsynchronousSocketChannel asynchronousSocketChannel) {
super();
init(tioConfig, asynchronousSocketChannel);
if (tioConfig.sslConfig != null) {
try {
SslFacadeContext sslFacadeContext = new SslFacadeContext(this);
if (tioConfig.isServer()) {
sslFacadeContext.beginHandshake();
}
} catch (Exception e) {
log.error("在开始SSL握手时发生了异常", e);
Tio.close(this, "在开始SSL握手时发生了异常" + e.getMessage(), CloseCode.SSL_ERROR_ON_HANDSHAKE);
return;
}
}
}
public AsyncChannelWrapperSecure(AsynchronousSocketChannel socketChannel,
SSLEngine sslEngine) {
this.socketChannel = socketChannel;
this.sslEngine = sslEngine;
int socketBufferSize = sslEngine.getSession().getPacketBufferSize();
socketReadBuffer = ByteBuffer.allocateDirect(socketBufferSize);
socketWriteBuffer = ByteBuffer.allocateDirect(socketBufferSize);
}
public static long getClientIdBySocketChannel(AsynchronousSocketChannel socketChannel) throws IOException{
InetSocketAddress address = (InetSocketAddress)socketChannel.getRemoteAddress();
byte[] quad = address.getAddress().getAddress();
int port = address.getPort();
long clientId = NetworkUtil.convertIpPortToUniqueId(quad, port);
return clientId;
}
@Override
public String put(AsynchronousSocketChannel socketChannel) {
String sessionId = getSessionId(socketChannel);
map.put(sessionId, socketChannel);
return sessionId;
}
@Override
public void reset(AsynchronousSocketChannel channel, SocketWrapperBase<Nio2Channel> socket)
throws IOException {
super.reset(channel, socket);
sslEngine = null;
sniComplete = false;
handshakeComplete = false;
unwrapBeforeRead = true;
closed = false;
closing = false;
netInBuffer.clear();
}
@Test
public void testWrite() throws Exception {
try (Server server = new Server();
AsynchronousSocketChannel ch =
AsynchronousSocketChannel.open(GROUP)) {
ch.connect(server.address()).get();
try (AsynchronousSocketChannel sc = server.accept().get()) {
ByteBuffer src = ByteBuffer.wrap("hello".getBytes("UTF-8"));
sc.setOption(SO_SNDBUF, src.remaining());
CountDownLatch latch = new CountDownLatch(1);
Handler<Integer,Object> handler =
new Handler<Integer,Object>("write", latch);
ReferenceQueue queue = new ReferenceQueue<WeakReference>();
WeakReference<Object> ref =
new WeakReference<Object>(handler, queue);
sc.write(src, null, handler);
try { latch.await(); } catch (InterruptedException ignore) { }
handler = null;
waitForRefToClear(ref, queue);
}
}
}
@Override
public void completed(AsynchronousSocketChannel socketChannel, AioSocketServer aioSocketServer) {
try {
session.setSocketChannel(socketChannel);
session.start();
} finally{
aioSocketServer.acceptConnections();
}
}
/**
* Instantiates a new {@link Client} with an existing {@link AsynchronousSocketChannel}.
*
* @param channel The channel to back this {@link Client} with.
*/
Client(AsynchronousSocketChannel channel) {
closing = new AtomicBoolean();
inCallback = new MutableBoolean();
readInProgress = new AtomicBoolean();
writeInProgress = new AtomicBoolean();
outgoingPackets = new ArrayDeque<>();
packetsToFlush = new ArrayDeque<>();
queue = new ArrayDeque<>();
stack = new ArrayDeque<>();
if (channel != null) {
this.channel = channel;
}
}
protected boolean addChannel(AsynchronousSocketChannel ch) {
synchronized ( channels ) {
if ( channels.isEmpty() ) {
return channels.add(ch);
} else {
return false;
}
}
}
@SuppressWarnings({ "unchecked", "rawtypes" })
public MySQLConnection make(MySQLDataSource pool, ResponseHandler handler,
String schema) throws IOException {
DBHostConfig dsc = pool.getConfig();
NetworkChannel channel = openSocketChannel(MycatServer.getInstance()
.isAIO());
MySQLConnection c = new MySQLConnection(channel, pool.isReadNode());
MycatServer.getInstance().getConfig().setSocketParams(c, false);
c.setHost(dsc.getIp());
c.setPort(dsc.getPort());
c.setUser(dsc.getUser());
c.setPassword(dsc.getPassword());
c.setSchema(schema);
c.setHandler(new MySQLConnectionAuthenticator(c, handler));
c.setPool(pool);
c.setIdleTimeout(pool.getConfig().getIdleTimeout());
if (channel instanceof AsynchronousSocketChannel) {
((AsynchronousSocketChannel) channel).connect(
new InetSocketAddress(dsc.getIp(), dsc.getPort()), c,
(CompletionHandler) MycatServer.getInstance()
.getConnector());
} else {
((NIOConnector) MycatServer.getInstance().getConnector())
.postConnect(c);
}
return c;
}
protected NetworkChannel openSocketChannel(boolean isAIO)
throws IOException {
if (isAIO) {
return AsynchronousSocketChannel
.open(MycatServer.getInstance().getNextAsyncChannelGroup());
} else {
SocketChannel channel = null;
channel = SocketChannel.open();
channel.configureBlocking(false);
return channel;
}
}
/***************************************
* {@inheritDoc}
*/
@Override
protected boolean performAsyncOperation(
int nBytesReceived,
AsynchronousSocketChannel rChannel,
ByteBuffer rData,
ChannelCallback<Integer, AsynchronousSocketChannel> rCallback)
throws IOException
{
boolean bFinished = false;
if (nBytesReceived >= 0)
{
bFinished = pCheckFinished.test(nBytesReceived, rData);
}
if (nBytesReceived != -1 && !bFinished && rData.hasRemaining())
{
rChannel.read(rData, rData, rCallback);
}
else
{
checkErrors(rData, nBytesReceived, bFinished);
rData.flip();
}
return bFinished;
}
private void shutdownOutput(AsynchronousSocketChannel channel) {
if (channel != null && channel.isOpen()) {
try {
LOG.info("shutdown output for ({})", channel);
channel.shutdownOutput();
} catch (IOException e) {
LOG.error("cannot shutdown output to ({})", channel, e);
}
}
}
@Override
public int onChannelCreated(AsynchronousSocketChannel channel, Object attachment) throws IOException
{
super.onChannelCreated(channel, attachment);
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
return 0;
}
static CompletionStage<AsynchronousSocketChannel> connect(final SocketAddress addr) {
try {
final AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();
return connect(channel, addr).thenApply(ig -> channel);
} catch (final IOException e) {
return StageSupport.exceptionalStage(e);
}
}
public AsyncChannelWrapperSecure(AsynchronousSocketChannel socketChannel,
SSLEngine sslEngine) {
this.socketChannel = socketChannel;
this.sslEngine = sslEngine;
int socketBufferSize = sslEngine.getSession().getPacketBufferSize();
socketReadBuffer = ByteBuffer.allocateDirect(socketBufferSize);
socketWriteBuffer = ByteBuffer.allocateDirect(socketBufferSize);
}
@Test
public void testWrite() throws Exception {
try (Server server = new Server();
AsynchronousSocketChannel ch =
AsynchronousSocketChannel.open(GROUP)) {
ch.connect(server.address()).get();
try (AsynchronousSocketChannel sc = server.accept().get()) {
ByteBuffer src = ByteBuffer.wrap("hello".getBytes("UTF-8"));
sc.setOption(SO_SNDBUF, src.remaining());
CountDownLatch latch = new CountDownLatch(1);
Handler<Integer,Object> handler =
new Handler<Integer,Object>("write", latch);
ReferenceQueue queue = new ReferenceQueue<WeakReference>();
WeakReference<Object> ref =
new WeakReference<Object>(handler, queue);
sc.write(src, null, handler);
try { latch.await(); } catch (InterruptedException ignore) { }
handler = null;
waitForRefToClear(ref, queue);
}
}
}