下面列出了怎么用java.nio.channels.AsynchronousChannelGroup的API类实例代码及写法,或者点击链接到github查看源代码。
public synchronized void startServer(SocketAddress addr, Object attachment, AsynchronousChannelGroup group)
{
stopServer();
try
{
_acceptor = AsynchronousServerSocketChannel.open(group);
int backlog = onAcceptorCreated(_acceptor, attachment);
if (backlog >= 0)
{
_acceptor.bind(addr, backlog);
beginAccept();
return;
}
}
catch (Throwable e)
{
doException(null, e);
}
stopServer();
}
/**
* 连接服务端,指定绑定地址
*
* @param remote 远程地址
* @param bind 本机绑定地址
* @return channelContext
* @throws IOException io异常时抛出
*/
public ChannelContext connect(InetSocketAddress remote, InetSocketAddress bind)
throws IOException {
AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup
.withThreadPool(groupContext.getGroupExecutor());
groupContext.setChannelGroup(channelGroup);
AsynchronousSocketChannel channel = IoUtils.create(channelGroup, bind);
groupContext.setAioListener(new ComposeAioListener().add(ClientAioListener.INSTANCE)
.add(groupContext.getAioListener()));
ChannelContext channelContext = new ChannelContext(groupContext, channel, false);
channelContext.setServerAddress(remote);
channel.connect(remote, channelContext, ConnectHandler.INSTANCE);
BeatProcessor beat = new BeatProcessor(channelContext);
ThreadUtils.globalTimer()
.scheduleWithDelay(beat, groupContext.getBeatInterval(), TimeUnit.MILLISECONDS, beat);
return channelContext;
}
public void startClient(SocketAddress addr, Object attachment, AsynchronousChannelGroup group)
{
AsynchronousSocketChannel channel = null;
try
{
channel = AsynchronousSocketChannel.open(group);
int recvBufSize = onChannelCreated(channel, attachment);
if (recvBufSize >= 0)
channel.connect(addr, new ConnectParam(channel, recvBufSize), _connectHandler);
else
channel.close();
}
catch (Throwable e)
{
doException(null, e);
closeChannel(channel);
}
}
@Override
public void startService() {
super.startService();
try {
//启动acceptor,开始接受连接
acceptHandler = new RpcAcceptCompletionHandler();
acceptHandler.startService();
channelGroup = AsynchronousChannelGroup.withThreadPool(Executors.newFixedThreadPool(channelGroupThreads));
serverChannel = AsynchronousServerSocketChannel.open(channelGroup).bind(new InetSocketAddress(this.getHost(), this.getPort()));
serverChannel.accept(this, acceptHandler);
this.startListeners();
this.fireStartNetListeners();
} catch (IOException e) {
throw new RpcException(e);
}
}
public void test(AsynchronousChannelGroup asynchronousChannelGroup, BufferPagePool bufferPagePool, AbstractMessageProcessor<String> processor) throws InterruptedException, ExecutionException, IOException {
AioQuickClient<String> client = new AioQuickClient<>("localhost", 8888, new StringProtocol(), processor);
client.setBufferPagePool(bufferPagePool);
client.setWriteBuffer(1024 * 1024, 10);
AioSession<String> session = client.start(asynchronousChannelGroup);
WriteBuffer outputStream = session.writeBuffer();
byte[] data = "smart-socket".getBytes();
while (true) {
int num = (int) (Math.random() * 10) + 1;
// int num = 4;
outputStream.writeInt(data.length * num);
while (num-- > 0) {
outputStream.write(data);
}
// Thread.sleep(100);
}
}
public void test(AsynchronousChannelGroup asynchronousChannelGroup, BufferPagePool bufferPagePool, AbstractMessageProcessor<String> processor) throws InterruptedException, ExecutionException, IOException {
AioQuickClient<String> client = new AioQuickClient<>("localhost", 8888, new StringProtocol(), processor);
client.setBufferPagePool(bufferPagePool);
client.setWriteBuffer(1024 * 1024, 10);
AioSession<String> session = client.start(asynchronousChannelGroup);
WriteBuffer outputStream = session.writeBuffer();
byte[] data = "smart-socket".getBytes();
while (true) {
int num = (int) (Math.random() * 10) + 1;
// int num = 4;
outputStream.writeInt(data.length * num);
while (num-- > 0) {
outputStream.write(data);
}
// Thread.sleep(100);
}
}
private void initAioProcessor(int processorCount) throws IOException {
for (int i = 0; i < processorCount; i++) {
asyncChannelGroups[i] = AsynchronousChannelGroup.withFixedThreadPool(processorCount,
new ThreadFactory() {
private int inx = 1;
@Override
public Thread newThread(Runnable r) {
Thread th = new Thread(r);
//TODO
th.setName(DirectByteBufferPool.LOCAL_BUF_THREAD_PREX + "AIO" + (inx++));
LOGGER.info("created new AIO thread " + th.getName());
return th;
}
}
);
}
}
@Override
public void listen(int thread, int port, AioServerListener listener) {
this.port = port;
this.listener = listener;
try {
channelGroup = AsynchronousChannelGroup.withFixedThreadPool(thread, Executors.defaultThreadFactory());
serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup);
serverSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
serverSocketChannel.bind(new InetSocketAddress(port));
serverSocketChannel.accept(null, this);
if (logger.isInfoEnable())
logger.info("启动AIO监听[{}]服务。", port);
} catch (IOException e) {
logger.warn(e, "启动AIO监听[{}]服务时发生异常!", port);
}
}
public static AsynchronousChannelGroup register() {
synchronized (lock) {
if (usageCount == 0) {
group = createAsynchronousChannelGroup();
}
usageCount++;
return group;
}
}
private AsynchronousChannelGroup getAsynchronousChannelGroup() {
// Use AsyncChannelGroupUtil to share a common group amongst all
// WebSocket clients
AsynchronousChannelGroup result = asynchronousChannelGroup;
if (result == null) {
synchronized (asynchronousChannelGroupLock) {
if (asynchronousChannelGroup == null) {
asynchronousChannelGroup = AsyncChannelGroupUtil.register();
}
result = asynchronousChannelGroup;
}
}
return result;
}
/**
* Initialize the endpoint.
*/
@Override
public void bind() throws Exception {
// Create worker collection
if (getExecutor() == null) {
createExecutor();
}
if (getExecutor() instanceof ExecutorService) {
threadGroup = AsynchronousChannelGroup.withThreadPool((ExecutorService) getExecutor());
}
// AsynchronousChannelGroup currently needs exclusive access to its executor service
if (!internalExecutor) {
log.warn(sm.getString("endpoint.nio2.exclusiveExecutor"));
}
serverSock = AsynchronousServerSocketChannel.open(threadGroup);
socketProperties.setProperties(serverSock);
InetSocketAddress addr = (getAddress()!=null?new InetSocketAddress(getAddress(),getPort()):new InetSocketAddress(getPort()));
serverSock.bind(addr, getAcceptCount());
// Initialize thread count defaults for acceptor, poller
if (acceptorThreadCount != 1) {
// NIO2 does not allow any form of IO concurrency
acceptorThreadCount = 1;
}
// Initialize SSL if needed
initialiseSsl();
}
public static void main(String[] args) throws Exception {
// create channel groups
ThreadFactory factory = new PrivilegedThreadFactory();
AsynchronousChannelGroup group1 = AsynchronousChannelGroup
.withFixedThreadPool(5, factory);
AsynchronousChannelGroup group2 = AsynchronousChannelGroup
.withCachedThreadPool(Executors.newCachedThreadPool(factory), 0);
AsynchronousChannelGroup group3 = AsynchronousChannelGroup
.withThreadPool(Executors.newFixedThreadPool(10, factory));
try {
// execute simple tasks
testSimpleTask(group1);
testSimpleTask(group2);
testSimpleTask(group3);
// install security manager and test again
System.setSecurityManager( new SecurityManager() );
testSimpleTask(group1);
testSimpleTask(group2);
testSimpleTask(group3);
// attempt to execute tasks that run with only frames from boot
// class loader on the stack.
testAttackingTask(group1);
testAttackingTask(group2);
testAttackingTask(group3);
} finally {
group1.shutdown();
group2.shutdown();
group3.shutdown();
}
}
static void testSimpleTask(AsynchronousChannelGroup group) throws Exception {
Executor executor = (Executor)group;
final CountDownLatch latch = new CountDownLatch(1);
executor.execute(new Runnable() {
public void run() {
latch.countDown();
}
});
latch.await();
}
static void testAttackingTask(AsynchronousChannelGroup group) throws Exception {
Executor executor = (Executor)group;
Attack task = new Attack();
executor.execute(task);
task.waitUntilDone();
if (!task.failedDueToSecurityException())
throw new RuntimeException("SecurityException expected");
}
public static void main(String[] args) throws Exception {
// create channel groups
ThreadFactory factory = new PrivilegedThreadFactory();
AsynchronousChannelGroup group1 = AsynchronousChannelGroup
.withFixedThreadPool(5, factory);
AsynchronousChannelGroup group2 = AsynchronousChannelGroup
.withCachedThreadPool(Executors.newCachedThreadPool(factory), 0);
AsynchronousChannelGroup group3 = AsynchronousChannelGroup
.withThreadPool(Executors.newFixedThreadPool(10, factory));
try {
// execute simple tasks
testSimpleTask(group1);
testSimpleTask(group2);
testSimpleTask(group3);
// install security manager and test again
System.setSecurityManager( new SecurityManager() );
testSimpleTask(group1);
testSimpleTask(group2);
testSimpleTask(group3);
// attempt to execute tasks that run with only frames from boot
// class loader on the stack.
testAttackingTask(group1);
testAttackingTask(group2);
testAttackingTask(group3);
} finally {
group1.shutdown();
group2.shutdown();
group3.shutdown();
}
}
static void testSimpleTask(AsynchronousChannelGroup group) throws Exception {
Executor executor = (Executor)group;
final CountDownLatch latch = new CountDownLatch(1);
executor.execute(new Runnable() {
public void run() {
latch.countDown();
}
});
latch.await();
}
static void testAttackingTask(AsynchronousChannelGroup group) throws Exception {
Executor executor = (Executor)group;
Attack task = new Attack();
executor.execute(task);
task.waitUntilDone();
if (!task.failedDueToSecurityException())
throw new RuntimeException("SecurityException expected");
}
public static void main(String[] args) throws Exception {
// create channel groups
ThreadFactory factory = new PrivilegedThreadFactory();
AsynchronousChannelGroup group1 = AsynchronousChannelGroup
.withFixedThreadPool(5, factory);
AsynchronousChannelGroup group2 = AsynchronousChannelGroup
.withCachedThreadPool(Executors.newCachedThreadPool(factory), 0);
AsynchronousChannelGroup group3 = AsynchronousChannelGroup
.withThreadPool(Executors.newFixedThreadPool(10, factory));
try {
// execute simple tasks
testSimpleTask(group1);
testSimpleTask(group2);
testSimpleTask(group3);
// install security manager and test again
System.setSecurityManager( new SecurityManager() );
testSimpleTask(group1);
testSimpleTask(group2);
testSimpleTask(group3);
// attempt to execute tasks that run with only frames from boot
// class loader on the stack.
testAttackingTask(group1);
testAttackingTask(group2);
testAttackingTask(group3);
} finally {
group1.shutdown();
group2.shutdown();
group3.shutdown();
}
}
static void testSimpleTask(AsynchronousChannelGroup group) throws Exception {
Executor executor = (Executor)group;
final CountDownLatch latch = new CountDownLatch(1);
executor.execute(new Runnable() {
public void run() {
latch.countDown();
}
});
latch.await();
}
static void testAttackingTask(AsynchronousChannelGroup group) throws Exception {
Executor executor = (Executor)group;
Attack task = new Attack();
executor.execute(task);
task.waitUntilDone();
if (!task.failedDueToSecurityException())
throw new RuntimeException("SecurityException expected");
}
/***************************************
* Returns the {@link AsynchronousChannelGroup} for asynchronous channel
* operations in the current scope. If no such group exists a new one will
* be created with the {@link ExecutorService} of the {@link
* CoroutineContext} and stored as {@link #CHANNEL_GROUP} in the current
* scope.
*
* @param rContinuation The channel group
*
* @return The channel group
*/
protected AsynchronousChannelGroup getChannelGroup(
Continuation<?> rContinuation)
{
AsynchronousChannelGroup rChannelGroup =
rContinuation.getState(CHANNEL_GROUP);
if (rChannelGroup == null)
{
Executor rContextExecutor = rContinuation.context().getExecutor();
if (rContextExecutor instanceof ExecutorService)
{
try
{
rChannelGroup =
AsynchronousChannelGroup.withThreadPool(
(ExecutorService) rContextExecutor);
}
catch (IOException e)
{
throw new CoroutineException(e);
}
rContinuation.scope().set(CHANNEL_GROUP, rChannelGroup);
}
}
return rChannelGroup;
}
/**
* 启动服务
*
* @param address 地址
* @throws IOException io异常时抛出
*/
@SuppressWarnings("squid:S2095")
public void start(InetSocketAddress address) throws IOException {
groupContext.setStopped(false);
AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup
.withThreadPool(groupContext.getGroupExecutor());
groupContext.setChannelGroup(channelGroup);
serverAddress = address;
serverChannel = AsynchronousServerSocketChannel.open(channelGroup)
.setOption(StandardSocketOptions.SO_REUSEADDR, true).bind(address);
serverChannel.accept(this, new AcceptHandler());
}
public static void main(String[] args) throws Exception {
// create channel groups
ThreadFactory factory = new PrivilegedThreadFactory();
AsynchronousChannelGroup group1 = AsynchronousChannelGroup
.withFixedThreadPool(5, factory);
AsynchronousChannelGroup group2 = AsynchronousChannelGroup
.withCachedThreadPool(Executors.newCachedThreadPool(factory), 0);
AsynchronousChannelGroup group3 = AsynchronousChannelGroup
.withThreadPool(Executors.newFixedThreadPool(10, factory));
try {
// execute simple tasks
testSimpleTask(group1);
testSimpleTask(group2);
testSimpleTask(group3);
// install security manager and test again
System.setSecurityManager( new SecurityManager() );
testSimpleTask(group1);
testSimpleTask(group2);
testSimpleTask(group3);
// attempt to execute tasks that run with only frames from boot
// class loader on the stack.
testAttackingTask(group1);
testAttackingTask(group2);
testAttackingTask(group3);
} finally {
group1.shutdown();
group2.shutdown();
group3.shutdown();
}
}
static void testSimpleTask(AsynchronousChannelGroup group) throws Exception {
Executor executor = (Executor)group;
final CountDownLatch latch = new CountDownLatch(1);
executor.execute(new Runnable() {
public void run() {
latch.countDown();
}
});
latch.await();
}
public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withFixedThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r);
}
});
StringProtocol protocol = new StringProtocol();
PushClientProcessorMessage clientProcessorMessage = new PushClientProcessorMessage();
AioQuickClient<String>[] clients = new AioQuickClient[4];
for (int i = 0; i < clients.length; i++) {
clients[i] = new AioQuickClient<>("localhost", 8080, protocol, clientProcessorMessage);
clients[i].start(channelGroup);
}
}
public static AsynchronousChannelGroup register() {
synchronized (lock) {
if (usageCount == 0) {
group = createAsynchronousChannelGroup();
}
usageCount++;
return group;
}
}
private static AsynchronousChannelGroup createAsynchronousChannelGroup() {
// Need to do this with the right thread context class loader else the
// first web app to call this will trigger a leak
ClassLoader original = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(
AsyncIOThreadFactory.class.getClassLoader());
// These are the same settings as the default
// AsynchronousChannelGroup
int initialSize = Runtime.getRuntime().availableProcessors();
ExecutorService executorService = new ThreadPoolExecutor(
0,
Integer.MAX_VALUE,
Long.MAX_VALUE, TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>(),
new AsyncIOThreadFactory());
try {
return AsynchronousChannelGroup.withCachedThreadPool(
executorService, initialSize);
} catch (IOException e) {
// No good reason for this to happen.
throw new IllegalStateException(sm.getString("asyncChannelGroup.createFail"));
}
} finally {
Thread.currentThread().setContextClassLoader(original);
}
}
private AsynchronousChannelGroup getAsynchronousChannelGroup() {
// Use AsyncChannelGroupUtil to share a common group amongst all
// WebSocket clients
AsynchronousChannelGroup result = asynchronousChannelGroup;
if (result == null) {
synchronized (asynchronousChannelGroupLock) {
if (asynchronousChannelGroup == null) {
asynchronousChannelGroup = AsyncChannelGroupUtil.register();
}
result = asynchronousChannelGroup;
}
}
return result;
}
public static void main(String[] args) throws Exception {
// create channel groups
ThreadFactory factory = new PrivilegedThreadFactory();
AsynchronousChannelGroup group1 = AsynchronousChannelGroup
.withFixedThreadPool(5, factory);
AsynchronousChannelGroup group2 = AsynchronousChannelGroup
.withCachedThreadPool(Executors.newCachedThreadPool(factory), 0);
AsynchronousChannelGroup group3 = AsynchronousChannelGroup
.withThreadPool(Executors.newFixedThreadPool(10, factory));
try {
// execute simple tasks
testSimpleTask(group1);
testSimpleTask(group2);
testSimpleTask(group3);
// install security manager and test again
System.setSecurityManager( new SecurityManager() );
testSimpleTask(group1);
testSimpleTask(group2);
testSimpleTask(group3);
// attempt to execute tasks that run with only frames from boot
// class loader on the stack.
testAttackingTask(group1);
testAttackingTask(group2);
testAttackingTask(group3);
} finally {
group1.shutdown();
group2.shutdown();
group3.shutdown();
}
}
static void testSimpleTask(AsynchronousChannelGroup group) throws Exception {
Executor executor = (Executor)group;
final CountDownLatch latch = new CountDownLatch(1);
executor.execute(new Runnable() {
public void run() {
latch.countDown();
}
});
latch.await();
}