下面列出了怎么用io.netty.channel.group.ChannelGroupFuture的API类实例代码及写法,或者点击链接到github查看源代码。
private void closeAllChannels(boolean graceful) {
ChannelGroupFuture future = allChannels.close();
// if this is a graceful shutdown, log any channel closing failures. if this isn't a
// graceful shutdown, ignore them.
if (graceful) {
try {
future.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (!future.isSuccess()) {
for (ChannelFuture cf : future) {
if (!cf.isSuccess()) {
logger.info("Unable to close channel: " + cf.channel(), cf.cause());
}
}
}
}
}
/**
* Closes all channels opened by this proxy server.
*
* @param graceful when false, attempts to shutdown all channels immediately and ignores any channel-closing exceptions
*/
protected void closeAllChannels(boolean graceful) {
LOG.info("Closing all channels " + (graceful ? "(graceful)" : "(non-graceful)"));
ChannelGroupFuture future = allChannels.close();
// if this is a graceful shutdown, log any channel closing failures. if this isn't a graceful shutdown, ignore them.
if (graceful) {
try {
future.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("Interrupted while waiting for channels to shut down gracefully.");
}
if (!future.isSuccess()) {
for (ChannelFuture cf : future) {
if (!cf.isSuccess()) {
LOG.info("Unable to close channel. Cause of failure for {} is {}", cf.channel(), cf.cause());
}
}
}
}
}
/**
* Closes all channels and releases all resources.
*/
@Override
public void close() {
LOG.info("Stopping listening at {} and closing", serverListeningChannel.localAddress());
final ChannelFuture closeListeningChannelFuture = serverListeningChannel.close();
final ChannelGroupFuture channelGroupCloseFuture = channelGroup.close();
final Future serverListeningGroupCloseFuture = serverListeningGroup.shutdownGracefully();
final Future serverWorkingGroupCloseFuture = serverWorkingGroup.shutdownGracefully();
final Future clientGroupCloseFuture = clientGroup.shutdownGracefully();
closeListeningChannelFuture.awaitUninterruptibly();
channelGroupCloseFuture.awaitUninterruptibly();
serverListeningGroupCloseFuture.awaitUninterruptibly();
serverWorkingGroupCloseFuture.awaitUninterruptibly();
clientGroupCloseFuture.awaitUninterruptibly();
}
public void close() {
if (workerGroup != null) {
log.info("stopping netty server");
bossGroup.shutdownGracefully();
bossGroup = null;
ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
allChannels.add(serverChannel);
for (Channel ch : conns.values()) {
allChannels.add(ch);
}
ChannelGroupFuture future = allChannels.close();
future.awaitUninterruptibly();
workerGroup.shutdownGracefully();
workerGroup = null;
log.info("netty server stopped");
}
}
public void close() {
if (workerGroup != null) {
log.info("stopping netty client");
timer.stop();
timer = null;
ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
for (Object ch : conns.values()) {
if (ch != null && ch != dummyChannel)
allChannels.add((Channel) ch);
}
ChannelGroupFuture future = allChannels.close();
future.awaitUninterruptibly();
workerGroup.shutdownGracefully();
workerGroup = null;
log.info("netty client stopped");
}
}
public void close() {
if (workerGroup != null) {
log.info("stopping netty server");
bossGroup.shutdownGracefully();
bossGroup = null;
ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
allChannels.add(serverChannel);
for (Channel ch : conns.values()) {
allChannels.add(ch);
}
ChannelGroupFuture future = allChannels.close();
future.awaitUninterruptibly();
workerGroup.shutdownGracefully();
workerGroup = null;
log.info("netty server stopped");
}
}
public void close() {
if (workerGroup != null) {
log.info("stopping selfcheck server");
bossGroup.shutdownGracefully();
bossGroup = null;
ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
allChannels.add(serverChannel);
for (Channel ch : conns.values()) {
allChannels.add(ch);
}
ChannelGroupFuture future = allChannels.close();
future.awaitUninterruptibly();
workerGroup.shutdownGracefully();
workerGroup = null;
log.info("selfcheck server stopped");
}
}
public void close() {
if (workerGroup != null) {
log.info("cat stopping netty client");
timer.cancel();
timer = null;
ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
for (Object ch : conns.values()) {
if (ch != null && ch != dummyChannel)
allChannels.add((Channel) ch);
}
ChannelGroupFuture future = allChannels.close();
future.awaitUninterruptibly();
workerGroup.shutdownGracefully();
workerGroup = null;
log.info("cat netty client stopped");
}
}
@Override
protected void doStop() {
try {
ChannelGroupFuture f = allChannels.close();
f.addListener(new ChannelGroupFutureListener() {
@Override
public void operationComplete(ChannelGroupFuture future) throws Exception {
if (future.isSuccess()) {
notifyStopped();
} else {
notifyFailed(future.cause());
}
}
});
} catch (Throwable t) {
notifyFailed(t);
Throwables.propagate(t);
}
}
private void closeAllChannels(boolean graceful) {
ChannelGroupFuture future = allChannels.close();
// if this is a graceful shutdown, log any channel closing failures. if this isn't a
// graceful shutdown, ignore them.
if (graceful) {
try {
future.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (!future.isSuccess()) {
for (ChannelFuture cf : future) {
if (!cf.isSuccess()) {
logger.info("Unable to close channel: " + cf.channel(), cf.cause());
}
}
}
}
}
/**
* Closes all channels and releases all resources.
*/
@Override
public void close() {
LOG.info("Stopping listening at {} and closing", serverListeningChannel.localAddress());
final ChannelFuture closeListeningChannelFuture = serverListeningChannel.close();
final ChannelGroupFuture channelGroupCloseFuture = channelGroup.close();
final Future serverListeningGroupCloseFuture = serverListeningGroup.shutdownGracefully();
final Future serverWorkingGroupCloseFuture = serverWorkingGroup.shutdownGracefully();
final Future clientGroupCloseFuture = clientGroup.shutdownGracefully();
closeListeningChannelFuture.awaitUninterruptibly();
channelGroupCloseFuture.awaitUninterruptibly();
serverListeningGroupCloseFuture.awaitUninterruptibly();
serverWorkingGroupCloseFuture.awaitUninterruptibly();
clientGroupCloseFuture.awaitUninterruptibly();
}
/**
* Stop proxy server
* */
public void stop() {
ChannelGroupFuture future = _allChannels.close().awaitUninterruptibly();
if (!future.isSuccess()) {
final Iterator<ChannelFuture> iter = future.iterator();
while (iter.hasNext()) {
final ChannelFuture cf = iter.next();
if (!cf.isSuccess()) {
LOG.warn(String.format("Failed to close channel %s because %s", cf.channel(), cf.cause()));
}
}
}
_acceptorGroup.shutdownGracefully();
_upstreamWorkerGroup.shutdownGracefully();
_downstreamWorkerGroup.shutdownGracefully();
}
@Override
public synchronized void pause() {
if (paused) {
return;
}
if (channelClazz == null) {
return;
}
// We *pause* the acceptor so no new connections are made
if (serverChannelGroup != null) {
ChannelGroupFuture future = serverChannelGroup.close().awaitUninterruptibly();
if (!future.isSuccess()) {
ActiveMQServerLogger.LOGGER.nettyChannelGroupBindError();
for (Channel channel : future.group()) {
if (channel.isActive()) {
ActiveMQServerLogger.LOGGER.nettyChannelStillBound(channel, channel.remoteAddress());
}
}
}
}
paused = true;
}
public static CompletableFuture<Void> completable(ChannelGroupFuture future) {
CompletableFuture<Void> cf = new CompletableFuture<>();
future.addListener(
(ChannelGroupFutureListener)
future1 -> {
if (future1.isSuccess()) {
cf.complete(null);
} else {
cf.completeExceptionally(future1.cause());
}
});
return cf;
}
/**
* 在这里接收客户端的消息 在客户端和代理服务器建立连接时,也获得了代理服务器和目标服务器的通道outbound,
* 通过outbound写入消息到目标服务器
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead0(final ChannelHandlerContext ctx, byte[] msg) throws Exception {
log.info("客户端消息");
allChannels.writeAndFlush(msg).addListener(new ChannelGroupFutureListener() {
@Override
public void operationComplete(ChannelGroupFuture future) throws Exception {
//防止出现发送不成功造成的永久不读取消息的错误.
ctx.channel().read();
}
});
}
@Override
public void reload() {
ChannelGroupFuture future = serverChannelGroup.disconnect();
try {
future.awaitUninterruptibly();
} catch (Exception ignored) {
}
serverChannelGroup.clear();
startServerChannels();
}
public RFuture<Void> shutdownAsync() {
RPromise<Void> result = new RedissonPromise<Void>();
if (channels.isEmpty()) {
shutdown(result);
return result;
}
ChannelGroupFuture channelsFuture = channels.newCloseFuture();
channelsFuture.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
result.tryFailure(future.cause());
return;
}
shutdown(result);
}
});
for (Channel channel : channels) {
RedisConnection connection = RedisConnection.getFrom(channel);
if (connection != null) {
connection.closeAsync();
}
}
return result;
}
/**
* 关闭所有channel
*
* @return
*/
public ChannelGroupFuture closeAllChannels() {
if (channels != null && channels.size() > 0) {
return channels.close();
}
return null;
}
/**
* Closes all channels and releases all resources.
*/
@Override
public void close() {
LOG.log(Level.FINE, "Closing netty transport socket address: {0}", this.localAddress);
final ChannelGroupFuture clientChannelGroupFuture = this.clientChannelGroup.close();
final ChannelGroupFuture serverChannelGroupFuture = this.serverChannelGroup.close();
final ChannelFuture acceptorFuture = this.acceptor.close();
final ArrayList<Future> eventLoopGroupFutures = new ArrayList<>(3);
eventLoopGroupFutures.add(this.clientWorkerGroup.shutdownGracefully());
eventLoopGroupFutures.add(this.serverBossGroup.shutdownGracefully());
eventLoopGroupFutures.add(this.serverWorkerGroup.shutdownGracefully());
clientChannelGroupFuture.awaitUninterruptibly();
serverChannelGroupFuture.awaitUninterruptibly();
try {
acceptorFuture.sync();
} catch (final Exception ex) {
LOG.log(Level.SEVERE, "Error closing the acceptor channel for " + this.localAddress, ex);
}
for (final Future eventLoopGroupFuture : eventLoopGroupFutures) {
eventLoopGroupFuture.awaitUninterruptibly();
}
LOG.log(Level.FINE, "Closing netty transport socket address: {0} done", this.localAddress);
}
public ChannelGroupFuture writeGroup(OutMessage message) {
message.setH(handlerName);
if (chanelGroup != null) {
return chanelGroup.write(message);
}
return null;
}
@Override
public synchronized void asyncStop(Runnable callback) {
if (channelClazz == null) {
callback.run();
return;
}
if (protocolHandler != null) {
protocolHandler.close();
}
if (batchFlusherFuture != null) {
batchFlusherFuture.cancel(false);
flusher.cancel();
flusher = null;
batchFlusherFuture = null;
}
// serverChannelGroup has been unbound in pause()
if (serverChannelGroup != null) {
serverChannelGroup.close().awaitUninterruptibly();
}
if (channelGroup != null) {
ChannelGroupFuture future = channelGroup.close().awaitUninterruptibly();
if (!future.isSuccess()) {
ActiveMQServerLogger.LOGGER.nettyChannelGroupError();
for (Channel channel : future.group()) {
if (channel.isActive()) {
ActiveMQServerLogger.LOGGER.nettyChannelStillOpen(channel, channel.remoteAddress());
}
}
}
}
channelClazz = null;
for (Connection connection : connections.values()) {
listener.connectionDestroyed(connection.getID());
}
connections.clear();
if (notificationService != null) {
TypedProperties props = new TypedProperties();
props.putSimpleStringProperty(new SimpleString("factory"), new SimpleString(NettyAcceptorFactory.class.getName()));
props.putSimpleStringProperty(new SimpleString("host"), new SimpleString(host));
props.putIntProperty(new SimpleString("port"), port);
Notification notification = new Notification(null, CoreNotificationType.ACCEPTOR_STOPPED, props);
try {
notificationService.sendNotification(notification);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.failedToSendNotification(e);
}
}
paused = false;
// Shutdown the EventLoopGroup if no new task was added for 100ms or if
// 3000ms elapsed.
eventLoopGroup.shutdownGracefully(quietPeriod, shutdownTimeout, TimeUnit.MILLISECONDS).addListener(f -> callback.run());
eventLoopGroup = null;
}