下面列出了io.netty.channel.ChannelFuture#isSuccess ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* @see org.apache.commons.pool.BasePoolableObjectFactory#makeObject()
*/
public Object makeObject() throws Exception {
PbrpcClientChannel ch = new PbrpcClientChannel();
ChannelFuture future = pbrpcClient.connect();
future.awaitUninterruptibly();
if (!future.isSuccess()) {
LOG.warn("Making new connection on " + pbrpcClient.getInfo() + " not success",
future.cause());
}
LOG.info("Making new connection on " + pbrpcClient.getInfo() + " and adding to pool done");
ch.setChannelFuture(future);
return ch;
}
/**
* 实现发送消息
* Implement sending a message
*
* @param message message
* @param node peer info
* @param asyn default true
* @return NetworkEventResult
*/
@Override
public NetworkEventResult send(BaseMessage message, Node node, boolean asyn) {
try {
MessageHeader header = message.getHeader();
header.setMagicNumber(header.getMagicNumber());
BaseNulsData body = message.getMsgBody();
header.setPayloadLength(body.size());
ChannelFuture future = node.getChannel().writeAndFlush(Unpooled.wrappedBuffer(message.serialize()));
if (!asyn) {
future.await();
boolean success = future.isSuccess();
if (!success) {
return NetworkEventResult.getResultFail(NetworkErrorCode.NET_MESSAGE_SEND_FAIL);
}
}
} catch (Exception e) {
LoggerUtil.logger(node.getNodeGroup().getChainId()).error(e.getMessage(), e);
return NetworkEventResult.getResultFail(NetworkErrorCode.NET_MESSAGE_SEND_EXCEPTION);
}
return NetworkEventResult.getResultSuccess();
}
@Override
public void operationComplete(final ChannelFuture future) throws Exception {
final int inFlightBaches = inFlightBatches.decrementAndGet();
if(inFlightBaches == inFlightBatchesLowThreshold) {
throttler.restoreClientReads();
}
if (future.isSuccess()) {
publishedCounter.inc();
} else {
errorCounter.inc();
if (log.isDebugEnabled()) {
log.debug("Failed to write to {}: {}", host, future.cause().toString());
}
}
}
@Override
protected ListenableFuture<ClientHttpResponse> executeInternal(final HttpHeaders headers) throws IOException {
final SettableListenableFuture<ClientHttpResponse> responseFuture =
new SettableListenableFuture<ClientHttpResponse>();
ChannelFutureListener connectionListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
Channel channel = future.channel();
channel.pipeline().addLast(new RequestExecuteHandler(responseFuture));
FullHttpRequest nettyRequest = createFullHttpRequest(headers);
channel.writeAndFlush(nettyRequest);
}
else {
responseFuture.setException(future.cause());
}
}
};
this.bootstrap.connect(this.uri.getHost(), getPort(this.uri)).addListener(connectionListener);
return responseFuture;
}
public void testConnectCancellation(Bootstrap cb) throws Throwable {
cb.handler(new TestHandler()).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 4000);
ChannelFuture future = cb.connect(BAD_HOST, BAD_PORT);
try {
if (future.await(1000)) {
if (future.isSuccess()) {
fail("A connection attempt to " + BAD_HOST + " must not succeed.");
} else {
throw future.cause();
}
}
if (future.cancel(true)) {
assertThat(future.channel().closeFuture().await(500), is(true));
assertThat(future.isCancelled(), is(true));
} else {
// Cancellation not supported by the transport.
}
} finally {
future.channel().close();
}
}
private void waitWriteDone(ChannelFuture future, int timeout, RequestProtocol request, boolean needRemoveTrace)
throws SailfishException {
boolean done = future.awaitUninterruptibly(timeout);
if (!done) {
// useless at most of time when do writeAndFlush(...) invoke
future.cancel(true);
if (needRemoveTrace) {
getTracer().remove(request.packetId());
}
throw new SailfishException(ExceptionCode.WRITE_TIMEOUT,
String.format("write to remote[%s] timeout, protocol[%s]", channel.remoteAddress(), request));
}
if (!future.isSuccess()) {
if (needRemoveTrace) {
getTracer().remove(request.packetId());
}
throw new SailfishException(ExceptionCode.CHANNEL_WRITE_FAIL,
String.format("write to remote[%s] fail, protocol[%s]", channel.remoteAddress(), request),
future.cause());
}
}
/**
* 将数据不进行任何处理写入channel
*
* @param data 数据
* @return 写入状态
*/
public ProtocolFuture writeToChannel(Object data) {
this.lastActive = System.currentTimeMillis();
if (!isClosed()) {
ChannelFuture future = channel.write(data);
return new ProtocolFuture() {
@Override
public boolean isSuccess() {
return future.isSuccess();
}
@Override
public boolean isDone() {
return future.isDone();
}
};
} else {
return ProtocolFuture.ERRORFUTURE;
}
}
@Override
protected ListenableFuture<ClientHttpResponse> executeInternal(final HttpHeaders headers) throws IOException {
final SettableListenableFuture<ClientHttpResponse> responseFuture =
new SettableListenableFuture<ClientHttpResponse>();
ChannelFutureListener connectionListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
Channel channel = future.channel();
channel.pipeline().addLast(new RequestExecuteHandler(responseFuture));
FullHttpRequest nettyRequest = createFullHttpRequest(headers);
channel.writeAndFlush(nettyRequest);
}
else {
responseFuture.setException(future.cause());
}
}
};
this.bootstrap.connect(this.uri.getHost(), getPort(this.uri)).addListener(connectionListener);
return responseFuture;
}
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
Channel channel = future.channel();
Boolean keepAlive = ContextUtil.getKeepAlive(channel);
if (keepAlive == null || !keepAlive) {
channel.close();
}
}
}
public static SocketChannel open(SocketAddress address) throws Exception {
SocketChannel socket = null;
ChannelFuture future = boot.connect(address).sync();
if (future.isSuccess()) {
future.channel().pipeline().get(BusinessHandler.class).latch.await();
socket = chManager.get(future.channel());
}
if (null == socket) {
throw new IOException("can't create socket!");
}
return socket;
}
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
future.channel().write(request).addListener(writeResultListener);
if (content != null) {
future.channel().write(content).addListener(writeResultListener);
}
future.channel().flush();
} else {
exception = (Exception) future.cause();
invokeFutureAndCallback("RequestSender::operationComplete");
}
}
/**
* connects to all replicas
*
* @throws InterruptedException
*/
@SuppressFBWarnings("SIC_INNER_SHOULD_BE_STATIC_ANON")
public void connect() throws InterruptedException {
for (Entry<Integer, Integer> e : this.serverList.entrySet()) {
int replicaId = e.getKey();
int replicaPort = e.getValue();
if (replicaId != myServerId) {
Bootstrap b = new Bootstrap();
b.group(loopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// enable SSL/TLS support
SSLEngine engine = SSLContextFactory.getClientContext().createSSLEngine();
engine.setUseClientMode(true);
ch.pipeline().addLast(
new SslHandler(engine),
new ObjectEncoder(),
new ObjectDecoder(OzymandiasServer.maxObjectSize, ClassResolvers.cacheDisabled(null)));
}
});
/* wait till server is connected */
ChannelFuture f = null;
do {
f = b.connect("127.0.0.1", replicaPort);
f.await();
} while (!(f.isDone() && f.isSuccess()));
this.channels.add(f.sync().channel());
}
}
}
@Override
public void operationComplete(ChannelFuture future) {
if (!future.isSuccess()) {
Throwable ex = future.cause();
if(ex == null){
sendFailure(new UserRpcException(null, "Unknown failure when sending message.", null));
} else {
sendFailure(new UserRpcException(null, "Failure when sending message.", ex));
}
}
}
private void messageSent(ChannelHandlerContext context, ChannelFuture future, RequestHandler requestHandler)
{
try {
if (!future.isSuccess()) {
onError(context, new TTransportException("Sending request failed", future.cause()), Optional.of(requestHandler));
return;
}
requestHandler.onRequestSent();
}
catch (Throwable t) {
onError(context, t, Optional.of(requestHandler));
}
}
@Override
protected void finishConnectEvent(ChannelFuture ch) {
super.finishConnectEvent(ch);
if (ch.isSuccess()) {
// start handskake
ChannelFuture handFuture = this.handshaker.handshake(this.getChannel());
if (this.isSyncConnect()) {
handFuture.syncUninterruptibly();
}
}
}
protected void finishConnectEvent(ChannelFuture ch) {
if (ch.isSuccess() && (channel != null) && channel.isActive()) {
NettyLog.info("client connect " + channel.remoteAddress());
prepareLogin();
} else {
NettyLog.info(" client connect failure!");
}
}
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
sendNumbers();
} else {
future.cause().printStackTrace();
future.channel().close();
}
}
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
generateTraffic();
} else {
future.cause().printStackTrace();
future.channel().close();
}
}
public void send2Client(Channel channel, byte[] arr) throws InterruptedException {
ChannelFuture future = channel.writeAndFlush(Unpooled.copiedBuffer(arr)).sync();
if (!future.isSuccess()) {
log.error("发送数据出错:{}", future.cause());
}
}
@Override
public void operationComplete(final ChannelFuture future) throws Exception {
if (future.isSuccess()) {
future.channel().attr(ChannelAttributes.CONNACK_SENT).set(true);
}
}