下面列出了io.netty.channel.EventLoopGroup#next ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* ReplicatorService creates and starts fibers; it must be stopped (or failed) in
* order to dispose them.
*/
public ReplicatorService(EventLoopGroup bossGroup,
EventLoopGroup workerGroup,
long nodeId,
int port,
ModuleInformationProvider moduleInformationProvider,
FiberSupplier fiberSupplier,
QuorumFileReaderWriter quorumFileReaderWriter) {
this.bossGroup = bossGroup;
this.workerGroup = workerGroup;
this.nodeId = nodeId;
this.port = port;
this.moduleInformationProvider = moduleInformationProvider;
this.fiberSupplier = fiberSupplier;
this.allChannels = new DefaultChannelGroup(workerGroup.next());
this.persister = new Persister(quorumFileReaderWriter);
}
@Test
public void testExpire() throws Throwable {
EventLoopGroup group = new DefaultEventLoopGroup(1);
try {
EventLoop loop = group.next();
final DefaultDnsCache cache = new DefaultDnsCache();
cache.cache("netty.io", null, NetUtil.LOCALHOST, 1, loop);
cache.cache("netty.io", null, NetUtil.LOCALHOST6, 10000, loop);
Throwable error = loop.schedule(new Callable<Throwable>() {
@Override
public Throwable call() throws Exception {
try {
Assert.assertNull(cache.get("netty.io", null));
return null;
} catch (Throwable cause) {
return cause;
}
}
}, 1, TimeUnit.SECONDS).get();
if (error != null) {
throw error;
}
} finally {
group.shutdownGracefully();
}
}
@Test(expected = ConnectException.class)
public void testMap() throws Exception {
EventLoopGroup group = new LocalEventLoopGroup();
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
final Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr);
cb.group(group)
.channel(LocalChannel.class);
AbstractChannelPoolMap<EventLoop, SimpleChannelPool> poolMap =
new AbstractChannelPoolMap<EventLoop, SimpleChannelPool>() {
@Override
protected SimpleChannelPool newPool(EventLoop key) {
return new SimpleChannelPool(cb.clone(key), new TestChannelPoolHandler());
}
};
EventLoop loop = group.next();
assertFalse(poolMap.iterator().hasNext());
assertEquals(0, poolMap.size());
SimpleChannelPool pool = poolMap.get(loop);
assertEquals(1, poolMap.size());
assertTrue(poolMap.iterator().hasNext());
assertSame(pool, poolMap.get(loop));
assertTrue(poolMap.remove(loop));
assertFalse(poolMap.remove(loop));
assertFalse(poolMap.iterator().hasNext());
assertEquals(0, poolMap.size());
pool.acquire().syncUninterruptibly();
}
@Test
public void testRebuildSelector() throws Exception {
EventLoopGroup group = new NioEventLoopGroup(1);
final NioEventLoop loop = (NioEventLoop) group.next();
try {
Channel channel = new NioServerSocketChannel();
loop.register(channel).syncUninterruptibly();
Selector selector = loop.unwrappedSelector();
assertSame(selector, ((NioEventLoop) channel.eventLoop()).unwrappedSelector());
assertTrue(selector.isOpen());
// Submit to the EventLoop so we are sure its really executed in a non-async manner.
loop.submit(new Runnable() {
@Override
public void run() {
loop.rebuildSelector();
}
}).syncUninterruptibly();
Selector newSelector = ((NioEventLoop) channel.eventLoop()).unwrappedSelector();
assertTrue(newSelector.isOpen());
assertNotSame(selector, newSelector);
assertFalse(selector.isOpen());
channel.close().syncUninterruptibly();
} finally {
group.shutdownGracefully();
}
}
@Builder
private AffinityScheduler(ClassLoader contextLoader,
EventLoop acceptor,
EventLoop worker,
EventLoopGroup executors) {
this.contextLoader = contextLoader;
this.acceptor = acceptor;
this.worker = worker;
executor = executors.next();
}
<T extends ServerSocketChannel> RpcServer(EventLoopGroup eventLoopGroup, EventExecutorGroup eventExecutor, Class<T> channel, SocketAddress address) {
this.address = address;
this.allChannels = new DefaultChannelGroup(eventLoopGroup.next());
this.handler = new ServerHandler(allChannels);
this.bootstrap = new ServerBootstrap();
bootstrap.channel(channel);
bootstrap.childHandler(new ServerInitializer(eventExecutor, handler));
bootstrap.group(eventLoopGroup);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
}
public HttpOrHttp2ChannelPool(ChannelPool delegatePool,
EventLoopGroup group,
int maxConcurrency,
NettyConfiguration configuration) {
this.delegatePool = delegatePool;
this.maxConcurrency = maxConcurrency;
this.eventLoopGroup = group;
this.eventLoop = group.next();
this.configuration = configuration;
}
private static void testRecursiveResolveCache(boolean cache)
throws Exception {
final String hostname = "some.record.netty.io";
final String hostname2 = "some2.record.netty.io";
final TestDnsServer dnsServerAuthority = new TestDnsServer(new HashSet<String>(
Arrays.asList(hostname, hostname2)));
dnsServerAuthority.start();
TestDnsServer dnsServer = new RedirectingTestDnsServer(hostname,
dnsServerAuthority.localAddress().getAddress().getHostAddress());
dnsServer.start();
TestDnsCache nsCache = new TestDnsCache(cache ? new DefaultDnsCache() : NoopDnsCache.INSTANCE);
TestRecursiveCacheDnsQueryLifecycleObserverFactory lifecycleObserverFactory =
new TestRecursiveCacheDnsQueryLifecycleObserverFactory();
EventLoopGroup group = new NioEventLoopGroup(1);
DnsNameResolver resolver = new DnsNameResolver(
group.next(), new ReflectiveChannelFactory<DatagramChannel>(NioDatagramChannel.class),
NoopDnsCache.INSTANCE, nsCache, lifecycleObserverFactory, 3000, ResolvedAddressTypes.IPV4_ONLY, true,
10, true, 4096, false, HostsFileEntriesResolver.DEFAULT,
new SingletonDnsServerAddressStreamProvider(dnsServer.localAddress()),
DnsNameResolver.DEFAULT_SEARCH_DOMAINS, 0, true) {
@Override
int dnsRedirectPort(InetAddress server) {
return server.equals(dnsServerAuthority.localAddress().getAddress()) ?
dnsServerAuthority.localAddress().getPort() : DNS_PORT;
}
};
// Java7 will strip of the "." so we need to adjust the expected dnsname. Both are valid in terms of the RFC
// so its ok.
String expectedDnsName = PlatformDependent.javaVersion() == 7 ?
"dns4.some.record.netty.io" : "dns4.some.record.netty.io.";
try {
resolver.resolveAll(hostname).syncUninterruptibly();
TestDnsQueryLifecycleObserver observer = lifecycleObserverFactory.observers.poll();
assertNotNull(observer);
assertTrue(lifecycleObserverFactory.observers.isEmpty());
assertEquals(4, observer.events.size());
QueryWrittenEvent writtenEvent1 = (QueryWrittenEvent) observer.events.poll();
assertEquals(dnsServer.localAddress(), writtenEvent1.dnsServerAddress);
QueryRedirectedEvent redirectedEvent = (QueryRedirectedEvent) observer.events.poll();
assertEquals(expectedDnsName, redirectedEvent.nameServers.get(0).getHostName());
assertEquals(dnsServerAuthority.localAddress(), redirectedEvent.nameServers.get(0));
QueryWrittenEvent writtenEvent2 = (QueryWrittenEvent) observer.events.poll();
assertEquals(dnsServerAuthority.localAddress(), writtenEvent2.dnsServerAddress);
QuerySucceededEvent succeededEvent = (QuerySucceededEvent) observer.events.poll();
if (cache) {
assertNull(nsCache.cache.get("io.", null));
assertNull(nsCache.cache.get("netty.io.", null));
List<? extends DnsCacheEntry> entries = nsCache.cache.get("record.netty.io.", null);
assertEquals(1, entries.size());
assertNull(nsCache.cache.get(hostname, null));
// Test again via cache.
resolver.resolveAll(hostname).syncUninterruptibly();
observer = lifecycleObserverFactory.observers.poll();
assertNotNull(observer);
assertTrue(lifecycleObserverFactory.observers.isEmpty());
assertEquals(2, observer.events.size());
writtenEvent1 = (QueryWrittenEvent) observer.events.poll();
assertEquals(expectedDnsName, writtenEvent1.dnsServerAddress.getHostName());
assertEquals(dnsServerAuthority.localAddress(), writtenEvent1.dnsServerAddress);
succeededEvent = (QuerySucceededEvent) observer.events.poll();
resolver.resolveAll(hostname2).syncUninterruptibly();
observer = lifecycleObserverFactory.observers.poll();
assertNotNull(observer);
assertTrue(lifecycleObserverFactory.observers.isEmpty());
assertEquals(2, observer.events.size());
writtenEvent1 = (QueryWrittenEvent) observer.events.poll();
assertEquals(expectedDnsName, writtenEvent1.dnsServerAddress.getHostName());
assertEquals(dnsServerAuthority.localAddress(), writtenEvent1.dnsServerAddress);
succeededEvent = (QuerySucceededEvent) observer.events.poll();
// Check that it only queried the cache for record.netty.io.
assertNull(nsCache.cacheHits.get("io."));
assertNull(nsCache.cacheHits.get("netty.io."));
assertNotNull(nsCache.cacheHits.get("record.netty.io."));
assertNull(nsCache.cacheHits.get("some.record.netty.io."));
}
} finally {
resolver.close();
group.shutdownGracefully(0, 0, TimeUnit.SECONDS);
dnsServer.stop();
dnsServerAuthority.stop();
}
}