下面列出了怎么用io.netty.channel.kqueue.KQueueEventLoopGroup的API类实例代码及写法,或者点击链接到github查看源代码。
private void setChannelInfo() {
if (Epoll.isAvailable()) {
this.group = new EpollEventLoopGroup();
this.channelType = EpollDomainSocketChannel.class;
LOGGER.info("Using epoll for Netty transport.");
} else {
if (!KQueue.isAvailable()) {
throw new RuntimeException("Unsupported OS '" + System.getProperty("os.name") + "', only Unix and Mac are supported");
}
this.group = new KQueueEventLoopGroup();
this.channelType = KQueueDomainSocketChannel.class;
LOGGER.info("Using KQueue for Netty transport.");
}
}
@Override
public void setIoRatio(int bossIoRatio, int workerIoRatio) {
EventLoopGroup boss = boss();
if (boss instanceof EpollEventLoopGroup) {
((EpollEventLoopGroup) boss).setIoRatio(bossIoRatio);
} else if (boss instanceof KQueueEventLoopGroup) {
((KQueueEventLoopGroup) boss).setIoRatio(bossIoRatio);
} else if (boss instanceof NioEventLoopGroup) {
((NioEventLoopGroup) boss).setIoRatio(bossIoRatio);
}
EventLoopGroup worker = worker();
if (worker instanceof EpollEventLoopGroup) {
((EpollEventLoopGroup) worker).setIoRatio(workerIoRatio);
} else if (worker instanceof KQueueEventLoopGroup) {
((KQueueEventLoopGroup) worker).setIoRatio(workerIoRatio);
} else if (worker instanceof NioEventLoopGroup) {
((NioEventLoopGroup) worker).setIoRatio(workerIoRatio);
}
}
@Override
public void setIoRatio(int bossIoRatio, int workerIoRatio) {
EventLoopGroup boss = boss();
if (boss instanceof EpollEventLoopGroup) {
((EpollEventLoopGroup) boss).setIoRatio(bossIoRatio);
} else if (boss instanceof KQueueEventLoopGroup) {
((KQueueEventLoopGroup) boss).setIoRatio(bossIoRatio);
}
EventLoopGroup worker = worker();
if (worker instanceof EpollEventLoopGroup) {
((EpollEventLoopGroup) worker).setIoRatio(workerIoRatio);
} else if (worker instanceof KQueueEventLoopGroup) {
((KQueueEventLoopGroup) worker).setIoRatio(workerIoRatio);
}
}
private void assertKQueue(String message, boolean expected, Transport transport) throws Exception {
Field group = null;
Class<?> transportType = transport.getClass();
while (transportType != null && group == null) {
try {
group = transportType.getDeclaredField("group");
} catch (NoSuchFieldException error) {
transportType = transportType.getSuperclass();
if (Object.class.equals(transportType)) {
transportType = null;
}
}
}
assertNotNull("Transport implementation unknown", group);
group.setAccessible(true);
if (expected) {
assertTrue(message, group.get(transport) instanceof KQueueEventLoopGroup);
} else {
assertFalse(message, group.get(transport) instanceof KQueueEventLoopGroup);
}
}
private static NettyChannelBuilder newNettyChannelBuilder(String targetUrl, String proxy)
throws IOException {
if (Strings.isNullOrEmpty(proxy)) {
return NettyChannelBuilder.forTarget(targetUrl).defaultLoadBalancingPolicy("round_robin");
}
if (!proxy.startsWith("unix:")) {
throw new IOException("Remote proxy unsupported: " + proxy);
}
DomainSocketAddress address = new DomainSocketAddress(proxy.replaceFirst("^unix:", ""));
NettyChannelBuilder builder =
NettyChannelBuilder.forAddress(address).overrideAuthority(targetUrl);
if (KQueue.isAvailable()) {
return builder
.channelType(KQueueDomainSocketChannel.class)
.eventLoopGroup(new KQueueEventLoopGroup());
}
if (Epoll.isAvailable()) {
return builder
.channelType(EpollDomainSocketChannel.class)
.eventLoopGroup(new EpollEventLoopGroup());
}
throw new IOException("Unix domain sockets are unsupported on this platform");
}
@Parameters
public static Collection createInputValues() {
ArrayList<Object[]> parameters =
new ArrayList<Object[]>(Arrays.asList(new Object[][] {{new InetTestServer()}}));
if (Epoll.isAvailable()) {
parameters.add(
new Object[] {
new UnixDomainServer(EpollServerDomainSocketChannel.class, EpollEventLoopGroup::new)
});
}
if (KQueue.isAvailable()) {
parameters.add(
new Object[] {
new UnixDomainServer(KQueueServerDomainSocketChannel.class, KQueueEventLoopGroup::new)
});
}
return parameters;
}
public NettyDomainSocketClient(ClientConfig config) {
super(config);
URI uri = config.baseUri();
Require.argument("URI scheme", uri.getScheme()).equalTo("unix");
if (Epoll.isAvailable()) {
this.eventLoopGroup = new EpollEventLoopGroup();
this.channelClazz = EpollDomainSocketChannel.class;
} else if (KQueue.isAvailable()) {
this.eventLoopGroup = new KQueueEventLoopGroup();
this.channelClazz = KQueueDomainSocketChannel.class;
} else {
throw new IllegalStateException("No native library for unix domain sockets is available");
}
this.path = uri.getPath();
}
public static EventLoopGroup newEventLoopGroup(int threads) {
if (useNativeIo && Epoll.isAvailable()) {
return new EpollEventLoopGroup(threads);
} else if (useNativeIo && KQueue.isAvailable()) {
return new KQueueEventLoopGroup(threads);
}
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(threads);
eventLoopGroup.setIoRatio(ConfigManager.getInstance().getIoRatio());
return eventLoopGroup;
}
/**
* Returns {@code true} if native kqueue transport should be used.
*
* @param group the used {@link EventLoopGroup}
* @return {@code true} if native transport should be used
*/
public static boolean useKQueue(EventLoopGroup group) {
// Check if we should use the kqueue transport. This is true if either the KQueueEventLoopGroup is used directly
// or if the passed group is a EventLoop and it's parent is an KQueueEventLoopGroup.
return group instanceof KQueueEventLoopGroup || (group instanceof EventLoop &&
((EventLoop) group).parent() instanceof KQueueEventLoopGroup);
}
public static EventLoopGroup newEventLoopGroup(int threads) {
if (useNativeIo && Epoll.isAvailable()) {
return new EpollEventLoopGroup(threads);
} else if (useNativeIo && KQueue.isAvailable()) {
return new KQueueEventLoopGroup(threads);
}
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(threads);
eventLoopGroup.setIoRatio(ConfigManager.getInstance().getIoRatio());
return eventLoopGroup;
}
public static HttpBlobStore create(
DomainSocketAddress domainSocketAddress,
URI uri,
int timeoutMillis,
int remoteMaxConnections,
@Nullable final Credentials creds)
throws ConfigurationException, URISyntaxException, SSLException {
if (KQueue.isAvailable()) {
return new HttpBlobStore(
KQueueEventLoopGroup::new,
KQueueDomainSocketChannel.class,
uri,
timeoutMillis,
remoteMaxConnections,
creds,
domainSocketAddress);
} else if (Epoll.isAvailable()) {
return new HttpBlobStore(
EpollEventLoopGroup::new,
EpollDomainSocketChannel.class,
uri,
timeoutMillis,
remoteMaxConnections,
creds,
domainSocketAddress);
} else {
throw new ConfigurationException("Unix domain sockets are unsupported on this platform");
}
}
private static EventLoopGroup newEventLoopGroup() {
if (Epoll.isAvailable()) {
LOGGER.info("Using Netty epoll native transport.");
return new EpollEventLoopGroup(threadFactory);
}
if (KQueue.isAvailable()) {
LOGGER.info("Using Netty kqueue native transport.");
return new KQueueEventLoopGroup(threadFactory);
}
LOGGER.info("Using Netty NIO transport.");
return new NioEventLoopGroup(threadFactory);
}
@Override
public boolean supportGroup(EventLoopGroup group) {
if (group instanceof ColocatedEventLoopGroup) {
group = ((ColocatedEventLoopGroup) group).get();
}
return group instanceof KQueueEventLoopGroup;
}
@Override
protected EventLoopGroup initEventLoopGroup(int nThreads, ThreadFactory tFactory) {
SocketChannelProvider.SocketType socketType = socketType();
switch (socketType) {
case NATIVE_EPOLL:
return new EpollEventLoopGroup(nThreads, tFactory);
case NATIVE_KQUEUE:
return new KQueueEventLoopGroup(nThreads, tFactory);
case JAVA_NIO:
return new NioEventLoopGroup(nThreads, tFactory);
default:
throw new IllegalStateException("Invalid socket type: " + socketType);
}
}
@Override
protected EventLoopGroup initEventLoopGroup(int nThreads, ThreadFactory tFactory) {
SocketChannelProvider.SocketType socketType = socketType();
switch (socketType) {
case NATIVE_EPOLL_DOMAIN:
return new EpollEventLoopGroup(nThreads, tFactory);
case NATIVE_KQUEUE_DOMAIN:
return new KQueueEventLoopGroup(nThreads, tFactory);
default:
throw new IllegalStateException("Invalid socket type: " + socketType);
}
}
@Override
public void setIoRatio(int workerIoRatio) {
EventLoopGroup worker = worker();
if (worker instanceof EpollEventLoopGroup) {
((EpollEventLoopGroup) worker).setIoRatio(workerIoRatio);
} else if (worker instanceof KQueueEventLoopGroup) {
((KQueueEventLoopGroup) worker).setIoRatio(workerIoRatio);
} else if (worker instanceof NioEventLoopGroup) {
((NioEventLoopGroup) worker).setIoRatio(workerIoRatio);
}
}
@Override
protected EventLoopGroup initEventLoopGroup(int nThreads, ThreadFactory tFactory) {
SocketChannelProvider.SocketType socketType = socketType();
switch (socketType) {
case NATIVE_EPOLL:
return new EpollEventLoopGroup(nThreads, tFactory);
case NATIVE_KQUEUE:
return new KQueueEventLoopGroup(nThreads, tFactory);
case JAVA_NIO:
return new NioEventLoopGroup(nThreads, tFactory);
default:
throw new IllegalStateException("Invalid socket type: " + socketType);
}
}
@Override
public void setIoRatio(int workerIoRatio) {
EventLoopGroup worker = worker();
if (worker instanceof EpollEventLoopGroup) {
((EpollEventLoopGroup) worker).setIoRatio(workerIoRatio);
} else if (worker instanceof KQueueEventLoopGroup) {
((KQueueEventLoopGroup) worker).setIoRatio(workerIoRatio);
}
}
@Override
protected EventLoopGroup initEventLoopGroup(int nThreads, ThreadFactory tFactory) {
SocketChannelProvider.SocketType socketType = socketType();
switch (socketType) {
case NATIVE_EPOLL_DOMAIN:
return new EpollEventLoopGroup(nThreads, tFactory);
case NATIVE_KQUEUE_DOMAIN:
return new KQueueEventLoopGroup(nThreads, tFactory);
default:
throw new IllegalStateException("Invalid socket type: " + socketType);
}
}
public static HttpCacheClient create(
DomainSocketAddress domainSocketAddress,
URI uri,
int timeoutSeconds,
int remoteMaxConnections,
boolean verifyDownloads,
ImmutableList<Entry<String, String>> extraHttpHeaders,
DigestUtil digestUtil,
@Nullable final Credentials creds)
throws Exception {
if (KQueue.isAvailable()) {
return new HttpCacheClient(
KQueueEventLoopGroup::new,
KQueueDomainSocketChannel.class,
uri,
timeoutSeconds,
remoteMaxConnections,
verifyDownloads,
extraHttpHeaders,
digestUtil,
creds,
domainSocketAddress);
} else if (Epoll.isAvailable()) {
return new HttpCacheClient(
EpollEventLoopGroup::new,
EpollDomainSocketChannel.class,
uri,
timeoutSeconds,
remoteMaxConnections,
verifyDownloads,
extraHttpHeaders,
digestUtil,
creds,
domainSocketAddress);
} else {
throw new Exception("Unix domain sockets are unsupported on this platform");
}
}
public static EventLoopGroup eventLoopGroup(int threads) {
return EPOLL ? new EpollEventLoopGroup(threads) : KQueue.isAvailable() ? new KQueueEventLoopGroup(threads) : new NioEventLoopGroup(
threads);
}
public static EventLoopGroup eventLoopGroup(int threads, ThreadFactory threadFactory) {
return EPOLL ? new EpollEventLoopGroup(threads, threadFactory) : KQueue.isAvailable() ? new KQueueEventLoopGroup(threads,
threadFactory) : new NioEventLoopGroup(
threads,
threadFactory);
}
@Override
public EventLoopGroup newEventLoopGroup(int threads, ThreadFactory factory) {
return new KQueueEventLoopGroup(threads, factory);
}
protected MasterSlaveConnectionManager(Config cfg, UUID id) {
this.id = id.toString();
Version.logVersion();
if (cfg.getTransportMode() == TransportMode.EPOLL) {
if (cfg.getEventLoopGroup() == null) {
this.group = new EpollEventLoopGroup(cfg.getNettyThreads(), new DefaultThreadFactory("redisson-netty"));
} else {
this.group = cfg.getEventLoopGroup();
}
this.socketChannelClass = EpollSocketChannel.class;
if (PlatformDependent.isAndroid()) {
this.resolverGroup = DefaultAddressResolverGroup.INSTANCE;
} else {
this.resolverGroup = cfg.getAddressResolverGroupFactory().create(EpollDatagramChannel.class, DnsServerAddressStreamProviders.platformDefault());
}
} else if (cfg.getTransportMode() == TransportMode.KQUEUE) {
if (cfg.getEventLoopGroup() == null) {
this.group = new KQueueEventLoopGroup(cfg.getNettyThreads(), new DefaultThreadFactory("redisson-netty"));
} else {
this.group = cfg.getEventLoopGroup();
}
this.socketChannelClass = KQueueSocketChannel.class;
if (PlatformDependent.isAndroid()) {
this.resolverGroup = DefaultAddressResolverGroup.INSTANCE;
} else {
this.resolverGroup = cfg.getAddressResolverGroupFactory().create(KQueueDatagramChannel.class, DnsServerAddressStreamProviders.platformDefault());
}
} else {
if (cfg.getEventLoopGroup() == null) {
this.group = new NioEventLoopGroup(cfg.getNettyThreads(), new DefaultThreadFactory("redisson-netty"));
} else {
this.group = cfg.getEventLoopGroup();
}
this.socketChannelClass = NioSocketChannel.class;
if (PlatformDependent.isAndroid()) {
this.resolverGroup = DefaultAddressResolverGroup.INSTANCE;
} else {
this.resolverGroup = cfg.getAddressResolverGroupFactory().create(NioDatagramChannel.class, DnsServerAddressStreamProviders.platformDefault());
}
}
if (cfg.getExecutor() == null) {
int threads = Runtime.getRuntime().availableProcessors() * 2;
if (cfg.getThreads() != 0) {
threads = cfg.getThreads();
}
executor = Executors.newFixedThreadPool(threads, new DefaultThreadFactory("redisson"));
} else {
executor = cfg.getExecutor();
}
this.cfg = cfg;
this.codec = cfg.getCodec();
this.commandExecutor = new CommandSyncService(this);
}
public static EventLoopGroup createGroup(int nThreads, ThreadFactory ioThreadfactory) {
return new KQueueEventLoopGroup(nThreads, ioThreadfactory);
}
@Before
public void setupUnixDomainSocketServer() throws IOException, URISyntaxException {
Class<? extends ServerDomainSocketChannel> channelType = null;
if (Epoll.isAvailable()) {
group = new EpollEventLoopGroup(2);
channelType = EpollServerDomainSocketChannel.class;
} else if (KQueue.isAvailable()) {
group = new KQueueEventLoopGroup(2);
channelType = KQueueServerDomainSocketChannel.class;
}
assumeThat(group).isNotNull();
assumeThat(channelType).isNotNull();
ServerBootstrap bootstrap = new ServerBootstrap()
.group(group)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.channel(channelType)
.childHandler(new ChannelInitializer<DomainSocketChannel>() {
@Override
protected void initChannel(DomainSocketChannel ch) {
ch.pipeline()
.addLast("http-codec", new HttpServerCodec())
.addLast("http-keep-alive", new HttpServerKeepAliveHandler())
.addLast("http-aggregator", new HttpObjectAggregator(Integer.MAX_VALUE))
.addLast(new SimpleChannelInboundHandler<FullHttpRequest>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest req) {
byte[] bytes = responseText.get().getBytes(UTF_8);
ByteBuf text = Unpooled.wrappedBuffer(bytes);
FullHttpResponse res = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, text);
res.headers().set(CONTENT_TYPE, MediaType.PLAIN_TEXT_UTF_8.toString());
res.headers().set(CONTENT_LENGTH, bytes.length);
ctx.writeAndFlush(res);
}
});
}
});
Path temp = Files.createTempFile("domain-socket-test", "socket");
Files.deleteIfExists(temp);
SocketAddress address = new DomainSocketAddress(temp.toFile());
future = bootstrap.bind(address);
socket = new URI("unix", null, null, 0, temp.toString(), null, null);
}
/**
* Create a new {@link EventLoopGroup}.
*
* @param ioThreads number of threads
* @param threadFactory the {@link ThreadFactory} to use.
* @return The created {@link IoExecutor}
*/
public static EventLoopGroup createEventLoopGroup(int ioThreads, ThreadFactory threadFactory) {
validateIoThreads(ioThreads);
return Epoll.isAvailable() ? new EpollEventLoopGroup(ioThreads, threadFactory) :
KQueue.isAvailable() ? new KQueueEventLoopGroup(ioThreads, threadFactory) :
new NioEventLoopGroup(ioThreads, threadFactory);
}
/**
* Determine if the {@code group} supports UDS.
* @param group the group to test.
* @return {@code true} if UDS are supported by {@code group}.
*/
static boolean isUnixDomainSocketSupported(EventExecutorGroup group) {
return group instanceof EpollEventLoopGroup || group instanceof KQueueEventLoopGroup;
}
/**
* Determine if {@code FileDescriptorSocketAddress} is supported.
* @param group the group to test.
* @return {@code true} if {@code FileDescriptorSocketAddress} are supported by {@code group}.
*/
static boolean isFileDescriptorSocketAddressSupported(EventExecutorGroup group) {
return group instanceof EpollEventLoopGroup || group instanceof KQueueEventLoopGroup;
}