下面列出了怎么用java.nio.channels.AsynchronousServerSocketChannel的API类实例代码及写法,或者点击链接到github查看源代码。
/***************************************
* {@inheritDoc}
*/
@Override
protected Void execute(Void rData, Continuation<?> rContinuation)
{
try
{
AsynchronousServerSocketChannel rChannel =
getServerSocketChannel(rContinuation);
rRequestHandler.runBlocking(
rContinuation.scope(),
rChannel.accept().get());
}
catch (Exception e)
{
throw new CoroutineException(e);
}
return null;
}
/***************************************
* Returns the channel to be used by this step. This first checks the
* currently exexcuting coroutine in the continuation parameter for an
* existing {@link #SERVER_SOCKET_CHANNEL} relation. If that doesn't exists
* or if it contains a closed channel a new {@link
* AsynchronousServerSocketChannel} will be opened and stored in the state
* object.
*
* @param rContinuation The continuation to query for an existing channel
*
* @return The channel
*
* @throws IOException If opening the channel fails
*/
protected AsynchronousServerSocketChannel getServerSocketChannel(
Continuation<?> rContinuation) throws IOException
{
Coroutine<?, ?> rCoroutine = rContinuation.getCurrentCoroutine();
AsynchronousServerSocketChannel rChannel =
rCoroutine.get(SERVER_SOCKET_CHANNEL);
if (rChannel == null || !rChannel.isOpen())
{
rChannel =
AsynchronousServerSocketChannel.open(
getChannelGroup(rContinuation));
rCoroutine.set(SERVER_SOCKET_CHANNEL, rChannel)
.annotate(MetaTypes.MANAGED);
}
if (rChannel.getLocalAddress() == null)
{
rChannel.bind(getSocketAddress(rContinuation));
}
return rChannel;
}
/***************************************
* Opens and connects a {@link Channel} to the {@link SocketAddress} of this
* step and then performs the channel operation asynchronously.
*
* @param rSuspension The coroutine suspension to be resumed when the
* operation is complete
*/
private void acceptAsync(Suspension<Void> rSuspension)
{
try
{
AsynchronousServerSocketChannel rChannel =
getServerSocketChannel(rSuspension.continuation());
rChannel.accept(
null,
new AcceptCallback(rRequestHandler, rSuspension));
}
catch (Exception e)
{
rSuspension.fail(e);
}
}
/**
* Launches {@link XMLLanguageServer} using asynchronous server-socket channel and makes it accessible through the JSON
* RPC protocol defined by the LSP.
*
* @param args standard launch arguments. may contain <code>--port</code> argument to change the default port 5008
*/
public void launch(String[] args) throws Exception {
AsynchronousServerSocketChannel _open = AsynchronousServerSocketChannel.open();
int _port = getPort(args);
InetSocketAddress _inetSocketAddress = new InetSocketAddress("0.0.0.0", _port);
final AsynchronousServerSocketChannel serverSocket = _open.bind(_inetSocketAddress);
while (true) {
final AsynchronousSocketChannel socketChannel = serverSocket.accept().get();
final InputStream in = Channels.newInputStream(socketChannel);
final OutputStream out = Channels.newOutputStream(socketChannel);
final ExecutorService executorService = Executors.newCachedThreadPool();
XMLLanguageServer languageServer = new XMLLanguageServer();
final Launcher<LanguageClient> launcher = Launcher.createIoLauncher(languageServer, LanguageClient.class,
in, out, executorService, (MessageConsumer it) -> {
return it;
});
languageServer.setClient(launcher.getRemoteProxy());
launcher.startListening();
}
}
public static void main(final String[] args) throws IOException {
final AsynchronousServerSocketChannel server =
AsynchronousServerSocketChannel.open().bind(null);
final CompletionStage<AsynchronousSocketChannel> acceptStage = accept(server);
final SocketAddress addr = server.getLocalAddress();
final CompletionStage<AsynchronousSocketChannel> connectStage = connect(addr);
// after connecting, write the integer 42 to the server
final CompletionStage<Void> writeStage =
connectStage.thenAccept(channel -> writeInt(channel, 42));
final CompletionStage<Void> readStage = acceptStage
// after accepting, read an int from the socket
.thenCompose(NioBridge::readInt)
// print the result
.thenAccept(System.out::println);
// wait for the write and the read to complete
writeStage.toCompletableFuture().join();
readStage.toCompletableFuture().join();
}
public static void main(final String[] args) throws IOException {
final AsynchronousServerSocketChannel server =
AsynchronousServerSocketChannel.open().bind(null);
final CompletionStage<AsynchronousSocketChannel> acceptStage = accept(server);
final SocketAddress addr = server.getLocalAddress();
final CompletionStage<AsynchronousSocketChannel> connectStage = connect(addr);
// after connecting, write 100 random integers, then write -1
final CompletionStage<Void> writeStage =
connectStage.thenCompose(channel -> write100Randoms(channel));
final CompletionStage<List<Integer>> readStage =
acceptStage.thenCompose(Iteration::readUntilStopped);
// wait for the write and the read to complete, print read results
writeStage.toCompletableFuture().join();
System.out.println(readStage.toCompletableFuture().join());
}
/**
* Setup a server that will accept a connection from a single client, and then respond to every
* request sent by the client by incrementing the request by one.
*
* @return the {@link SocketAddress} of the created server
* @throws IOException
*/
static SocketAddress setupServer() throws IOException {
final AsynchronousServerSocketChannel server =
AsynchronousServerSocketChannel.open().bind(null);
final SocketAddress addr = server.getLocalAddress();
NioBridge.accept(server).thenAccept(channel -> {
AsyncIterator
.generate(() -> NioBridge.readInt(channel))
.thenCompose(clientIntRequest -> NioBridge.writeInt(channel, clientIntRequest + 1))
.consume()
.whenComplete((ignore, ex) -> {
System.out.println("connection closed, " + ex.getMessage());
});
});
return addr;
}
private void setupAndRunWithSocket(XLanguageServerImpl languageServer, Builder<LanguageClient> lsBuilder)
throws InterruptedException, ExecutionException, IOException {
InetSocketAddress address = new InetSocketAddress("localhost", options.getPort());
try (AsynchronousServerSocketChannel serverSocket = AsynchronousServerSocketChannel.open().bind(address);) {
// Attention: the VSCode LSP extension is waiting for this line 'Listening for LSP clients'.
N4jscConsole.println(LSP_SYNC_MESSAGE + " on port " + options.getPort() + "...");
try (AsynchronousSocketChannel socketChannel = serverSocket.accept().get();
InputStream in = Channels.newInputStream(socketChannel);
OutputStream out = Channels.newOutputStream(socketChannel)) {
N4jscConsole.println("Connected to LSP client");
run(languageServer, lsBuilder, in, out);
}
}
}
@Override
public void listen(int thread, int port, AioServerListener listener) {
this.port = port;
this.listener = listener;
try {
channelGroup = AsynchronousChannelGroup.withFixedThreadPool(thread, Executors.defaultThreadFactory());
serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup);
serverSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
serverSocketChannel.bind(new InetSocketAddress(port));
serverSocketChannel.accept(null, this);
if (logger.isInfoEnable())
logger.info("启动AIO监听[{}]服务。", port);
} catch (IOException e) {
logger.warn(e, "启动AIO监听[{}]服务时发生异常!", port);
}
}
public void launch(String[] args) {
Injector injector = Guice.createInjector(getServerModule());
try (AsynchronousServerSocketChannel serverSocket = AsynchronousServerSocketChannel.open()
.bind(getSocketAddress(args))) {
LOG.info("Started server socket at " + getSocketAddress(args));
while (true) {
AsynchronousSocketChannel socketChannel = serverSocket.accept().get();
InputStream in = Channels.newInputStream(socketChannel);
OutputStream out = Channels.newOutputStream(socketChannel);
PrintWriter trace = getTrace(args);
boolean validate = shouldValidate(args);
LanguageServerImpl languageServer = injector.getInstance(LanguageServerImpl.class);
LOG
.info("Starting Xtext Language Server for client " + socketChannel.getRemoteAddress());
Launcher<LanguageClient> launcher = Launcher.createLauncher(languageServer, LanguageClient.class, in,
out, validate, trace);
languageServer.connect(launcher.getRemoteProxy());
launcher.startListening();
LOG.info("Xtext Language Server has been started.");
}
} catch (Throwable t) {
t.printStackTrace();
}
}
@Override
public void startService() {
super.startService();
try {
//启动acceptor,开始接受连接
acceptHandler = new RpcAcceptCompletionHandler();
acceptHandler.startService();
channelGroup = AsynchronousChannelGroup.withThreadPool(Executors.newFixedThreadPool(channelGroupThreads));
serverChannel = AsynchronousServerSocketChannel.open(channelGroup).bind(new InetSocketAddress(this.getHost(), this.getPort()));
serverChannel.accept(this, acceptHandler);
this.startListeners();
this.fireStartNetListeners();
} catch (IOException e) {
throw new RpcException(e);
}
}
public synchronized void startServer(SocketAddress addr, Object attachment, AsynchronousChannelGroup group)
{
stopServer();
try
{
_acceptor = AsynchronousServerSocketChannel.open(group);
int backlog = onAcceptorCreated(_acceptor, attachment);
if (backlog >= 0)
{
_acceptor.bind(addr, backlog);
beginAccept();
return;
}
}
catch (Throwable e)
{
doException(null, e);
}
stopServer();
}
/**
* Initialize the endpoint.
*/
@Override
public void bind() throws Exception {
// Create worker collection
if (getExecutor() == null) {
createExecutor();
}
if (getExecutor() instanceof ExecutorService) {
threadGroup = AsynchronousChannelGroup.withThreadPool((ExecutorService) getExecutor());
}
// AsynchronousChannelGroup currently needs exclusive access to its executor service
if (!internalExecutor) {
log.warn(sm.getString("endpoint.nio2.exclusiveExecutor"));
}
serverSock = AsynchronousServerSocketChannel.open(threadGroup);
socketProperties.setProperties(serverSock);
InetSocketAddress addr = (getAddress()!=null?new InetSocketAddress(getAddress(),getPort()):new InetSocketAddress(getPort()));
serverSock.bind(addr, getAcceptCount());
// Initialize thread count defaults for acceptor, poller
if (acceptorThreadCount != 1) {
// NIO2 does not allow any form of IO concurrency
acceptorThreadCount = 1;
}
// Initialize SSL if needed
initialiseSsl();
}
Server(int recvBufSize) throws IOException {
ssc = AsynchronousServerSocketChannel.open(GROUP);
if (recvBufSize > 0) {
ssc.setOption(SO_RCVBUF, recvBufSize);
}
ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(),
0));
address = (InetSocketAddress)ssc.getLocalAddress();
}
/**
* 启动服务
*
* @param address 地址
* @throws IOException io异常时抛出
*/
@SuppressWarnings("squid:S2095")
public void start(InetSocketAddress address) throws IOException {
groupContext.setStopped(false);
AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup
.withThreadPool(groupContext.getGroupExecutor());
groupContext.setChannelGroup(channelGroup);
serverAddress = address;
serverChannel = AsynchronousServerSocketChannel.open(channelGroup)
.setOption(StandardSocketOptions.SO_REUSEADDR, true).bind(address);
serverChannel.accept(this, new AcceptHandler());
}
public static void main(final String[] args) throws IOException {
final AsynchronousServerSocketChannel server =
AsynchronousServerSocketChannel.open().bind(null);
final SocketAddress addr = server.getLocalAddress();
// on the client side, concurrently connect to addr 4 times, and write 100 random integers on
// each connection
final CompletionStage<Void> writeStage = Combinators.allOf(IntStream
.range(0, 4)
.mapToObj(i -> connect(addr)
.thenComposeAsync(channel -> Iteration.write100Randoms(channel)))
.collect(Collectors.toList()))
.thenApply(ig -> null);
// on the server side, we'd like to accept 4 connections and route their messages into a single
// place we can consume
final AsyncIterator<AsynchronousSocketChannel> clientConnections = AsyncIterator
// listen for next connection
.generate(() -> accept(server))
// only will take 4 connections
.take(4);
final AsyncIterator<Integer> results = routeClientMessages(clientConnections);
// do something with the results! - print each result as it comes from each client
final CompletionStage<Void> printStage = results.forEach(i -> System.out.println(i));
// wait for both the clients and the server/printing to complete
writeStage.thenAcceptBoth(printStage, (ig1, ig2) -> {
System.out.println("completed successfully");
});
}
public AsyncTimeServerHandler(int port) {
this.port = port;
try {
this.asyncServerSocketChannel = AsynchronousServerSocketChannel.open();
this.asyncServerSocketChannel.bind(new InetSocketAddress(this.port));
System.out.println("The time server is start in port:" + port);
} catch (IOException e) {
e.printStackTrace();
}
}
Server(int recvBufSize) throws IOException {
ssc = AsynchronousServerSocketChannel.open(GROUP);
if (recvBufSize > 0) {
ssc.setOption(SO_RCVBUF, recvBufSize);
}
ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(),
0));
address = (InetSocketAddress)ssc.getLocalAddress();
}
/**
*
* @param exc
* @param tioServer
* @author tanyaowu
*/
@Override
public void failed(Throwable exc, TioServer tioServer) {
AsynchronousServerSocketChannel serverSocketChannel = tioServer.getServerSocketChannel();
serverSocketChannel.accept(tioServer, this);
log.error("[" + tioServer.getServerNode() + "]监听出现异常", exc);
}
private boolean startServer() throws Exception
{
final AsynchronousServerSocketChannel server_channel = AsynchronousServerSocketChannel.open();
server_channel.setOption(StandardSocketOptions.SO_REUSEADDR, Boolean.TRUE);
try
{
server_channel.bind(address);
}
catch (BindException ex)
{
// Address in use, there is already a server
return false;
}
client_handler = new CompletionHandler<>()
{
@Override
public void completed(final AsynchronousSocketChannel client_channel, Void Null)
{
// Start thread to handle this client..
handleClient(client_channel);
// Accept another client
server_channel.accept(null, client_handler);
}
@Override
public void failed(final Throwable ex, Void Null)
{
logger.log(Level.WARNING, "Application server connection error", ex);
}
};
// Accept initial client
logger.log(Level.INFO, "Listening for arguments on TCP " + address.getPort());
server_channel.accept(null, client_handler);
return true;
}
public AIOAcceptor(String name, String ip, int port,
FrontendConnectionFactory factory, AsynchronousChannelGroup group)
throws IOException {
this.name = name;
this.port = port;
this.factory = factory;
serverChannel = AsynchronousServerSocketChannel.open(group);
/** 设置TCP属性 */
serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 1024 * 16 * 2);
// backlog=100
serverChannel.bind(new InetSocketAddress(ip, port), 100);
}
/**
* @see java.nio.channels.CompletionHandler#failed(java.lang.Throwable, java.lang.Object)
*
* @param exc
* @param aioServer
* @重写人: tanyaowu
* @重写时间: 2016年11月16日 下午1:28:05
*
*/
@Override
public void failed(Throwable exc, AioServer<SessionContext, P, R> aioServer)
{
AsynchronousServerSocketChannel serverSocketChannel = aioServer.getServerSocketChannel();
serverSocketChannel.accept(aioServer, this);
log.error("[" + aioServer.getServerNode() + "]监听出现异常", exc);
}
Server(int recvBufSize) throws IOException {
ssc = AsynchronousServerSocketChannel.open(GROUP);
if (recvBufSize > 0) {
ssc.setOption(SO_RCVBUF, recvBufSize);
}
ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(),
0));
address = (InetSocketAddress)ssc.getLocalAddress();
}
static <T> Launcher<T> createSocketLauncher(Object localService, Class<T> remoteInterface, SocketAddress socketAddress, ExecutorService executorService, Function<MessageConsumer, MessageConsumer> wrapper) throws IOException {
AsynchronousServerSocketChannel serverSocket = AsynchronousServerSocketChannel.open().bind(socketAddress);
AsynchronousSocketChannel socketChannel;
try {
socketChannel = serverSocket.accept().get();
return Launcher.createIoLauncher(localService, remoteInterface, Channels.newInputStream(socketChannel), Channels.newOutputStream(socketChannel), executorService, wrapper);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return null;
}
public void setUpHandlers() {
try {
AsynchronousChannelGroup asyncChannelGroup = AsynchronousChannelGroup.withFixedThreadPool(poolSize, Executors.defaultThreadFactory());
serverSocketChannel = AsynchronousServerSocketChannel.open(asyncChannelGroup).bind(new InetSocketAddress(listenPort));
serverSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
log.info("** " + poolSize + " handler thread has been setup! **");
log.info("** Socket Server has been startup, listen port is " + listenPort + "! **");
} catch (IOException e) {
log.error("setUpHandlers error: ", e);
}
}
public void setUpHandlers() {
try {
AsynchronousChannelGroup asyncChannelGroup = AsynchronousChannelGroup.withFixedThreadPool(poolSize, Executors.defaultThreadFactory());
serverSocketChannel = AsynchronousServerSocketChannel.open(asyncChannelGroup).bind(new InetSocketAddress(listenPort));
serverSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
} catch (IOException e) {
e.printStackTrace();
}
log.info("** " + poolSize + " handler thread has been setup! **");
log.info("** Socket Server has been startup, listen port is " + listenPort + "! **");
}
public void setUpHandlers() {
try {
AsynchronousChannelGroup asyncChannelGroup = AsynchronousChannelGroup
.withFixedThreadPool(poolSize,Executors.defaultThreadFactory());
serverSocketChannel = AsynchronousServerSocketChannel
.open(asyncChannelGroup).bind(new InetSocketAddress(listenPort));
serverSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
//serverSocketChannel.setOption(StandardSocketOption.TCP_NODELAY, true);
} catch (IOException e) {
e.printStackTrace();
}
log.info("** " + poolSize + " handler thread has been setup! **");
log.info("** Socket Server has been startup, listen port is " + listenPort + "! **");
}
public void start() throws IOException {
serverSocketChannel = AsynchronousServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(0));
port = ((InetSocketAddress) serverSocketChannel.getLocalAddress()).getPort();
LOG.info("Bound listen socket to port {}, waiting for clients...", port);
serverSocketChannel.accept(null, new ServerConnectionHandler());
}
/**
* Run the "master" side of the artifact resolution:
* start a server and write its port to the lock file,
* then download and install the artifact.
* The server is always closed at the end, even on exception.
* @param lockFile The lock file to maintain
* @throws ArtifactException when an artifact exception occurs
* @throws IOException when an IO exception occurs
*/
public void resolve(File lockFile) throws ArtifactException, IOException
{
// open a non-blocking server which will stay alive
// until the artifact download is complete
try (AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel
.open()
// bind to localhost and let it find a random open port
.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0)))
{
InetSocketAddress address = (InetSocketAddress)listener.getLocalAddress();
config.getLog().debug("Using port " + address.getPort() + " for the local server");
// write the port to the lock file
FileUtils.writeStringToFile(
lockFile,
String.valueOf(address.getPort()),
Charset.defaultCharset());
config.getLog().debug("Wrote port " + address.getPort() + " to the lock file");
listener.accept(null, createServerHandler(listener));
config.getLog().debug("Started the server on port " + address.getPort());
File tempFile = downloadArtifact();
config.getArtifactInstaller().installArtifact(artifactReference, tempFile);
}
}
/**
* 停止服务器监听. 但不断开已建立的连接
*/
public synchronized void stopServer()
{
AsynchronousServerSocketChannel acceptor = _acceptor;
if (acceptor != null)
{
_acceptor = null;
closeChannel(acceptor);
}
}