下面列出了io.netty.channel.ChannelFactory#io.netty.channel.ReflectiveChannelFactory 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Attempts to determine the {@link ChannelFactory} class that corresponds to the given
* event loop group.
*
* @param eventLoopGroup the event loop group to determine the {@link ChannelFactory} for
* @return A {@link ChannelFactory} instance for the given event loop group.
*/
@SuppressWarnings("unchecked")
public static ChannelFactory<? extends Channel> resolveSocketChannelFactory(EventLoopGroup eventLoopGroup) {
if (eventLoopGroup instanceof DelegatingEventLoopGroup) {
return resolveSocketChannelFactory(((DelegatingEventLoopGroup) eventLoopGroup).getDelegate());
}
if (eventLoopGroup instanceof NioEventLoopGroup) {
return NioSocketChannel::new;
}
if (eventLoopGroup instanceof EpollEventLoopGroup) {
return EpollSocketChannel::new;
}
String socketFqcn = KNOWN_EL_GROUPS.get(eventLoopGroup.getClass().getName());
if (socketFqcn == null) {
throw new IllegalArgumentException("Unknown event loop group : " + eventLoopGroup.getClass());
}
return invokeSafely(() -> new ReflectiveChannelFactory(Class.forName(socketFqcn)));
}
@Test
public void setSoLingerChannelOption() throws IOException {
startServer();
Map<ChannelOption<?>, Object> channelOptions = new HashMap<>();
// set SO_LINGER option
int soLinger = 123;
channelOptions.put(ChannelOption.SO_LINGER, soLinger);
NettyClientTransport transport = new NettyClientTransport(
address, new ReflectiveChannelFactory<>(NioSocketChannel.class), channelOptions, group,
newNegotiator(), false, DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1L, false, authority,
null /* user agent */, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY,
new SocketPicker(), new FakeChannelLogger(), false);
transports.add(transport);
callMeMaybe(transport.start(clientTransportListener));
// verify SO_LINGER has been set
ChannelConfig config = transport.channel().config();
assertTrue(config instanceof SocketChannelConfig);
assertEquals(soLinger, ((SocketChannelConfig) config).getSoLinger());
}
@Test
public void keepAliveEnabled_shouldSetTcpUserTimeout() throws Exception {
assume().that(Utils.isEpollAvailable()).isTrue();
startServer();
EventLoopGroup epollGroup = Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP.create();
int keepAliveTimeMillis = 12345670;
int keepAliveTimeoutMillis = 1234567;
try {
NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE,
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */, true /* keep alive */,
TimeUnit.MILLISECONDS.toNanos(keepAliveTimeMillis),
TimeUnit.MILLISECONDS.toNanos(keepAliveTimeoutMillis),
new ReflectiveChannelFactory<>(Utils.DEFAULT_CLIENT_CHANNEL_TYPE), epollGroup);
callMeMaybe(transport.start(clientTransportListener));
ChannelOption<Integer> tcpUserTimeoutOption = Utils.maybeGetTcpUserTimeoutOption();
assertThat(tcpUserTimeoutOption).isNotNull();
// on some linux based system, the integer value may have error (usually +-1)
assertThat((double) transport.channel().config().getOption(tcpUserTimeoutOption))
.isWithin(5.0).of((double) keepAliveTimeoutMillis);
} finally {
epollGroup.shutdownGracefully();
}
}
@Test
public void keepAliveDisabled_shouldNotSetTcpUserTimeout() throws Exception {
assume().that(Utils.isEpollAvailable()).isTrue();
startServer();
EventLoopGroup epollGroup = Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP.create();
int keepAliveTimeMillis = 12345670;
try {
long keepAliveTimeNanos = TimeUnit.MILLISECONDS.toNanos(keepAliveTimeMillis);
NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE,
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */, false /* keep alive */,
keepAliveTimeNanos, keepAliveTimeNanos,
new ReflectiveChannelFactory<>(Utils.DEFAULT_CLIENT_CHANNEL_TYPE), epollGroup);
callMeMaybe(transport.start(clientTransportListener));
ChannelOption<Integer> tcpUserTimeoutOption = Utils.maybeGetTcpUserTimeoutOption();
assertThat(tcpUserTimeoutOption).isNotNull();
// default TCP_USER_TIMEOUT=0 (use the system default)
assertThat(transport.channel().config().getOption(tcpUserTimeoutOption)).isEqualTo(0);
} finally {
epollGroup.shutdownGracefully();
}
}
private void startServer(int maxStreamsPerConnection, int maxHeaderListSize) throws IOException {
server = new NettyServer(
TestUtils.testServerAddress(new InetSocketAddress(0)),
new ReflectiveChannelFactory<>(NioServerSocketChannel.class),
new HashMap<ChannelOption<?>, Object>(),
new HashMap<ChannelOption<?>, Object>(),
new FixedObjectPool<>(group), new FixedObjectPool<>(group), false, negotiator,
Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),
maxStreamsPerConnection,
false,
DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, maxHeaderListSize,
DEFAULT_SERVER_KEEPALIVE_TIME_NANOS, DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS,
MAX_CONNECTION_IDLE_NANOS_DISABLED,
MAX_CONNECTION_AGE_NANOS_DISABLED, MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE, true, 0,
channelz);
server.start(serverListener);
address = TestUtils.testServerAddress((InetSocketAddress) server.getListenSocketAddress());
authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort());
}
/**
* The {@link Class} which is used to create {@link Channel} instances from.
* You either use this or {@link #channelFactory(io.netty.channel.ChannelFactory)} if your
* {@link Channel} implementation has no no-args constructor.
* 用于创建通道实例的类。如果通道实现没有no-args构造函数,您可以使用这个或channelFactory(io.netty.channel.ChannelFactory)。
*/
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
@Test
public void getPort_notStarted() {
InetSocketAddress addr = new InetSocketAddress(0);
NettyServer ns = new NettyServer(
addr,
new ReflectiveChannelFactory<>(NioServerSocketChannel.class),
new HashMap<ChannelOption<?>, Object>(),
new HashMap<ChannelOption<?>, Object>(),
new FixedObjectPool<>(eventLoop),
new FixedObjectPool<>(eventLoop),
false,
ProtocolNegotiators.plaintext(),
Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),
1, // ignore
false, // ignore
1, // ignore
1, // ignore
1, // ignore
1, // ignore
1, 1, // ignore
1, 1, // ignore
true, 0, // ignore
channelz);
assertThat(ns.getListenSocketAddress()).isEqualTo(addr);
}
@Test
public void channelFactoryShouldSetSocketOptionKeepAlive() throws Exception {
startServer();
NettyClientTransport transport = newTransport(newNegotiator(),
DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, "testUserAgent", true,
TimeUnit.SECONDS.toNanos(10L), TimeUnit.SECONDS.toNanos(1L),
new ReflectiveChannelFactory<>(NioSocketChannel.class), group);
callMeMaybe(transport.start(clientTransportListener));
assertThat(transport.channel().config().getOption(ChannelOption.SO_KEEPALIVE))
.isTrue();
}
@Test
public void channelFactoryShouldNNotSetSocketOptionKeepAlive() throws Exception {
startServer();
NettyClientTransport transport = newTransport(newNegotiator(),
DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, "testUserAgent", true,
TimeUnit.SECONDS.toNanos(10L), TimeUnit.SECONDS.toNanos(1L),
new ReflectiveChannelFactory<>(LocalChannel.class), group);
callMeMaybe(transport.start(clientTransportListener));
assertThat(transport.channel().config().getOption(ChannelOption.SO_KEEPALIVE))
.isNull();
}
public DnsAddressResolverGroup(
Class<? extends DatagramChannel> channelType,
DnsServerAddressStreamProvider nameServerProvider) {
this(new ReflectiveChannelFactory<DatagramChannel>(channelType), nameServerProvider);
}
private static void testRecursiveResolveCache(boolean cache)
throws Exception {
final String hostname = "some.record.netty.io";
final String hostname2 = "some2.record.netty.io";
final TestDnsServer dnsServerAuthority = new TestDnsServer(new HashSet<String>(
Arrays.asList(hostname, hostname2)));
dnsServerAuthority.start();
TestDnsServer dnsServer = new RedirectingTestDnsServer(hostname,
dnsServerAuthority.localAddress().getAddress().getHostAddress());
dnsServer.start();
TestDnsCache nsCache = new TestDnsCache(cache ? new DefaultDnsCache() : NoopDnsCache.INSTANCE);
TestRecursiveCacheDnsQueryLifecycleObserverFactory lifecycleObserverFactory =
new TestRecursiveCacheDnsQueryLifecycleObserverFactory();
EventLoopGroup group = new NioEventLoopGroup(1);
DnsNameResolver resolver = new DnsNameResolver(
group.next(), new ReflectiveChannelFactory<DatagramChannel>(NioDatagramChannel.class),
NoopDnsCache.INSTANCE, nsCache, lifecycleObserverFactory, 3000, ResolvedAddressTypes.IPV4_ONLY, true,
10, true, 4096, false, HostsFileEntriesResolver.DEFAULT,
new SingletonDnsServerAddressStreamProvider(dnsServer.localAddress()),
DnsNameResolver.DEFAULT_SEARCH_DOMAINS, 0, true) {
@Override
int dnsRedirectPort(InetAddress server) {
return server.equals(dnsServerAuthority.localAddress().getAddress()) ?
dnsServerAuthority.localAddress().getPort() : DNS_PORT;
}
};
// Java7 will strip of the "." so we need to adjust the expected dnsname. Both are valid in terms of the RFC
// so its ok.
String expectedDnsName = PlatformDependent.javaVersion() == 7 ?
"dns4.some.record.netty.io" : "dns4.some.record.netty.io.";
try {
resolver.resolveAll(hostname).syncUninterruptibly();
TestDnsQueryLifecycleObserver observer = lifecycleObserverFactory.observers.poll();
assertNotNull(observer);
assertTrue(lifecycleObserverFactory.observers.isEmpty());
assertEquals(4, observer.events.size());
QueryWrittenEvent writtenEvent1 = (QueryWrittenEvent) observer.events.poll();
assertEquals(dnsServer.localAddress(), writtenEvent1.dnsServerAddress);
QueryRedirectedEvent redirectedEvent = (QueryRedirectedEvent) observer.events.poll();
assertEquals(expectedDnsName, redirectedEvent.nameServers.get(0).getHostName());
assertEquals(dnsServerAuthority.localAddress(), redirectedEvent.nameServers.get(0));
QueryWrittenEvent writtenEvent2 = (QueryWrittenEvent) observer.events.poll();
assertEquals(dnsServerAuthority.localAddress(), writtenEvent2.dnsServerAddress);
QuerySucceededEvent succeededEvent = (QuerySucceededEvent) observer.events.poll();
if (cache) {
assertNull(nsCache.cache.get("io.", null));
assertNull(nsCache.cache.get("netty.io.", null));
List<? extends DnsCacheEntry> entries = nsCache.cache.get("record.netty.io.", null);
assertEquals(1, entries.size());
assertNull(nsCache.cache.get(hostname, null));
// Test again via cache.
resolver.resolveAll(hostname).syncUninterruptibly();
observer = lifecycleObserverFactory.observers.poll();
assertNotNull(observer);
assertTrue(lifecycleObserverFactory.observers.isEmpty());
assertEquals(2, observer.events.size());
writtenEvent1 = (QueryWrittenEvent) observer.events.poll();
assertEquals(expectedDnsName, writtenEvent1.dnsServerAddress.getHostName());
assertEquals(dnsServerAuthority.localAddress(), writtenEvent1.dnsServerAddress);
succeededEvent = (QuerySucceededEvent) observer.events.poll();
resolver.resolveAll(hostname2).syncUninterruptibly();
observer = lifecycleObserverFactory.observers.poll();
assertNotNull(observer);
assertTrue(lifecycleObserverFactory.observers.isEmpty());
assertEquals(2, observer.events.size());
writtenEvent1 = (QueryWrittenEvent) observer.events.poll();
assertEquals(expectedDnsName, writtenEvent1.dnsServerAddress.getHostName());
assertEquals(dnsServerAuthority.localAddress(), writtenEvent1.dnsServerAddress);
succeededEvent = (QuerySucceededEvent) observer.events.poll();
// Check that it only queried the cache for record.netty.io.
assertNull(nsCache.cacheHits.get("io."));
assertNull(nsCache.cacheHits.get("netty.io."));
assertNotNull(nsCache.cacheHits.get("record.netty.io."));
assertNull(nsCache.cacheHits.get("some.record.netty.io."));
}
} finally {
resolver.close();
group.shutdownGracefully(0, 0, TimeUnit.SECONDS);
dnsServer.stop();
dnsServerAuthority.stop();
}
}
@Test
public void startStop() throws Exception {
InetSocketAddress addr = new InetSocketAddress(0);
class TestProtocolNegotiator implements ProtocolNegotiator {
boolean closed;
@Override public ChannelHandler newHandler(GrpcHttp2ConnectionHandler handler) {
throw new UnsupportedOperationException();
}
@Override public void close() {
closed = true;
}
@Override public AsciiString scheme() {
return Utils.HTTP;
}
}
TestProtocolNegotiator protocolNegotiator = new TestProtocolNegotiator();
NettyServer ns = new NettyServer(
addr,
new ReflectiveChannelFactory<>(NioServerSocketChannel.class),
new HashMap<ChannelOption<?>, Object>(),
new HashMap<ChannelOption<?>, Object>(),
new FixedObjectPool<>(eventLoop),
new FixedObjectPool<>(eventLoop),
false,
protocolNegotiator,
Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),
1, // ignore
false, // ignore
1, // ignore
1, // ignore
1, // ignore
1, // ignore
1, 1, // ignore
1, 1, // ignore
true, 0, // ignore
channelz);
final SettableFuture<Void> serverShutdownCalled = SettableFuture.create();
ns.start(new ServerListener() {
@Override
public ServerTransportListener transportCreated(ServerTransport transport) {
return new NoopServerTransportListener();
}
@Override
public void serverShutdown() {
serverShutdownCalled.set(null);
}
});
// Check that we got an actual port.
assertThat(((InetSocketAddress) ns.getListenSocketAddress()).getPort()).isGreaterThan(0);
// Cleanup
ns.shutdown();
// serverShutdown() signals that resources are freed
serverShutdownCalled.get(1, TimeUnit.SECONDS);
assertThat(protocolNegotiator.closed).isTrue();
}
@Test
public void channelzListenSocket() throws Exception {
InetSocketAddress addr = new InetSocketAddress(0);
NettyServer ns = new NettyServer(
addr,
new ReflectiveChannelFactory<>(NioServerSocketChannel.class),
new HashMap<ChannelOption<?>, Object>(),
new HashMap<ChannelOption<?>, Object>(),
new FixedObjectPool<>(eventLoop),
new FixedObjectPool<>(eventLoop),
false,
ProtocolNegotiators.plaintext(),
Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),
1, // ignore
false, // ignore
1, // ignore
1, // ignore
1, // ignore
1, // ignore
1, 1, // ignore
1, 1, // ignore
true, 0, // ignore
channelz);
final SettableFuture<Void> shutdownCompleted = SettableFuture.create();
ns.start(new ServerListener() {
@Override
public ServerTransportListener transportCreated(ServerTransport transport) {
return new NoopServerTransportListener();
}
@Override
public void serverShutdown() {
shutdownCompleted.set(null);
}
});
assertThat(((InetSocketAddress) ns.getListenSocketAddress()).getPort()).isGreaterThan(0);
// SocketStats won't be available until the event loop task of adding SocketStats created by
// ns.start() complete. So submit a noop task and await until it's drained.
eventLoop.submit(new Runnable() {
@Override
public void run() {}
}).await(5, TimeUnit.SECONDS);
InternalInstrumented<SocketStats> listenSocket = ns.getListenSocketStats();
assertSame(listenSocket, channelz.getSocket(id(listenSocket)));
// very basic sanity check of the contents
SocketStats socketStats = listenSocket.getStats().get();
assertEquals(ns.getListenSocketAddress(), socketStats.local);
assertNull(socketStats.remote);
// TODO(zpencer): uncomment when sock options are exposed
// by default, there are some socket options set on the listen socket
// assertThat(socketStats.socketOptions.additional).isNotEmpty();
// Cleanup
ns.shutdown();
shutdownCompleted.get();
// listen socket is removed
assertNull(channelz.getSocket(id(listenSocket)));
}
private NettyClientTransport newTransport(ProtocolNegotiator negotiator, int maxMsgSize,
int maxHeaderListSize, String userAgent, boolean enableKeepAlive) {
return newTransport(negotiator, maxMsgSize, maxHeaderListSize, userAgent, enableKeepAlive,
TimeUnit.SECONDS.toNanos(10L), TimeUnit.SECONDS.toNanos(1L),
new ReflectiveChannelFactory<>(NioSocketChannel.class), group);
}