下面列出了怎么用io.netty.channel.kqueue.KQueue的API类实例代码及写法,或者点击链接到github查看源代码。
private void setChannelInfo() {
if (Epoll.isAvailable()) {
this.group = new EpollEventLoopGroup();
this.channelType = EpollDomainSocketChannel.class;
LOGGER.info("Using epoll for Netty transport.");
} else {
if (!KQueue.isAvailable()) {
throw new RuntimeException("Unsupported OS '" + System.getProperty("os.name") + "', only Unix and Mac are supported");
}
this.group = new KQueueEventLoopGroup();
this.channelType = KQueueDomainSocketChannel.class;
LOGGER.info("Using KQueue for Netty transport.");
}
}
private static NettyChannelBuilder newNettyChannelBuilder(String targetUrl, String proxy)
throws IOException {
if (Strings.isNullOrEmpty(proxy)) {
return NettyChannelBuilder.forTarget(targetUrl).defaultLoadBalancingPolicy("round_robin");
}
if (!proxy.startsWith("unix:")) {
throw new IOException("Remote proxy unsupported: " + proxy);
}
DomainSocketAddress address = new DomainSocketAddress(proxy.replaceFirst("^unix:", ""));
NettyChannelBuilder builder =
NettyChannelBuilder.forAddress(address).overrideAuthority(targetUrl);
if (KQueue.isAvailable()) {
return builder
.channelType(KQueueDomainSocketChannel.class)
.eventLoopGroup(new KQueueEventLoopGroup());
}
if (Epoll.isAvailable()) {
return builder
.channelType(EpollDomainSocketChannel.class)
.eventLoopGroup(new EpollEventLoopGroup());
}
throw new IOException("Unix domain sockets are unsupported on this platform");
}
@Parameters
public static Collection createInputValues() {
ArrayList<Object[]> parameters =
new ArrayList<Object[]>(Arrays.asList(new Object[][] {{new InetTestServer()}}));
if (Epoll.isAvailable()) {
parameters.add(
new Object[] {
new UnixDomainServer(EpollServerDomainSocketChannel.class, EpollEventLoopGroup::new)
});
}
if (KQueue.isAvailable()) {
parameters.add(
new Object[] {
new UnixDomainServer(KQueueServerDomainSocketChannel.class, KQueueEventLoopGroup::new)
});
}
return parameters;
}
public NettyDomainSocketClient(ClientConfig config) {
super(config);
URI uri = config.baseUri();
Require.argument("URI scheme", uri.getScheme()).equalTo("unix");
if (Epoll.isAvailable()) {
this.eventLoopGroup = new EpollEventLoopGroup();
this.channelClazz = EpollDomainSocketChannel.class;
} else if (KQueue.isAvailable()) {
this.eventLoopGroup = new KQueueEventLoopGroup();
this.channelClazz = KQueueDomainSocketChannel.class;
} else {
throw new IllegalStateException("No native library for unix domain sockets is available");
}
this.path = uri.getPath();
}
public static EventLoopGroup newEventLoopGroup(int threads) {
if (useNativeIo && Epoll.isAvailable()) {
return new EpollEventLoopGroup(threads);
} else if (useNativeIo && KQueue.isAvailable()) {
return new KQueueEventLoopGroup(threads);
}
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(threads);
eventLoopGroup.setIoRatio(ConfigManager.getInstance().getIoRatio());
return eventLoopGroup;
}
public Class<? extends ServerChannel> getServerChannel() {
if (useNativeIo && Epoll.isAvailable()) {
return uds ? EpollServerDomainSocketChannel.class : EpollServerSocketChannel.class;
} else if (useNativeIo && KQueue.isAvailable()) {
return uds ? KQueueServerDomainSocketChannel.class : KQueueServerSocketChannel.class;
}
return NioServerSocketChannel.class;
}
public Class<? extends Channel> getClientChannel() {
if (useNativeIo && Epoll.isAvailable()) {
return uds ? EpollDomainSocketChannel.class : EpollSocketChannel.class;
} else if (useNativeIo && KQueue.isAvailable()) {
return uds ? KQueueDomainSocketChannel.class : KQueueSocketChannel.class;
}
return NioSocketChannel.class;
}
private void expectToFailIfNotOnLinux(Runnable call) {
// TODO(scott) Windows doesn't propagate the exception. Some times an unhandled exception in pipeline.
if (cChannel instanceof EpollSocketChannel || (!KQueue.isAvailable() && !Epoll.isAvailable())) {
call.run();
} else {
try {
call.run();
fail("Should fail");
} catch (ChannelException e) {
// Expected
}
}
}
public static EventLoopGroup newEventLoopGroup(int threads) {
if (useNativeIo && Epoll.isAvailable()) {
return new EpollEventLoopGroup(threads);
} else if (useNativeIo && KQueue.isAvailable()) {
return new KQueueEventLoopGroup(threads);
}
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(threads);
eventLoopGroup.setIoRatio(ConfigManager.getInstance().getIoRatio());
return eventLoopGroup;
}
public Class<? extends ServerChannel> getServerChannel() {
if (useNativeIo && Epoll.isAvailable()) {
return uds ? EpollServerDomainSocketChannel.class : EpollServerSocketChannel.class;
} else if (useNativeIo && KQueue.isAvailable()) {
return uds ? KQueueServerDomainSocketChannel.class : KQueueServerSocketChannel.class;
}
return NioServerSocketChannel.class;
}
public Class<? extends Channel> getClientChannel() {
if (useNativeIo && Epoll.isAvailable()) {
return uds ? EpollDomainSocketChannel.class : EpollSocketChannel.class;
} else if (useNativeIo && KQueue.isAvailable()) {
return uds ? KQueueDomainSocketChannel.class : KQueueSocketChannel.class;
}
return NioSocketChannel.class;
}
public static HttpBlobStore create(
DomainSocketAddress domainSocketAddress,
URI uri,
int timeoutMillis,
int remoteMaxConnections,
@Nullable final Credentials creds)
throws ConfigurationException, URISyntaxException, SSLException {
if (KQueue.isAvailable()) {
return new HttpBlobStore(
KQueueEventLoopGroup::new,
KQueueDomainSocketChannel.class,
uri,
timeoutMillis,
remoteMaxConnections,
creds,
domainSocketAddress);
} else if (Epoll.isAvailable()) {
return new HttpBlobStore(
EpollEventLoopGroup::new,
EpollDomainSocketChannel.class,
uri,
timeoutMillis,
remoteMaxConnections,
creds,
domainSocketAddress);
} else {
throw new ConfigurationException("Unix domain sockets are unsupported on this platform");
}
}
private static EventLoopGroup newEventLoopGroup() {
if (Epoll.isAvailable()) {
LOGGER.info("Using Netty epoll native transport.");
return new EpollEventLoopGroup(threadFactory);
}
if (KQueue.isAvailable()) {
LOGGER.info("Using Netty kqueue native transport.");
return new KQueueEventLoopGroup(threadFactory);
}
LOGGER.info("Using Netty NIO transport.");
return new NioEventLoopGroup(threadFactory);
}
public static final boolean isKQueueAvailable() {
try {
return Env.isMacOs() && KQueue.isAvailable();
} catch (NoClassDefFoundError noClassDefFoundError) {
ActiveMQClientLogger.LOGGER.unableToCheckKQueueAvailabilityNoClass();
return false;
} catch (Throwable e) {
ActiveMQClientLogger.LOGGER.unableToCheckKQueueAvailability(e);
return false;
}
}
public static boolean isAvailable(TransportOptions transportOptions) {
try {
return transportOptions.isUseKQueue() && KQueue.isAvailable();
} catch (NoClassDefFoundError ncdfe) {
LOG.debug("Unable to check for KQueue support due to missing class definition", ncdfe);
return false;
}
}
private void doTestKQueueSupport(boolean useKQueue) throws Exception {
assumeTrue(KQueue.isAvailable());
try (NettyEchoServer server = createEchoServer(createServerOptions())) {
server.start();
int port = server.getServerPort();
URI serverLocation = new URI("tcp://localhost:" + port);
TransportOptions options = createClientOptions();
options.setUseKQueue(useKQueue);
options.setUseEpoll(false);
Transport transport = createTransport(serverLocation, testListener, options);
try {
transport.connect(null, null);
LOG.info("Connected to server:{} as expected.", serverLocation);
} catch (Exception e) {
fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
}
assertTrue(transport.isConnected());
assertEquals(serverLocation, transport.getRemoteLocation());
assertKQueue("Transport should be using Kqueue", useKQueue, transport);
transport.close();
// Additional close should not fail or cause other problems.
transport.close();
}
assertTrue(!transportClosed); // Normal shutdown does not trigger the event.
assertTrue(exceptions.isEmpty());
assertTrue(data.isEmpty());
}
public static HttpCacheClient create(
DomainSocketAddress domainSocketAddress,
URI uri,
int timeoutSeconds,
int remoteMaxConnections,
boolean verifyDownloads,
ImmutableList<Entry<String, String>> extraHttpHeaders,
DigestUtil digestUtil,
@Nullable final Credentials creds)
throws Exception {
if (KQueue.isAvailable()) {
return new HttpCacheClient(
KQueueEventLoopGroup::new,
KQueueDomainSocketChannel.class,
uri,
timeoutSeconds,
remoteMaxConnections,
verifyDownloads,
extraHttpHeaders,
digestUtil,
creds,
domainSocketAddress);
} else if (Epoll.isAvailable()) {
return new HttpCacheClient(
EpollEventLoopGroup::new,
EpollDomainSocketChannel.class,
uri,
timeoutSeconds,
remoteMaxConnections,
verifyDownloads,
extraHttpHeaders,
digestUtil,
creds,
domainSocketAddress);
} else {
throw new Exception("Unix domain sockets are unsupported on this platform");
}
}
public void run() throws Exception {
// Configure the server.
if (Epoll.isAvailable()) {
doRun(new EpollEventLoopGroup(), EpollServerSocketChannel.class, IoMultiplexer.EPOLL);
} else if (KQueue.isAvailable()) {
doRun(new EpollEventLoopGroup(), KQueueServerSocketChannel.class, IoMultiplexer.KQUEUE);
} else {
doRun(new NioEventLoopGroup(), NioServerSocketChannel.class, IoMultiplexer.JDK);
}
}
public Connector(int port) {
this(port, useNativeIo && (Epoll.isAvailable() || KQueue.isAvailable()));
}
public static Class<? extends SocketChannel> socketChannel() {
return EPOLL ? EpollSocketChannel.class : KQueue.isAvailable() ? KQueueSocketChannel.class : NioSocketChannel.class;
}
public static Class<? extends ServerSocketChannel> serverSocketChannel() {
return EPOLL ? EpollServerSocketChannel.class : KQueue.isAvailable() ? KQueueServerSocketChannel.class : NioServerSocketChannel.class;
}
public static EventLoopGroup eventLoopGroup(int threads) {
return EPOLL ? new EpollEventLoopGroup(threads) : KQueue.isAvailable() ? new KQueueEventLoopGroup(threads) : new NioEventLoopGroup(
threads);
}
public static EventLoopGroup eventLoopGroup(int threads, ThreadFactory threadFactory) {
return EPOLL ? new EpollEventLoopGroup(threads, threadFactory) : KQueue.isAvailable() ? new KQueueEventLoopGroup(threads,
threadFactory) : new NioEventLoopGroup(
threads,
threadFactory);
}
public Connector(int port) {
this(port, useNativeIo && (Epoll.isAvailable() || KQueue.isAvailable()));
}
/**
* The native socket transport for BSD systems such as MacOS using JNI.
*/
public static boolean isNativeKQueueAvailable() {
return KQueue.isAvailable();
}
@Before
public void setupUnixDomainSocketServer() throws IOException, URISyntaxException {
Class<? extends ServerDomainSocketChannel> channelType = null;
if (Epoll.isAvailable()) {
group = new EpollEventLoopGroup(2);
channelType = EpollServerDomainSocketChannel.class;
} else if (KQueue.isAvailable()) {
group = new KQueueEventLoopGroup(2);
channelType = KQueueServerDomainSocketChannel.class;
}
assumeThat(group).isNotNull();
assumeThat(channelType).isNotNull();
ServerBootstrap bootstrap = new ServerBootstrap()
.group(group)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.channel(channelType)
.childHandler(new ChannelInitializer<DomainSocketChannel>() {
@Override
protected void initChannel(DomainSocketChannel ch) {
ch.pipeline()
.addLast("http-codec", new HttpServerCodec())
.addLast("http-keep-alive", new HttpServerKeepAliveHandler())
.addLast("http-aggregator", new HttpObjectAggregator(Integer.MAX_VALUE))
.addLast(new SimpleChannelInboundHandler<FullHttpRequest>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest req) {
byte[] bytes = responseText.get().getBytes(UTF_8);
ByteBuf text = Unpooled.wrappedBuffer(bytes);
FullHttpResponse res = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, text);
res.headers().set(CONTENT_TYPE, MediaType.PLAIN_TEXT_UTF_8.toString());
res.headers().set(CONTENT_LENGTH, bytes.length);
ctx.writeAndFlush(res);
}
});
}
});
Path temp = Files.createTempFile("domain-socket-test", "socket");
Files.deleteIfExists(temp);
SocketAddress address = new DomainSocketAddress(temp.toFile());
future = bootstrap.bind(address);
socket = new URI("unix", null, null, 0, temp.toString(), null, null);
}
/**
* Create a new {@link EventLoopGroup}.
*
* @param ioThreads number of threads
* @param threadFactory the {@link ThreadFactory} to use.
* @return The created {@link IoExecutor}
*/
public static EventLoopGroup createEventLoopGroup(int ioThreads, ThreadFactory threadFactory) {
validateIoThreads(ioThreads);
return Epoll.isAvailable() ? new EpollEventLoopGroup(ioThreads, threadFactory) :
KQueue.isAvailable() ? new KQueueEventLoopGroup(ioThreads, threadFactory) :
new NioEventLoopGroup(ioThreads, threadFactory);
}