下面列出了io.netty.channel.ChannelInitializer#io.netty.channel.ChannelOption 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public synchronized void start(NettyClientConfig config) {
if (started.get()) {
return;
}
initHandler();
Bootstrap bootstrap = new Bootstrap();
eventLoopGroup = new NioEventLoopGroup(1, new DefaultThreadFactory(clientName + "-boss"));
eventExecutors = new DefaultEventExecutorGroup(config.getClientWorkerThreads(), new DefaultThreadFactory(clientName + "-worker"));
connectManager = new NettyConnectManageHandler(bootstrap, config.getConnectTimeoutMillis());
bootstrap.group(this.eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeoutMillis())
.option(ChannelOption.SO_SNDBUF, config.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, config.getClientSocketRcvBufSize())
.handler(newChannelInitializer(config, eventExecutors, connectManager));
started.set(true);
}
/**
* Connect to MySQL.
*/
public synchronized void connect() {
responseCallback = new DefaultPromise<>(eventLoopGroup.next());
channel = new Bootstrap()
.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(final SocketChannel socketChannel) {
socketChannel.pipeline().addLast(new PacketCodec(new MySQLPacketCodecEngine()));
socketChannel.pipeline().addLast(new MySQLCommandPacketDecoder());
socketChannel.pipeline().addLast(new MySQLNegotiateHandler(username, password, responseCallback));
socketChannel.pipeline().addLast(new MySQLCommandResponseHandler());
}
})
.option(ChannelOption.AUTO_READ, true)
.connect(host, port).channel();
serverInfo = waitExpectedResponse(ServerInfo.class);
}
public static void main(String[] args) throws Exception {
EventLoopGroup mainLoop = new NioEventLoopGroup(1);
EventLoopGroup workerLoop = new NioEventLoopGroup();
try {
ChannelFuture f = new ServerBootstrap().group(mainLoop, workerLoop)
.channel(NioSctpServerChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SctpChannel>() {
@Override
public void initChannel(SctpChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new SimpleSctpServerHandler());
}
}).bind(PORT).sync();
f.channel().closeFuture().sync();
} finally {
mainLoop.shutdownGracefully();
workerLoop.shutdownGracefully();
}
}
private NettyClientTransport newTransport(ProtocolNegotiator negotiator, int maxMsgSize,
int maxHeaderListSize, String userAgent, boolean enableKeepAlive) {
long keepAliveTimeNano = KEEPALIVE_TIME_NANOS_DISABLED;
long keepAliveTimeoutNano = TimeUnit.SECONDS.toNanos(1L);
if (enableKeepAlive) {
keepAliveTimeNano = TimeUnit.SECONDS.toNanos(10L);
}
NettyClientTransport transport = new NettyClientTransport(
address, NioSocketChannel.class, new HashMap<ChannelOption<?>, Object>(), group, negotiator,
DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize,
keepAliveTimeNano, keepAliveTimeoutNano,
false, authority, userAgent, tooManyPingsRunnable,
new TransportTracer(), eagAttributes, new SocketPicker());
transports.add(transport);
return transport;
}
private void startServer(int maxStreamsPerConnection, int maxHeaderListSize) throws IOException {
server = new NettyServer(
TestUtils.testServerAddress(0),
NioServerSocketChannel.class,
new HashMap<ChannelOption<?>, Object>(),
group, group, negotiator,
Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),
maxStreamsPerConnection,
DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, maxHeaderListSize,
DEFAULT_SERVER_KEEPALIVE_TIME_NANOS, DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS,
MAX_CONNECTION_IDLE_NANOS_DISABLED,
MAX_CONNECTION_AGE_NANOS_DISABLED, MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE, true, 0,
channelz);
server.start(serverListener);
address = TestUtils.testServerAddress(server.getPort());
authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort());
}
private void buildServerAndRun(EventLoopGroup bossGroup, EventLoopGroup workerGroup,
Class<? extends ServerChannel> channelClass) throws Exception {
var b = new ServerBootstrap();
try {
b.group(bossGroup, workerGroup)
.channel(channelClass)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(getChannelInitializer());
var listenTo = (listenAddress == null || listenAddress.isEmpty())
? new InetSocketAddress(port)
: new InetSocketAddress(listenAddress, port);
this.cf = b.bind(listenTo).sync();
} catch (Exception e) {
log.error("Error initializing {}, port {}", getServerName(), port, e);
throw e;
}
log.info("{} server listening at {} port.", getServerName(), port);
}
public void run() throws Exception {
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap(); // (2)
b.group(workerGroup)
.channel(NioSocketChannel.class) // (3)
.handler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(clientHandler);
}
})
.option(ChannelOption.SO_KEEPALIVE, true); // (6)
// Bind and start to accept incoming connections.
ChannelFuture f = b.connect("localhost", port).sync(); // (7)
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
ServerBootstrapAcceptor(
EventLoopGroup childGroup, ChannelHandler childHandler,
Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs,
// IRIS UPDATE
RateLimiter acceptRateLimiter, NetworkClock acceptNetworkClock
// END IRIS UPDATE
) {
this.childGroup = childGroup;
this.childHandler = childHandler;
this.childOptions = childOptions;
this.childAttrs = childAttrs;
// IRIS UPDATE
this.acceptRateLimiter = acceptRateLimiter;
this.acceptNetworkClock = acceptNetworkClock;
// END IRIS UPDATE
}
@Override
protected FastdfsPool newPool(InetSocketAddress addr) {
if (LOG.isDebugEnabled()) {
LOG.debug("channel pool created : {}", addr);
}
Bootstrap bootstrap = new Bootstrap().channel(NioSocketChannel.class).group(loopGroup);
bootstrap.remoteAddress(addr);
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) connectTimeout);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
return new FastdfsPool(
bootstrap,
readTimeout,
idleTimeout,
maxConnPerHost,
maxPendingRequests
);
}
public boolean connect() {
clientGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap(); //服务引导程序,服务器端快速启动程序
b.group(clientGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new SynapseClientInitializer(this));
b.connect(this.interfaz, this.port).get();
// 等待服务端监听端口关闭,等待服务端链路关闭之后main函数才退出
//future.channel().closeFuture().sync();
return true;
} catch (Exception e) {
Server.getInstance().getLogger().alert("Synapse Client can't connect to server: " + this.interfaz + ":" + this.port);
Server.getInstance().getLogger().alert("Reason: " + e.getLocalizedMessage());
Server.getInstance().getLogger().warning("We will reconnect in 3 seconds");
this.reconnect();
return false;
}
}
private void acceptConnections(@Nonnull MixServerInitializer initializer, int port,
@Nonnegative int numWorkers) throws InterruptedException {
final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
final EventLoopGroup workerGroup = new NioEventLoopGroup(numWorkers);
try {
ServerBootstrap b = new ServerBootstrap();
b.option(ChannelOption.SO_KEEPALIVE, true);
b.group(bossGroup, workerGroup);
b.channel(NioServerSocketChannel.class);
b.handler(new LoggingHandler(LogLevel.INFO));
b.childHandler(initializer);
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync();
this.state = ServerState.RUNNING;
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
this.state = ServerState.STOPPING;
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public void start() {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.option(ChannelOption.SO_BACKLOG, 1024);
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ServerInitializer());
Channel ch = b.bind(Config.getInt("server.port")).sync().channel();
ch.closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
final Channel inboundChannel = ctx.channel();
// Start the connection attempt.
Bootstrap b = new Bootstrap();
b.group(inboundChannel.eventLoop())
.channel(ctx.channel().getClass())
.handler(new HexDumpProxyBackendHandler(inboundChannel))
.option(ChannelOption.AUTO_READ, false);
ChannelFuture f = b.connect(remoteHost, remotePort);
outboundChannel = f.channel();
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
// connection complete start to read first data
inboundChannel.read();
} else {
// Close the connection if the connection attempt has failed.
inboundChannel.close();
}
}
});
}
@Override
@SuppressWarnings("unchecked")
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(config.handler());
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
}
}
public Sender(final String source, final String destination) {
this.destination = destination;
bootstrap = new Bootstrap();
bootstrap.group(workerGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
if (isSslEnabled()) {
SSLEngine engine = serverContext.createSSLEngine();
engine.setUseClientMode(true);
ch.pipeline().addLast(new SslHandler(engine));
}
// Inbound handlers.
ch.pipeline().addLast("clientError", new ClientErrorHandler());
// Outbound handlers.
ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(4));
}
});
}
private void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelHandlerA(), new ChannelHandlerB());
}
}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture f = b.bind(port).sync(); // (7)
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
/**
* Bootstraps a server.
*
* @return a future to be completed once the server has been bound to all interfaces
*/
private CompletableFuture<Void> bootstrapServer() {
ServerBootstrap b = new ServerBootstrap();
b.option(ChannelOption.SO_REUSEADDR, true);
b.option(ChannelOption.SO_BACKLOG, 128);
b.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
new WriteBufferWaterMark(8 * 1024, 32 * 1024));
b.childOption(ChannelOption.SO_RCVBUF, 1024 * 1024);
b.childOption(ChannelOption.SO_SNDBUF, 1024 * 1024);
b.childOption(ChannelOption.SO_KEEPALIVE, true);
b.childOption(ChannelOption.TCP_NODELAY, true);
b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.group(serverGroup, clientGroup);
b.channel(serverChannelClass);
if (enableNettyTls) {
try {
b.childHandler(new SslServerChannelInitializer());
} catch (SSLException e) {
return Futures.exceptionalFuture(e);
}
} else {
b.childHandler(new BasicServerChannelInitializer());
}
return bind(b);
}
@Override
public <T> boolean setOption(ChannelOption<T> option, T value) {
validate(option, value);
if (option == SO_RCVBUF) {
setReceiveBufferSize((Integer) value);
} else if (option == SO_SNDBUF) {
setSendBufferSize((Integer) value);
} else if (option == SCTP_NODELAY) {
setSctpNoDelay((Boolean) value);
} else if (option == SCTP_INIT_MAXSTREAMS) {
setInitMaxStreams((SctpStandardSocketOptions.InitMaxStreams) value);
} else {
return super.setOption(option, value);
}
return true;
}
private @NotNull ServerBootstrap serverBootstrap(@NotNull TChannel topChannel) {
return new ServerBootstrap()
.group(this.bossGroup, this.childGroup)
.channel(useEpoll ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.option(ChannelOption.SO_BACKLOG, 128)
.childHandler(this.channelInitializer(true, topChannel))
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(
ChannelOption.WRITE_BUFFER_WATER_MARK,
new WriteBufferWaterMark(WRITE_BUFFER_LOW_WATER_MARK, WRITE_BUFFER_HIGH_WATER_MARK)
)
.validate();
}
@Test
public void getPort_notStarted() {
InetSocketAddress addr = new InetSocketAddress(0);
NettyServer ns = new NettyServer(
addr,
new ReflectiveChannelFactory<>(NioServerSocketChannel.class),
new HashMap<ChannelOption<?>, Object>(),
new HashMap<ChannelOption<?>, Object>(),
new FixedObjectPool<>(eventLoop),
new FixedObjectPool<>(eventLoop),
false,
ProtocolNegotiators.plaintext(),
Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),
1, // ignore
false, // ignore
1, // ignore
1, // ignore
1, // ignore
1, // ignore
1, 1, // ignore
1, 1, // ignore
true, 0, // ignore
channelz);
assertThat(ns.getListenSocketAddress()).isEqualTo(addr);
}
private void closeOnRead(ChannelPipeline pipeline) {
SelectionKey key = selectionKey();
setInputShutdown();
if (isOpen()) {
if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
key.interestOps(key.interestOps() & ~readInterestOp);
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
} else {
close(voidPromise());
}
}
}
@Override
protected Bootstrap createBootstrap() {
final var b = new Bootstrap();
final boolean tcpNoDelay = Boolean.parseBoolean(System.getProperty("nfs.rpc.tcp.nodelay", "true"));
b.group(getBossEventLoopGroup())
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, tcpNoDelay)
.remoteAddress(getSocketAddress())
.handler(new Iso8583ChannelInitializer<>(
getConfiguration(),
getConfigurer(),
getWorkerEventLoopGroup(),
getIsoMessageFactory(),
getMessageHandler()
));
configureBootstrap(b);
b.validate();
reconnectOnCloseListener = new ReconnectOnCloseListener(this,
getConfiguration().getReconnectInterval(),
getBossEventLoopGroup()
);
return b;
}
public static ChannelFuture newBootstrapUDP(EventLoopGroup loopGroup, SimpleChannelInboundHandler<DatagramPacket> handler, int port){
return new Bootstrap().group(loopGroup)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler(handler)
.bind(port);
}
NettyServer(
SocketAddress address, Class<? extends ServerChannel> channelType,
Map<ChannelOption<?>, ?> channelOptions,
@Nullable EventLoopGroup bossGroup, @Nullable EventLoopGroup workerGroup,
ProtocolNegotiator protocolNegotiator, List<ServerStreamTracer.Factory> streamTracerFactories,
TransportTracer.Factory transportTracerFactory,
int maxStreamsPerConnection, int flowControlWindow, int maxMessageSize, int maxHeaderListSize,
long keepAliveTimeInNanos, long keepAliveTimeoutInNanos,
long maxConnectionIdleInNanos,
long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos,
boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos,
InternalChannelz channelz) {
this.address = address;
this.channelType = checkNotNull(channelType, "channelType");
checkNotNull(channelOptions, "channelOptions");
this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
this.bossGroup = bossGroup;
this.workerGroup = workerGroup;
this.protocolNegotiator = checkNotNull(protocolNegotiator, "protocolNegotiator");
this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories");
this.usingSharedBossGroup = bossGroup == null;
this.usingSharedWorkerGroup = workerGroup == null;
this.transportTracerFactory = transportTracerFactory;
this.maxStreamsPerConnection = maxStreamsPerConnection;
this.flowControlWindow = flowControlWindow;
this.maxMessageSize = maxMessageSize;
this.maxHeaderListSize = maxHeaderListSize;
this.keepAliveTimeInNanos = keepAliveTimeInNanos;
this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos;
this.maxConnectionIdleInNanos = maxConnectionIdleInNanos;
this.maxConnectionAgeInNanos = maxConnectionAgeInNanos;
this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos;
this.permitKeepAliveWithoutCalls = permitKeepAliveWithoutCalls;
this.permitKeepAliveTimeInNanos = permitKeepAliveTimeInNanos;
this.channelz = Preconditions.checkNotNull(channelz);
}
public void testShutdownOutput(ServerBootstrap sb) throws Throwable {
TestHandler h = new TestHandler();
Socket s = new Socket();
Channel sc = null;
try {
sc = sb.childHandler(h).childOption(ChannelOption.ALLOW_HALF_CLOSURE, true).bind().sync().channel();
SocketUtils.connect(s, sc.localAddress(), 10000);
s.getOutputStream().write(1);
assertEquals(1, (int) h.queue.take());
assertTrue(h.ch.isOpen());
assertTrue(h.ch.isActive());
assertFalse(h.ch.isInputShutdown());
assertFalse(h.ch.isOutputShutdown());
s.shutdownOutput();
h.halfClosure.await();
assertTrue(h.ch.isOpen());
assertTrue(h.ch.isActive());
assertTrue(h.ch.isInputShutdown());
assertFalse(h.ch.isOutputShutdown());
assertEquals(1, h.closure.getCount());
Thread.sleep(100);
assertEquals(1, h.halfClosureCount.intValue());
} finally {
if (sc != null) {
sc.close();
}
s.close();
}
}
@Test
public void additionalSdkSocketOptionsPresent() {
SdkChannelOptions channelOptions = new SdkChannelOptions();
channelOptions.putOption(ChannelOption.SO_LINGER, 0);
Map<ChannelOption, Object> expectedOptions = new HashMap<>();
expectedOptions.put(ChannelOption.TCP_NODELAY, Boolean.TRUE);
expectedOptions.put(ChannelOption.SO_LINGER, 0);
assertEquals(expectedOptions, channelOptions.channelOptions());
}
/**
* 创建bootstrap
*
* @param boss 连接线程数
* @param work 工作线程数
*/
void createServerBootstrap(int boss, int work) {
bootstrap = new ServerBootstrap();
if (epollIsAvailable()) {
createEpollGroup(boss, work);
} else {
createNioGroup(boss, work);
}
bootstrap.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.SO_KEEPALIVE, true);
}
public void run() throws Exception {
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(8);
EventLoopGroup workerGroup = new NioEventLoopGroup(8);
try {
//public service processor
ServerBootstrap publicServerBootstrap = new ServerBootstrap();
publicServerBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class);
publicServerBootstrap.childOption(ChannelOption.TCP_NODELAY, false)
.childOption(ChannelOption.SO_KEEPALIVE, false);
//FIXME
//.childHandler(new PublicHttpServerInitializer());
//bind to public access host info
Channel ch1;
if("*".equals(ip)){
ch1 = publicServerBootstrap.bind(port).sync().channel();
} else {
ch1 = publicServerBootstrap.bind(ip, port).sync().channel();
}
System.out.println(String.format("Started OK HttpServer at %s:%d", ip, port));
ch1.config().setConnectTimeoutMillis(1800);
ch1.closeFuture().sync();
System.out.println("Shutdown...");
} catch (Throwable e) {
e.printStackTrace();
System.exit(1);
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
@Test
void shouldPreserveChannelOptionInClientFactory() {
final ClientFactory factory = ClientFactory.builder()
.options(ClientFactoryOptions.of())
.build();
final Map<ChannelOption<?>, Object> channelOptions =
factory.options().get(ClientFactoryOption.CHANNEL_OPTIONS);
final int connectTimeoutMillis = (int) channelOptions.get(ChannelOption.CONNECT_TIMEOUT_MILLIS);
assertThat(connectTimeoutMillis).isEqualTo(Flags.defaultConnectTimeoutMillis());
}
@Override
protected Future<?> execute() {
Bootstrap cb = new Bootstrap()
.group(proxyServer.getProxyToServerWorkerFor(transportProtocol))
.resolver(remoteAddressResolver);
switch (transportProtocol) {
case TCP:
LOG.debug("Connecting to server with TCP");
cb.channelFactory(NioSocketChannel::new);
break;
case UDT:
LOG.debug("Connecting to server with UDT");
cb.channelFactory(NioUdtProvider.BYTE_CONNECTOR)
.option(ChannelOption.SO_REUSEADDR, true);
break;
default:
throw new UnknownTransportProtocolException(transportProtocol);
}
cb.handler(new ChannelInitializer<Channel>() {
protected void initChannel(Channel ch) {
initChannelPipeline(ch.pipeline(), initialRequest);
}
});
cb.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
proxyServer.getConnectTimeout());
if (localAddress != null) {
return cb.connect(remoteAddress, localAddress);
} else {
return cb.connect(remoteAddress);
}
}