下面列出了io.netty.channel.ChannelFuture#channel ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private ChannelFuture startNetty(String host, int port) {
int workerThreads = Integer.parseInt(properties.getProperty("forwarder.worker.thread", "1"));
// Setup netty listener
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup(workerThreads);
//setup boostrap
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
// TODO: Test the Unix Domain Socket implementation will need junixsocket at client side....
// But should increase perf
//.channel(EpollServerDomainSocketChannel.class)
.childHandler(new ForwarderChannelInitializer(kafkaService));
//start server
LOGGER.info("Startup netty server");
ChannelFuture f = b.bind(host, port).addListener(future -> LOGGER.info("Netty server started"));
serverChannel = f.channel();
return f;
}
private synchronized Channel createChannel(SocketAddress address, long connectionTimeout) throws InterruptedException, TimeoutException {
if (address == null) {
throw new IllegalArgumentException("address must not be null!");
}
if (ioEventGroup == null) {
ioEventGroup = newIoEventGroup();
}
if (bootstrap == null){
ChannelHandler channelHandlerPipeline = newChannelHandlerPipeline();
bootstrap = newBootstrap(channelHandlerPipeline, ioEventGroup);
}
ChannelFuture channelFuture;
Channel channel;
channelFuture = bootstrap.connect(address);
if (!channelFuture.await(connectionTimeout)) {
throw new TimeoutException();
}
channel = channelFuture.channel();
if (channel == null || !channel.isActive()) {
throw new IllegalStateException();
}
channels.add(channel);
return channel;
}
protected synchronized void handleOperationComplete ( final SettableFuture<Void> result, final ChannelFuture future )
{
if ( this.connectFuture != result )
{
// this should never happen
return;
}
this.connectFuture = null;
try
{
future.get ();
this.channel = future.channel ();
fireConnected ( this.channel );
result.set ( null );
}
catch ( final InterruptedException | ExecutionException e )
{
fireDisconnected ( e );
result.setException ( e );
}
}
ChannelFuture bing() {
ChannelFuture channelFuture = null;
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) //非阻塞模式
.handler(new LoggingHandler(LogLevel.DEBUG))
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, keepAlive)
.option(ChannelOption.SO_BACKLOG, backlog)
.childHandler(new WSServerInitialzer(nettyHandlerService));
channelFuture = b.bind(new InetSocketAddress(tcpPort)).syncUninterruptibly();
channel = channelFuture.channel();
} catch (Exception e) {
log.error("netty start error {} {}", e.getMessage(), e);
} finally {
if (null != channelFuture && channelFuture.isSuccess()) {
log.info("tCPServerTwo start ok");
} else {
log.error("tCPServerTwo start error ");
}
}
return channelFuture;
}
public void instantiateServiceInstance() throws ExecutionException, InterruptedException {
final RpcProviderService rpcRegistry = this.dependenciesProvider.getRpcProviderRegistry();
this.element = requireNonNull(rpcRegistry
.registerRpcImplementation(NetworkTopologyPcepService.class, new TopologyRPCs(this.manager),
Collections.singleton(this.configDependencies.getTopology())));
this.network = requireNonNull(rpcRegistry
.registerRpcImplementation(NetworkTopologyPcepProgrammingService.class,
new TopologyProgramming(this.scheduler, this.manager),
Collections.singleton(this.configDependencies.getTopology())));
this.manager.instantiateServiceInstance();
final ChannelFuture channelFuture = this.dependenciesProvider.getPCEPDispatcher()
.createServer(this.manager.getPCEPDispatcherDependencies());
channelFuture.get();
this.channel = channelFuture.channel();
}
@Override
public void bind(SocketAddress address) throws IOException {
InetSocketAddress inetAddress = (InetSocketAddress) address;
ChannelFuture f = bootstrap.bind(inetAddress);
Channel channel = f.channel();
channelGroup.add(channel);
try {
f.sync();
SocketAddress bound = channel.localAddress();
boundAddresses.put(bound, channel);
channel.closeFuture().addListener(fut -> {
boundAddresses.remove(bound);
});
} catch (Exception e) {
throw Helper.toIOException(e);
}
}
@Override
public void bind(SocketAddress address) throws IOException {
InetSocketAddress inetAddress = (InetSocketAddress) address;
ChannelFuture f = bootstrap.bind(inetAddress);
Channel channel = f.channel();
channelGroup.add(channel);
try {
f.sync();
SocketAddress bound = channel.localAddress();
boundAddresses.put(bound, channel);
channel.closeFuture().addListener(fut -> {
boundAddresses.remove(bound);
});
} catch (Exception e) {
throw Helper.toIOException(e);
}
}
@Override
public synchronized CommunicationSession<Channel> open(InetSocketAddress address, long wait) {
if (sessionManager.getSession(address) != null) {
throw new CommunicationException();
}
try {
ChannelFuture future = connector.connect(address);
future.sync();
Channel channel = future.channel();
channels.put(address, channel);
return sessionManager.attachSession(address, channel);
} catch (Throwable throwable) {
String message = StringUtility.format("客户端异常");
LOGGER.error(message, throwable);
throw new CommunicationException();
}
}
public synchronized NettyClient connect() {
if (!isConnect) {
group = new NioEventLoopGroup();
bootstrap = new Bootstrap().group(group)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.TCP_NODELAY, true)
.channel(NioSocketChannel.class)
.handler(new NettyClientInitializer(listener));
try {
ChannelFuture future = bootstrap.connect(UrlConstant.SOCKET_HOST, UrlConstant.SOCKET_PORT).sync();
if (future != null && future.isSuccess()) {
channel = future.channel();
isConnect = true;
} else {
isConnect = false;
}
} catch (Exception e) {
e.printStackTrace();
listener.onServiceStatusConnectChanged(NettyListener.STATUS_CONNECT_ERROR);
reconnect();
}
}
return this;
}
@Override
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
// iothreads参数值,默认cpu线程数+1 小于32
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("handler", nettyServerHandler);
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
@Override
public void operationComplete(ChannelFuture future) {
if (!future.isSuccess()) {
perfClientMetrics.connectError.inc();
Channel channel = future.channel();
logger.error("Channel {} to {} could not be connected.", channel, channel.remoteAddress(), future.cause());
}
}
@Test(timeout = 20000)
public void testMain() throws Exception {
final InetSocketAddress serverAddr = InetSocketAddressUtil.getRandomLoopbackInetSocketAddress();
final BmpSessionListenerFactory bmpSessionListenerFactory = () -> BmpMockTest.this.sessionListener;
final ChannelFuture futureServer = this.bmpDispatcher.createServer(serverAddr,
bmpSessionListenerFactory, KeyMapping.getKeyMapping());
final Channel serverChannel;
final int sessionUpWait;
if (futureServer.isSuccess()) {
serverChannel = futureServer.channel();
sessionUpWait = 10;
} else {
serverChannel = null;
// wait longer for the reconnection attempt
sessionUpWait = 40;
}
BmpMock.main(new String[]{
"--remote_address",
InetSocketAddressUtil.toHostAndPort(serverAddr).toString(),
"--peers_count", "3",
"--pre_policy_routes",
"3"});
verify(this.sessionListener, Mockito.timeout(TimeUnit.SECONDS.toMillis(sessionUpWait)))
.onSessionUp(Mockito.any(BmpSession.class));
//1 * Initiate message + 3 * PeerUp Notification + 9 * Route Monitoring message
verify(this.sessionListener, Mockito.timeout(TimeUnit.SECONDS.toMillis(10))
.times(13))
.onMessage(Mockito.any(Notification.class));
if (serverChannel != null) {
serverChannel.close().sync();
}
}
@Test
public void uncaughtReadFails() throws Exception {
WriteBufferingAndExceptionHandler handler =
new WriteBufferingAndExceptionHandler(new ChannelHandlerAdapter() {});
LocalAddress addr = new LocalAddress("local");
ChannelFuture cf = new Bootstrap()
.channel(LocalChannel.class)
.handler(handler)
.group(group)
.register();
chan = cf.channel();
cf.sync();
ChannelFuture sf = new ServerBootstrap()
.channel(LocalServerChannel.class)
.childHandler(new ChannelHandlerAdapter() {})
.group(group)
.bind(addr);
server = sf.channel();
sf.sync();
ChannelFuture wf = chan.writeAndFlush(new Object());
chan.connect(addr);
chan.pipeline().fireChannelRead(Unpooled.copiedBuffer(new byte[] {'a'}));
try {
wf.sync();
fail();
} catch (Exception e) {
Status status = Status.fromThrowable(e);
assertThat(status.getCode()).isEqualTo(Code.INTERNAL);
assertThat(status.getDescription()).contains("channelRead() missed");
}
}
Channel createServer(final InetSocketAddress serverAddress) {
this.registry.addPeer(new IpAddressNoZone(new Ipv4AddressNoZone(serverAddress.getAddress().getHostAddress())),
this.serverListener, createPreferences(serverAddress));
LoggerFactory.getLogger(AbstractBGPDispatcherTest.class).info("createServer");
final ChannelFuture future = this.serverDispatcher.createServer(serverAddress);
future.addListener(future1 -> Preconditions.checkArgument(future1.isSuccess(),
"Unable to start bgp server on %s", future1.cause()));
waitFutureSuccess(future);
return future.channel();
}
/**
* @see #connect()
*/
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
// 初始化并注册通道
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.isDone()) {
if (!regFuture.isSuccess()) {
return regFuture;
}
// 创建连接
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
} else {
// Registration future is almost always fulfilled already, but just in case it's not.注册的未来几乎总是已经完成了,但是以防万一。
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// Directly obtain the cause and do a null check so we only need one volatile read in case of a
// failure.//直接获得原因,并做一个空检查,所以我们只需要一个volatile读在情况a
//失败。
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.//在EventLoop注册失败,所以失败的频道承诺直接不引起
// 当我们试图访问通道的EventLoop时。
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.注册成功,所以设置正确的执行程序。
// See https://github.com/netty/netty/issues/2586
promise.registered();
doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
}
}
});
return promise;
}
}
@Test
public void uncaughtException_closeAtMostOnce() throws Exception {
final AtomicInteger closes = new AtomicInteger();
WriteBufferingAndExceptionHandler handler =
new WriteBufferingAndExceptionHandler(new ChannelDuplexHandler() {
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
closes.getAndIncrement();
// Simulates a loop between this handler and the WriteBufferingAndExceptionHandler.
ctx.fireExceptionCaught(Status.ABORTED.withDescription("zap").asRuntimeException());
super.close(ctx, promise);
}
});
LocalAddress addr = new LocalAddress("local");
ChannelFuture cf = new Bootstrap()
.channel(LocalChannel.class)
.handler(handler)
.group(group)
.register();
chan = cf.channel();
cf.sync();
ChannelFuture sf = new ServerBootstrap()
.channel(LocalServerChannel.class)
.childHandler(new ChannelHandlerAdapter() {})
.group(group)
.bind(addr);
server = sf.channel();
sf.sync();
chan.connect(addr).sync();
chan.close().sync();
assertEquals(1, closes.get());
}
public void start() throws Exception {
if (ssl) {
boolean useOpenSSL = NetworkUtils.isOpenSslAvailable();
if (sslCertFile == null) {
LOGGER.log(Level.SEVERE, "start SSL with self-signed auto-generated certificate, useOpenSSL:" + useOpenSSL);
if (sslCiphers != null) {
LOGGER.log(Level.SEVERE, "required sslCiphers " + sslCiphers);
}
SelfSignedCertificate ssc = new SelfSignedCertificate();
try {
sslCtx = SslContextBuilder
.forServer(ssc.certificate(), ssc.privateKey())
.sslProvider(useOpenSSL ? SslProvider.OPENSSL : SslProvider.JDK)
.ciphers(sslCiphers)
.build();
} finally {
ssc.delete();
}
} else {
LOGGER.log(Level.SEVERE, "start SSL with certificate " + sslCertFile.getAbsolutePath() + " chain file " + sslCertChainFile.getAbsolutePath() + ", useOpenSSL:" + useOpenSSL);
if (sslCiphers != null) {
LOGGER.log(Level.SEVERE, "required sslCiphers " + sslCiphers);
}
sslCtx = SslContextBuilder.forServer(sslCertChainFile, sslCertFile, sslCertPassword)
.sslProvider(useOpenSSL ? SslProvider.OPENSSL : SslProvider.JDK)
.ciphers(sslCiphers).build();
}
}
if (callbackThreads == 0) {
callbackExecutor = Executors.newCachedThreadPool();
} else {
callbackExecutor = Executors.newFixedThreadPool(callbackThreads, new ThreadFactory() {
private final AtomicLong count = new AtomicLong();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "blazingcache-callbacks-" + count.incrementAndGet());
}
});
}
if (NetworkUtils.isEnableEpollNative()) {
bossGroup = new EpollEventLoopGroup(workerThreads);
workerGroup = new EpollEventLoopGroup(workerThreads);
LOGGER.log(Level.INFO, "Using netty-native-epoll network type");
} else {
bossGroup = new NioEventLoopGroup(workerThreads);
workerGroup = new NioEventLoopGroup(workerThreads);
}
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NetworkUtils.isEnableEpollNative() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
NettyChannel session = new NettyChannel("unnamed", ch, callbackExecutor, null);
if (acceptor != null) {
acceptor.createConnection(session);
}
// ch.pipeline().addLast(new LoggingHandler());
// Add SSL handler first to encrypt and decrypt everything.
if (ssl) {
ch.pipeline().addLast(sslCtx.newHandler(ch.alloc()));
}
ch.pipeline().addLast("lengthprepender", new LengthFieldPrepender(4));
ch.pipeline().addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
ch.pipeline().addLast("messageencoder", new DataMessageEncoder());
ch.pipeline().addLast("messagedecoder", new DataMessageDecoder());
ch.pipeline().addLast(new InboundMessageHandler(session));
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture f = b.bind(host, port).sync(); // (7)
this.channel = f.channel();
}
@Override
protected void doConnect() throws Throwable {
long start = System.currentTimeMillis();
ChannelFuture future = bootstrap.connect(getConnectAddress());
try {
// 同步等待连接
boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
if (ret && future.isSuccess()) {
Channel newChannel = future.channel();
try {
// Close old channel
Channel oldChannel = NettyClient.this.channel; // copy reference
if (oldChannel != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
}
oldChannel.close();
} finally {
NettyChannel.removeChannelIfDisconnected(oldChannel);
}
}
} finally {
if (NettyClient.this.isClosed()) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close new netty channel " + newChannel + ", because the client closed.");
}
newChannel.close();
} finally {
NettyClient.this.channel = null;
NettyChannel.removeChannelIfDisconnected(newChannel);
}
} else {
NettyClient.this.channel = newChannel;
}
}
} else if (future.cause() != null) {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + ", error message is:" + future.cause().getMessage(), future.cause());
} else {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + " client-side timeout "
+ getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
}
} finally {
if (!isConnected()) {
//future.cancel(true);
}
}
}
/**
* ConfigClient不支持异步连接行为, async参数无效
*/
@Override
public JConnection connect(UnresolvedAddress address, boolean async) {
setOptions();
final Bootstrap boot = bootstrap();
final SocketAddress socketAddress = InetSocketAddress.createUnresolved(address.getHost(), address.getPort());
// 重连watchdog
final ConnectionWatchdog watchdog = new ConnectionWatchdog(boot, timer, socketAddress, null) {
@Override
public ChannelHandler[] handlers() {
return new ChannelHandler[] {
this,
new IdleStateChecker(timer, 0, JConstants.WRITER_IDLE_TIME_SECONDS, 0),
idleStateTrigger,
new MessageDecoder(),
encoder,
ackEncoder,
handler
};
}
};
try {
ChannelFuture future;
synchronized (bootstrapLock()) {
boot.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(watchdog.handlers());
}
});
future = boot.connect(socketAddress);
}
// 以下代码在synchronized同步块外面是安全的
future.sync();
channel = future.channel();
} catch (Throwable t) {
throw new ConnectFailedException("connects to [" + address + "] fails", t);
}
return new JConnection(address) {
@Override
public void setReconnect(boolean reconnect) {
if (reconnect) {
watchdog.start();
} else {
watchdog.stop();
}
}
};
}
@FXML
public void connect() {
if( connected.get() ) {
if( logger.isWarnEnabled() ) {
logger.warn("client already connected; skipping connect");
}
return; // already connected; should be prevented with disabled
}
String host = tfHost.getText();
int port = Integer.parseInt(tfPort.getText());
group = new NioEventLoopGroup();
Task<Channel> task = new Task<Channel>() {
@Override
protected Channel call() throws Exception {
updateMessage("Bootstrapping");
updateProgress(0.1d, 1.0d);
Bootstrap b = new Bootstrap();
b
.group(group)
.channel(NioSocketChannel.class)
.remoteAddress( new InetSocketAddress(host, port) )
.handler( new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoClientHandler(receivingMessageModel));
}
});
ChannelFuture f = b.connect();
Channel chn = f.channel();
updateMessage("Connecting");
updateProgress(0.2d, 1.0d);
f.sync();
return chn;
}
@Override
protected void succeeded() {
channel = getValue();
connected.set(true);
}
@Override
protected void failed() {
Throwable exc = getException();
logger.error( "client connect error", exc );
Alert alert = new Alert(AlertType.ERROR);
alert.setTitle("Client");
alert.setHeaderText( exc.getClass().getName() );
alert.setContentText( exc.getMessage() );
alert.showAndWait();
connected.set(false);
}
};
hboxStatus.visibleProperty().bind( task.runningProperty() );
lblStatus.textProperty().bind( task.messageProperty() );
piStatus.progressProperty().bind(task.progressProperty());
new Thread(task).start();
}