下面列出了io.netty.channel.ChannelFuture#isDone ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* @see {@link #connect()}
*/
private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
final ChannelPromise promise = channel.newPromise();
if (regFuture.isDone()) {
doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
} else {
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
}
});
}
return promise;
}
/**
* 将数据不进行任何处理写入channel
*
* @param data 数据
* @return 写入状态
*/
public ProtocolFuture writeToChannel(Object data) {
this.lastActive = System.currentTimeMillis();
if (!isClosed()) {
ChannelFuture future = channel.write(data);
return new ProtocolFuture() {
@Override
public boolean isSuccess() {
return future.isSuccess();
}
@Override
public boolean isDone() {
return future.isDone();
}
};
} else {
return ProtocolFuture.ERRORFUTURE;
}
}
/**
* @see {@link #connect()}
*/
private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
final ChannelPromise promise = channel.newPromise();
if (regFuture.isDone()) {
doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
} else {
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
}
});
}
return promise;
}
public void stopServer() throws InterruptedException {
try {
ChannelFuture channelCloseFuture = channelFuture.channel().closeFuture();
channelCloseFuture.get(1000, TimeUnit.MILLISECONDS);
if (!channelCloseFuture.isDone()) {
channelCloseFuture.channel().unsafe().closeForcibly();
}
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
// Wait until all threads are terminated.
bossGroup.terminationFuture().sync();
workerGroup.terminationFuture().sync();
} catch (ExecutionException | TimeoutException e) {
//ignore
}
}
@Test
public void testCloseOnInvalid() {
ChannelFuture closeFuture = ch.closeFuture();
String header = "GET / HTTP/1.1\r\n";
try {
ch.writeInbound(copiedBuffer(header, CharsetUtil.US_ASCII));
} catch (HAProxyProtocolException ppex) {
// swallow this exception since we're just testing to be sure the channel was closed
}
boolean isComplete = closeFuture.awaitUninterruptibly(5000);
if (!isComplete || !closeFuture.isDone() || !closeFuture.isSuccess()) {
fail("Expected channel close");
}
}
private void writeQuery(final DnsQuery query, final ChannelPromise writePromise) {
final ChannelFuture writeFuture = parent.ch.writeAndFlush(query, writePromise);
if (writeFuture.isDone()) {
onQueryWriteCompletion(writeFuture);
} else {
writeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
onQueryWriteCompletion(writeFuture);
}
});
}
}
/**
* Tries to retrieve healthy channel from the pool if any or creates a new channel otherwise.尝试从池中检索健康通道(如果有的话)或创建新的通道。
* @param promise the promise to provide acquire result.
* @return future for acquiring a channel.
*/
private Future<Channel> acquireHealthyFromPoolOrNew(final Promise<Channel> promise) {
try {
// 从deque中获取一个channel,这里是用双端队列存储的channel
final Channel ch = pollChannel();
if (ch == null) {
// No Channel left in the pool bootstrap a new Channel池中没有剩余通道引导新通道
Bootstrap bs = bootstrap.clone();
bs.attr(POOL_KEY, this);
// 如果channel不存在就创建一个
ChannelFuture f = connectChannel(bs);
if (f.isDone()) {
// promise发布连接成功事件
notifyConnect(f, promise);
} else {
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
notifyConnect(future, promise);
}
});
}
return promise;
}
EventLoop loop = ch.eventLoop();
if (loop.inEventLoop()) {
doHealthCheck(ch, promise);
} else {
loop.execute(new Runnable() {
@Override
public void run() {
doHealthCheck(ch, promise);
}
});
}
} catch (Throwable cause) {
promise.tryFailure(cause);
}
return promise;
}
/**
* Write messages to the outbound of this {@link Channel}.将消息写入该通道的出站。
*
* @param msgs the messages to be written
* @return bufferReadable returns {@code true} if the write operation did add something to the outbound buffer
*/
public boolean writeOutbound(Object... msgs) {
ensureOpen();
if (msgs.length == 0) {
return isNotEmpty(outboundMessages);
}
RecyclableArrayList futures = RecyclableArrayList.newInstance(msgs.length);
try {
for (Object m: msgs) {
if (m == null) {
break;
}
futures.add(write(m));
}
flushOutbound0();
int size = futures.size();
for (int i = 0; i < size; i++) {
ChannelFuture future = (ChannelFuture) futures.get(i);
if (future.isDone()) {
recordException(future);
} else {
// The write may be delayed to run later by runPendingTasks()
future.addListener(recordExceptionListener);
}
}
checkException();
return isNotEmpty(outboundMessages);
} finally {
futures.recycle();
}
}
/**
* @see #connect()
*/
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
// 初始化并注册通道
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.isDone()) {
if (!regFuture.isSuccess()) {
return regFuture;
}
// 创建连接
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
} else {
// Registration future is almost always fulfilled already, but just in case it's not.注册的未来几乎总是已经完成了,但是以防万一。
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// Directly obtain the cause and do a null check so we only need one volatile read in case of a
// failure.//直接获得原因,并做一个空检查,所以我们只需要一个volatile读在情况a
//失败。
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.//在EventLoop注册失败,所以失败的频道承诺直接不引起
// 当我们试图访问通道的EventLoop时。
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.注册成功,所以设置正确的执行程序。
// See https://github.com/netty/netty/issues/2586
promise.registered();
doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
}
}
});
return promise;
}
}
@Override
public IMessage send(Object message) throws InterruptedException {
if (!(message instanceof IMessage)) {
throw new EPSCommonException("Illegal type of Message");
}
IMessage msg = (IMessage)message;
ChannelFuture future = channel.writeAndFlush(msg)
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
boolean isSendSuccess = true;
StringBuilder errorMsg = new StringBuilder("Cause: ");
if (future.await(1000)) {
if (!future.isDone()) {
errorMsg.append("Send operation is not done.\n");
logger.error("Send operation is not done. Session: {}", this);
isSendSuccess = false;
}
if (!future.isSuccess()) {
errorMsg.append("Write operation was not successful.\n");
logger.error("Write operation was not successful. Session: {}", this);
isSendSuccess = false;
}
} else {
errorMsg.append("Send operation is not completed.\n");
logger.error("Send operation is not completed. Session: {}", this);
isSendSuccess = false;
}
if (future.cause() != null) {
throw new EPSCommonException("Message sent failed. Session: " + this, future.cause());
}
if (!isSendSuccess) {
throw new SendMessageFailedException(
"Message wasn't send during 1 second." + errorMsg + " Session: " + this);
}
return msg;
}
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isDone() && future.isSuccess()) {
set(new NettyRpcChannel(future.channel()));
} else if (future.isDone() && future.cause() != null) {
setException(future.cause());
} else if (future.isDone() && future.isCancelled()) {
cancel(false);
}
}
/**
* 执行连接调用{@link ChannelFuture executeBooterConnect(InetSocketAddress target)}
* 执行多次会把channel顶掉
* @param target
* @return
*/
protected final boolean connect(InetSocketAddress target)
{
boolean isConnect=false;
try {
log.log(logLevel,getName()+"连接中("+target+")……");
ChannelFuture cf=doConnect(target);
if(cf==null)
{//如果阻塞则使用系统调度器执行
log.log(logLevel,getName()+"连接繁忙("+target+")!稍后重连:"+isReconnect());
doReconect();//这里不能占用IO线程池
}else
{
isConnect=cf.isDone() && cf.isSuccess();
if(isConnect)
{//连接成功
log.log(logLevel,getName()+"连接成功("+target+")!"+cf.channel());
// this.channel=cf.channel();//子类去设置,通过initHandler的channelRegistered去设置更及时
//给通道加上断线重连监听器
cf.channel().closeFuture().removeListener(reconectListener);
cf.channel().closeFuture().addListener(reconectListener);
}else
{//连接不成功则10秒再执行一次连接
log.log(logLevel,getName()+"连接失败("+target+")!"+cf.channel());
doReconect();//这里不能占用IO线程池
}
}
} catch (Exception e) {
log.error(e.getMessage(),e);
}
return isConnect;
}
/**
* SENT
*/
@Override
public void operationComplete(ChannelFuture future) throws Exception {
try {
if (future.isDone() && future.isSuccess()) {
StringBuilder strBld = new StringBuilder();
strBld.append("\n");
strBld.append("######################## Client Log operationComplete ################################\n");
strBld.append("---------------------------------------------------------------------\n");
strBld.append("SEND HTTP Request: " + future.channel()).append("\n");
//log.debug(strBld.toString());
// COMPLETE API
if(listener != null) {
listener.channelRequested(ctx);
}
} else {
log.error("operationComplete_failure channel=" + future.channel() + "\n" + request + " " + future.cause());
}
} catch (Exception e) {
log.error("operationComplete_error : ", e);
throw e;
}
finally {
// ByteBuf 해제
// ReferenceCountUtil.release(request);
}
}
/**
* SENT
*/
@Override
public void operationComplete(ChannelFuture future) throws Exception {
try {
if (future.isDone() && future.isSuccess()) {
StringBuilder strBld = new StringBuilder();
strBld.append("\n");
strBld.append("######################## Client Log operationComplete ################################\n");
strBld.append("---------------------------------------------------------------------\n");
strBld.append("SEND HTTP Request: " + future.channel()).append("\n");
//log.debug(strBld.toString());
// COMPLETE API
if(listener != null) {
listener.channelRequested(ctx);
}
} else {
log.error("operationComplete_failure channel=" + future.channel() + "\n" + request + " " + future.cause());
}
} catch (Exception e) {
log.error("operationComplete_error : ", e);
throw e;
}
finally {
// ByteBuf 해제
// ReferenceCountUtil.release(request);
}
}
/**
* connects to all replicas
*
* @throws InterruptedException
*/
@SuppressFBWarnings("SIC_INNER_SHOULD_BE_STATIC_ANON")
public void connect() throws InterruptedException {
for (Entry<Integer, Integer> e : this.serverList.entrySet()) {
int replicaId = e.getKey();
int replicaPort = e.getValue();
if (replicaId != myServerId) {
Bootstrap b = new Bootstrap();
b.group(loopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// enable SSL/TLS support
SSLEngine engine = SSLContextFactory.getClientContext().createSSLEngine();
engine.setUseClientMode(true);
ch.pipeline().addLast(
new SslHandler(engine),
new ObjectEncoder(),
new ObjectDecoder(OzymandiasServer.maxObjectSize, ClassResolvers.cacheDisabled(null)));
}
});
/* wait till server is connected */
ChannelFuture f = null;
do {
f = b.connect("127.0.0.1", replicaPort);
f.await();
} while (!(f.isDone() && f.isSuccess()));
this.channels.add(f.sync().channel());
}
}
}
@SuppressWarnings("unchecked")
@Override
public Runnable start(Listener transportListener) {
lifecycleManager = new ClientTransportLifecycleManager(
Preconditions.checkNotNull(transportListener, "listener"));
EventLoop eventLoop = group.next();
if (keepAliveTimeNanos != KEEPALIVE_TIME_NANOS_DISABLED) {
keepAliveManager = new KeepAliveManager(
new ClientKeepAlivePinger(this), eventLoop, keepAliveTimeNanos, keepAliveTimeoutNanos,
keepAliveWithoutCalls);
}
handler = NettyClientHandler.newHandler(
lifecycleManager,
keepAliveManager,
flowControlWindow,
maxHeaderListSize,
GrpcUtil.STOPWATCH_SUPPLIER,
tooManyPingsRunnable,
transportTracer,
eagAttributes,
authorityString);
NettyHandlerSettings.setAutoWindow(handler);
negotiationHandler = negotiator.newHandler(handler);
Bootstrap b = new Bootstrap();
b.group(eventLoop);
b.channel(channelType);
if (NioSocketChannel.class.isAssignableFrom(channelType)) {
b.option(SO_KEEPALIVE, true);
}
for (Map.Entry<ChannelOption<?>, ?> entry : channelOptions.entrySet()) {
// Every entry in the map is obtained from
// NettyChannelBuilder#withOption(ChannelOption<T> option, T value)
// so it is safe to pass the key-value pair to b.option().
b.option((ChannelOption<Object>) entry.getKey(), entry.getValue());
}
/**
* We don't use a ChannelInitializer in the client bootstrap because its "initChannel" method
* is executed in the event loop and we need this handler to be in the pipeline immediately so
* that it may begin buffering writes.
*/
b.handler(negotiationHandler);
ChannelFuture regFuture = b.register();
if (regFuture.isDone() && !regFuture.isSuccess()) {
channel = null;
// Initialization has failed badly. All new streams should be made to fail.
Throwable t = regFuture.cause();
if (t == null) {
t = new IllegalStateException("Channel is null, but future doesn't have a cause");
}
statusExplainingWhyTheChannelIsNull = Utils.statusFromThrowable(t);
// Use a Runnable since lifecycleManager calls transportListener
return new Runnable() {
@Override
public void run() {
// NOTICE: we not are calling lifecycleManager from the event loop. But there isn't really
// an event loop in this case, so nothing should be accessing the lifecycleManager. We
// could use GlobalEventExecutor (which is what regFuture would use for notifying
// listeners in this case), but avoiding on-demand thread creation in an error case seems
// a good idea and is probably clearer threading.
lifecycleManager.notifyTerminated(statusExplainingWhyTheChannelIsNull);
}
};
}
channel = regFuture.channel();
// Start the write queue as soon as the channel is constructed
handler.startWriteQueue(channel);
// This write will have no effect, yet it will only complete once the negotiationHandler
// flushes any pending writes. We need it to be staged *before* the `connect` so that
// the channel can't have been closed yet, removing all handlers. This write will sit in the
// AbstractBufferingHandler's buffer, and will either be flushed on a successful connection,
// or failed if the connection fails.
channel.writeAndFlush(NettyClientHandler.NOOP_MESSAGE).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
// Need to notify of this failure, because NettyClientHandler may not have been added to
// the pipeline before the error occurred.
lifecycleManager.notifyTerminated(Utils.statusFromThrowable(future.cause()));
}
}
});
// Start the connection operation to the server.
SocketAddress localAddress =
localSocketPicker.createSocketAddress(remoteAddress, eagAttributes);
if (localAddress != null) {
channel.connect(remoteAddress, localAddress);
} else {
channel.connect(remoteAddress);
}
if (keepAliveManager != null) {
keepAliveManager.onTransportStarted();
}
return null;
}
@Override
public IMessage send(Object message) throws InterruptedException {
if (client.getChannel() == null) {
throw new EPSCommonException("Channel not ready (channel == null)");
}
if (!(message instanceof IMessage)) {
throw new EPSCommonException("Illegal type of Message");
}
IMessage msg = (IMessage) message;
ChannelFuture future = client.getChannel().writeAndFlush(msg)
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
boolean isSendSuccess = true;
StringBuilder errorMsg = new StringBuilder("Cause: ");
if (future.await(1000)) {
if (!future.isDone()) {
errorMsg.append("Send operation is not done.\n");
logger.error("Send operation is not done. Session: {}", this);
isSendSuccess = false;
}
if (!future.isSuccess()) {
errorMsg.append("Write operation was not successful.\n");
logger.error("Write operation was not successful. Session: {}", this);
isSendSuccess = false;
}
} else {
errorMsg.append("Send operation is not completed.\n");
logger.error("Send operation is not completed. Session: {}", this);
isSendSuccess = false;
}
if(future.cause() != null) {
throw new EPSCommonException("Message sent failed. Session: " + this, future.cause());
}
if (!isSendSuccess) {
throw new SendMessageFailedException(
"Message wasn't send during 1 second." + errorMsg + " Session: " + this);
}
return msg;
}