下面列出了怎么用io.netty.channel.epoll.EpollSocketChannel的API类实例代码及写法,或者点击链接到github查看源代码。
protected ChannelFuture doTcpConntecSync(DFTcpClientCfg cfg, EventLoopGroup ioGroup, ChannelHandler handler){
if(ioGroup == null){
return null;
}
Bootstrap boot = new Bootstrap();
boot.group(ioGroup)
.option(ChannelOption.ALLOCATOR,
PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.SO_KEEPALIVE, cfg.isKeepAlive())
.option(ChannelOption.SO_RCVBUF, cfg.getSoRecvBufLen())
.option(ChannelOption.SO_SNDBUF, cfg.getSoSendBufLen())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int)cfg.getConnTimeout())
.option(ChannelOption.TCP_NODELAY, cfg.isTcpNoDelay())
.handler(new TcpHandlerInit(false, cfg.getTcpProtocol(),
cfg.getTcpMsgMaxLength(), 0, 0, cfg.getWsUri(), null,
cfg.getDecoder(), cfg.getEncoder(), cfg.getUserHandler(), cfg.getSslCfg()
, cfg.getReqData(), handler));
if(ioGroup instanceof EpollEventLoopGroup){
boot.channel(EpollSocketChannel.class);
}else{
boot.channel(NioSocketChannel.class);
}
ChannelFuture future = boot.connect(cfg.host, cfg.port);
return future;
}
/**
* Constructs an netty/io event executor.
*
* @param name thread group name.
* @param count thread count.
* @return
*/
public static NettyExecutor create(String name, int count) {
if (Epoll.isAvailable()) {
LOG.info("Epoll is available. Using the native socket transport.");
return new NettyExecutor(
epollEventLoopGroup(count, name + "-%d-Thread"),
EpollServerSocketChannel.class,
EpollSocketChannel.class);
} else {
LOG.info("Epoll not available. Using nio socket transport.");
return new NettyExecutor(
nioEventLoopGroup(count, name + "-%d-Thread"),
NioServerSocketChannel.class,
NioSocketChannel.class);
}
}
/**
* Attempts to determine the {@link ChannelFactory} class that corresponds to the given
* event loop group.
*
* @param eventLoopGroup the event loop group to determine the {@link ChannelFactory} for
* @return A {@link ChannelFactory} instance for the given event loop group.
*/
@SuppressWarnings("unchecked")
public static ChannelFactory<? extends Channel> resolveSocketChannelFactory(EventLoopGroup eventLoopGroup) {
if (eventLoopGroup instanceof DelegatingEventLoopGroup) {
return resolveSocketChannelFactory(((DelegatingEventLoopGroup) eventLoopGroup).getDelegate());
}
if (eventLoopGroup instanceof NioEventLoopGroup) {
return NioSocketChannel::new;
}
if (eventLoopGroup instanceof EpollEventLoopGroup) {
return EpollSocketChannel::new;
}
String socketFqcn = KNOWN_EL_GROUPS.get(eventLoopGroup.getClass().getName());
if (socketFqcn == null) {
throw new IllegalArgumentException("Unknown event loop group : " + eventLoopGroup.getClass());
}
return invokeSafely(() -> new ReflectiveChannelFactory(Class.forName(socketFqcn)));
}
@Override
@SuppressWarnings("unchecked")
public <CHANNEL extends Channel> CHANNEL getChannel(Class<CHANNEL> channelClass) {
if (channelClass.equals(SocketChannel.class)) {
return (CHANNEL) new EpollSocketChannel();
}
if (channelClass.equals(ServerSocketChannel.class)) {
return (CHANNEL) new EpollServerSocketChannel();
}
if (channelClass.equals(DatagramChannel.class)) {
return (CHANNEL) new EpollDatagramChannel();
}
if (channelClass.equals(DomainSocketChannel.class)) {
return (CHANNEL) new EpollDomainSocketChannel();
}
if (channelClass.equals(ServerDomainSocketChannel.class)) {
return (CHANNEL) new EpollServerDomainSocketChannel();
}
throw new IllegalArgumentException("Unsupported channel type: " + channelClass.getSimpleName());
}
private void initEventLoopGroup() {
// try Epoll first and if that does work, use nio.
try {
clientGroup = new EpollEventLoopGroup(0, namedThreads("netty-messaging-event-epoll-client-%d", log));
serverGroup = new EpollEventLoopGroup(0, namedThreads("netty-messaging-event-epoll-server-%d", log));
serverChannelClass = EpollServerSocketChannel.class;
clientChannelClass = EpollSocketChannel.class;
return;
} catch (Throwable e) {
log.debug("Failed to initialize native (epoll) transport. "
+ "Reason: {}. Proceeding with nio.", e.getMessage());
}
clientGroup = new NioEventLoopGroup(0, namedThreads("netty-messaging-event-nio-client-%d", log));
serverGroup = new NioEventLoopGroup(0, namedThreads("netty-messaging-event-nio-server-%d", log));
serverChannelClass = NioServerSocketChannel.class;
clientChannelClass = NioSocketChannel.class;
}
public static ChannelConfiguration clientConfig(EventLoopGroup workerGroup) {
EventLoopGroup parent = workerGroup;
if (parent instanceof EventLoop) {
parent = ((EventLoop) workerGroup).parent();
}
Class<? extends Channel> channelClass;
if (parent instanceof EpollEventLoopGroup) {
channelClass = EpollSocketChannel.class;
} else if (parent instanceof NioEventLoopGroup) {
channelClass = NioSocketChannel.class;
} else {
throw new RuntimeException("Unsupported EventLoopGroup " + workerGroup.getClass());
}
return new ChannelConfiguration(workerGroup, channelClass);
}
/**
* This method will configure a worker EventLoopGroup and a Channel for use by a client. It will
* try to use the correct SocketChannel for the provided workerGroup.
*
* @param workerGroup uses EventLoopGroup in the ClientChannelConfiguration
* @return ClientChannelConfiguration
*/
public static ClientChannelConfiguration clientConfig(EventLoopGroup workerGroup) {
EventLoopGroup parent = workerGroup;
if (parent instanceof EventLoop) {
parent = ((EventLoop) workerGroup).parent();
}
Class<? extends Channel> channelClass;
if (parent instanceof EpollEventLoopGroup) {
channelClass = EpollSocketChannel.class;
} else if (parent instanceof NioEventLoopGroup) {
channelClass = NioSocketChannel.class;
} else {
throw new RuntimeException("Unsupported EventLoopGroup " + workerGroup.getClass());
}
return new ClientChannelConfiguration(workerGroup, channelClass);
}
/**
* Initializes event loop group.
*/
private void initEventLoopGroup() {
// try to use EpollEventLoopGroup if possible,
// if OS does not support native Epoll, fallback to use netty NIO
try {
eventLoopGroup = new EpollEventLoopGroup();
channelClass = EpollSocketChannel.class;
return;
} catch (Error e) {
log.debug("Failed to initialize native (epoll) transport. "
+ "Reason: {}. Proceeding with NIO event group.", e);
}
eventLoopGroup = new NioEventLoopGroup();
channelClass = NioServerSocketChannel.class;
}
/**
* @param inetSocketAddress IP Socket Address (IP address + port number).
* @param eventLoopGroup The event loop group.
* @param http2ClientConfig http2 client configs.
* @param inetSocketAddress Remote Socket Address (IP address + port number).
* @return {@link Bootstrap}
*/
static private Bootstrap createBootStrap(EventLoopGroup eventLoopGroup, Http2ClientConfig http2ClientConfig,
InetSocketAddress inetSocketAddress) {
Bootstrap b = new Bootstrap().group(eventLoopGroup)
.channel(Epoll.isAvailable() ? EpollSocketChannel.class : NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
// To honor http2 window size, WriteBufferWaterMark.high() should be greater or equal to http2 window size.
// Also see: https://github.com/netty/netty/issues/10193
.option(ChannelOption.WRITE_BUFFER_WATER_MARK,
new WriteBufferWaterMark(http2ClientConfig.http2InitialWindowSize,
2 * http2ClientConfig.http2InitialWindowSize))
.remoteAddress(inetSocketAddress);
if (http2ClientConfig.nettyReceiveBufferSize != -1) {
b.option(ChannelOption.SO_RCVBUF, http2ClientConfig.nettyReceiveBufferSize);
}
if (http2ClientConfig.nettySendBufferSize != -1) {
b.option(ChannelOption.SO_SNDBUF, http2ClientConfig.nettySendBufferSize);
}
return b;
}
protected Bootstrap newBootstrap(ChannelHandler channelHandler, EventLoopGroup ioEventGroup) {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(Epoll.isAvailable() ? EpollSocketChannel.class : NioSocketChannel.class)
.group(ioEventGroup)
.handler(channelHandler)
.option(ChannelOption.SO_REUSEADDR, config.isReuseAddress())
.option(ChannelOption.SO_RCVBUF, config.getSocketBufferSize())
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
return bootstrap;
}
private static NettyChannelBuilder newNettyClientChannel(Transport transport,
SocketAddress address, boolean tls, boolean testca, int flowControlWindow)
throws IOException {
NettyChannelBuilder builder =
NettyChannelBuilder.forAddress(address).flowControlWindow(flowControlWindow);
if (!tls) {
builder.usePlaintext();
} else if (testca) {
File cert = TestUtils.loadCert("ca.pem");
builder.sslContext(GrpcSslContexts.forClient().trustManager(cert).build());
}
DefaultThreadFactory tf = new DefaultThreadFactory("client-elg-", true /*daemon */);
switch (transport) {
case NETTY_NIO:
builder
.eventLoopGroup(new NioEventLoopGroup(0, tf))
.channelType(NioSocketChannel.class);
break;
case NETTY_EPOLL:
// These classes only work on Linux.
builder
.eventLoopGroup(new EpollEventLoopGroup(0, tf))
.channelType(EpollSocketChannel.class);
break;
case NETTY_UNIX_DOMAIN_SOCKET:
// These classes only work on Linux.
builder
.eventLoopGroup(new EpollEventLoopGroup(0, tf))
.channelType(EpollDomainSocketChannel.class);
break;
default:
// Should never get here.
throw new IllegalArgumentException("Unsupported transport: " + transport);
}
return builder;
}
/**
* 启动
*
* @throws java.lang.Throwable 异常
*/
@Override
public void start() throws Throwable {
this.workerGroup = Epoll.isAvailable() ? new EpollEventLoopGroup(Runtime.getRuntime().availableProcessors()) : new NioEventLoopGroup(Runtime.getRuntime().availableProcessors());
ClientHandler clientHandler = new ClientHandler();
clientHandler.setYuRPCClient(this);
this.bootstrap.group(this.workerGroup).channel(Epoll.isAvailable() ? EpollSocketChannel.class : NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, ClientConfig.connectionTimeoutMS)
.option(ChannelOption.SO_SNDBUF, 65535)
.option(ChannelOption.SO_RCVBUF, 65535)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(
new StringEncoder(),
new StringDecoder(),
clientHandler,
new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS),
new HeartbeatHandler()
);
}
});
clientInvocationHandler = new ClientInvocationHandler();
clientInvocationHandler.setYuRPCClient(this);
}
public static Class<? extends SocketChannel> getClientSocketChannel(){
if(SUPPORTS_EPOLL){
return EpollSocketChannel.class;
}else{
return NioSocketChannel.class;
}
}
private Bootstrap newBootstrap(ChannelHandler channelHandler, EventLoopGroup ioEventGroup) {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(Epoll.isAvailable() ? EpollSocketChannel.class : NioSocketChannel.class)
.group(ioEventGroup)
.handler(channelHandler)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
return bootstrap;
}
public Class<? extends Channel> getClientChannel() {
if (useNativeIo && Epoll.isAvailable()) {
return uds ? EpollDomainSocketChannel.class : EpollSocketChannel.class;
} else if (useNativeIo && KQueue.isAvailable()) {
return uds ? KQueueDomainSocketChannel.class : KQueueSocketChannel.class;
}
return NioSocketChannel.class;
}
protected Bootstrap newBootstrap(ChannelHandler channelHandler, EventLoopGroup ioEventGroup) {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(Epoll.isAvailable() ? EpollSocketChannel.class : NioSocketChannel.class)
.group(ioEventGroup)
.handler(channelHandler)
.option(ChannelOption.SO_REUSEADDR, config.isReuseAddress())
.option(ChannelOption.SO_RCVBUF, config.getSocketBufferSize())
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
return bootstrap;
}
/**
* Returns the correct {@link Class} to use with the given {@link EventLoopGroup}.
*
* @param group the {@link EventLoopGroup} for which the class is needed
* @param addressClass The class of the address that to connect to.
* @return the class that should be used for bootstrapping
*/
public static Class<? extends Channel> socketChannel(EventLoopGroup group,
Class<? extends SocketAddress> addressClass) {
if (useEpoll(group)) {
return DomainSocketAddress.class.isAssignableFrom(addressClass) ? EpollDomainSocketChannel.class :
EpollSocketChannel.class;
} else if (useKQueue(group)) {
return DomainSocketAddress.class.isAssignableFrom(addressClass) ? KQueueDomainSocketChannel.class :
KQueueSocketChannel.class;
} else {
return NioSocketChannel.class;
}
}
/**
* Returns the correct Channel that wraps the given filedescriptor or {@code null} if not supported.
*
* @param group the {@link EventLoopGroup} for which the class is needed
* @param address the filedescriptor to wrap.
* @return the class that should be used for bootstrapping
*/
@Nullable
public static Channel socketChannel(EventLoopGroup group, FileDescriptorSocketAddress address) {
if (useEpoll(group)) {
return new EpollSocketChannel(address.getValue());
}
if (useKQueue(group)) {
return new KQueueSocketChannel(address.getValue());
}
return null;
}
private void expectToFailIfNotOnLinux(Runnable call) {
// TODO(scott) Windows doesn't propagate the exception. Some times an unhandled exception in pipeline.
if (cChannel instanceof EpollSocketChannel || (!KQueue.isAvailable() && !Epoll.isAvailable())) {
call.run();
} else {
try {
call.run();
fail("Should fail");
} catch (ChannelException e) {
// Expected
}
}
}
public Bootstrap createBooStrap(String serviceName, final CommunicationOptions communicationOptions) {
// init netty bootstrap
Bootstrap bootstrap = new Bootstrap();
if (communicationOptions.getIoEventType() == BrpcConstants.IO_EVENT_NETTY_EPOLL) {
bootstrap.channel(EpollSocketChannel.class);
bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED);
} else {
bootstrap.channel(NioSocketChannel.class);
}
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, communicationOptions.getConnectTimeoutMillis());
bootstrap.option(ChannelOption.SO_KEEPALIVE, communicationOptions.isKeepAlive());
bootstrap.option(ChannelOption.SO_REUSEADDR, communicationOptions.isReuseAddr());
bootstrap.option(ChannelOption.TCP_NODELAY, communicationOptions.isTcpNoDelay());
bootstrap.option(ChannelOption.SO_RCVBUF, communicationOptions.getReceiveBufferSize());
bootstrap.option(ChannelOption.SO_SNDBUF, communicationOptions.getSendBufferSize());
BrpcThreadPoolManager threadPoolManager = BrpcThreadPoolManager.getInstance();
boolean isSharing = communicationOptions.isGlobalThreadPoolSharing();
ThreadPool workThreadPool = threadPoolManager.getOrCreateClientWorkThreadPool(
serviceName, isSharing, communicationOptions.getWorkThreadNum());
ExecutorService exceptionThreadPool = threadPoolManager.getExceptionThreadPool();
final RpcClientHandler rpcClientHandler = new RpcClientHandler(workThreadPool, exceptionThreadPool);
final ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
if (communicationOptions.getChannelType() == ChannelType.SINGLE_CONNECTION) {
ch.pipeline().addLast(new IdleStateHandler(
0, 0, communicationOptions.getKeepAliveTime()));
ch.pipeline().addLast(new IdleChannelHandler());
}
ch.pipeline().addLast(rpcClientHandler);
}
};
EventLoopGroup ioThreadPool = threadPoolManager.getOrCreateClientIoThreadPool(
serviceName, isSharing, communicationOptions.getIoThreadNum(), communicationOptions.getIoEventType());
bootstrap.group(ioThreadPool).handler(initializer);
return bootstrap;
}
protected ChannelFactory<? extends Channel> newClientChannelFactory() {
ChannelFactory<? extends Channel> channelFactory;
if(enableEpoll){
channelFactory = EpollSocketChannel::new;
}else {
channelFactory = NioSocketChannel::new;
}
return channelFactory;
}
public Class<? extends Channel> getClientChannel() {
if (useNativeIo && Epoll.isAvailable()) {
return uds ? EpollDomainSocketChannel.class : EpollSocketChannel.class;
} else if (useNativeIo && KQueue.isAvailable()) {
return uds ? KQueueDomainSocketChannel.class : KQueueSocketChannel.class;
}
return NioSocketChannel.class;
}
public static Class<? extends SocketChannel> getClientSocketChannel(){
if(SUPPORTS_EPOLL){
return EpollSocketChannel.class;
}else{
return NioSocketChannel.class;
}
}
private void createEpollClient(Listener listener) {
EpollEventLoopGroup workerGroup = new EpollEventLoopGroup(
getWorkThreadNum(), new DefaultThreadFactory(ThreadNames.T_TCP_CLIENT)
);
workerGroup.setIoRatio(getIoRate());
createClient(listener, workerGroup, EpollSocketChannel::new);
}
@Override
@SuppressWarnings("unchecked")
public <CHANNEL extends Channel> Class<? extends CHANNEL> getChannelClass(Class<CHANNEL> channelClass) {
if (channelClass.equals(SocketChannel.class)) {
return (Class<? extends CHANNEL>) EpollSocketChannel.class;
}
if (channelClass.equals(ServerSocketChannel.class)) {
return (Class<? extends CHANNEL>) EpollServerSocketChannel.class;
}
if (channelClass.equals(DatagramChannel.class)) {
return (Class<? extends CHANNEL>) EpollDatagramChannel.class;
}
throw new IllegalArgumentException("Unsupported channel type: " + channelClass.getSimpleName());
}
/**
* Create {@link Bootstrap}.
*/
private Bootstrap getNettyBootstrap() {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(Epoll.isAvailable() ? EpollSocketChannel.class : NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true);
return b;
}
/**
* Return a SocketChannel class suitable for the given EventLoopGroup implementation.
*
* @param eventLoopGroup
* @return
*/
public static Class<? extends SocketChannel> getClientSocketChannelClass(EventLoopGroup eventLoopGroup) {
if (eventLoopGroup instanceof EpollEventLoopGroup) {
return EpollSocketChannel.class;
} else {
return NioSocketChannel.class;
}
}
@SuppressWarnings("unchecked")
@Override
public T newChannel() {
switch (channelType) {
case ACCEPTOR:
switch (socketType) {
case JAVA_NIO:
return (T) new NioServerSocketChannel();
case NATIVE_EPOLL:
return (T) new EpollServerSocketChannel();
case NATIVE_KQUEUE:
return (T) new KQueueServerSocketChannel();
case NATIVE_EPOLL_DOMAIN:
return (T) new EpollServerDomainSocketChannel();
case NATIVE_KQUEUE_DOMAIN:
return (T) new KQueueServerDomainSocketChannel();
default:
throw new IllegalStateException("Invalid socket type: " + socketType);
}
case CONNECTOR:
switch (socketType) {
case JAVA_NIO:
return (T) new NioSocketChannel();
case NATIVE_EPOLL:
return (T) new EpollSocketChannel();
case NATIVE_KQUEUE:
return (T) new KQueueSocketChannel();
case NATIVE_EPOLL_DOMAIN:
return (T) new EpollDomainSocketChannel();
case NATIVE_KQUEUE_DOMAIN:
return (T) new KQueueDomainSocketChannel();
default:
throw new IllegalStateException("Invalid socket type: " + socketType);
}
default:
throw new IllegalStateException("Invalid channel type: " + channelType);
}
}
private @NotNull Bootstrap bootstrap(@NotNull TChannel topChannel) {
return new Bootstrap()
.group(this.childGroup)
.channel(useEpoll ? EpollSocketChannel.class : NioSocketChannel.class)
.handler(this.channelInitializer(false, topChannel))
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.WRITE_BUFFER_WATER_MARK,
new WriteBufferWaterMark(WRITE_BUFFER_LOW_WATER_MARK, WRITE_BUFFER_HIGH_WATER_MARK))
.validate();
}
private RedisClient(RedisClientConfig config) {
RedisClientConfig copy = new RedisClientConfig(config);
if (copy.getTimer() == null) {
copy.setTimer(new HashedWheelTimer());
hasOwnTimer = true;
}
if (copy.getGroup() == null) {
copy.setGroup(new NioEventLoopGroup());
hasOwnGroup = true;
}
if (copy.getExecutor() == null) {
copy.setExecutor(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2));
hasOwnExecutor = true;
}
if (copy.getResolverGroup() == null) {
if (config.getSocketChannelClass() == EpollSocketChannel.class) {
copy.setResolverGroup(new DnsAddressResolverGroup(EpollDatagramChannel.class, DnsServerAddressStreamProviders.platformDefault()));
} else {
copy.setResolverGroup(new DnsAddressResolverGroup(NioDatagramChannel.class, DnsServerAddressStreamProviders.platformDefault()));
}
hasOwnResolver = true;
}
this.config = copy;
this.executor = copy.getExecutor();
this.timer = copy.getTimer();
uri = copy.getAddress();
resolvedAddr = copy.getAddr();
if (resolvedAddr != null) {
resolvedAddrFuture.set(RedissonPromise.newSucceededFuture(resolvedAddr));
}
channels = new DefaultChannelGroup(copy.getGroup().next());
bootstrap = createBootstrap(copy, Type.PLAIN);
pubSubBootstrap = createBootstrap(copy, Type.PUBSUB);
this.commandTimeout = copy.getCommandTimeout();
}