下面列出了怎么用io.netty.util.concurrent.GlobalEventExecutor的API类实例代码及写法,或者点击链接到github查看源代码。
public void disconnect() {
// logger.info("close tcp socket, Disconnecting.");
synchronized (this.clientBoot) {
this.channelPromise = null;
final Future<Void> channelCloseFuture;
if (this.channelPromise != null) {
channelCloseFuture = this.channelPromise.channel().close();
} else {
channelCloseFuture = new SucceededFuture<Void>(GlobalEventExecutor.INSTANCE, null);
}
channelCloseFuture.addListener(new GenericFutureListener<Future<Void>>() {
@Override
public void operationComplete(final Future<Void> future) throws Exception {
NettyClient2.this.clientBoot.config().group().shutdownGracefully();
}
});
}
// logger.info("close netty tcp socket connection");
}
public void disconnect() {
// logger.info("close tcp socket, Disconnecting.");
synchronized (this.clientBoot) {
this.channelPromise = null;
final Future<Void> channelCloseFuture;
if (this.channelPromise != null) {
channelCloseFuture = this.channelPromise.channel().close();
} else {
channelCloseFuture = new SucceededFuture<Void>(GlobalEventExecutor.INSTANCE, null);
}
channelCloseFuture.addListener(new GenericFutureListener<Future<Void>>() {
@Override
public void operationComplete(final Future<Void> future) throws Exception {
PlatformSSLClient.this.clientBoot.config().group().shutdownGracefully();
}
});
}
// logger.info("close netty tcp socket connection");
}
@Test(timeout = 5000)
public void newOutboundStream() {
final Http2FrameStream stream = frameCodec.newStream();
assertNotNull(stream);
assertFalse(isStreamIdValid(stream.id()));
final Promise<Void> listenerExecuted = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
channel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers(), false).stream(stream))
.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
assertTrue(future.isSuccess());
assertTrue(isStreamIdValid(stream.id()));
listenerExecuted.setSuccess(null);
}
}
);
ByteBuf data = Unpooled.buffer().writeZero(100);
ChannelFuture f = channel.writeAndFlush(new DefaultHttp2DataFrame(data).stream(stream));
assertTrue(f.isSuccess());
listenerExecuted.syncUninterruptibly();
assertTrue(listenerExecuted.isSuccess());
}
@Test
public void testTerminationFutureSuccessReflectively() throws Exception {
Field terminationFutureField =
ThreadPerChannelEventLoopGroup.class.getDeclaredField("terminationFuture");
terminationFutureField.setAccessible(true);
final Exception[] exceptionHolder = new Exception[1];
for (int i = 0; i < 2; i++) {
ThreadPerChannelEventLoopGroup loopGroup = new ThreadPerChannelEventLoopGroup(64);
Promise<?> promise = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE) {
@Override
public Promise<Void> setSuccess(Void result) {
try {
return super.setSuccess(result);
} catch (IllegalStateException e) {
exceptionHolder[0] = e;
throw e;
}
}
};
terminationFutureField.set(loopGroup, promise);
runTest(loopGroup);
}
// The global event executor will not terminate, but this will give the test a chance to fail.
GlobalEventExecutor.INSTANCE.awaitTermination(100, TimeUnit.MILLISECONDS);
assertNull(exceptionHolder[0]);
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
final String hexMessages = "/bgp_hex.txt";
final List<byte[]> bgpMessages = HexDumpBGPFileParser
.parseMessages(ParserToSalTest.class.getResourceAsStream(hexMessages));
this.mock = new BGPMock(new EventBus("test"), ServiceLoaderBGPExtensionProviderContext
.getSingletonInstance().getMessageRegistry(), Lists.newArrayList(fixMessages(bgpMessages)));
Mockito.doReturn(GlobalEventExecutor.INSTANCE.newSucceededFuture(null)).when(this.dispatcher)
.createReconnectingClient(any(InetSocketAddress.class), any(InetSocketAddress.class),
anyInt(), any(KeyMapping.class));
this.ext1 = new SimpleRIBExtensionProviderContext();
this.ext2 = new SimpleRIBExtensionProviderContext();
this.baseact = new RIBActivator();
this.lsact = new org.opendaylight.protocol.bgp.linkstate.impl.RIBActivator();
final CurrentAdapterSerializer serializer = mappingService.currentSerializer();
this.baseact.startRIBExtensionProvider(this.ext1, serializer);
this.lsact.startRIBExtensionProvider(this.ext2, serializer);
this.codecsRegistry = new ConstantCodecsRegistry(serializer);
}
@Override
protected Executor prepareToClose() {
try {
// Check isOpen() first as otherwise it will throw a RuntimeException
// when call getSoLinger() as the fd is not valid anymore.
if (isOpen() && config().getSoLinger() > 0) {
// We need to cancel this key of the channel so we may not end up in a eventloop spin
// because we try to read or write until the actual close happens which may be later due
// SO_LINGER handling.
// See https://github.com/netty/netty/issues/4449
((EpollEventLoop) eventLoop()).remove(EpollSocketChannel.this);
return GlobalEventExecutor.INSTANCE;
}
} catch (Throwable ignore) {
// Ignore the error as the underlying channel may be closed in the meantime and so
// getSoLinger() may produce an exception. In this case we just return null.
// See https://github.com/netty/netty/issues/4449
}
return null;
}
public void disconnect() {
// logger.info("close tcp socket, Disconnecting.");
synchronized (this.clientBoot) {
this.channelPromise = null;
final Future<Void> channelCloseFuture;
if (this.channelPromise != null) {
channelCloseFuture = this.channelPromise.channel().close();
} else {
channelCloseFuture = new SucceededFuture<Void>(GlobalEventExecutor.INSTANCE, null);
}
channelCloseFuture.addListener(new GenericFutureListener<Future<Void>>() {
@Override
public void operationComplete(final Future<Void> future) throws Exception {
NettyClient2.this.clientBoot.config().group().shutdownGracefully();
}
});
}
// logger.info("close netty tcp socket connection");
}
public void disconnect() {
// logger.info("close tcp socket, Disconnecting.");
synchronized (this.clientBoot) {
this.channelPromise = null;
final Future<Void> channelCloseFuture;
if (this.channelPromise != null) {
channelCloseFuture = this.channelPromise.channel().close();
} else {
channelCloseFuture = new SucceededFuture<Void>(GlobalEventExecutor.INSTANCE, null);
}
channelCloseFuture.addListener(new GenericFutureListener<Future<Void>>() {
@Override
public void operationComplete(final Future<Void> future) throws Exception {
PlatformSSLClient.this.clientBoot.config().group().shutdownGracefully();
}
});
}
// logger.info("close netty tcp socket connection");
}
public void disconnect() {
// logger.info("close tcp socket, Disconnecting.");
synchronized (this.clientBoot) {
this.channelPromise = null;
final Future<Void> channelCloseFuture;
if (this.channelPromise != null) {
channelCloseFuture = this.channelPromise.channel().close();
} else {
channelCloseFuture = new SucceededFuture<Void>(GlobalEventExecutor.INSTANCE, null);
}
channelCloseFuture.addListener(new GenericFutureListener<Future<Void>>() {
@Override
public void operationComplete(final Future<Void> future) throws Exception {
NettyClient2.this.clientBoot.config().group().shutdownGracefully();
}
});
}
// logger.info("close netty tcp socket connection");
}
public void disconnect() {
// logger.info("close tcp socket, Disconnecting.");
synchronized (this.clientBoot) {
this.channelPromise = null;
final Future<Void> channelCloseFuture;
if (this.channelPromise != null) {
channelCloseFuture = this.channelPromise.channel().close();
} else {
channelCloseFuture = new SucceededFuture<Void>(GlobalEventExecutor.INSTANCE, null);
}
channelCloseFuture.addListener(new GenericFutureListener<Future<Void>>() {
@Override
public void operationComplete(final Future<Void> future) throws Exception {
PlatformSSLClient.this.clientBoot.config().group().shutdownGracefully();
}
});
}
// logger.info("close netty tcp socket connection");
}
public NettyIoAcceptor(NettyIoServiceFactory factory, final IoHandler handler) {
this.factory = factory;
this.handler = handler;
channelGroup = new DefaultChannelGroup("sshd-acceptor-channels", GlobalEventExecutor.INSTANCE);
bootstrap.group(factory.eventLoopGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new NettyIoSession(NettyIoAcceptor.this, handler).adapter);
}
});
}
public NettyServer(String ip, int port) {
this.ip = ip;
this.port = port;
this.channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
this.bootstrap = new ServerBootstrap();
this.connectionIds = new AtomicInteger(0);
this.connectionVersionRuleMap = new HashMap<>();
for (int i = 0; i < 30; i++) {
String key = "v" + i + ".version.port";
if (ServerConfiguration.exists(key)) {
int portNumber = ServerConfiguration.getInteger(key);
if (portNumber > 0) {
this.connectionVersionRuleMap.put(portNumber, new ConnectionVersionRule(portNumber, i));
}
}
}
}
public void close() {
if (workerGroup != null) {
log.info("stopping netty server");
bossGroup.shutdownGracefully();
bossGroup = null;
ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
allChannels.add(serverChannel);
for (Channel ch : conns.values()) {
allChannels.add(ch);
}
ChannelGroupFuture future = allChannels.close();
future.awaitUninterruptibly();
workerGroup.shutdownGracefully();
workerGroup = null;
log.info("netty server stopped");
}
}
public void close() {
if (workerGroup != null) {
log.info("stopping netty client");
timer.stop();
timer = null;
ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
for (Object ch : conns.values()) {
if (ch != null && ch != dummyChannel)
allChannels.add((Channel) ch);
}
ChannelGroupFuture future = allChannels.close();
future.awaitUninterruptibly();
workerGroup.shutdownGracefully();
workerGroup = null;
log.info("netty client stopped");
}
}
public void close() {
if (workerGroup != null) {
log.info("stopping selfcheck server");
bossGroup.shutdownGracefully();
bossGroup = null;
ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
allChannels.add(serverChannel);
for (Channel ch : conns.values()) {
allChannels.add(ch);
}
ChannelGroupFuture future = allChannels.close();
future.awaitUninterruptibly();
workerGroup.shutdownGracefully();
workerGroup = null;
log.info("selfcheck server stopped");
}
}
public void close() {
if (workerGroup != null) {
log.info("cat stopping netty client");
timer.cancel();
timer = null;
ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
for (Object ch : conns.values()) {
if (ch != null && ch != dummyChannel)
allChannels.add((Channel) ch);
}
ChannelGroupFuture future = allChannels.close();
future.awaitUninterruptibly();
workerGroup.shutdownGracefully();
workerGroup = null;
log.info("cat netty client stopped");
}
}
@Override
public CompletionStage<NodeConnectionReport> closeConnectionAsync(
SocketAddress connection, CloseType type) {
Optional<Channel> channel =
this.clientChannelGroup
.stream()
.filter(c -> c.remoteAddress().equals(connection))
.findFirst();
if (channel.isPresent()) {
ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
channelGroup.add(channel.get());
ClusterConnectionReport clusterReport = new ClusterConnectionReport(getCluster().getId());
NodeConnectionReport report =
clusterReport.addNode(this, Collections.singletonList(connection), getAddress());
return closeChannelGroup(channelGroup, type).thenApply(f -> report);
} else {
CompletableFuture<NodeConnectionReport> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(new IllegalArgumentException("Not found"));
return failedFuture;
}
}
private void close0() {
checkState(this.executor.inEventLoop());
this.idleStateDetectionScheduledFuture.cancel(false);
this.acquiredChannelCount.set(0);
this.availableChannelCount.set(0);
for (; ; ) {
final AcquireTask task = this.pendingAcquisitionQueue.poll();
if (task == null) {
break;
}
final ScheduledFuture<?> timeoutFuture = task.timeoutFuture;
if (timeoutFuture != null) {
timeoutFuture.cancel(false);
}
task.promise.setFailure(new ClosedChannelException());
}
// Ensure we dispatch this on another Thread as close0 will be called from the EventExecutor and we need
// to ensure we will not block in an EventExecutor
GlobalEventExecutor.INSTANCE.execute(RntbdClientChannelPool.super::close);
}
private ProxyConnector(ConnectorConfig config, ProxyConnectorFactory factory) {
this.config = requireNonNull(config);
this.responseEnhancer = requireNonNull(factory.responseEnhancer);
this.serverConfig = requireNonNull(factory.serverConfig);
this.metrics = requireNonNull(factory.metrics);
this.httpErrorStatusListener = requireNonNull(factory.errorStatusListener);
this.channelStatsHandler = new ChannelStatisticsHandler(metrics);
this.requestStatsCollector = new RequestStatsCollector(metrics.scope("requests"));
this.excessConnectionRejector = new ExcessConnectionRejector(new DefaultChannelGroup(GlobalEventExecutor.INSTANCE), serverConfig.maxConnectionsCount());
this.unwiseCharEncoder = new ConfigurableUnwiseCharsEncoder(factory.unwiseCharacters);
if (isHttps()) {
this.sslContext = Optional.of(newSSLContext((HttpsConnectorConfig) config, metrics));
} else {
this.sslContext = Optional.empty();
}
this.requestTracker = factory.requestTracking ? CurrentRequestTracker.INSTANCE : RequestTracker.NO_OP;
this.httpMessageFormatter = factory.httpMessageFormatter;
this.originsHeader = factory.originsHeader;
}
private void close0() {
if (!closed) {
closed = true;
for (;;) {
AcquireTask task = pendingAcquireQueue.poll();
if (task == null) {
break;
}
ScheduledFuture<?> f = task.timeoutFuture;
if (f != null) {
f.cancel(false);
}
task.promise.setFailure(new ClosedChannelException());
}
acquiredChannelCount = 0;
pendingAcquireCount = 0;
// Ensure we dispatch this on another Thread as close0 will be called from the EventExecutor and we need
// to ensure we will not block in a EventExecutor.
GlobalEventExecutor.INSTANCE.execute(() -> delegateChannelPool.close());
}
}
public void close() {
super.close();
if (!NetworkModule.TRANSPORT_TYPE_SETTING.exists(this.settings) || ((String)NetworkModule.TRANSPORT_TYPE_SETTING.get(this.settings)).equals("netty4")) {
try {
GlobalEventExecutor.INSTANCE.awaitInactivity(5L, TimeUnit.SECONDS);
} catch (InterruptedException var3) {
Thread.currentThread().interrupt();
}
try {
ThreadDeathWatcher.awaitInactivity(5L, TimeUnit.SECONDS);
} catch (InterruptedException var2) {
Thread.currentThread().interrupt();
}
}
}
@Test
@SuppressWarnings("deprecation")
public void testTcpConfiguration_1() throws Exception {
CountDownLatch latch = new CountDownLatch(10);
LoopResources loop = LoopResources.create("testTcpConfiguration");
ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
doTestTcpConfiguration(
HttpServer.create().tcpConfiguration(tcp -> configureTcpServer(tcp, loop, group, latch)),
HttpClient.create().tcpConfiguration(tcp -> configureTcpClient(tcp, loop, group, latch))
);
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
FutureMono.from(group.close())
.then(loop.disposeLater())
.block(Duration.ofSeconds(30));
}
@Test
@SuppressWarnings("deprecation")
public void testTcpConfiguration_2() throws Exception {
CountDownLatch latch = new CountDownLatch(10);
LoopResources loop = LoopResources.create("testTcpConfiguration");
ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
doTestTcpConfiguration(
HttpServer.from(configureTcpServer(TcpServer.create(), loop, group, latch)),
HttpClient.from(configureTcpClient(TcpClient.create(), loop, group, latch))
);
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
FutureMono.from(group.close())
.then(loop.disposeLater())
.block(Duration.ofSeconds(30));
}
public NettyIoAcceptor(NettyIoServiceFactory factory, IoHandler handler) {
this.factory = factory;
this.handler = handler;
channelGroup = new DefaultChannelGroup("sshd-acceptor-channels", GlobalEventExecutor.INSTANCE);;
bootstrap.group(factory.eventLoopGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new NettyIoSession(NettyIoAcceptor.this, handler).adapter);
}
});
}
public NettyIoAcceptor(NettyIoServiceFactory factory, IoHandler handler) {
this.factory = factory;
this.handler = handler;
channelGroup = new DefaultChannelGroup("sshd-acceptor-channels", GlobalEventExecutor.INSTANCE);;
bootstrap.group(factory.eventLoopGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new NettyIoSession(NettyIoAcceptor.this, handler).adapter);
}
});
}
/**
* Establish PCEPS TLS connection with peer.
*/
@Test
public void testEstablishTLS() {
final DefaultPCEPSessionNegotiator negotiator =
new DefaultPCEPSessionNegotiator(new DefaultPromise<>(GlobalEventExecutor.INSTANCE),
this.channel, this.listener, (short) 1, 20, new OpenBuilder().setKeepalive(Uint8.ONE).build(),
SslContextFactoryTest.createTlsConfig());
negotiator.channelActive(null);
assertEquals(1, this.msgsSend.size());
assertTrue(this.msgsSend.get(0) instanceof Starttls);
assertEquals(DefaultPCEPSessionNegotiator.State.START_TLS_WAIT, negotiator.getState());
negotiator.handleMessage(this.startTlsMsg);
assertEquals(DefaultPCEPSessionNegotiator.State.OPEN_WAIT, negotiator.getState());
assertEquals(2, this.msgsSend.size());
assertTrue(this.msgsSend.get(1) instanceof Open);
negotiator.handleMessage(this.openMsg);
assertEquals(DefaultPCEPSessionNegotiator.State.KEEP_WAIT, negotiator.getState());
}
public Future<IRedisCommandResponse> sendRedisCommand(final RedisCommand redisCommand) {
final Future<IRedisCommandResponse> responseFuture;
if (channelPromise != null) {
final ChannelPromise readyPromise = this.channelPromise;
final DefaultPromise<IRedisCommandResponse> responsePromise = new DefaultPromise<IRedisCommandResponse>(
readyPromise.channel().eventLoop());
// 提交一个事件
readyPromise.channel().eventLoop().submit(new Runnable() {
@Override
public void run() {
// 将这个结果赋值给responsePromise
PlatformSSLClient.this.responsePromise = responsePromise;
}
});
readyPromise.channel().writeAndFlush(redisCommand).addListener(new GenericFutureListener<ChannelFuture>() {
@Override
public void operationComplete(final ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
// 如果失败了,直接将promise返回
responsePromise.tryFailure(future.cause());
logger.error("send push message error: {},cause={}", redisCommand, future.cause());
} else {
// logger.info("write data to platform success");
}
}
});
responseFuture = responsePromise;
} else {
logger.error("send push error because client is not connected: {}", redisCommand.toString());
responseFuture = new FailedFuture<IRedisCommandResponse>(GlobalEventExecutor.INSTANCE, CONNECT_EXCEPTION);
}
return responseFuture;
}
static ServerWebSocketContainer getDefaultContainer() {
if (defaultContainerDisabled) {
return null;
}
if (defaultContainer != null) {
return defaultContainer;
}
synchronized (UndertowContainerProvider.class) {
if (defaultContainer == null) {
//this is not great, as we have no way to control the lifecycle
//but there is not much we can do
Supplier<EventLoopGroup> supplier = new Supplier<EventLoopGroup>() {
@Override
public EventLoopGroup get() {
return getDefaultEventLoopGroup();
}
};
defaultContainer = new ServerWebSocketContainer(defaultIntrospector, UndertowContainerProvider.class.getClassLoader(), supplier, Collections.EMPTY_LIST, !invokeInIoThread, new Supplier<Executor>() {
@Override
public Executor get() {
return GlobalEventExecutor.INSTANCE;
}
});
}
return defaultContainer;
}
}
public final void init() {
allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
serverGroup = Connector.newEventLoopGroup(2);
childGroup = Connector.newEventLoopGroup(configManager.getNettyThreads());
backendGroup = Connector.newEventLoopGroup(configManager.getNettyClientThreads());
}
public BGPProtocolSessionPromise(final @NonNull InetSocketAddress remoteAddress, final int retryTimer,
final @NonNull Bootstrap bootstrap, final @NonNull BGPPeerRegistry peerRegistry) {
super(GlobalEventExecutor.INSTANCE);
this.address = requireNonNull(remoteAddress);
this.retryTimer = retryTimer;
this.bootstrap = requireNonNull(bootstrap);
this.listenerRegistration = requireNonNull(peerRegistry).registerPeerSessionListener(
new PeerRegistrySessionListenerImpl(StrictBGPPeerRegistry.getIpAddress(this.address)));
}