下面列出了io.netty.channel.Channel#close ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public Completable closeAsync() {
return new SubscribableCompletable() {
@Override
protected void handleSubscribe(final Subscriber subscriber) {
toSource(onClose).subscribe(subscriber);
if (stateUpdater.getAndSet(ChannelSet.this, CLOSING) == CLOSING) {
return;
}
if (channelMap.isEmpty()) {
onCloseProcessor.onComplete();
return;
}
for (final Channel channel : channelMap.values()) {
// We don't try to catch exceptions here because we're only invoking Netty or ServiceTalk code, no
// user-provided code.
channel.close();
}
}
};
}
@Override
public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
Channel channel = context.channel();
if(cause instanceof IOException && cause.getMessage().equals("Connection reset by peer")) {
// ignore
} else if(cause instanceof ReadTimeoutException) {
// ignore
} else if(cause instanceof DecoderException) {
// ignore
} else {
cause.printStackTrace();
}
if(channel.isOpen()) {
channel.close();
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
if(!isInternalNetwork(channel)) {
logger.error("[not internal network] {}, close channel", channel.remoteAddress());
channel.close();
}
super.channelActive(ctx);
}
@Test(timeout = 20000)
public void testClientReconnect() throws Exception {
final Future<PCEPSession> futureSession = this.dispatcher
.createClient(this.serverAddress, 1, new TestingSessionListenerFactory(), this.nf,
KeyMapping.getKeyMapping(), this.clientAddress);
final TestingSessionListenerFactory slf = new TestingSessionListenerFactory();
doReturn(slf).when(this.dispatcherDependencies).getListenerFactory();
final ChannelFuture futureServer = this.pcepDispatcher.createServer(this.dispatcherDependencies);
futureServer.sync();
final Channel channel = futureServer.channel();
Assert.assertNotNull(futureSession.get());
checkSessionListenerNotNull(slf, this.clientAddress.getHostString());
final TestingSessionListener sl
= checkSessionListenerNotNull(slf, this.clientAddress.getAddress().getHostAddress());
Assert.assertNotNull(sl.getSession());
Assert.assertTrue(sl.isUp());
channel.close().get();
closeEventLoopGroups();
this.workerGroup = new NioEventLoopGroup();
this.bossGroup = new NioEventLoopGroup();
this.pcepDispatcher = new PCEPDispatcherImpl(ServiceLoaderPCEPExtensionProviderContext.getSingletonInstance()
.getMessageHandlerRegistry(),
this.nf, this.bossGroup, this.workerGroup);
final TestingSessionListenerFactory slf2 = new TestingSessionListenerFactory();
doReturn(slf2).when(this.dispatcherDependencies).getListenerFactory();
final ChannelFuture future2 = this.pcepDispatcher.createServer(this.dispatcherDependencies);
future2.sync();
final Channel channel2 = future2.channel();
final TestingSessionListener sl2
= checkSessionListenerNotNull(slf2, this.clientAddress.getAddress().getHostAddress());
Assert.assertNotNull(sl2.getSession());
Assert.assertTrue(sl2.isUp());
channel2.close();
}
@Test
public void createOutboundStream() throws Exception {
Channel childChannel = newOutboundStream(new TestChannelInitializer());
assertTrue(childChannel.isRegistered());
assertTrue(childChannel.isActive());
Http2Headers headers = new DefaultHttp2Headers();
childChannel.writeAndFlush(new DefaultHttp2HeadersFrame(headers));
ByteBuf data = Unpooled.buffer(100).writeZero(100);
childChannel.writeAndFlush(new DefaultHttp2DataFrame(data, true));
Http2HeadersFrame headersFrame = serverLastInboundHandler.blockingReadInbound();
assertNotNull(headersFrame);
assertEquals(3, headersFrame.stream().id());
assertEquals(headers, headersFrame.headers());
Http2DataFrame dataFrame = serverLastInboundHandler.blockingReadInbound();
assertNotNull(dataFrame);
assertEquals(3, dataFrame.stream().id());
assertEquals(data.resetReaderIndex(), dataFrame.content());
assertTrue(dataFrame.isEndStream());
dataFrame.release();
childChannel.close();
Http2ResetFrame rstFrame = serverLastInboundHandler.blockingReadInbound();
assertNotNull(rstFrame);
assertEquals(3, rstFrame.stream().id());
serverLastInboundHandler.checkException();
}
@Override
public Future<Void> release(Channel childChannel, Promise<Void> promise) {
if (childChannel.parent() == null) {
// This isn't a child channel. Oddly enough, this is "expected" and is handled properly by the
// BetterFixedChannelPool AS LONG AS we return an IllegalArgumentException via the promise.
closeAndReleaseParent(childChannel);
return promise.setFailure(new IllegalArgumentException("Channel (" + childChannel + ") is not a child channel."));
}
Channel parentChannel = childChannel.parent();
MultiplexedChannelRecord multiplexedChannel = parentChannel.attr(MULTIPLEXED_CHANNEL).get();
if (multiplexedChannel == null) {
// This is a child channel, but there is no attached multiplexed channel, which there should be if it was from
// this pool. Close it and log an error.
Exception exception = new IOException(
"Channel (" + childChannel + ") is not associated with any channel records. "
+ "It will be closed, but cannot be released within this pool.");
log.error(exception.getMessage());
childChannel.close();
return promise.setFailure(exception);
}
multiplexedChannel.closeAndReleaseChild(childChannel);
if (multiplexedChannel.canBeClosedAndReleased()) {
log.debug("Parent channel closed: {}", parentChannel.remoteAddress());
// We just closed the last stream in a connection that has reached the end of its life.
return closeAndReleaseParent(parentChannel, null, promise);
}
return promise.setSuccess(null);
}
final ChannelFuture initAndRegister() {
final Channel channel = channelFactory().newChannel();
try {
init(channel);
} catch (Throwable t) {
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
public void destroyIpcdSession(ClientToken token) {
Session session = getIpcdSession(token);
if(session != null) {
Channel channel = session.getChannel();
ipcdSessions.destroySession(session);
channel.close();
}
}
private Future<Void> closeAndReleaseParent(Channel parentChannel, Throwable cause, Promise<Void> resultPromise) {
if (parentChannel.parent() != null) {
// This isn't a parent channel. Notify it that something is wrong.
Exception exception = new IOException(
"Channel (" + parentChannel + ") is not a parent channel. It will be closed, "
+ "but cannot be released within this pool.");
log.error(exception.getMessage());
parentChannel.close();
return resultPromise.setFailure(exception);
}
MultiplexedChannelRecord multiplexedChannel = parentChannel.attr(MULTIPLEXED_CHANNEL).get();
// We may not have a multiplexed channel if the parent channel hasn't been fully initialized.
if (multiplexedChannel != null) {
if (cause == null) {
multiplexedChannel.closeChildChannels();
} else {
multiplexedChannel.closeChildChannels(cause);
}
parentConnections.remove(multiplexedChannel);
}
parentChannel.close();
if (parentChannel.attr(PARENT_CHANNEL_RELEASED).getAndSet(Boolean.TRUE) == null) {
return parentConnectionPool.release(parentChannel, resultPromise);
}
return resultPromise.setSuccess(null);
}
public void closeHardwareChannelByDeviceId(int dashId, int deviceId) {
for (Channel channel : hardwareChannels) {
if (isSameDashAndDeviceId(channel, dashId, deviceId)) {
channel.close();
}
}
}
@Override
public void handleResponse(Channel channel, ByteBuf byteBuf) {
timeoutCounter.set(0);
ByteBufReceiver byteBufReceiver = receivers.peek();
if(byteBufReceiver != null){
ByteBufReceiver.RECEIVER_RESULT result = byteBufReceiver.receive(channel, byteBuf);
switch (result){
case SUCCESS:
logger.debug("[handleResponse][remove receiver]");
receivers.poll();
break;
case CONTINUE:
//nothing need to be done
break;
case FAIL:
logger.info("[handleResponse][fail, close channel]{}, {}", byteBufReceiver, channel);
channel.close();
break;
case ALREADY_FINISH:
logger.info("[handleResponse][already finish, close channel]{}, {}", byteBufReceiver, channel);
channel.close();
break;
default:
throw new IllegalStateException("unknown result:" + result);
}
}else{
logger.error("[handleResponse][no receiver][close client]{}, {}, {}", channel, byteBuf.readableBytes(), ByteBufUtils.readToString(byteBuf));
channel.close();
}
}
@Test(
alwaysRun = true,
dependsOnMethods = {"testUnregisterURLModel"})
public void testLoadingMemoryError() throws InterruptedException {
Channel channel = TestUtils.getManagementChannel(configManager);
Assert.assertNotNull(channel);
TestUtils.setResult(null);
TestUtils.setLatch(new CountDownLatch(1));
TestUtils.registerModel(channel, "loading-memory-error.mar", "memory_error", true, false);
TestUtils.getLatch().await();
Assert.assertEquals(TestUtils.getHttpStatus(), HttpResponseStatus.INSUFFICIENT_STORAGE);
channel.close();
}
@Test(timeout = 20000)
public void testCreateClient() throws Exception {
final int port = InetSocketAddressUtil.getRandomPort();
final InetSocketAddress serverAddr = InetSocketAddressUtil.getRandomLoopbackInetSocketAddress(port);
final BmpDispatcherImpl bmpDispatcher = new BmpDispatcherImpl(
new NioEventLoopGroup(), new NioEventLoopGroup(), this.registry, this.sessionFactory);
final ChannelFuture futureServer = bmpDispatcher
.createServer(serverAddr, this.slf, KeyMapping.getKeyMapping());
waitFutureSuccess(futureServer);
final ChannelFuture channelFuture = this.bmpMockDispatcher.createClient(InetSocketAddressUtil
.getRandomLoopbackInetSocketAddress(0), serverAddr);
final Channel channel = channelFuture.sync().channel();
assertTrue(channel.isActive());
checkEquals(() -> assertTrue(this.sl.getStatus()));
channel.close();
bmpDispatcher.close();
checkEquals(() -> assertFalse(this.sl.getStatus()));
final BmpDispatcherImpl bmpDispatcher2 = new BmpDispatcherImpl(
new NioEventLoopGroup(), new NioEventLoopGroup(), this.registry, this.sessionFactory);
final ChannelFuture futureServer2 = bmpDispatcher2
.createServer(serverAddr, this.slf, KeyMapping.getKeyMapping());
futureServer2.sync();
checkEquals(() -> assertTrue(this.sl.getStatus()));
bmpDispatcher2.close();
this.bmpMockDispatcher.close();
checkEquals(() -> assertFalse(this.sl.getStatus()));
}
/**
* P - B, S - B
* @param channel
* @param msg
*/
public void processDisConnect(Channel channel, MqttMessage msg) {
String clientId = NettyUtil.getClientId(channel);
boolean isCleanSession = sessionService.isCleanSession(clientId);
NettyLog.debug("DISCONNECT - clientId: {}, cleanSession: {}", clientId, isCleanSession);
if (isCleanSession) {
this.topicProcess.removeByCleanSession(clientId);
this.consumerProcess.removeByCleanSession(clientId);
this.procedureProcess.removeByCleanSession(clientId);
}
sessionService.remove(clientId);
channel.close();
}
public void onGoAwayReceived(Channel channel, int lastStreamId, long errorCode, ByteBuf debugData) {
goAwayReceived = errorCode;
onGoAway(channel, "Received", lastStreamId, errorCode, debugData);
// Send a GOAWAY back to the peer and close the connection gracefully if we did not send GOAWAY yet.
// This makes sure that the connection is closed eventually once we receive GOAWAY.
if (!goAwaySent) {
// This does not close the connection immediately but sends a GOAWAY frame.
// The connection will be closed when all streams are closed.
// See AbstractHttp2ConnectionHandler.close().
channel.close();
}
}
protected void closeChannel(Channel channel) {
channel.close();
}
/**
* 加入session
* @param player
* @param ch
* @return
*/
private static int addSession(IBaseCharacter player, Channel ch) {
int userId;
if (player == null || (userId = player.getId()) <= 0) {
return ADD_SESSION_ERROR;
}
Channel _ch = userIdChannels.get(userId);
BaseQueueElement<Channel> queueElement = null;
if (_ch == null && maxUser > 0) {
queueElement = waitUserIdChannels.get(userId);
if (queueElement != null) {
_ch = queueElement.getValue();
}
}
boolean same = _ch != null && _ch.hashCode() == ch.hashCode();
if (_ch != null && !same) {
IBaseCharacter _player = _ch.attr(IBaseConnector.PLAYER).get();
if (_player != null) {
_player.setId(0);
// 保持排队名次
if (queueElement != null) {
player.setIsInQueue(true);
queueElement.setValue(ch);
}
}
_ch.close();
}
if (maxUser > 0) {
if (queueElement != null) {
return ADD_SESSION_WAIT;
} else if (userIdChannels.size() >= maxUser) {
player.setIsInQueue(true);
BaseQueueElement<Channel> element = waitQueue.put(ch);
if (_ch == null || !same) {
waitUserIdChannels.put(userId, element);
}
return ADD_SESSION_WAIT;
}
}
if (_ch == null || !same) {
userIdChannels.put(userId, ch);
}
player.loginHook();
return ADD_SESSION_SUCC;
}
@Test
public void runTextRecordsWithAck() throws StageException, IOException, ExecutionException, InterruptedException {
final String recordSeparatorStr = "\n";
final String[] expectedRecords = TEN_DELIMITED_RECORDS.split(recordSeparatorStr);
final int batchSize = expectedRecords.length;
final Charset charset = Charsets.ISO_8859_1;
final TCPServerSourceConfig configBean = createConfigBean(charset);
configBean.dataFormat = DataFormat.TEXT;
configBean.tcpMode = TCPMode.DELIMITED_RECORDS;
configBean.recordSeparatorStr = recordSeparatorStr;
configBean.ports = NetworkUtils.getRandomPorts(1);
configBean.recordProcessedAckMessage = "record_ack_${record:id()}" + ACK_SEPARATOR;
configBean.batchCompletedAckMessage = "batch_ack_${batchSize}" + ACK_SEPARATOR;
configBean.batchSize = batchSize;
final TCPServerSource source = new TCPServerSource(configBean);
final String outputLane = "lane";
final PushSourceRunner runner = new PushSourceRunner.Builder(TCPServerDSource.class, source)
.addOutputLane(outputLane)
.build();
final List<Record> records = new LinkedList<>();
runner.runInit();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ChannelFuture channelFuture = startTcpClient(
configBean,
workerGroup,
TEN_DELIMITED_RECORDS.getBytes(charset),
true
);
final Channel channel = channelFuture.channel();
TCPServerSourceClientHandler clientHandler = channel.pipeline().get(TCPServerSourceClientHandler.class);
LOG.trace("About to run produce");
runner.runProduce(new HashMap<>(), batchSize, output -> {
records.addAll(output.getRecords().get(outputLane));
runner.setStop();
});
// Wait until the connection is closed.
LOG.trace("Waiting on produce");
runner.waitOnProduce();
LOG.trace("Finished waiting on produce");
final List<String> acks = new LinkedList<>();
// one for each record, plus one for the batch
final int totalExpectedAcks = batchSize + 1;
LOG.trace("About to fetch {} acks", totalExpectedAcks);
for (int i = 0; i < totalExpectedAcks; i++) {
final String response = clientHandler.getResponse();
if (response != null) {
acks.add(response);
}
}
LOG.trace("Closing channel");
channel.close();
LOG.trace("Shutting down worker group");
workerGroup.shutdownGracefully();
LOG.trace("Worker group shut down");
assertThat(records, hasSize(batchSize));
final Set<String> expectedAcks = new HashSet<>();
for (int i = 0; i < records.size(); i++) {
// validate the output record value
assertThat(records.get(i).get("/text").getValueAsString(), equalTo(expectedRecords[i]));
// validate the record-level ack
// we don't add the ack separator the expected value, as we are effectively stripping it out of
// the actual acks in the client handler (via String.split)
expectedAcks.add(String.format("record_ack_%s", records.get(i).getHeader().getSourceId()));
}
// validate the batch-level ack
expectedAcks.add(String.format("batch_ack_%d", batchSize));
final Set<String> actualAcks = new HashSet<>();
actualAcks.addAll(acks);
assertThat(actualAcks, equalTo(expectedAcks));
}
@Test(
alwaysRun = true,
dependsOnMethods = {"testLoadingMemoryError"})
public void testPredictionMemoryError() throws InterruptedException {
// Load the model
Channel channel = TestUtils.getManagementChannel(configManager);
Assert.assertNotNull(channel);
TestUtils.setResult(null);
TestUtils.setLatch(new CountDownLatch(1));
TestUtils.registerModel(channel, "prediction-memory-error.mar", "pred-err", true, false);
TestUtils.getLatch().await();
Assert.assertEquals(TestUtils.getHttpStatus(), HttpResponseStatus.OK);
channel.close();
// Test for prediction
channel = TestUtils.connect(false, configManager);
Assert.assertNotNull(channel);
TestUtils.setResult(null);
TestUtils.setLatch(new CountDownLatch(1));
DefaultFullHttpRequest req =
new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1, HttpMethod.POST, "/predictions/pred-err");
req.content().writeCharSequence("data=invalid_output", CharsetUtil.UTF_8);
channel.writeAndFlush(req);
TestUtils.getLatch().await();
Assert.assertEquals(TestUtils.getHttpStatus(), HttpResponseStatus.INSUFFICIENT_STORAGE);
channel.close();
// Unload the model
channel = TestUtils.connect(true, configManager);
TestUtils.setHttpStatus(null);
TestUtils.setLatch(new CountDownLatch(1));
Assert.assertNotNull(channel);
TestUtils.unregisterModel(channel, "pred-err", null, false);
TestUtils.getLatch().await();
Assert.assertEquals(TestUtils.getHttpStatus(), HttpResponseStatus.OK);
}
@SuppressWarnings("FutureReturnValueIgnored")
static Mono<Channel> doInitAndRegister(
TransportConfig config,
ChannelInitializer<Channel> channelInitializer,
boolean isDomainSocket) {
EventLoopGroup elg = config.eventLoopGroup();
ChannelFactory<? extends Channel> channelFactory = config.connectionFactory(elg, isDomainSocket);
Channel channel = null;
try {
channel = channelFactory.newChannel();
if (channelInitializer instanceof ServerTransport.AcceptorInitializer) {
((ServerTransport.AcceptorInitializer) channelInitializer).acceptor.enableAutoReadTask(channel);
}
channel.pipeline().addLast(channelInitializer);
setChannelOptions(channel, config.options, isDomainSocket);
setAttributes(channel, config.attrs);
}
catch (Throwable t) {
if (channel != null) {
channel.unsafe().closeForcibly();
}
return Mono.error(t);
}
MonoChannelPromise monoChannelPromise = new MonoChannelPromise(channel);
channel.unsafe().register(elg.next(), monoChannelPromise);
Throwable cause = monoChannelPromise.cause();
if (cause != null) {
if (channel.isRegistered()) {
// "FutureReturnValueIgnored" this is deliberate
channel.close();
}
else {
channel.unsafe().closeForcibly();
}
}
return monoChannelPromise;
}