下面列出了io.netty.channel.ChannelFuture#get ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void keepAliveEnforcer_sendingDataResetsCounters() throws Exception {
permitKeepAliveWithoutCalls = false;
permitKeepAliveTimeInNanos = TimeUnit.HOURS.toNanos(1);
manualSetUp();
createStream();
Http2Headers headers = Utils.convertServerHeaders(new Metadata());
ChannelFuture future = enqueue(
SendResponseHeadersCommand.createHeaders(stream.transportState(), headers));
future.get();
for (int i = 0; i < 10; i++) {
future = enqueue(
new SendGrpcFrameCommand(stream.transportState(), content().retainedSlice(), false));
future.get();
channel().releaseOutbound();
channelRead(pingFrame(false /* isAck */, 1L));
}
verifyWrite(never()).writeGoAway(eq(ctx()), eq(STREAM_ID),
eq(Http2Error.ENHANCE_YOUR_CALM.code()), any(ByteBuf.class), any(ChannelPromise.class));
}
protected synchronized void handleOperationComplete ( final SettableFuture<Void> result, final ChannelFuture future )
{
if ( this.connectFuture != result )
{
// this should never happen
return;
}
this.connectFuture = null;
try
{
future.get ();
this.channel = future.channel ();
fireConnected ( this.channel );
result.set ( null );
}
catch ( final InterruptedException | ExecutionException e )
{
fireDisconnected ( e );
result.setException ( e );
}
}
private void closeAllConnection() {
if (source == 1) {
sendMsg(1, "断开所有连接....");
}
LOGGER.info("断开所有连接....");
Session[] sessions = SessionManager.getInstance().sessionArray();
for (Session session : sessions) {
session.close();
ChannelFuture future = session.getChannel().close();
try {
future.get(1000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
LOGGER.error(session + ",停服关闭连接失败");
}
// MessageRouter.closeSession(session);
}
}
public void instantiateServiceInstance() throws ExecutionException, InterruptedException {
final RpcProviderService rpcRegistry = this.dependenciesProvider.getRpcProviderRegistry();
this.element = requireNonNull(rpcRegistry
.registerRpcImplementation(NetworkTopologyPcepService.class, new TopologyRPCs(this.manager),
Collections.singleton(this.configDependencies.getTopology())));
this.network = requireNonNull(rpcRegistry
.registerRpcImplementation(NetworkTopologyPcepProgrammingService.class,
new TopologyProgramming(this.scheduler, this.manager),
Collections.singleton(this.configDependencies.getTopology())));
this.manager.instantiateServiceInstance();
final ChannelFuture channelFuture = this.dependenciesProvider.getPCEPDispatcher()
.createServer(this.manager.getPCEPDispatcherDependencies());
channelFuture.get();
this.channel = channelFuture.channel();
}
public void stopServer() throws InterruptedException {
try {
ChannelFuture channelCloseFuture = channelFuture.channel().closeFuture();
channelCloseFuture.get(1000, TimeUnit.MILLISECONDS);
if (!channelCloseFuture.isDone()) {
channelCloseFuture.channel().unsafe().closeForcibly();
}
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
// Wait until all threads are terminated.
bossGroup.terminationFuture().sync();
workerGroup.terminationFuture().sync();
} catch (ExecutionException | TimeoutException e) {
//ignore
}
}
@Test
public void keepAliveEnforcer_sendingDataResetsCounters() throws Exception {
permitKeepAliveWithoutCalls = false;
permitKeepAliveTimeInNanos = TimeUnit.HOURS.toNanos(1);
manualSetUp();
createStream();
Http2Headers headers = Utils.convertServerHeaders(new Metadata());
ChannelFuture future = enqueue(
SendResponseHeadersCommand.createHeaders(stream.transportState(), headers));
future.get();
for (int i = 0; i < 10; i++) {
future = enqueue(
new SendGrpcFrameCommand(stream.transportState(), content().retainedSlice(), false));
future.get();
channel().releaseOutbound();
channelRead(pingFrame(false /* isAck */, 1L));
}
verifyWrite(never()).writeGoAway(eq(ctx()), eq(STREAM_ID),
eq(Http2Error.ENHANCE_YOUR_CALM.code()), any(ByteBuf.class), any(ChannelPromise.class));
}
private void start() throws Exception {
Bootstrap bootstrap = new Bootstrap();
NioEventLoopGroup group = new NioEventLoopGroup();
bootstrap.group(group);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
// ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
ch.pipeline().addLast(new Spliter());
ch.pipeline().addLast(new PushMessageDecoder());
ch.pipeline().addLast(new PushAckEncoder());
ch.pipeline().addLast(PushMessageHandler.INSTANCE);
}
});
while (!Thread.interrupted()) {
ChannelFuture channelFuture = bootstrap.connect(HOST, PORT);
channelFuture.get();
if (!channelFuture.isSuccess()) {
logger.error("something has wrong, bye!");
break;
}
Thread.sleep(50);
// bootstrap.connect(HOST, PORT).addListener((ChannelFutureListener) future -> {
// if (future.isSuccess()) {
// logger.info("success connected to server!");
// } else {
// logger.error("fail to connect to server!");
// }
// });
}
}
@Test
public void testCloseConnection() throws Exception {
// Validate that when a stub dictates to close a connection it does so and does not close the
// NodeSpec's channel so it can remain accepting traffic.
NodeSpec node = NodeSpec.builder().build();
BoundNode boundNode = localServer.register(node);
stubCloseOnStartup(Scope.CONNECTION);
try (MockClient client = new MockClient(eventLoop)) {
client.connect(boundNode.getAddress());
// Sending a write should cause the connection to close.
ChannelFuture f = client.write(new Startup());
// Future should be successful since write was successful.
f.get(5, TimeUnit.SECONDS);
// Next write should fail because the channel was closed.
f = client.write(Options.INSTANCE);
try {
f.get();
} catch (ExecutionException e) {
assertThat(e.getCause()).isInstanceOf(ClosedChannelException.class);
} finally {
assertThat(client.channel.isOpen()).isFalse();
// node should still accept connections.
assertThat(boundNode.channel.get().isOpen()).isTrue();
}
}
}
public void closeAndWait () throws Exception
{
ChannelFuture channelFuture = this.channel.close ();
channelFuture.get ();
for ( final ServerModule module : this.modules )
{
module.dispose ();
}
Future<?> bossGroupFuture = this.bossGroup.shutdownGracefully ();
bossGroupFuture.get ();
Future<?> workerGroupFuture = this.workerGroup.shutdownGracefully ();
workerGroupFuture.get ();
}
@Override
public void operationComplete(ChannelFuture future) throws Exception {
boolean isInterrupted = false;
// We want to wait for at least 120 secs when interrupts occur. Establishing a connection fails/succeeds quickly,
// So there is no point propagating the interruption as failure immediately.
long remainingWaitTimeMills = 120000;
long startTime = System.currentTimeMillis();
// logger.debug("Connection operation finished. Success: {}", future.isSuccess());
while (true) {
try {
future.get(remainingWaitTimeMills, TimeUnit.MILLISECONDS);
if (future.isSuccess()) {
SocketAddress remote = future.channel().remoteAddress();
SocketAddress local = future.channel().localAddress();
parent.setAddresses(remote, local);
// if SSL is enabled send the handshake after the ssl handshake is completed, otherwise send it
// now
if(!parent.isSslEnabled()) {
// send a handshake on the current thread. This is the only time we will send from within the event thread.
// We can do this because the connection will not be backed up.
parent.send(handshakeSendHandler, handshakeValue, true);
}
} else {
connectionListener.connectionFailed(RpcConnectionHandler.FailureType.CONNECTION,
new RpcException("General connection failure."));
}
// logger.debug("Handshake queued for send.");
break;
} catch (final InterruptedException interruptEx) {
remainingWaitTimeMills -= (System.currentTimeMillis() - startTime);
startTime = System.currentTimeMillis();
isInterrupted = true;
if (remainingWaitTimeMills < 1) {
connectionListener.connectionFailed(RpcConnectionHandler.FailureType.CONNECTION, interruptEx);
break;
}
// Ignore the interrupt and continue to wait until we elapse remainingWaitTimeMills.
} catch (final Exception ex) {
logger.error("Failed to establish connection", ex);
connectionListener.connectionFailed(RpcConnectionHandler.FailureType.CONNECTION, ex);
break;
}
}
if (isInterrupted) {
// Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
// interruption and respond to it if it wants to.
Thread.currentThread().interrupt();
}
}
/**
* @throws ExecutionException
* @throws InterruptedException
* @throws IOException
*/
private void startHttpServer() throws ExecutionException, InterruptedException, IOException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
/** Configure to use SSL, construct SslContext. */
SslContext sslCtx =
config.getSslSwitch() == RPCConfig.SSLSwitch.SSL_OFF.getSwh()
? null
: initSslContextForServer(
config.getCaCert(),
config.getSslCert(),
config.getSslKey(),
config.getSslSwitch());
ThreadPoolTaskExecutor threadPoolTaskExecutor =
ThreadPoolTaskExecutorFactory.build(
config.getThreadNum(), config.getThreadQueueCapacity(), "http-callback");
serverBootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(NioChannelOption.SO_REUSEADDR, true)
.childHandler(
new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
SslHandler sslHandler = sslCtx.newHandler(ch.alloc());
sslHandler.setHandshakeTimeout(
HANDLE_SHAKE_TIMEOUT, TimeUnit.MILLISECONDS);
pipeline.addLast(sslHandler);
}
ch.pipeline()
.addLast(
new IdleStateHandler(
IDLE_TIMEOUT,
IDLE_TIMEOUT,
IDLE_TIMEOUT,
TimeUnit.MILLISECONDS),
new HttpServerCodec(),
new HttpObjectAggregator(Integer.MAX_VALUE),
new HttpServerHandler(
getUriHandlerDispatcher(),
threadPoolTaskExecutor));
}
});
ChannelFuture future = serverBootstrap.bind(config.getListenIP(), config.getListenPort());
future.get();
logger.info(
" start rpc http server, listen ip: {}, port: {}",
config.getListenIP(),
config.getListenPort());
}
@Test
public void testCloseNode() throws Exception {
// Validates that a stub that dictates to close a node's connections does so.
ClusterSpec cluster = ClusterSpec.builder().withNodes(2, 2).build();
BoundCluster boundCluster = localServer.register(cluster);
BoundDataCenter dc0 = boundCluster.getDataCenters().iterator().next();
Iterator<BoundNode> nodes = dc0.getNodes().iterator();
BoundNode boundNode = nodes.next();
stubCloseOnStartup(Scope.NODE);
Map<BoundNode, MockClient> nodeToClients = new HashMap<>();
MockClient client = null;
try {
// Create a connection to each node.
for (BoundNode node : boundCluster.getNodes()) {
MockClient client0 = new MockClient(eventLoop);
client0.connect(node.getAddress());
nodeToClients.put(node, client0);
}
client = new MockClient(eventLoop);
client.connect(boundNode.getAddress());
// Sending a write should cause the connection to close.
ChannelFuture f = client.write(new Startup());
// Future should be successful since write was successful.
f.get(5, TimeUnit.SECONDS);
// Next write should fail because the channel was closed.
f = client.write(Options.INSTANCE);
try {
f.get();
} catch (ExecutionException e) {
assertThat(e.getCause()).isInstanceOf(ClosedChannelException.class);
}
} finally {
if (client != null) {
// client that sent request should close.
assertThat(client.channel.isOpen()).isFalse();
}
// All clients should remain open except the ones to the node that received the request.
nodeToClients
.entrySet()
.stream()
.filter(e -> e.getKey() != boundNode)
.forEach(e -> assertThat(e.getValue().channel.isOpen()).isTrue());
nodeToClients
.entrySet()
.stream()
.filter(e -> e.getKey() == boundNode)
.forEach(e -> assertThat(e.getValue().channel.isOpen()).isFalse());
}
}
@Test
public void testCloseDataCenter() throws Exception {
// Validates that a stub that dictates to close a node's DC's connections does so.
ClusterSpec cluster = ClusterSpec.builder().withNodes(2, 2).build();
BoundCluster boundCluster = localServer.register(cluster);
BoundDataCenter dc0 = boundCluster.getDataCenters().iterator().next();
Iterator<BoundNode> nodes = dc0.getNodes().iterator();
BoundNode boundNode = nodes.next();
stubCloseOnStartup(Scope.DATA_CENTER);
Map<BoundNode, MockClient> nodeToClients = new HashMap<>();
MockClient client = null;
try {
// Create a connection to each node.
for (BoundNode node : boundCluster.getNodes()) {
MockClient client0 = new MockClient(eventLoop);
client0.connect(node.getAddress());
nodeToClients.put(node, client0);
}
client = new MockClient(eventLoop);
client.connect(boundNode.getAddress());
// Sending a write should cause the connection to close.
ChannelFuture f = client.write(new Startup());
// Future should be successful since write was successful.
f.get(5, TimeUnit.SECONDS);
// Next write should fail because the channel was closed.
f = client.write(Options.INSTANCE);
try {
f.get();
} catch (ExecutionException e) {
assertThat(e.getCause()).isInstanceOf(ClosedChannelException.class);
}
} finally {
if (client != null) {
// client that sent request should close.
assertThat(client.channel.isOpen()).isFalse();
}
// Clients connecting to a different DC should remain open.
nodeToClients
.entrySet()
.stream()
.filter(e -> e.getKey().getDataCenter() != boundNode.getDataCenter())
.forEach(e -> assertThat(e.getValue().channel.isOpen()).isTrue());
// Clients connecting to same DC should close.
nodeToClients
.entrySet()
.stream()
.filter(e -> e.getKey().getDataCenter() == boundNode.getDataCenter())
.forEach(e -> assertThat(e.getValue().channel.isOpen()).isFalse());
}
}
@Test
public void testCloseCluster() throws Exception {
// Validates that a stub that dictates to close a node's ClusterSpec's connections does so.
ClusterSpec cluster = ClusterSpec.builder().withNodes(2, 2).build();
BoundCluster boundCluster = localServer.register(cluster);
BoundDataCenter dc0 = boundCluster.getDataCenters().iterator().next();
Iterator<BoundNode> nodes = dc0.getNodes().iterator();
BoundNode boundNode = nodes.next();
stubCloseOnStartup(Scope.CLUSTER);
Map<BoundNode, MockClient> nodeToClients = new HashMap<>();
MockClient client = null;
try {
// Create a connection to each node.
for (BoundNode node : boundCluster.getNodes()) {
MockClient client0 = new MockClient(eventLoop);
client0.connect(node.getAddress());
nodeToClients.put(node, client0);
}
client = new MockClient(eventLoop);
client.connect(boundNode.getAddress());
// Sending a write should cause the connection to close.
ChannelFuture f = client.write(new Startup());
// Future should be successful since write was successful.
f.get(5, TimeUnit.SECONDS);
// Next write should fail because the channel was closed.
f = client.write(Options.INSTANCE);
try {
f.get();
} catch (ExecutionException e) {
assertThat(e.getCause()).isInstanceOf(ClosedChannelException.class);
}
} finally {
if (client != null) {
// client that sent request should close.
assertThat(client.channel.isOpen()).isFalse();
}
// All clients should close
nodeToClients.entrySet().forEach(e -> assertThat(e.getValue().channel.isOpen()).isFalse());
}
}
@Override
public void run() {
//关服逻辑执行标记
GameContext.setServerCloseLogicExecuted(true);
GameContext.setClosed(true);
long serverCloseLogicExecutedStart = TimeUtil.getNowOfMills();
fireCloseEvent();// 发送停服事件 ok
long serverCloseLogicExecutedStartEnd = TimeUtil.getNowOfMills();
LOGGER.error("停服事件耗时:" + (serverCloseLogicExecutedStartEnd - serverCloseLogicExecutedStart));
long closeAllConnectionStart = TimeUtil.getNowOfMills();
closeAllConnection();// 断开所有连接
long closeAllConnectionEnd = TimeUtil.getNowOfMills();
LOGGER.error("断开所有连接------------------------:" + (closeAllConnectionEnd - closeAllConnectionStart));
long closeLoginThreadStart = TimeUtil.getNowOfMills();
closeLoginThread();// 关闭登录线程 ok
long closeLoginThreadEnd = TimeUtil.getNowOfMills();
LOGGER.error("关闭登录线程-------------------:" + (closeLoginThreadEnd - closeLoginThreadStart));
long closeLogicThreadStart = TimeUtil.getNowOfMills();
closeLogicThread();// closeLogicThread ok
long closeLogicThreadEnd = TimeUtil.getNowOfMills();
LOGGER.error("关闭业务线程--------------:" + (closeLogicThreadEnd - closeLogicThreadStart));
long closeEventDispatchThreadStart = TimeUtil.getNowOfMills();
closeEventDispatchThread(); // 关闭事件派发起线程 ok
long closeEventDispatchThreadEnd = TimeUtil.getNowOfMills();
LOGGER.error("关闭事件派发起线程-------------------:" + (closeEventDispatchThreadEnd - closeEventDispatchThreadStart));
long closeStageDriverThreadStart = TimeUtil.getNowOfMills();
closeStageDriverThread(); // 关闭场景驱动线程 ok
long closeStageDriverThreadEnd = TimeUtil.getNowOfMills();
LOGGER.error("关闭场景驱动线程-------------:" + (closeStageDriverThreadEnd - closeStageDriverThreadStart));
long saveAllDataStart = TimeUtil.getNowOfMills();
saveAllData();// 保存数据 ok
long saveAllDataEnd = TimeUtil.getNowOfMills();
LOGGER.error("saveAllData--------------------:" + (saveAllDataEnd - saveAllDataStart));
long closeNetWorkStart = TimeUtil.getNowOfMills();
closeNetWork();// 关闭网络 ok
long closeNetWorkEnd = TimeUtil.getNowOfMills();
LOGGER.error("关闭网络---------------:" + (closeNetWorkEnd - closeNetWorkStart));
if (source == 1) {
sendMsg(1, "关服逻辑处理完毕...");
}
LOGGER.info("关服逻辑处理完毕...");
LOGGER.info("服务器已关闭...");
if (session != null) {
// ResCloseServerMessage msg = new ResCloseServerMessage();
// msg.setSequence(sequence);
// msg.setCode(-1);
// msg.setInfo("服务器已关闭...");
try {
ChannelFuture fu = session.getChannel().writeAndFlush(null).sync();
fu.get();
} catch (Exception e) {
LOGGER.error("发送服务器关闭消息出错.", e);
}
}
LOGGER.info("退出程序...");
int ret = 4;
if (source != ret) {
System.exit(0);
}
}