下面列出了java.nio.channels.AsynchronousSocketChannel#open() 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testConnect() throws Exception {
try (Server server = new Server()) {
try (AsynchronousSocketChannel ch =
AsynchronousSocketChannel.open(GROUP)) {
CountDownLatch latch = new CountDownLatch(1);
Handler<Void,Object> handler =
new Handler<Void,Object>("connect", latch);
ReferenceQueue queue = new ReferenceQueue<WeakReference>();
WeakReference<Object> ref =
new WeakReference<Object>(handler, queue);
ch.connect(server.address(), null, handler);
try { latch.await(); } catch (InterruptedException ignore) { }
handler = null;
waitForRefToClear(ref, queue);
server.accept().get().close();
}
}
}
public void startClient(SocketAddress addr, Object attachment, AsynchronousChannelGroup group)
{
AsynchronousSocketChannel channel = null;
try
{
channel = AsynchronousSocketChannel.open(group);
int recvBufSize = onChannelCreated(channel, attachment);
if (recvBufSize >= 0)
channel.connect(addr, new ConnectParam(channel, recvBufSize), _connectHandler);
else
channel.close();
}
catch (Throwable e)
{
doException(null, e);
closeChannel(channel);
}
}
@SuppressWarnings("unchecked")
@Override
public <T extends Closeable> T connect(String host, int port, PropertySet props, int loginTimeout) throws IOException {
try {
this.channel = AsynchronousSocketChannel.open();
//channel.setOption(java.net.StandardSocketOptions.TCP_NODELAY, true);
this.channel.setOption(java.net.StandardSocketOptions.SO_SNDBUF, 128 * 1024);
this.channel.setOption(java.net.StandardSocketOptions.SO_RCVBUF, 128 * 1024);
Future<Void> connectPromise = this.channel.connect(new InetSocketAddress(host, port));
connectPromise.get();
} catch (CJCommunicationsException e) {
throw e;
} catch (IOException | InterruptedException | ExecutionException | RuntimeException ex) {
throw new CJCommunicationsException(ex);
}
return (T) this.channel;
}
private void connect() throws BindException{
String errorMessage = "";
if (socketChannel == null || !socketChannel.isOpen() || closed) {
try {
socketChannel = AsynchronousSocketChannel.open();
socketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
socketChannel.connect(new InetSocketAddress(ip, port)).get(timeout, TimeUnit.MILLISECONDS);
closed = false;
//when connect to the server, keep receiving data either server response or server call
receive();
} catch (Exception e) {
log.error("Connection error: " + e.getMessage());
errorMessage = e.getMessage();
}
}
if (socketChannel == null) {
throw new BindException(errorMessage);
}
}
@Override
public void connect(String host, int port, AioClientListener listener) {
this.host = host;
this.port = port;
this.listener = listener;
try {
socketChannel = AsynchronousSocketChannel.open();
socketChannel.connect(new InetSocketAddress(host, port), null, this);
} catch (IOException e) {
logger.warn(e, "连接到服务端[{}:{}]时发生异常!", host, port);
}
}
protected NetworkChannel openSocketChannel(boolean isAIO)
throws IOException {
if (isAIO) {
return AsynchronousSocketChannel
.open(MycatServer.getInstance().getNextAsyncChannelGroup());
} else {
SocketChannel channel = null;
channel = SocketChannel.open();
channel.configureBlocking(false);
return channel;
}
}
@Test
public void testRead() throws Exception {
try (Server server = new Server();
AsynchronousSocketChannel ch =
AsynchronousSocketChannel.open(GROUP)) {
ch.connect(server.address()).get();
try (AsynchronousSocketChannel sc = server.accept().get()) {
ByteBuffer src = ByteBuffer.wrap("hello".getBytes("UTF-8"));
sc.setOption(SO_SNDBUF, src.remaining());
sc.write(src).get();
CountDownLatch latch = new CountDownLatch(1);
Handler<Integer,Object> handler =
new Handler<Integer,Object>("read", latch);
ReferenceQueue queue = new ReferenceQueue<WeakReference>();
WeakReference<Object> ref =
new WeakReference<Object>(handler, queue);
ByteBuffer dst = ByteBuffer.allocate(64);
ch.read(dst, null, handler);
try { latch.await(); } catch (InterruptedException ignore) { }
handler = null;
waitForRefToClear(ref, queue);
}
}
}
@Override
public void startService() {
super.startService();
//检查writer
this.checkWriter();
try{
//如果TCP连接已经建立,不需要建立连接,如acceptor,如果是服务消费者,则直接跳过
if(channel==null){
this.checkChannelGroup();
channel = AsynchronousSocketChannel.open(channelGroup);
channel.connect(new InetSocketAddress(this.getHost(),this.getPort()));
logger.info("connect to "+this.getHost()+":"+this.getPort()+" success");
}
//JDK7
InetSocketAddress remoteAddress = (InetSocketAddress)channel.getRemoteAddress();
InetSocketAddress localAddress = (InetSocketAddress)channel.getLocalAddress();
String remote = RpcUtils.genAddressString("remoteAddress-> ", remoteAddress);
String local = RpcUtils.genAddressString("localAddress-> ", localAddress);
logger.info(local+" "+remote);
remotePort = remoteAddress.getPort();
remoteHost = remoteAddress.getAddress().getHostAddress();
//注册上去
this.getRpcWriter().registerWrite(this);
this.getRpcWriter().startService();
this.fireStartNetListeners();
//start read
this.channel.read(readBuf, this, readHandler);
}catch(IOException e){
logger.error("connect to host "+this.getHost()+" port "+this.getPort()+" failed", e);
throw new RpcException("connect to host error");
}
}
static CompletionStage<AsynchronousSocketChannel> connect(final SocketAddress addr) {
try {
final AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();
return connect(channel, addr).thenApply(ig -> channel);
} catch (final IOException e) {
return StageSupport.exceptionalStage(e);
}
}
public AsyncTimeClientHandler(String host, int port) {
this.host = host;
this.port = port;
try {
this.timeClient = AsynchronousSocketChannel.open();
} catch (IOException e) {
e.printStackTrace();
}
}
@Test
public void testWrite() throws Exception {
try (Server server = new Server();
AsynchronousSocketChannel ch =
AsynchronousSocketChannel.open(GROUP)) {
ch.connect(server.address()).get();
try (AsynchronousSocketChannel sc = server.accept().get()) {
ByteBuffer src = ByteBuffer.wrap("hello".getBytes("UTF-8"));
sc.setOption(SO_SNDBUF, src.remaining());
CountDownLatch latch = new CountDownLatch(1);
Handler<Integer,Object> handler =
new Handler<Integer,Object>("write", latch);
ReferenceQueue queue = new ReferenceQueue<WeakReference>();
WeakReference<Object> ref =
new WeakReference<Object>(handler, queue);
sc.write(src, null, handler);
try { latch.await(); } catch (InterruptedException ignore) { }
handler = null;
waitForRefToClear(ref, queue);
}
}
}
private AsyncEchoClient() {
try {
client = AsynchronousSocketChannel.open();
InetSocketAddress hostAddress = new InetSocketAddress("localhost", 4999);
future = client.connect(hostAddress);
start();
} catch (IOException e) {
e.printStackTrace();
}
}
@Test
public void testRead() throws Exception {
try (Server server = new Server();
AsynchronousSocketChannel ch =
AsynchronousSocketChannel.open(GROUP)) {
ch.connect(server.address()).get();
try (AsynchronousSocketChannel sc = server.accept().get()) {
ByteBuffer src = ByteBuffer.wrap("hello".getBytes("UTF-8"));
sc.setOption(SO_SNDBUF, src.remaining());
sc.write(src).get();
CountDownLatch latch = new CountDownLatch(1);
Handler<Integer,Object> handler =
new Handler<Integer,Object>("read", latch);
ReferenceQueue queue = new ReferenceQueue<WeakReference>();
WeakReference<Object> ref =
new WeakReference<Object>(handler, queue);
ByteBuffer dst = ByteBuffer.allocate(64);
ch.read(dst, null, handler);
try { latch.await(); } catch (InterruptedException ignore) { }
handler = null;
waitForRefToClear(ref, queue);
}
}
}
@Test
public void testWrite() throws Exception {
try (Server server = new Server();
AsynchronousSocketChannel ch =
AsynchronousSocketChannel.open(GROUP)) {
ch.connect(server.address()).get();
try (AsynchronousSocketChannel sc = server.accept().get()) {
ByteBuffer src = ByteBuffer.wrap("hello".getBytes("UTF-8"));
sc.setOption(SO_SNDBUF, src.remaining());
CountDownLatch latch = new CountDownLatch(1);
Handler<Integer,Object> handler =
new Handler<Integer,Object>("write", latch);
ReferenceQueue queue = new ReferenceQueue<WeakReference>();
WeakReference<Object> ref =
new WeakReference<Object>(handler, queue);
sc.write(src, null, handler);
try { latch.await(); } catch (InterruptedException ignore) { }
handler = null;
waitForRefToClear(ref, queue);
}
}
}
public static void main(String[] args) throws IOException {
AsynchronousSocketChannel.open();
}
public static void main(String[] args) throws IOException {
AsynchronousSocketChannel.open();
}
public static void main(String[] args) throws IOException {
AsynchronousSocketChannel.open();
}
public static void main(String[] args) throws IOException {
AsynchronousSocketChannel.open();
}
public static void main(String[] args) throws IOException {
AsynchronousSocketChannel.open();
}
/**
* 启动客户端。
* <p>
* 在与服务端建立连接期间,该方法处于阻塞状态。直至连接建立成功,或者发生异常。
* </p>
* <p>
* 该start方法支持外部指定AsynchronousChannelGroup,实现多个客户端共享一组线程池资源,有效提升资源利用率。
* </p>
*
* @param asynchronousChannelGroup IO事件处理线程组
* @return 建立连接后的会话对象
* @throws IOException IOException
* @see AsynchronousSocketChannel#connect(SocketAddress)
*/
public AioSession<T> start(AsynchronousChannelGroup asynchronousChannelGroup) throws IOException {
AsynchronousSocketChannel socketChannel = null;
try {
socketChannel = AsynchronousSocketChannel.open(asynchronousChannelGroup);
if (bufferPool == null) {
bufferPool = config.getBufferFactory().create();
this.innerBufferPool = bufferPool;
}
//set socket options
if (config.getSocketOptions() != null) {
for (Map.Entry<SocketOption<Object>, Object> entry : config.getSocketOptions().entrySet()) {
socketChannel.setOption(entry.getKey(), entry.getValue());
}
}
//bind host
if (localAddress != null) {
socketChannel.bind(localAddress);
}
Future<Void> future = socketChannel.connect(new InetSocketAddress(config.getHost(), config.getPort()));
if (connectTimeout > 0) {
future.get(connectTimeout, TimeUnit.MILLISECONDS);
} else {
future.get();
}
AsynchronousSocketChannel connectedChannel = socketChannel;
if (config.getMonitor() != null) {
connectedChannel = config.getMonitor().shouldAccept(socketChannel);
}
if (connectedChannel == null) {
throw new RuntimeException("NetMonitor refuse channel");
}
//连接成功则构造AIOSession对象
session = new TcpAioSession<T>(connectedChannel, config, new ReadCompletionHandler<T>(), new WriteCompletionHandler<T>(), bufferPool.allocateBufferPage());
session.initSession();
return session;
} catch (Exception e) {
if (socketChannel != null) {
IOUtil.close(socketChannel);
}
shutdownNow();
throw new IOException(e);
}
}