下面列出了io.netty.channel.kqueue.KQueueServerSocketChannel#io.netty.channel.group.DefaultChannelGroup 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public NettyIoAcceptor(NettyIoServiceFactory factory, final IoHandler handler) {
this.factory = factory;
this.handler = handler;
channelGroup = new DefaultChannelGroup("sshd-acceptor-channels", GlobalEventExecutor.INSTANCE);
bootstrap.group(factory.eventLoopGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new NettyIoSession(NettyIoAcceptor.this, handler).adapter);
}
});
}
public NettyServer(String ip, int port) {
this.ip = ip;
this.port = port;
this.channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
this.bootstrap = new ServerBootstrap();
this.connectionIds = new AtomicInteger(0);
this.connectionVersionRuleMap = new HashMap<>();
for (int i = 0; i < 30; i++) {
String key = "v" + i + ".version.port";
if (ServerConfiguration.exists(key)) {
int portNumber = ServerConfiguration.getInteger(key);
if (portNumber > 0) {
this.connectionVersionRuleMap.put(portNumber, new ConnectionVersionRule(portNumber, i));
}
}
}
}
public void close() {
if (workerGroup != null) {
log.info("stopping netty server");
bossGroup.shutdownGracefully();
bossGroup = null;
ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
allChannels.add(serverChannel);
for (Channel ch : conns.values()) {
allChannels.add(ch);
}
ChannelGroupFuture future = allChannels.close();
future.awaitUninterruptibly();
workerGroup.shutdownGracefully();
workerGroup = null;
log.info("netty server stopped");
}
}
public void close() {
if (workerGroup != null) {
log.info("stopping netty client");
timer.stop();
timer = null;
ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
for (Object ch : conns.values()) {
if (ch != null && ch != dummyChannel)
allChannels.add((Channel) ch);
}
ChannelGroupFuture future = allChannels.close();
future.awaitUninterruptibly();
workerGroup.shutdownGracefully();
workerGroup = null;
log.info("netty client stopped");
}
}
public void close() {
if (workerGroup != null) {
log.info("stopping netty server");
bossGroup.shutdownGracefully();
bossGroup = null;
ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
allChannels.add(serverChannel);
for (Channel ch : conns.values()) {
allChannels.add(ch);
}
ChannelGroupFuture future = allChannels.close();
future.awaitUninterruptibly();
workerGroup.shutdownGracefully();
workerGroup = null;
log.info("netty server stopped");
}
}
public void close() {
if (workerGroup != null) {
log.info("stopping selfcheck server");
bossGroup.shutdownGracefully();
bossGroup = null;
ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
allChannels.add(serverChannel);
for (Channel ch : conns.values()) {
allChannels.add(ch);
}
ChannelGroupFuture future = allChannels.close();
future.awaitUninterruptibly();
workerGroup.shutdownGracefully();
workerGroup = null;
log.info("selfcheck server stopped");
}
}
public void close() {
if (workerGroup != null) {
log.info("cat stopping netty client");
timer.cancel();
timer = null;
ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
for (Object ch : conns.values()) {
if (ch != null && ch != dummyChannel)
allChannels.add((Channel) ch);
}
ChannelGroupFuture future = allChannels.close();
future.awaitUninterruptibly();
workerGroup.shutdownGracefully();
workerGroup = null;
log.info("cat netty client stopped");
}
}
@Override
public CompletionStage<NodeConnectionReport> closeConnectionAsync(
SocketAddress connection, CloseType type) {
Optional<Channel> channel =
this.clientChannelGroup
.stream()
.filter(c -> c.remoteAddress().equals(connection))
.findFirst();
if (channel.isPresent()) {
ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
channelGroup.add(channel.get());
ClusterConnectionReport clusterReport = new ClusterConnectionReport(getCluster().getId());
NodeConnectionReport report =
clusterReport.addNode(this, Collections.singletonList(connection), getAddress());
return closeChannelGroup(channelGroup, type).thenApply(f -> report);
} else {
CompletableFuture<NodeConnectionReport> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(new IllegalArgumentException("Not found"));
return failedFuture;
}
}
private ProxyConnector(ConnectorConfig config, ProxyConnectorFactory factory) {
this.config = requireNonNull(config);
this.responseEnhancer = requireNonNull(factory.responseEnhancer);
this.serverConfig = requireNonNull(factory.serverConfig);
this.metrics = requireNonNull(factory.metrics);
this.httpErrorStatusListener = requireNonNull(factory.errorStatusListener);
this.channelStatsHandler = new ChannelStatisticsHandler(metrics);
this.requestStatsCollector = new RequestStatsCollector(metrics.scope("requests"));
this.excessConnectionRejector = new ExcessConnectionRejector(new DefaultChannelGroup(GlobalEventExecutor.INSTANCE), serverConfig.maxConnectionsCount());
this.unwiseCharEncoder = new ConfigurableUnwiseCharsEncoder(factory.unwiseCharacters);
if (isHttps()) {
this.sslContext = Optional.of(newSSLContext((HttpsConnectorConfig) config, metrics));
} else {
this.sslContext = Optional.empty();
}
this.requestTracker = factory.requestTracking ? CurrentRequestTracker.INSTANCE : RequestTracker.NO_OP;
this.httpMessageFormatter = factory.httpMessageFormatter;
this.originsHeader = factory.originsHeader;
}
@Test
@SuppressWarnings("deprecation")
public void testTcpConfiguration_1() throws Exception {
CountDownLatch latch = new CountDownLatch(10);
LoopResources loop = LoopResources.create("testTcpConfiguration");
ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
doTestTcpConfiguration(
HttpServer.create().tcpConfiguration(tcp -> configureTcpServer(tcp, loop, group, latch)),
HttpClient.create().tcpConfiguration(tcp -> configureTcpClient(tcp, loop, group, latch))
);
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
FutureMono.from(group.close())
.then(loop.disposeLater())
.block(Duration.ofSeconds(30));
}
@Test
@SuppressWarnings("deprecation")
public void testTcpConfiguration_2() throws Exception {
CountDownLatch latch = new CountDownLatch(10);
LoopResources loop = LoopResources.create("testTcpConfiguration");
ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
doTestTcpConfiguration(
HttpServer.from(configureTcpServer(TcpServer.create(), loop, group, latch)),
HttpClient.from(configureTcpClient(TcpClient.create(), loop, group, latch))
);
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
FutureMono.from(group.close())
.then(loop.disposeLater())
.block(Duration.ofSeconds(30));
}
public NettyIoAcceptor(NettyIoServiceFactory factory, IoHandler handler) {
this.factory = factory;
this.handler = handler;
channelGroup = new DefaultChannelGroup("sshd-acceptor-channels", GlobalEventExecutor.INSTANCE);;
bootstrap.group(factory.eventLoopGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new NettyIoSession(NettyIoAcceptor.this, handler).adapter);
}
});
}
/**
* ReplicatorService creates and starts fibers; it must be stopped (or failed) in
* order to dispose them.
*/
public ReplicatorService(EventLoopGroup bossGroup,
EventLoopGroup workerGroup,
long nodeId,
int port,
ModuleInformationProvider moduleInformationProvider,
FiberSupplier fiberSupplier,
QuorumFileReaderWriter quorumFileReaderWriter) {
this.bossGroup = bossGroup;
this.workerGroup = workerGroup;
this.nodeId = nodeId;
this.port = port;
this.moduleInformationProvider = moduleInformationProvider;
this.fiberSupplier = fiberSupplier;
this.allChannels = new DefaultChannelGroup(workerGroup.next());
this.persister = new Persister(quorumFileReaderWriter);
}
public NettyIoAcceptor(NettyIoServiceFactory factory, IoHandler handler) {
this.factory = factory;
this.handler = handler;
channelGroup = new DefaultChannelGroup("sshd-acceptor-channels", GlobalEventExecutor.INSTANCE);;
bootstrap.group(factory.eventLoopGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new NettyIoSession(NettyIoAcceptor.this, handler).adapter);
}
});
}
@Inject
public void init() throws Exception
{
ChannelGroup clientChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
clientConnectionsShutdown = new ClientConnectionsShutdown(clientChannels,
GlobalEventExecutor.INSTANCE, discoveryClient);
addrsToChannelInitializers = chooseAddrsAndChannels(clientChannels);
server = new Server(
serverStatusManager,
addrsToChannelInitializers,
clientConnectionsShutdown,
eventLoopGroupMetrics,
new DefaultEventLoopConfig());
}
@Test
public void tcpHandlersAdded() {
ChannelConfig channelConfig = new ChannelConfig();
ChannelConfig channelDependencies = new ChannelConfig();
channelDependencies.set(ZuulDependencyKeys.registry, new NoopRegistry());
channelDependencies.set(
ZuulDependencyKeys.rateLimitingChannelHandlerProvider, new NullChannelHandlerProvider());
channelDependencies.set(
ZuulDependencyKeys.sslClientCertCheckChannelHandlerProvider, new NullChannelHandlerProvider());
ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
BaseZuulChannelInitializer init =
new BaseZuulChannelInitializer("1234", channelConfig, channelDependencies, channelGroup) {
@Override
protected void initChannel(Channel ch) {}
};
EmbeddedChannel channel = new EmbeddedChannel();
init.addTcpRelatedHandlers(channel.pipeline());
assertNotNull(channel.pipeline().context(SourceAddressChannelHandler.class));
assertNotNull(channel.pipeline().context(ServerChannelMetrics.class));
assertNotNull(channel.pipeline().context(PerEventLoopMetricsChannelHandler.Connections.class));
assertNotNull(channel.pipeline().context(ElbProxyProtocolChannelHandler.NAME));
assertNotNull(channel.pipeline().context(MaxInboundConnectionsHandler.class));
}
/**
* Simple constructor with the host and port to use to connect to.
* <p>This constructor manages the lifecycle of the {@link TcpClient} and
* underlying resources such as {@link ConnectionProvider},
* {@link LoopResources}, and {@link ChannelGroup}.
* <p>For full control over the initialization and lifecycle of the
* TcpClient, use {@link #ReactorNettyTcpClient(TcpClient, ReactorNettyCodec)}.
* @param host the host to connect to
* @param port the port to connect to
* @param codec for encoding and decoding the input/output byte streams
* @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec
*/
public ReactorNettyTcpClient(String host, int port, ReactorNettyCodec<P> codec) {
Assert.notNull(host, "host is required");
Assert.notNull(codec, "ReactorNettyCodec is required");
this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
this.loopResources = LoopResources.create("tcp-client-loop");
this.poolResources = ConnectionProvider.elastic("tcp-client-pool");
this.codec = codec;
this.tcpClient = TcpClient.create(this.poolResources)
.host(host).port(port)
.runOn(this.loopResources, false)
.doOnConnected(conn -> this.channelGroup.add(conn.channel()));
}
/**
* A variant of {@link #ReactorNettyTcpClient(String, int, ReactorNettyCodec)}
* that still manages the lifecycle of the {@link TcpClient} and underlying
* resources, but allows for direct configuration of other properties of the
* client through a {@code Function<TcpClient, TcpClient>}.
* @param clientConfigurer the configurer function
* @param codec for encoding and decoding the input/output byte streams
* @since 5.1.3
* @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec
*/
public ReactorNettyTcpClient(Function<TcpClient, TcpClient> clientConfigurer, ReactorNettyCodec<P> codec) {
Assert.notNull(codec, "ReactorNettyCodec is required");
this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
this.loopResources = LoopResources.create("tcp-client-loop");
this.poolResources = ConnectionProvider.elastic("tcp-client-pool");
this.codec = codec;
this.tcpClient = clientConfigurer.apply(TcpClient
.create(this.poolResources)
.runOn(this.loopResources, false)
.doOnConnected(conn -> this.channelGroup.add(conn.channel())));
}
public final void init() {
allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
serverGroup = Connector.newEventLoopGroup(2);
childGroup = Connector.newEventLoopGroup(configManager.getNettyThreads());
backendGroup = Connector.newEventLoopGroup(configManager.getNettyClientThreads());
}
private static void runTest(ThreadPerChannelEventLoopGroup loopGroup) throws InterruptedException {
int taskCount = 100;
EventExecutor testExecutor = new TestEventExecutor();
ChannelGroup channelGroup = new DefaultChannelGroup(testExecutor);
while (taskCount-- > 0) {
Channel channel = new EmbeddedChannel(NOOP_HANDLER);
loopGroup.register(new DefaultChannelPromise(channel, testExecutor));
channelGroup.add(channel);
}
channelGroup.close().sync();
loopGroup.shutdownGracefully(100, 200, TimeUnit.MILLISECONDS).sync();
assertTrue(loopGroup.isTerminated());
}
/**
* Test try to reproduce issue #1335
*/
@Test
public void testBindMultiple() throws Exception {
DefaultChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
NioEventLoopGroup group = new NioEventLoopGroup();
try {
for (int i = 0; i < 100; i++) {
Bootstrap udpBootstrap = new Bootstrap();
udpBootstrap.group(group).channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// Discard
ReferenceCountUtil.release(msg);
}
});
DatagramChannel datagramChannel = (DatagramChannel) udpBootstrap
.bind(new InetSocketAddress(0)).syncUninterruptibly().channel();
channelGroup.add(datagramChannel);
}
Assert.assertEquals(100, channelGroup.size());
} finally {
channelGroup.close().sync();
group.shutdownGracefully().sync();
}
}
/**
* Simple constructor with the host and port to use to connect to.
* <p>This constructor manages the lifecycle of the {@link TcpClient} and
* underlying resources such as {@link ConnectionProvider},
* {@link LoopResources}, and {@link ChannelGroup}.
* <p>For full control over the initialization and lifecycle of the
* TcpClient, use {@link #ReactorNettyTcpClient(TcpClient, ReactorNettyCodec)}.
* @param host the host to connect to
* @param port the port to connect to
* @param codec for encoding and decoding the input/output byte streams
* @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec
*/
public ReactorNettyTcpClient(String host, int port, ReactorNettyCodec<P> codec) {
Assert.notNull(host, "host is required");
Assert.notNull(codec, "ReactorNettyCodec is required");
this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
this.loopResources = LoopResources.create("tcp-client-loop");
this.poolResources = ConnectionProvider.elastic("tcp-client-pool");
this.codec = codec;
this.tcpClient = TcpClient.create(this.poolResources)
.host(host).port(port)
.runOn(this.loopResources, false)
.doOnConnected(conn -> this.channelGroup.add(conn.channel()));
}
/**
* A variant of {@link #ReactorNettyTcpClient(String, int, ReactorNettyCodec)}
* that still manages the lifecycle of the {@link TcpClient} and underlying
* resources, but allows for direct configuration of other properties of the
* client through a {@code Function<TcpClient, TcpClient>}.
* @param clientConfigurer the configurer function
* @param codec for encoding and decoding the input/output byte streams
* @since 5.1.3
* @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec
*/
public ReactorNettyTcpClient(Function<TcpClient, TcpClient> clientConfigurer, ReactorNettyCodec<P> codec) {
Assert.notNull(codec, "ReactorNettyCodec is required");
this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
this.loopResources = LoopResources.create("tcp-client-loop");
this.poolResources = ConnectionProvider.elastic("tcp-client-pool");
this.codec = codec;
this.tcpClient = clientConfigurer.apply(TcpClient
.create(this.poolResources)
.runOn(this.loopResources, false)
.doOnConnected(conn -> this.channelGroup.add(conn.channel())));
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, CreateGroupRequestPacket createGroupRequestPacket) {
List<String> userIdList = createGroupRequestPacket.getUserIdList();
List<String> userNameList = new ArrayList<>();
// 1. 创建一个 channel 分组
ChannelGroup channelGroup = new DefaultChannelGroup(ctx.executor());
// 2. 筛选出待加入群聊的用户的 channel 和 userName
for (String userId : userIdList) {
Channel channel = SessionUtil.getChannel(userId);
if (channel != null) {
channelGroup.add(channel);
userNameList.add(SessionUtil.getSession(channel).getUserName());
}
}
// 3. 创建群聊创建结果的响应
String groupId = IDUtil.randomId();
CreateGroupResponsePacket createGroupResponsePacket = new CreateGroupResponsePacket();
createGroupResponsePacket.setSuccess(true);
createGroupResponsePacket.setGroupId(groupId);
createGroupResponsePacket.setUserNameList(userNameList);
// 4. 给每个客户端发送拉群通知
channelGroup.writeAndFlush(createGroupResponsePacket);
System.out.print("群创建成功,id 为 " + createGroupResponsePacket.getGroupId() + ", ");
System.out.println("群里面有:" + createGroupResponsePacket.getUserNameList());
// 5. 保存群组相关的信息
SessionUtil.bindChannelGroup(groupId, channelGroup);
}
public LauncherNettyServer(LaunchServer server) {
LaunchServerConfig.NettyConfig config = server.config.netty;
NettyObjectFactory.setUsingEpoll(config.performance.usingEpoll);
if (config.performance.usingEpoll) {
LogHelper.debug("Netty: Epoll enabled");
}
if (config.performance.usingEpoll && !Epoll.isAvailable()) {
LogHelper.error("Epoll is not available: (netty,perfomance.usingEpoll configured wrongly)", Epoll.unavailabilityCause());
}
bossGroup = NettyObjectFactory.newEventLoopGroup(config.performance.bossThread);
workerGroup = NettyObjectFactory.newEventLoopGroup(config.performance.workerThread);
serverBootstrap = new ServerBootstrap();
service = new WebSocketService(new DefaultChannelGroup(GlobalEventExecutor.INSTANCE), server);
serverBootstrap.group(bossGroup, workerGroup)
.channelFactory(NettyObjectFactory.getServerSocketChannelFactory())
.handler(new LoggingHandler(config.logLevel))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
NettyConnectContext context = new NettyConnectContext();
//p.addLast(new LoggingHandler(LogLevel.INFO));
pipeline.addLast("http-codec", new HttpServerCodec());
pipeline.addLast("http-codec-compressor", new HttpObjectAggregator(65536));
if (server.config.netty.ipForwarding)
pipeline.addLast("forward-http", new NettyIpForwardHandler(context));
pipeline.addLast("websock-comp", new WebSocketServerCompressionHandler());
pipeline.addLast("websock-codec", new WebSocketServerProtocolHandler(WEBSOCKET_PATH, null, true));
if (server.config.netty.fileServerEnabled)
pipeline.addLast("fileserver", new FileServerHandler(server.updatesDir, true, config.showHiddenFiles));
pipeline.addLast("launchserver", new WebSocketFrameHandler(context, server, service));
pipelineHook.hook(context, ch);
}
});
}
public HttpServer(@NonNull EventLoopGroup acceptors,
@NonNull EventLoopGroup workers,
@NonNull HttpServerHandler handler) {
this.handler = handler;
this.channelGroup = new DefaultChannelGroup("netserver-channels",
GlobalEventExecutor.INSTANCE);
this.bootstrap = Bootstraps.serverBootstrap(acceptors, workers);
}
public MusServer(String ip, int port) {
this.ip = ip;
this.port = port;
this.channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
this.bootstrap = new ServerBootstrap();
this.connectionIds = new AtomicInteger(0);
}
/**
* 将channel添加到GroupChannel中
* @param groupId
* @param channel
*/
public static void addChannelToGroup(String groupId, Channel channel) {
DefaultChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
ChannelGroup returnChannelGroup = CHANNEL_GROUPS.putIfAbsent(groupId, channelGroup);
if(returnChannelGroup == null) {
// 不存在该ChannelGroup,第一次添加。
channelGroup.add(channel);
return;
}
// ChannelGroup已经存在
returnChannelGroup.add(channel);
}
<T extends ServerSocketChannel> RpcServer(EventLoopGroup eventLoopGroup, EventExecutorGroup eventExecutor, Class<T> channel, SocketAddress address) {
this.address = address;
this.allChannels = new DefaultChannelGroup(eventLoopGroup.next());
this.handler = new ServerHandler(allChannels);
this.bootstrap = new ServerBootstrap();
bootstrap.channel(channel);
bootstrap.childHandler(new ServerInitializer(eventExecutor, handler));
bootstrap.group(eventLoopGroup);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
}
public final void init() {
allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
serverGroup = Connector.newEventLoopGroup(2);
childGroup = Connector.newEventLoopGroup(configManager.getNettyThreads());
backendGroup = Connector.newEventLoopGroup(configManager.getNettyClientThreads());
}