下面列出了io.netty.channel.ChannelFuture#cause ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Start proxy server
* */
public void start()
throws InterruptedException {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(_acceptorGroup, _upstreamWorkerGroup);
serverBootstrap.channelFactory(new ChannelFactory<ServerChannel>() {
@Override
public ServerChannel newChannel() {
return new NioServerSocketChannel();
}
});
serverBootstrap.childHandler(new ProxyInitializer(this));
//bind
ChannelFuture future = serverBootstrap.bind(_host, _port);
//wait for the future
future.awaitUninterruptibly();
if (!future.isSuccess()) {
future.channel().closeFuture().awaitUninterruptibly();
throw new ChannelException(String.format("Failed to bind to: %s:%d", _host, _port), future.cause());
} else {
_allChannels.add(future.channel());
}
}
/**
* Convert ChannelFuture.cause() to a Status, taking into account that all handlers are removed
* from the pipeline when the channel is closed. Since handlers are removed, you may get an
* unhelpful exception like ClosedChannelException.
*
* <p>This method must only be called on the event loop.
*/
private Status statusFromFailedFuture(ChannelFuture f) {
Throwable t = f.cause();
if (t instanceof ClosedChannelException
// Exception thrown by the StreamBufferingEncoder if the channel is closed while there
// are still streams buffered. This exception is not helpful. Replace it by the real
// cause of the shutdown (if available).
|| t instanceof Http2ChannelClosedException) {
Status shutdownStatus = lifecycleManager.getShutdownStatus();
if (shutdownStatus == null) {
return Status.UNKNOWN.withDescription("Channel closed but for unknown reason")
.withCause(new ClosedChannelException().initCause(t));
}
return shutdownStatus;
}
return Utils.statusFromThrowable(t);
}
/**
* Convert ChannelFuture.cause() to a Status, taking into account that all handlers are removed
* from the pipeline when the channel is closed. Since handlers are removed, you may get an
* unhelpful exception like ClosedChannelException.
*
* <p>This method must only be called on the event loop.
*/
private Status statusFromFailedFuture(ChannelFuture f) {
Throwable t = f.cause();
if (t instanceof ClosedChannelException
// Exception thrown by the StreamBufferingEncoder if the channel is closed while there
// are still streams buffered. This exception is not helpful. Replace it by the real
// cause of the shutdown (if available).
|| t instanceof Http2ChannelClosedException) {
Status shutdownStatus = lifecycleManager.getShutdownStatus();
if (shutdownStatus == null) {
return Status.UNKNOWN.withDescription("Channel closed but for unknown reason")
.withCause(new ClosedChannelException().initCause(t));
}
return shutdownStatus;
}
return Utils.statusFromThrowable(t);
}
private void checkWriteFuture(ChannelFuture future, RPromise<R> attemptPromise, RedisConnection connection) {
if (future.isCancelled() || attemptPromise.isDone()) {
return;
}
if (!future.isSuccess()) {
exception = new WriteRedisConnectionException(
"Unable to write command into connection! Node source: " + source + ", connection: " + connection +
", command: " + LogHelper.toString(command, params)
+ " after " + attempt + " retry attempts", future.cause());
if (attempt == attempts) {
attemptPromise.tryFailure(exception);
}
return;
}
timeout.cancel();
scheduleResponseTimeout(attemptPromise, connection);
}
public void send(Object message, boolean sent) throws RemotingException {
super.send(message, sent);
boolean success = true;
int timeout = 0;
try {
ChannelFuture future = channel.writeAndFlush(message);
if (sent) {
timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
success = future.syncUninterruptibly().await(timeout);
}
Throwable cause = future.cause();
if (cause != null) {
throw cause;
}
} catch (Throwable e) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
}
if(! success) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
+ "in timeout(" + timeout + "ms) limit");
}
}
/**
* Wait (sequentially) for a time duration for each anticipated response
*
* @param timeout Value of time to wait for each response
* @param unit Units associated with {@code timeout}
* @see HttpResponseHandler#put(int, io.netty.channel.ChannelFuture, io.netty.channel.ChannelPromise)
*/
public void awaitResponses(long timeout, TimeUnit unit) {
Iterator<Entry<Integer, Entry<ChannelFuture, ChannelPromise>>> itr = streamidPromiseMap.entrySet().iterator();
while (itr.hasNext()) {
Entry<Integer, Entry<ChannelFuture, ChannelPromise>> entry = itr.next();
ChannelFuture writeFuture = entry.getValue().getKey();
if (!writeFuture.awaitUninterruptibly(timeout, unit)) {
throw new IllegalStateException("Timed out waiting to write for stream id " + entry.getKey());
}
if (!writeFuture.isSuccess()) {
throw new RuntimeException(writeFuture.cause());
}
ChannelPromise promise = entry.getValue().getValue();
if (!promise.awaitUninterruptibly(timeout, unit)) {
throw new IllegalStateException("Timed out waiting for response on stream id " + entry.getKey());
}
if (!promise.isSuccess()) {
throw new RuntimeException(promise.cause());
}
System.out.println("---Stream id: " + entry.getKey() + " received---");
itr.remove();
}
}
@Override
public void operationComplete(ChannelFuture future) throws Exception {
try (SafeCloseable ignored = RequestContextUtil.pop()) {
final boolean isSuccess;
if (future.isSuccess()) {
isSuccess = true;
} else {
// If 1) the last chunk we attempted to send was empty,
// 2) the connection has been closed,
// 3) and the protocol is HTTP/1,
// it is very likely that a client closed the connection after receiving the
// complete content, which is not really a problem.
isSuccess = endOfStream && wroteEmptyData &&
future.cause() instanceof ClosedChannelException &&
responseEncoder instanceof Http1ObjectEncoder;
}
handleWriteComplete(future, endOfStream, isSuccess);
}
}
private void writeComplete(ChannelFuture future, ChannelPromise promise) {
Throwable cause = future.cause();
if (cause == null) {
promise.setSuccess();
} else {
Throwable error = wrapStreamClosedError(cause);
promise.setFailure(error);
if (error instanceof ClosedChannelException) {
if (config.isAutoClose()) {
// Close channel if needed.
closeForcibly();
} else {
outboundClosed = true;
}
}
}
}
public void testConnectCancellation(Bootstrap cb) throws Throwable {
cb.handler(new TestHandler()).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 4000);
ChannelFuture future = cb.connect(BAD_HOST, BAD_PORT);
try {
if (future.await(1000)) {
if (future.isSuccess()) {
fail("A connection attempt to " + BAD_HOST + " must not succeed.");
} else {
throw future.cause();
}
}
if (future.cancel(true)) {
assertThat(future.channel().closeFuture().await(500), is(true));
assertThat(future.isCancelled(), is(true));
} else {
// Cancellation not supported by the transport.
}
} finally {
future.channel().close();
}
}
private static void shutdownDone(ChannelFuture shutdownOutputFuture,
ChannelFuture shutdownInputFuture,
ChannelPromise promise) {
Throwable shutdownOutputCause = shutdownOutputFuture.cause();
Throwable shutdownInputCause = shutdownInputFuture.cause();
if (shutdownOutputCause != null) {
if (shutdownInputCause != null) {
logger.debug("Exception suppressed because a previous exception occurred.",
shutdownInputCause);
}
promise.setFailure(shutdownOutputCause);
} else if (shutdownInputCause != null) {
promise.setFailure(shutdownInputCause);
} else {
promise.setSuccess();
}
}
private static void shutdownDone(ChannelFuture shutdownOutputFuture,
ChannelFuture shutdownInputFuture,
ChannelPromise promise) {
Throwable shutdownOutputCause = shutdownOutputFuture.cause();
Throwable shutdownInputCause = shutdownInputFuture.cause();
if (shutdownOutputCause != null) {
if (shutdownInputCause != null) {
logger.debug("Exception suppressed because a previous exception occurred.",
shutdownInputCause);
}
promise.setFailure(shutdownOutputCause);
} else if (shutdownInputCause != null) {
promise.setFailure(shutdownInputCause);
} else {
promise.setSuccess();
}
}
private static void shutdownDone(ChannelFuture shutdownOutputFuture,
ChannelFuture shutdownInputFuture,
ChannelPromise promise) {
Throwable shutdownOutputCause = shutdownOutputFuture.cause();
Throwable shutdownInputCause = shutdownInputFuture.cause();
if (shutdownOutputCause != null) {
if (shutdownInputCause != null) {
logger.debug("Exception suppressed because a previous exception occurred.",
shutdownInputCause);
}
promise.setFailure(shutdownOutputCause);
} else if (shutdownInputCause != null) {
promise.setFailure(shutdownInputCause);
} else {
promise.setSuccess();
}
}
@Override
public void send(Object message, boolean sent) throws RemotingException {
super.send(message, sent);
boolean success = true;
int timeout = 0;
try {
// 消息发送
ChannelFuture future = channel.writeAndFlush(message);
if (sent) {
timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// 超时时间,默认1000毫秒
success = future.await(timeout);
}
Throwable cause = future.cause();
if (cause != null) {
throw cause;
}
} catch (Throwable e) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
}
if (!success) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
+ "in timeout(" + timeout + "ms) limit");
}
}
@Override
public void operationComplete(ChannelFuture future) {
if (!future.isSuccess()) {
Throwable cause = future.cause();
LOGGER.warn("Error while sending message for " + queueName, cause);
if (cause instanceof ClosedChannelException) {
requeueMessage();
}
}
}
@Override
public ChannelFuture writePushPromise(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
Http2Headers headers, int padding, ChannelPromise promise) {
try {
if (connection.goAwayReceived()) {
throw connectionError(PROTOCOL_ERROR, "Sending PUSH_PROMISE after GO_AWAY received.");
}
Http2Stream stream = requireStream(streamId);
// Reserve the promised stream.
connection.local().reservePushStream(promisedStreamId, stream);
ChannelFuture future = frameWriter.writePushPromise(ctx, streamId, promisedStreamId, headers, padding,
promise);
// Writing headers may fail during the encode state if they violate HPACK limits.
Throwable failureCause = future.cause();
if (failureCause == null) {
// This just sets internal stream state which is used elsewhere in the codec and doesn't
// necessarily mean the write will complete successfully.
stream.pushPromiseSent();
if (!future.isSuccess()) {
// Either the future is not done or failed in the meantime.
notifyLifecycleManagerOnError(future, ctx);
}
} else {
lifecycleManager.onError(ctx, true, failureCause);
}
return future;
} catch (Throwable t) {
lifecycleManager.onError(ctx, true, t);
promise.tryFailure(t);
return promise;
}
}
private void firstWriteComplete(ChannelFuture future, ChannelPromise promise) {
Throwable cause = future.cause();
if (cause == null) {
// As we just finished our first write which made the stream-id valid we need to re-evaluate
// the writability of the channel.
writabilityChanged(Http2MultiplexCodec.this.isWritable(stream));
promise.setSuccess();
} else {
promise.setFailure(wrapStreamClosedError(cause));
// If the first write fails there is not much we can do, just close
closeForcibly();
}
}
final ChannelFuture initAndRegister() {
final Channel channel = channelFactory().newChannel();
try {
init(channel);
} catch (Throwable t) {
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
@Override
public Channel connect() {
final String ip = serviceInstance.getIp();
final int port = serviceInstance.getPort();
final ChannelFuture future = bootstrap.connect(new InetSocketAddress(ip, port));
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
log.debug("future callback, connect to {}:{} success, channel={}",
ip, port, channelFuture.channel());
// 发送clientName包到server
if (communicationOptions.getProtocol() instanceof ServerPushProtocol) {
sendClientNameToServer(future);
}
} else {
log.debug("future callback, connect to {}:{} failed due to {}",
ip, port, channelFuture.cause().getMessage());
}
}
});
future.syncUninterruptibly();
if (future.isSuccess()) {
return future.channel();
} else {
// throw exception when connect failed to the connection pool acquirer
log.error("connect to {}:{} failed, msg={}", ip, port, future.cause().getMessage());
throw new RpcException(future.cause());
}
}
public void execute(Request request, Response response) throws RpcException {
request.setCommunicationClient(this);
Channel channel = selectChannel();
request.setChannel(channel);
ChannelInfo channelInfo = ChannelInfo.getClientChannelInfo(channel);
RpcFuture rpcFuture = RpcFuture.createRpcFuture(request);
if (request.getCallback() != null) {
rpcFuture.setInterceptors(interceptors);
}
channelInfo.setCorrelationId(rpcFuture.getCorrelationId());
rpcFuture.setChannelInfo(channelInfo);
rpcFuture.setChannelType(communicationOptions.getChannelType());
request.setRpcFuture(rpcFuture);
request.setCorrelationId(rpcFuture.getCorrelationId());
try {
request.setSendBuf(communicationOptions.getProtocol().encodeRequest(request));
} catch (Throwable t) {
throw new RpcException(RpcException.SERIALIZATION_EXCEPTION, t.getMessage(), t);
}
// register timeout timer
Timer timeoutTimer = TimerInstance.getInstance();
RpcTimeoutTimer timeoutTask = new RpcTimeoutTimer(
channelInfo, request.getCorrelationId(), communicationOptions.getProtocol());
Timeout timeout = timeoutTimer.newTimeout(timeoutTask, request.getReadTimeoutMillis(), TimeUnit.MILLISECONDS);
request.getRpcFuture().setTimeout(timeout);
try {
// netty will release the send buffer after sent.
// we retain here, so it can be used when rpc retry.
request.retain();
ChannelFuture sendFuture = request.getChannel().writeAndFlush(request.getSendBuf());
sendFuture.awaitUninterruptibly(request.getWriteTimeoutMillis());
if (!sendFuture.isSuccess()) {
if (!(sendFuture.cause() instanceof ClosedChannelException)) {
log.warn("send request failed, channelActive={}, ex=",
request.getChannel().isActive(), sendFuture.cause());
}
String errMsg = String.format("send request failed, channelActive=%b",
request.getChannel().isActive());
throw new RpcException(RpcException.NETWORK_EXCEPTION, errMsg);
}
} catch (Exception ex) {
channelInfo.handleRequestFail(communicationOptions.getChannelType(), request.getCorrelationId());
timeout.cancel();
log.debug("send request failed:", ex);
if (ex instanceof RpcException) {
throw (RpcException) ex;
} else {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "send request failed", ex);
}
}
// return channel
channelInfo.handleRequestSuccess(communicationOptions.getChannelType());
// receive
if (rpcFuture.isAsync()) {
response.setRpcFuture(rpcFuture);
} else {
response.setResult(rpcFuture.get(request.getReadTimeoutMillis(), TimeUnit.MILLISECONDS));
response.setCorrelationId(rpcFuture.getCorrelationId());
}
}
@Override
protected void doConnect() throws Throwable {
long start = System.currentTimeMillis();
ChannelFuture future = bootstrap.connect(getConnectAddress()).sync();
try{
boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
if (ret && future.isSuccess()) {
io.netty.channel.Channel newChannel = future.channel();
try {
// 关闭旧的连接
io.netty.channel.Channel oldChannel = Netty4Client.this.channel; // copy reference
if (oldChannel != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
}
oldChannel.close();
} finally {
Netty4Channel.removeChannelIfDisconnected(oldChannel);
}
}
} finally {
if (Netty4Client.this.isClosed()) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close new netty channel " + newChannel + ", because the client closed.");
}
newChannel.close();
} finally {
Netty4Client.this.channel = null;
Netty4Channel.removeChannelIfDisconnected(newChannel);
}
} else {
Netty4Client.this.channel = newChannel;
}
}
} else if (future.cause() != null) {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + ", error message is:" + future.cause().getMessage(), future.cause());
} else {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + " client-side timeout "
+ getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
}
}finally{
if (! isConnected()) {
future.cancel(true);
}
}
}