下面列出了怎么用io.netty.channel.ChannelFactory的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testChannelFactoryFailureNotifiesPromise() throws Exception {
final RuntimeException exception = new RuntimeException("newChannel crash");
final Bootstrap bootstrap = new Bootstrap()
.handler(dummyHandler)
.group(groupA)
.channelFactory(new ChannelFactory<Channel>() {
@Override
public Channel newChannel() {
throw exception;
}
});
ChannelFuture connectFuture = bootstrap.connect(LocalAddress.ANY);
// Should fail with the RuntimeException.
assertThat(connectFuture.await(10000), is(true));
assertThat(connectFuture.cause(), sameInstance((Throwable) exception));
assertThat(connectFuture.channel(), is(not(nullValue())));
}
/**
* Attempts to determine the {@link ChannelFactory} class that corresponds to the given
* event loop group.
*
* @param eventLoopGroup the event loop group to determine the {@link ChannelFactory} for
* @return A {@link ChannelFactory} instance for the given event loop group.
*/
@SuppressWarnings("unchecked")
public static ChannelFactory<? extends Channel> resolveSocketChannelFactory(EventLoopGroup eventLoopGroup) {
if (eventLoopGroup instanceof DelegatingEventLoopGroup) {
return resolveSocketChannelFactory(((DelegatingEventLoopGroup) eventLoopGroup).getDelegate());
}
if (eventLoopGroup instanceof NioEventLoopGroup) {
return NioSocketChannel::new;
}
if (eventLoopGroup instanceof EpollEventLoopGroup) {
return EpollSocketChannel::new;
}
String socketFqcn = KNOWN_EL_GROUPS.get(eventLoopGroup.getClass().getName());
if (socketFqcn == null) {
throw new IllegalArgumentException("Unknown event loop group : " + eventLoopGroup.getClass());
}
return invokeSafely(() -> new ReflectiveChannelFactory(Class.forName(socketFqcn)));
}
@Test
public void customChannelFactoryIsUsed() throws Exception {
ChannelFactory channelFactory = mock(ChannelFactory.class);
when(channelFactory.newChannel()).thenAnswer((Answer<NioSocketChannel>) invocationOnMock -> new NioSocketChannel());
EventLoopGroup customEventLoopGroup = new NioEventLoopGroup();
SdkAsyncHttpClient customClient =
NettyNioAsyncHttpClient.builder()
.eventLoopGroup(SdkEventLoopGroup.create(customEventLoopGroup, channelFactory))
.build();
makeSimpleRequest(customClient);
customClient.close();
Mockito.verify(channelFactory, atLeastOnce()).newChannel();
assertThat(customEventLoopGroup.isShuttingDown()).isFalse();
customEventLoopGroup.shutdownGracefully().awaitUninterruptibly();
}
@Test
public void responseConnectionReused_shouldReleaseChannel() throws Exception {
ChannelFactory channelFactory = mock(ChannelFactory.class);
EventLoopGroup customEventLoopGroup = new NioEventLoopGroup(1);
NioSocketChannel channel = new NioSocketChannel();
when(channelFactory.newChannel()).thenAnswer((Answer<NioSocketChannel>) invocationOnMock -> channel);
SdkEventLoopGroup eventLoopGroup = SdkEventLoopGroup.create(customEventLoopGroup, channelFactory);
NettyNioAsyncHttpClient customClient =
(NettyNioAsyncHttpClient) NettyNioAsyncHttpClient.builder()
.eventLoopGroup(eventLoopGroup)
.maxConcurrency(1)
.build();
makeSimpleRequest(customClient);
verifyChannelRelease(channel);
assertThat(channel.isShutdown()).isFalse();
customClient.close();
eventLoopGroup.eventLoopGroup().shutdownGracefully().awaitUninterruptibly();
}
@Override
public EventLoopGroup init(Bootstrap bootstrap, final DockerClientConfig dockerClientConfig) {
EventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(0, new DefaultThreadFactory(threadPrefix));
InetAddress addr = InetAddress.getLoopbackAddress();
final SocketAddress proxyAddress = new InetSocketAddress(addr, 8008);
Security.addProvider(new BouncyCastleProvider());
ChannelFactory<NioSocketChannel> factory = () -> configure(new NioSocketChannel());
bootstrap.group(nioEventLoopGroup).channelFactory(factory)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(final SocketChannel channel) throws Exception {
// channel.pipeline().addLast(new
// HttpProxyHandler(proxyAddress));
channel.pipeline().addLast(new HttpClientCodec());
channel.pipeline().addLast(new HttpContentDecompressor());
}
});
return nioEventLoopGroup;
}
NettyTransportFactory(ProtocolNegotiator protocolNegotiator,
ChannelFactory<? extends Channel> channelFactory,
Map<ChannelOption<?>, ?> channelOptions, ObjectPool<? extends EventLoopGroup> groupPool,
boolean autoFlowControl, int flowControlWindow, int maxMessageSize, int maxHeaderListSize,
long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls,
TransportTracer.Factory transportTracerFactory, LocalSocketPicker localSocketPicker,
boolean useGetForSafeMethods) {
this.protocolNegotiator = checkNotNull(protocolNegotiator, "protocolNegotiator");
this.channelFactory = channelFactory;
this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
this.groupPool = groupPool;
this.group = groupPool.getObject();
this.autoFlowControl = autoFlowControl;
this.flowControlWindow = flowControlWindow;
this.maxMessageSize = maxMessageSize;
this.maxHeaderListSize = maxHeaderListSize;
this.keepAliveTimeNanos = new AtomicBackoff("keepalive time nanos", keepAliveTimeNanos);
this.keepAliveTimeoutNanos = keepAliveTimeoutNanos;
this.keepAliveWithoutCalls = keepAliveWithoutCalls;
this.transportTracerFactory = transportTracerFactory;
this.localSocketPicker =
localSocketPicker != null ? localSocketPicker : new LocalSocketPicker();
this.useGetForSafeMethods = useGetForSafeMethods;
}
private NettyClientTransport newTransport(ProtocolNegotiator negotiator, int maxMsgSize,
int maxHeaderListSize, String userAgent, boolean enableKeepAlive, long keepAliveTimeNano,
long keepAliveTimeoutNano, ChannelFactory<? extends Channel> channelFactory,
EventLoopGroup group) {
if (!enableKeepAlive) {
keepAliveTimeNano = KEEPALIVE_TIME_NANOS_DISABLED;
}
NettyClientTransport transport = new NettyClientTransport(
address, channelFactory, new HashMap<ChannelOption<?>, Object>(), group,
negotiator, false, DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize,
keepAliveTimeNano, keepAliveTimeoutNano,
false, authority, userAgent, tooManyPingsRunnable,
new TransportTracer(), eagAttributes, new SocketPicker(), new FakeChannelLogger(), false);
transports.add(transport);
return transport;
}
public List<BootstrapComboFactory<Bootstrap, Bootstrap>> datagram() {
// Make the list of Bootstrap factories.
List<BootstrapFactory<Bootstrap>> bfs = Arrays.asList(
new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(nioWorkerGroup).channelFactory(new ChannelFactory<Channel>() {
@Override
public Channel newChannel() {
return new NioDatagramChannel(InternetProtocolFamily.IPv4);
}
@Override
public String toString() {
return NioDatagramChannel.class.getSimpleName() + ".class";
}
});
}
},
new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(oioWorkerGroup).channel(OioDatagramChannel.class)
.option(ChannelOption.SO_TIMEOUT, OIO_SO_TIMEOUT);
}
}
);
// Populare the combinations.
return combo(bfs, bfs);
}
/**
* @deprecated Override {@link #newNameResolver(EventLoop, ChannelFactory, DnsServerAddressStreamProvider)}.
*/
@Deprecated
protected AddressResolver<InetSocketAddress> newResolver(
EventLoop eventLoop, ChannelFactory<? extends DatagramChannel> channelFactory,
DnsServerAddressStreamProvider nameServerProvider) throws Exception {
final NameResolver<InetAddress> resolver = new InflightNameResolver<InetAddress>(
eventLoop,
newNameResolver(eventLoop, channelFactory, nameServerProvider),
resolvesInProgress,
resolveAllsInProgress);
return newAddressResolver(eventLoop, resolver);
}
/**
* Creates a new {@link NameResolver}. Override this method to create an alternative {@link NameResolver}
* implementation or override the default configuration.
*/
protected NameResolver<InetAddress> newNameResolver(EventLoop eventLoop,
ChannelFactory<? extends DatagramChannel> channelFactory,
DnsServerAddressStreamProvider nameServerProvider)
throws Exception {
return new DnsNameResolverBuilder(eventLoop)
.channelFactory(channelFactory)
.nameServerProvider(nameServerProvider)
.build();
}
@Override
public List<TestsuitePermutation.BootstrapComboFactory<Bootstrap, Bootstrap>> datagram() {
// Make the list of Bootstrap factories.
@SuppressWarnings("unchecked")
List<BootstrapFactory<Bootstrap>> bfs = Arrays.asList(
new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(nioWorkerGroup).channelFactory(new ChannelFactory<Channel>() {
@Override
public Channel newChannel() {
return new NioDatagramChannel(InternetProtocolFamily.IPv4);
}
@Override
public String toString() {
return NioDatagramChannel.class.getSimpleName() + ".class";
}
});
}
},
new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(KQUEUE_WORKER_GROUP).channel(KQueueDatagramChannel.class);
}
}
);
return combo(bfs, bfs);
}
@Override
public List<TestsuitePermutation.BootstrapComboFactory<Bootstrap, Bootstrap>> datagram() {
// Make the list of Bootstrap factories.
@SuppressWarnings("unchecked")
List<BootstrapFactory<Bootstrap>> bfs = Arrays.asList(
new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(nioWorkerGroup).channelFactory(new ChannelFactory<Channel>() {
@Override
public Channel newChannel() {
return new NioDatagramChannel(InternetProtocolFamily.IPv4);
}
@Override
public String toString() {
return NioDatagramChannel.class.getSimpleName() + ".class";
}
});
}
},
new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(EPOLL_WORKER_GROUP).channel(EpollDatagramChannel.class);
}
}
);
return combo(bfs, bfs);
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new ClassPathXmlApplicationContext("classpath:netty-gateway.xml");
PORT = ((GatewayOptions) context.getBean("options")).getGatewayPort();
dumpSystemInfo(new PrintWriter(System.out));
EventLoopGroup bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("netty.gateway.boss", Thread.MAX_PRIORITY));
EventLoopGroup workerGroup = new NioEventLoopGroup(GATEWAY_OPTION_PARALLEL, new DefaultThreadFactory("netty.gateway.worker", Thread.MAX_PRIORITY));
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channelFactory(new ChannelFactory<NioServerSocketChannel>() {
@Override
public NioServerSocketChannel newChannel() {
return new NioServerSocketChannel(SelectorProvider.provider());
}
})
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new GatewayServerInitializer());
Channel ch = b.bind(PORT).sync().channel();
ch.closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
@Test
public void connectionInactive_shouldReleaseChannel() throws Exception {
ChannelFactory channelFactory = mock(ChannelFactory.class);
EventLoopGroup customEventLoopGroup = new NioEventLoopGroup(1);
NioSocketChannel channel = new NioSocketChannel();
when(channelFactory.newChannel()).thenAnswer((Answer<NioSocketChannel>) invocationOnMock -> channel);
SdkEventLoopGroup eventLoopGroup = SdkEventLoopGroup.create(customEventLoopGroup, channelFactory);
NettyNioAsyncHttpClient customClient =
(NettyNioAsyncHttpClient) NettyNioAsyncHttpClient.builder()
.eventLoopGroup(eventLoopGroup)
.maxConcurrency(1)
.build();
String body = randomAlphabetic(10);
URI uri = URI.create("http://localhost:" + mockServer.port());
SdkHttpRequest request = createRequest(uri);
RecordingResponseHandler recorder = new RecordingResponseHandler();
stubFor(any(urlPathEqualTo("/")).willReturn(aResponse().withBody(body)
.withStatus(500)
.withFault(Fault.RANDOM_DATA_THEN_CLOSE)));
customClient.execute(AsyncExecuteRequest.builder()
.request(request)
.requestContentPublisher(createProvider(""))
.responseHandler(recorder).build());
verifyChannelRelease(channel);
assertThat(channel.isShutdown()).isTrue();
customClient.close();
eventLoopGroup.eventLoopGroup().shutdownGracefully().awaitUninterruptibly();
}
@Test
public void responseConnectionClosed_shouldCloseAndReleaseChannel() throws Exception {
ChannelFactory channelFactory = mock(ChannelFactory.class);
EventLoopGroup customEventLoopGroup = new NioEventLoopGroup(1);
NioSocketChannel channel = new NioSocketChannel();
when(channelFactory.newChannel()).thenAnswer((Answer<NioSocketChannel>) invocationOnMock -> channel);
URI uri = URI.create("http://localhost:" + mockServer.port());
SdkHttpRequest request = createRequest(uri);
RecordingResponseHandler recorder = new RecordingResponseHandler();
SdkEventLoopGroup eventLoopGroup = SdkEventLoopGroup.create(customEventLoopGroup, channelFactory);
NettyNioAsyncHttpClient customClient =
(NettyNioAsyncHttpClient) NettyNioAsyncHttpClient.builder()
.eventLoopGroup(eventLoopGroup)
.maxConcurrency(1)
.build();
String body = randomAlphabetic(10);
stubFor(any(urlPathEqualTo("/")).willReturn(aResponse().withBody(body)
.withStatus(500)
.withHeader("Connection", "close")
));
customClient.execute(AsyncExecuteRequest.builder()
.request(request)
.requestContentPublisher(createProvider(""))
.responseHandler(recorder).build());
recorder.completeFuture.get(5, TimeUnit.SECONDS);
verifyChannelRelease(channel);
assertThat(channel.isShutdown()).isTrue();
customClient.close();
eventLoopGroup.eventLoopGroup().shutdownGracefully().awaitUninterruptibly();
}
@Test
public void closeMethodClosesOpenedChannels() throws InterruptedException, TimeoutException, ExecutionException {
String body = randomAlphabetic(10);
URI uri = URI.create("https://localhost:" + mockServer.httpsPort());
stubFor(any(urlPathEqualTo("/")).willReturn(aResponse().withHeader("Some-Header", "With Value").withBody(body)));
SdkHttpFullRequest request = createRequest(uri, "/", body, SdkHttpMethod.POST, Collections.emptyMap());
RecordingResponseHandler recorder = new RecordingResponseHandler();
CompletableFuture<Boolean> channelClosedFuture = new CompletableFuture<>();
ChannelFactory<NioSocketChannel> channelFactory = new ChannelFactory<NioSocketChannel>() {
@Override
public NioSocketChannel newChannel() {
return new NioSocketChannel() {
@Override
public ChannelFuture close() {
ChannelFuture cf = super.close();
channelClosedFuture.complete(true);
return cf;
}
};
}
};
SdkAsyncHttpClient customClient = NettyNioAsyncHttpClient.builder()
.eventLoopGroup(new SdkEventLoopGroup(new NioEventLoopGroup(1), channelFactory))
.buildWithDefaults(mapWithTrustAllCerts());
try {
customClient.execute(AsyncExecuteRequest.builder()
.request(request)
.requestContentPublisher(createProvider(body))
.responseHandler(recorder).build())
.join();
} finally {
customClient.close();
}
assertThat(channelClosedFuture.get(5, TimeUnit.SECONDS)).isTrue();
}
private void createServer(Listener listener, EventLoopGroup eventLoopGroup, ChannelFactory<? extends DatagramChannel> channelFactory) {
this.eventLoopGroup = eventLoopGroup;
try {
Bootstrap b = new Bootstrap();
b.group(eventLoopGroup)//默认是根据机器情况创建Channel,如果机器支持ipv6,则无法使用ipv4的地址加入组播
.channelFactory(channelFactory)
.option(ChannelOption.SO_BROADCAST, true)
.handler(getChannelHandler());
initOptions(b);
//直接绑定端口,不要指定host,不然收不到组播消息
b.bind(port).addListener(future -> {
if (future.isSuccess()) {
logger.info("udp server start success on:{}", port);
if (listener != null) listener.onSuccess(port);
} else {
logger.error("udp server start failure on:{}", port, future.cause());
if (listener != null) listener.onFailure(future.cause());
}
});
} catch (Exception e) {
logger.error("udp server start exception", e);
if (listener != null) listener.onFailure(e);
throw new ServiceException("udp server start exception, port=" + port, e);
}
}
@Override
protected ChannelFactory<? extends Channel> connectionFactory(EventLoopGroup elg, boolean isDomainSocket) {
if (isDomainSocket) {
throw new UnsupportedOperationException();
}
if (isPreferNative()) {
return () -> loopResources().onChannel(DatagramChannel.class, elg);
}
else {
return () -> new NioDatagramChannel(family());
}
}
@Override
protected ChannelFactory<? extends Channel> connectionFactory(EventLoopGroup elg, boolean isDomainSocket) {
if (isDomainSocket) {
throw new UnsupportedOperationException();
}
if (isPreferNative()) {
return () -> loopResources().onChannel(DatagramChannel.class, elg);
}
else {
return () -> new NioDatagramChannel(family());
}
}
public EventLoopGroup epollGroup() {
EventLoopGroup epollEventLoopGroup = new EpollEventLoopGroup(0, new DefaultThreadFactory(threadPrefix));
ChannelFactory<EpollDomainSocketChannel> factory = () -> configure(new EpollDomainSocketChannel());
bootstrap.group(epollEventLoopGroup).channelFactory(factory).handler(new ChannelInitializer<UnixChannel>() {
@Override
protected void initChannel(final UnixChannel channel) throws Exception {
channel.pipeline().addLast(new HttpClientCodec());
channel.pipeline().addLast(new HttpContentDecompressor());
}
});
return epollEventLoopGroup;
}
NettyClientTransport(
SocketAddress address, ChannelFactory<? extends Channel> channelFactory,
Map<ChannelOption<?>, ?> channelOptions, EventLoopGroup group,
ProtocolNegotiator negotiator, boolean autoFlowControl, int flowControlWindow,
int maxMessageSize, int maxHeaderListSize,
long keepAliveTimeNanos, long keepAliveTimeoutNanos,
boolean keepAliveWithoutCalls, String authority, @Nullable String userAgent,
Runnable tooManyPingsRunnable, TransportTracer transportTracer, Attributes eagAttributes,
LocalSocketPicker localSocketPicker, ChannelLogger channelLogger,
boolean useGetForSafeMethods) {
this.negotiator = Preconditions.checkNotNull(negotiator, "negotiator");
this.negotiationScheme = this.negotiator.scheme();
this.remoteAddress = Preconditions.checkNotNull(address, "address");
this.group = Preconditions.checkNotNull(group, "group");
this.channelFactory = channelFactory;
this.channelOptions = Preconditions.checkNotNull(channelOptions, "channelOptions");
this.autoFlowControl = autoFlowControl;
this.flowControlWindow = flowControlWindow;
this.maxMessageSize = maxMessageSize;
this.maxHeaderListSize = maxHeaderListSize;
this.keepAliveTimeNanos = keepAliveTimeNanos;
this.keepAliveTimeoutNanos = keepAliveTimeoutNanos;
this.keepAliveWithoutCalls = keepAliveWithoutCalls;
this.authorityString = authority;
this.authority = new AsciiString(authority);
this.userAgent = new AsciiString(GrpcUtil.getGrpcUserAgent("netty", userAgent));
this.tooManyPingsRunnable =
Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer");
this.eagAttributes = Preconditions.checkNotNull(eagAttributes, "eagAttributes");
this.localSocketPicker = Preconditions.checkNotNull(localSocketPicker, "localSocketPicker");
this.logId = InternalLogId.allocate(getClass(), remoteAddress.toString());
this.channelLogger = Preconditions.checkNotNull(channelLogger, "channelLogger");
this.useGetForSafeMethods = useGetForSafeMethods;
}
private static ChannelFactory<ServerChannel> nioServerChannelFactory() {
return new ChannelFactory<ServerChannel>() {
@Override
public ServerChannel newChannel() {
return new NioServerSocketChannel();
}
};
}
@Test
public void assertEventLoopAndChannelType_onlyFactoryProvided() {
NettyChannelBuilder builder = NettyChannelBuilder.forTarget("fakeTarget");
builder.channelFactory(new ChannelFactory<Channel>() {
@Override
public Channel newChannel() {
return null;
}
});
thrown.expect(IllegalStateException.class);
thrown.expectMessage("Both EventLoopGroup and ChannelType should be provided");
builder.assertEventLoopAndChannelType();
}
@Test
public void defaultServerChannelFactory_whenEpollIsAvailable() {
assume().that(Utils.isEpollAvailable()).isTrue();
ChannelFactory<? extends ServerChannel> channelFactory = Utils.DEFAULT_SERVER_CHANNEL_FACTORY;
assertThat(channelFactory.toString())
.isEqualTo("ReflectiveChannelFactory(EpollServerSocketChannel.class)");
}
public DnsAddressResolverGroup(
ChannelFactory<? extends DatagramChannel> channelFactory,
DnsServerAddressStreamProvider nameServerProvider) {
this.channelFactory = channelFactory;
this.nameServerProvider = nameServerProvider;
}
public RoundRobinDnsAddressResolverGroup(
ChannelFactory<? extends DatagramChannel> channelFactory,
DnsServerAddressStreamProvider nameServerProvider) {
super(channelFactory, nameServerProvider);
}
public static ChannelFactory<? extends ServerChannel> getServerSocketChannelFactory() {
if (epoll)
return EpollServerSocketChannel::new;
else
return NioServerSocketChannel::new;
}
@Override
public void connect() throws Exception {
try {
channelLock.writeLock().lock();
LinkedHashMap<String, ChannelHandler> handlers = getChannelHandlers();
String interfaceIp = getSettings().getInterfaceIp();
String mcastIp = getSettings().getMulticastIp();
int mcastPort = getSettings().getMulticastPort();
this.localNetworkInterface = NetworkInterface.getByInetAddress(Inet4Address.getByName(interfaceIp));
if (this.localNetworkInterface == null) {
throw new ServiceException("Failed to resolve network interface via IP: " + interfaceIp);
}
this.multicastGroup = new InetSocketAddress(InetAddress.getByName(mcastIp), mcastPort);
Bootstrap cb = new Bootstrap();
// Fixme: use ITaskExecutor ?
cb.group(nioEventLoopGroup);
cb.channelFactory(new ChannelFactory<Channel>() {
@Override
public Channel newChannel() {
// Force IPv4
return new NioDatagramChannel(InternetProtocolFamily.IPv4);
}
});
cb.option(ChannelOption.SO_REUSEADDR, true);
cb.option(ChannelOption.IP_MULTICAST_IF, localNetworkInterface);
cb.option(ChannelOption.IP_MULTICAST_TTL, getSettings().getTtl());
cb.localAddress(new InetSocketAddress(InetAddress.getByName(mcastIp), mcastPort));
// we can configure java -Dio.netty.allocator.numDirectArenas=... -Dio.netty.allocator.numHeapArenas=...
cb.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
cb.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
handlers.forEach((key, value) -> ch.pipeline().addLast(key, value));
// add exception handler for inbound messages
// outbound exceptions will be routed here by ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE
ch.pipeline().addLast(new ExceptionInboundHandler(nettySession::onExceptionCaught));
}
});
Channel localChannel = cb.bind().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture bindFuture) throws Exception {
if (!bindFuture.isSuccess()) {
return;
}
DatagramChannel channel = (DatagramChannel) bindFuture.channel();
// TODO: heartbeat loss detection
ChannelFuture future;
String sourceIP = getSettings().getSourceIp();
if (sourceIP == null) {
future = channel.joinGroup(multicastGroup, localNetworkInterface);
} else {
future = channel.joinGroup(multicastGroup.getAddress(), localNetworkInterface, InetAddress.getByName(sourceIP));
}
future.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
}).addListener(ChannelFutureListener.CLOSE_ON_FAILURE).syncUninterruptibly().channel();
localChannel.closeFuture().addListener((ChannelFutureListener) future -> changeStatus(ServiceStatus.DISPOSED, "Connection closed", null));
setChannel(localChannel);
} finally {
channelLock.writeLock().unlock();
}
}
SdkEventLoopGroup(EventLoopGroup eventLoopGroup, ChannelFactory<? extends Channel> channelFactory) {
Validate.paramNotNull(eventLoopGroup, "eventLoopGroup");
Validate.paramNotNull(channelFactory, "channelFactory");
this.eventLoopGroup = eventLoopGroup;
this.channelFactory = channelFactory;
}
/**
* @return the {@link ChannelFactory} to be used with Netty Http Client.
*/
public ChannelFactory<? extends Channel> channelFactory() {
return channelFactory;
}