下面列出了怎么用io.netty.channel.epoll.EpollServerDomainSocketChannel的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
@SuppressWarnings("unchecked")
public <CHANNEL extends Channel> CHANNEL getChannel(Class<CHANNEL> channelClass) {
if (channelClass.equals(SocketChannel.class)) {
return (CHANNEL) new EpollSocketChannel();
}
if (channelClass.equals(ServerSocketChannel.class)) {
return (CHANNEL) new EpollServerSocketChannel();
}
if (channelClass.equals(DatagramChannel.class)) {
return (CHANNEL) new EpollDatagramChannel();
}
if (channelClass.equals(DomainSocketChannel.class)) {
return (CHANNEL) new EpollDomainSocketChannel();
}
if (channelClass.equals(ServerDomainSocketChannel.class)) {
return (CHANNEL) new EpollServerDomainSocketChannel();
}
throw new IllegalArgumentException("Unsupported channel type: " + channelClass.getSimpleName());
}
@Parameters
public static Collection createInputValues() {
ArrayList<Object[]> parameters =
new ArrayList<Object[]>(Arrays.asList(new Object[][] {{new InetTestServer()}}));
if (Epoll.isAvailable()) {
parameters.add(
new Object[] {
new UnixDomainServer(EpollServerDomainSocketChannel.class, EpollEventLoopGroup::new)
});
}
if (KQueue.isAvailable()) {
parameters.add(
new Object[] {
new UnixDomainServer(KQueueServerDomainSocketChannel.class, KQueueEventLoopGroup::new)
});
}
return parameters;
}
@Override
public Void call() throws Exception {
OzoneConfiguration ozoneConfiguration = createOzoneConfiguration();
CsiConfig csiConfig = ozoneConfiguration.getObject(CsiConfig.class);
OzoneClient rpcClient = OzoneClientFactory.getRpcClient(ozoneConfiguration);
EpollEventLoopGroup group = new EpollEventLoopGroup();
if (csiConfig.getVolumeOwner().isEmpty()) {
throw new IllegalArgumentException(
"ozone.csi.owner is not set. You should set this configuration "
+ "variable to define which user should own all the created "
+ "buckets.");
}
Server server =
NettyServerBuilder
.forAddress(new DomainSocketAddress(csiConfig.getSocketPath()))
.channelType(EpollServerDomainSocketChannel.class)
.workerEventLoopGroup(group)
.bossEventLoopGroup(group)
.addService(new IdentitiyService())
.addService(new ControllerService(rpcClient,
csiConfig.getDefaultVolumeSize()))
.addService(new NodeService(csiConfig))
.build();
server.start();
server.awaitTermination();
rpcClient.close();
return null;
}
public Class<? extends ServerChannel> getServerChannel() {
if (useNativeIo && Epoll.isAvailable()) {
return uds ? EpollServerDomainSocketChannel.class : EpollServerSocketChannel.class;
} else if (useNativeIo && KQueue.isAvailable()) {
return uds ? KQueueServerDomainSocketChannel.class : KQueueServerSocketChannel.class;
}
return NioServerSocketChannel.class;
}
/**
* Returns the correct {@link Class} to use with the given {@link EventLoopGroup}.
*
* @param group the {@link EventLoopGroup} for which the class is needed
* @param addressClass The class of the address that the server socket will be bound to.
* @return the class that should be used for bootstrapping
*/
public static Class<? extends ServerChannel> serverChannel(EventLoopGroup group,
Class<? extends SocketAddress> addressClass) {
if (useEpoll(group)) {
return DomainSocketAddress.class.isAssignableFrom(addressClass) ? EpollServerDomainSocketChannel.class :
EpollServerSocketChannel.class;
} else if (useKQueue(group)) {
return DomainSocketAddress.class.isAssignableFrom(addressClass) ? KQueueServerDomainSocketChannel.class :
KQueueServerSocketChannel.class;
} else {
return NioServerSocketChannel.class;
}
}
public Class<? extends ServerChannel> getServerChannel() {
if (useNativeIo && Epoll.isAvailable()) {
return uds ? EpollServerDomainSocketChannel.class : EpollServerSocketChannel.class;
} else if (useNativeIo && KQueue.isAvailable()) {
return uds ? KQueueServerDomainSocketChannel.class : KQueueServerSocketChannel.class;
}
return NioServerSocketChannel.class;
}
@SuppressWarnings("unchecked")
@Override
public T newChannel() {
switch (channelType) {
case ACCEPTOR:
switch (socketType) {
case JAVA_NIO:
return (T) new NioServerSocketChannel();
case NATIVE_EPOLL:
return (T) new EpollServerSocketChannel();
case NATIVE_KQUEUE:
return (T) new KQueueServerSocketChannel();
case NATIVE_EPOLL_DOMAIN:
return (T) new EpollServerDomainSocketChannel();
case NATIVE_KQUEUE_DOMAIN:
return (T) new KQueueServerDomainSocketChannel();
default:
throw new IllegalStateException("Invalid socket type: " + socketType);
}
case CONNECTOR:
switch (socketType) {
case JAVA_NIO:
return (T) new NioSocketChannel();
case NATIVE_EPOLL:
return (T) new EpollSocketChannel();
case NATIVE_KQUEUE:
return (T) new KQueueSocketChannel();
case NATIVE_EPOLL_DOMAIN:
return (T) new EpollDomainSocketChannel();
case NATIVE_KQUEUE_DOMAIN:
return (T) new KQueueDomainSocketChannel();
default:
throw new IllegalStateException("Invalid socket type: " + socketType);
}
default:
throw new IllegalStateException("Invalid channel type: " + channelType);
}
}
/**
* Starts the server with given transport params.
*
* @param name UDS pathname or server name for {@link InProcessServerBuilder}
* @param useUds creates a UDS based server if true.
* @param useInterceptor if true, uses {@link SdsServerInterceptor} to grab & save Jwt Token.
*/
void startServer(String name, boolean useUds, boolean useInterceptor) throws IOException {
checkNotNull(name, "name");
discoveryService = new SecretDiscoveryServiceImpl();
ServerServiceDefinition serviceDefinition = discoveryService.bindService();
if (useInterceptor) {
serviceDefinition =
ServerInterceptors.intercept(serviceDefinition, new SdsServerInterceptor());
}
if (useUds) {
elg = new EpollEventLoopGroup();
boss = new EpollEventLoopGroup(1);
server =
NettyServerBuilder.forAddress(new DomainSocketAddress(name))
.bossEventLoopGroup(boss)
.workerEventLoopGroup(elg)
.channelType(EpollServerDomainSocketChannel.class)
.addService(serviceDefinition)
.directExecutor()
.build()
.start();
} else {
server =
InProcessServerBuilder.forName(name)
.addService(serviceDefinition)
.directExecutor()
.build()
.start();
}
}
@Before
public void setupUnixDomainSocketServer() throws IOException, URISyntaxException {
Class<? extends ServerDomainSocketChannel> channelType = null;
if (Epoll.isAvailable()) {
group = new EpollEventLoopGroup(2);
channelType = EpollServerDomainSocketChannel.class;
} else if (KQueue.isAvailable()) {
group = new KQueueEventLoopGroup(2);
channelType = KQueueServerDomainSocketChannel.class;
}
assumeThat(group).isNotNull();
assumeThat(channelType).isNotNull();
ServerBootstrap bootstrap = new ServerBootstrap()
.group(group)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.channel(channelType)
.childHandler(new ChannelInitializer<DomainSocketChannel>() {
@Override
protected void initChannel(DomainSocketChannel ch) {
ch.pipeline()
.addLast("http-codec", new HttpServerCodec())
.addLast("http-keep-alive", new HttpServerKeepAliveHandler())
.addLast("http-aggregator", new HttpObjectAggregator(Integer.MAX_VALUE))
.addLast(new SimpleChannelInboundHandler<FullHttpRequest>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest req) {
byte[] bytes = responseText.get().getBytes(UTF_8);
ByteBuf text = Unpooled.wrappedBuffer(bytes);
FullHttpResponse res = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, text);
res.headers().set(CONTENT_TYPE, MediaType.PLAIN_TEXT_UTF_8.toString());
res.headers().set(CONTENT_LENGTH, bytes.length);
ctx.writeAndFlush(res);
}
});
}
});
Path temp = Files.createTempFile("domain-socket-test", "socket");
Files.deleteIfExists(temp);
SocketAddress address = new DomainSocketAddress(temp.toFile());
future = bootstrap.bind(address);
socket = new URI("unix", null, null, 0, temp.toString(), null, null);
}