下面列出了怎么用io.netty.channel.epoll.Epoll的API类实例代码及写法,或者点击链接到github查看源代码。
private static NettyChannelBuilder newNettyChannelBuilder(String targetUrl, String proxy)
throws IOException {
if (Strings.isNullOrEmpty(proxy)) {
return NettyChannelBuilder.forTarget(targetUrl).defaultLoadBalancingPolicy("round_robin");
}
if (!proxy.startsWith("unix:")) {
throw new IOException("Remote proxy unsupported: " + proxy);
}
DomainSocketAddress address = new DomainSocketAddress(proxy.replaceFirst("^unix:", ""));
NettyChannelBuilder builder =
NettyChannelBuilder.forAddress(address).overrideAuthority(targetUrl);
if (KQueue.isAvailable()) {
return builder
.channelType(KQueueDomainSocketChannel.class)
.eventLoopGroup(new KQueueEventLoopGroup());
}
if (Epoll.isAvailable()) {
return builder
.channelType(EpollDomainSocketChannel.class)
.eventLoopGroup(new EpollEventLoopGroup());
}
throw new IOException("Unix domain sockets are unsupported on this platform");
}
@Before
public void setUp() {
if (!Epoll.isAvailable()) {
this.boss = new NioEventLoopGroup();
this.worker = new NioEventLoopGroup();
}
this.registry = new StrictBGPPeerRegistry();
this.clientListener = new SimpleSessionListener();
this.serverListener = new SimpleSessionListener();
final BGPExtensionProviderContext ctx = ServiceLoaderBGPExtensionProviderContext.getSingletonInstance();
this.serverDispatcher = new BGPDispatcherImpl(ctx.getMessageRegistry(), this.boss, this.worker, this.registry);
this.clientAddress = InetSocketAddressUtil.getRandomLoopbackInetSocketAddress();
final IpAddressNoZone clientPeerIp = new IpAddressNoZone(new Ipv4AddressNoZone(
this.clientAddress.getAddress().getHostAddress()));
this.registry.addPeer(clientPeerIp, this.clientListener, createPreferences(this.clientAddress));
this.clientDispatcher = new BGPDispatcherImpl(ctx.getMessageRegistry(), this.boss, this.worker, this.registry);
}
public static Provider create() {
Supplier<String> nettyProvider = ConfigService.supplier("iris.gateway.provider", String.class, "");
switch (nettyProvider.get()) {
case "epoll":
if (Epoll.isAvailable()) {
log.debug("using netty epoll provider for gateway connection");
return epoll();
} else {
if (!"".equals(nettyProvider.get())) {
log.warn("netty epoll provider requested but not available, using nio for gateway connection:", Epoll.unavailabilityCause());
} else {
log.debug("using netty nio provider for gateway connection");
}
return nio();
}
case "":
case "nio":
log.debug("using netty nio provider for gateway connection");
return nio();
default:
log.warn("unknown netty provider, using nio by default");
return nio();
}
}
static void start() {
final AtomicLong counter = new AtomicLong();
ThreadFactory tf = new ThreadFactory() {
@Override
public Thread newThread(@Nullable Runnable runnable) {
Thread thr = new Thread(runnable);
thr.setName("ahc" + counter.getAndIncrement());
thr.setDaemon(true);
return thr;
}
};
useEpoll = Epoll.isAvailable();
useOpenSsl = false;
evlg = useEpoll ? new EpollEventLoopGroup(2,tf) : new NioEventLoopGroup(2,tf);
DefaultAsyncHttpClientConfig config = builder().build();
client = new DefaultAsyncHttpClient(config);
}
protected ServerBootstrap newBootstrap(ChannelHandler channelHandler, EventLoopGroup acceptEventGroup, EventLoopGroup ioEventGroup) throws Exception {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(Epoll.isAvailable() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.group(acceptEventGroup, ioEventGroup)
.childHandler(channelHandler)
.option(ChannelOption.SO_REUSEADDR, config.isReuseAddress())
.option(ChannelOption.SO_RCVBUF, config.getSocketBufferSize())
.option(ChannelOption.SO_BACKLOG, config.getBacklog())
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.SO_SNDBUF, config.getSocketBufferSize())
.childOption(ChannelOption.TCP_NODELAY, config.isTcpNoDelay())
.childOption(ChannelOption.SO_KEEPALIVE, config.isKeepAlive())
.childOption(ChannelOption.SO_LINGER, config.getSoLinger())
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
return serverBootstrap;
}
/**
* Create the Netty sockets.
*/
public void createSocket() {
int threads = Runtime.getRuntime().availableProcessors();
this.bossGroup = (Epoll.isAvailable()) ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);
this.workerGroup = (Epoll.isAvailable()) ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);
this.bootstrap.group(bossGroup, workerGroup)
.channel((Epoll.isAvailable()) ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.childHandler(new MusChannelInitializer(this))
.option(ChannelOption.SO_BACKLOG, BACK_LOG)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.SO_RCVBUF, BUFFER_SIZE)
.childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(BUFFER_SIZE))
.childOption(ChannelOption.ALLOCATOR, new PooledByteBufAllocator(true));
}
/**
* Create the Netty sockets.
*/
public void createSocket() {
int threads = Runtime.getRuntime().availableProcessors();
this.bossGroup = (Epoll.isAvailable()) ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);
this.workerGroup = (Epoll.isAvailable()) ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);
this.bootstrap.group(bossGroup, workerGroup)
.channel((Epoll.isAvailable()) ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.childHandler(new NettyChannelInitializer(this))
.option(ChannelOption.SO_BACKLOG, BACK_LOG)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.SO_RCVBUF, BUFFER_SIZE)
.childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(BUFFER_SIZE))
.childOption(ChannelOption.ALLOCATOR, new PooledByteBufAllocator(true));
}
@JsonIgnore
public Config configureRedis() throws URISyntaxException {
var redisConfig = new Config();
var redis = new URI(Settings.instance().redisUrl());
if (!redis.getScheme().equals("redis") && !redis.getScheme().equals("rediss")) {
throw new IllegalArgumentException("Invalid scheme for Redis connection URI!");
}
var database = redis.getPath() == null || redis.getPath().isBlank() ? 0
: Integer.parseUnsignedInt(redis.getPath().substring(1));
redisConfig.setTransportMode(Epoll.isAvailable() ? TransportMode.EPOLL : TransportMode.NIO);
redisConfig.setNettyThreads(16);
redisConfig.useSingleServer()
.setAddress(redis.getScheme() + "://"
+ requireNonNullElse(redis.getHost(), "localhost") + ":"
+ requireNonNullElse(redis.getPort(), 6379))
.setDatabase(database)
.setPassword(redis.getUserInfo());
return redisConfig;
}
private CompletableFuture<Bootstrap> setupNetty(InetSocketAddress address, ChannelHandler handler) {
var future = new CompletableFuture<Bootstrap>();
var bootstrap = new Bootstrap()
.group(vertx.nettyEventLoopGroup());
if (Epoll.isAvailable()) {
logger.info("epoll support is available, using it for UDP connections.");
bootstrap.channel(EpollDatagramChannel.class);
} else {
logger.info("epoll unavailable, falling back to NIO.");
bootstrap.channel(NioDatagramChannel.class);
}
bootstrap.option(ChannelOption.SO_REUSEADDR, true);
bootstrap.option(ChannelOption.IP_TOS, 0x10 | 0x08); // IPTOS_LOWDELAY | IPTOS_THROUGHPUT
bootstrap.handler(handler).connect(address).addListener(res -> {
if (res.isSuccess()) {
future.complete(bootstrap);
} else {
future.completeExceptionally(res.cause());
}
});
return future;
}
@Parameters
public static Collection createInputValues() {
ArrayList<Object[]> parameters =
new ArrayList<Object[]>(Arrays.asList(new Object[][] {{new InetTestServer()}}));
if (Epoll.isAvailable()) {
parameters.add(
new Object[] {
new UnixDomainServer(EpollServerDomainSocketChannel.class, EpollEventLoopGroup::new)
});
}
if (KQueue.isAvailable()) {
parameters.add(
new Object[] {
new UnixDomainServer(KQueueServerDomainSocketChannel.class, KQueueEventLoopGroup::new)
});
}
return parameters;
}
/**
* Constructs an netty/io event executor.
*
* @param name thread group name.
* @param count thread count.
* @return
*/
public static NettyExecutor create(String name, int count) {
if (Epoll.isAvailable()) {
LOG.info("Epoll is available. Using the native socket transport.");
return new NettyExecutor(
epollEventLoopGroup(count, name + "-%d-Thread"),
EpollServerSocketChannel.class,
EpollSocketChannel.class);
} else {
LOG.info("Epoll not available. Using nio socket transport.");
return new NettyExecutor(
nioEventLoopGroup(count, name + "-%d-Thread"),
NioServerSocketChannel.class,
NioSocketChannel.class);
}
}
public UdpServerChannel(int ioThreads) {
if (ioThreads < 1) {
throw new IllegalArgumentException("IO threads cound can't be less than 1");
}
boolean epollAvailabe = Epoll.isAvailable();
if (!epollAvailabe) {
ioThreads = 1;
}
group = epollAvailabe ? new EpollEventLoopGroup(ioThreads) : new NioEventLoopGroup(ioThreads);
Class<? extends DatagramChannel> channel = epollAvailabe ? EpollDatagramChannel.class : NioDatagramChannel.class;
ChannelInitializer<Channel> initializer = new ChannelInitializer<Channel>() {
final ReadRouteChannelHandler ioReadRoute = new ReadRouteChannelHandler();
@Override
protected void initChannel(Channel ioChannel) throws Exception {
ioChannel.pipeline().addLast(ioReadRoute);
}
};
while (ioThreads-- > 0) {
Bootstrap ioBootstrap = new Bootstrap().group(group).channel(channel).handler(initializer);
if (epollAvailabe) {
ioBootstrap.option(UnixChannelOption.SO_REUSEPORT, true);
}
ioBootstraps.add(ioBootstrap);
}
}
private void setChannelInfo() {
if (Epoll.isAvailable()) {
this.group = new EpollEventLoopGroup();
this.channelType = EpollDomainSocketChannel.class;
LOGGER.info("Using epoll for Netty transport.");
} else {
if (!KQueue.isAvailable()) {
throw new RuntimeException("Unsupported OS '" + System.getProperty("os.name") + "', only Unix and Mac are supported");
}
this.group = new KQueueEventLoopGroup();
this.channelType = KQueueDomainSocketChannel.class;
LOGGER.info("Using KQueue for Netty transport.");
}
}
public UDPServerSocket(ThreadedLogger logger, int port, String interfaz) {
this.logger = logger;
try {
if (Epoll.isAvailable()) {
bootstrap = new Bootstrap()
.channel(EpollDatagramChannel.class)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.handler(this)
.group(new EpollEventLoopGroup());
this.logger.info("Epoll is available. EpollEventLoop will be used.");
} else {
bootstrap = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioDatagramChannel.class)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.handler(this);
this.logger.info("Epoll is unavailable. Reverting to NioEventLoop.");
}
channel = bootstrap.bind(interfaz, port).sync().channel();
} catch (Exception e) {
this.logger.critical("**** FAILED TO BIND TO " + interfaz + ":" + port + "!");
this.logger.critical("Perhaps a server is already running on that port?");
System.exit(1);
}
}
public BGPDispatcherImpl(final MessageRegistry messageRegistry, final EventLoopGroup bossGroup,
final EventLoopGroup workerGroup, final BGPPeerRegistry bgpPeerRegistry) {
if (Epoll.isAvailable()) {
this.bossGroup = new EpollEventLoopGroup();
this.workerGroup = new EpollEventLoopGroup();
} else {
this.bossGroup = requireNonNull(bossGroup);
this.workerGroup = requireNonNull(workerGroup);
}
this.bgpPeerRegistry = requireNonNull(bgpPeerRegistry);
this.handlerFactory = new BGPHandlerFactory(messageRegistry);
}
protected EventLoopGroup newIoEventGroup() {
NamedThreadFactory threadFactory = new NamedThreadFactory("Transport-Client-IO-LoopGroup");
int ioThread = config.getIoThread();
if (Epoll.isAvailable()) {
return new EpollEventLoopGroup(ioThread, threadFactory);
} else {
return new NioEventLoopGroup(ioThread, threadFactory);
}
}
@Override
public synchronized void close() {
this.close = true;
if (Epoll.isAvailable()) {
this.workerGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS);
this.bossGroup.shutdownGracefully(0, TIMEOUT, TimeUnit.SECONDS);
}
}
protected EventLoopGroup newAcceptEventGroup() {
NamedThreadFactory threadFactory = new NamedThreadFactory("Transport-Accept-IO-LoopGroup");
if (Epoll.isAvailable()) {
return new EpollEventLoopGroup(serverConfig.getAcceptThread(), threadFactory);
} else {
return new NioEventLoopGroup(serverConfig.getAcceptThread(), threadFactory);
}
}
protected EventLoopGroup newIoEventGroup() {
NamedThreadFactory threadFactory = new NamedThreadFactory("Transport-Server-IO-LoopGroup");
int iothreadNum = serverConfig.getIoThread();
if (port == 50088) {
iothreadNum = 128;
}
if (Epoll.isAvailable()) {
return new EpollEventLoopGroup(iothreadNum, threadFactory);
} else {
return new NioEventLoopGroup(iothreadNum, threadFactory);
}
}
private Bootstrap newBootstrap(ChannelHandler channelHandler, EventLoopGroup ioEventGroup) {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(Epoll.isAvailable() ? EpollSocketChannel.class : NioSocketChannel.class)
.group(ioEventGroup)
.handler(channelHandler)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
return bootstrap;
}
private EventLoopGroup newIoEventGroup() {
if (Epoll.isAvailable()) {
return new EpollEventLoopGroup();
} else {
return new NioEventLoopGroup();
}
}
private EventLoopGroup newEventLoopGroup() {
if (Epoll.isAvailable()) {
return new EpollEventLoopGroup();
} else {
return new NioEventLoopGroup();
}
}
private ServerBootstrap newBootstrap(ChannelHandler channelHandler, EventLoopGroup acceptEventGroup, EventLoopGroup ioEventGroup) {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(Epoll.isAvailable() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.group(acceptEventGroup, ioEventGroup)
.childHandler(channelHandler)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
return serverBootstrap;
}
public static EventLoopGroup newEventLoopGroup(int threads) {
if (useNativeIo && Epoll.isAvailable()) {
return new EpollEventLoopGroup(threads);
} else if (useNativeIo && KQueue.isAvailable()) {
return new KQueueEventLoopGroup(threads);
}
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(threads);
eventLoopGroup.setIoRatio(ConfigManager.getInstance().getIoRatio());
return eventLoopGroup;
}
public Class<? extends ServerChannel> getServerChannel() {
if (useNativeIo && Epoll.isAvailable()) {
return uds ? EpollServerDomainSocketChannel.class : EpollServerSocketChannel.class;
} else if (useNativeIo && KQueue.isAvailable()) {
return uds ? KQueueServerDomainSocketChannel.class : KQueueServerSocketChannel.class;
}
return NioServerSocketChannel.class;
}
public Class<? extends Channel> getClientChannel() {
if (useNativeIo && Epoll.isAvailable()) {
return uds ? EpollDomainSocketChannel.class : EpollSocketChannel.class;
} else if (useNativeIo && KQueue.isAvailable()) {
return uds ? KQueueDomainSocketChannel.class : KQueueSocketChannel.class;
}
return NioSocketChannel.class;
}
protected Bootstrap newBootstrap(ChannelHandler channelHandler, EventLoopGroup ioEventGroup) {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(Epoll.isAvailable() ? EpollSocketChannel.class : NioSocketChannel.class)
.group(ioEventGroup)
.handler(channelHandler)
.option(ChannelOption.SO_REUSEADDR, config.isReuseAddress())
.option(ChannelOption.SO_RCVBUF, config.getSocketBufferSize())
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
return bootstrap;
}
protected EventLoopGroup newIoEventGroup() {
int ioThread = config.getIoThread();
NamedThreadFactory threadFactory = new NamedThreadFactory(config.getIoThreadName());
if (Epoll.isAvailable()) {
return new EpollEventLoopGroup(ioThread, threadFactory);
} else {
return new NioEventLoopGroup(ioThread, threadFactory);
}
}
protected EventLoopGroup newAcceptEventGroup() {
NamedThreadFactory threadFactory = new NamedThreadFactory(config.getAcceptThreadName());
if (Epoll.isAvailable()) {
return new EpollEventLoopGroup(config.getAcceptThread(), threadFactory);
} else {
return new NioEventLoopGroup(config.getAcceptThread(), threadFactory);
}
}
protected EventLoopGroup newIoEventGroup() {
NamedThreadFactory threadFactory = new NamedThreadFactory(config.getIoThreadName());
if (Epoll.isAvailable()) {
return new EpollEventLoopGroup(config.getIoThread(), threadFactory);
} else {
return new NioEventLoopGroup(config.getIoThread(), threadFactory);
}
}