下面列出了io.netty.channel.ChannelFuture#awaitUninterruptibly ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Wait (sequentially) for a time duration for each anticipated response
*
* @param timeout Value of time to wait for each response
* @param unit Units associated with {@code timeout}
* @see HttpResponseHandler#put(int, io.netty.channel.ChannelFuture, io.netty.channel.ChannelPromise)
*/
public void awaitResponses(long timeout, TimeUnit unit) {
Iterator<Entry<Integer, Entry<ChannelFuture, ChannelPromise>>> itr = streamidPromiseMap.entrySet().iterator();
while (itr.hasNext()) {
Entry<Integer, Entry<ChannelFuture, ChannelPromise>> entry = itr.next();
ChannelFuture writeFuture = entry.getValue().getKey();
if (!writeFuture.awaitUninterruptibly(timeout, unit)) {
throw new IllegalStateException("Timed out waiting to write for stream id " + entry.getKey());
}
if (!writeFuture.isSuccess()) {
throw new RuntimeException(writeFuture.cause());
}
ChannelPromise promise = entry.getValue().getValue();
if (!promise.awaitUninterruptibly(timeout, unit)) {
throw new IllegalStateException("Timed out waiting for response on stream id " + entry.getKey());
}
if (!promise.isSuccess()) {
throw new RuntimeException(promise.cause());
}
System.out.println("---Stream id: " + entry.getKey() + " received---");
itr.remove();
}
}
/**
* Closes all channels and releases all resources.
*/
@Override
public void close() {
LOG.info("Stopping listening at {} and closing", serverListeningChannel.localAddress());
final ChannelFuture closeListeningChannelFuture = serverListeningChannel.close();
final ChannelGroupFuture channelGroupCloseFuture = channelGroup.close();
final Future serverListeningGroupCloseFuture = serverListeningGroup.shutdownGracefully();
final Future serverWorkingGroupCloseFuture = serverWorkingGroup.shutdownGracefully();
final Future clientGroupCloseFuture = clientGroup.shutdownGracefully();
closeListeningChannelFuture.awaitUninterruptibly();
channelGroupCloseFuture.awaitUninterruptibly();
serverListeningGroupCloseFuture.awaitUninterruptibly();
serverWorkingGroupCloseFuture.awaitUninterruptibly();
clientGroupCloseFuture.awaitUninterruptibly();
}
/**
* Wait (sequentially) for a time duration for each anticipated response
*
* @param timeout Value of time to wait for each response
* @param unit Units associated with {@code timeout}
* @see Http2ResponseHandler#put(int, io.netty.channel.ChannelFuture, io.netty.channel.ChannelPromise)
*/
public void awaitResponses(long timeout, TimeUnit unit) {
Iterator<Entry<Integer, Entry<ChannelFuture, ChannelPromise>>> itr = streamidPromiseMap.entrySet().iterator();
while (itr.hasNext()) {
Entry<Integer, Entry<ChannelFuture, ChannelPromise>> entry = itr.next();
ChannelFuture writeFuture = entry.getValue().getKey();
if (!writeFuture.awaitUninterruptibly(timeout, unit)) {
throw new IllegalStateException("Timed out waiting to write for stream id " + entry.getKey());
}
if (!writeFuture.isSuccess()) {
throw new RuntimeException(writeFuture.cause());
}
ChannelPromise promise = entry.getValue().getValue();
if (!promise.awaitUninterruptibly(timeout, unit)) {
throw new IllegalStateException("Timed out waiting for response on stream id " + entry.getKey());
}
if (!promise.isSuccess()) {
throw new RuntimeException(promise.cause());
}
log.debug("Stream id: " + entry.getKey() + " received");
itr.remove();
}
}
public XioServer build() {
log.debug("Building");
serverBootstrap.group(channelConfig.bossGroup(), channelConfig.workerGroup());
serverBootstrap.channel(channelConfig.channel());
final XioServerInstrumentation instrumentation = new XioServerInstrumentation();
serverBootstrap.childHandler(pipelineAssembler.build(instrumentation));
ChannelFuture future = serverBootstrap.bind();
future.awaitUninterruptibly();
if (future.isSuccess()) {
instrumentation.addressBound = (InetSocketAddress) future.channel().localAddress();
} else {
log.error("Couldn't bind channel", future.cause());
throw new RuntimeException(future.cause());
}
return new XioServer(future.channel(), instrumentation, config, state);
}
private void waitWriteDone(ChannelFuture future, int timeout, RequestProtocol request, boolean needRemoveTrace)
throws SailfishException {
boolean done = future.awaitUninterruptibly(timeout);
if (!done) {
// useless at most of time when do writeAndFlush(...) invoke
future.cancel(true);
if (needRemoveTrace) {
getTracer().remove(request.packetId());
}
throw new SailfishException(ExceptionCode.WRITE_TIMEOUT,
String.format("write to remote[%s] timeout, protocol[%s]", channel.remoteAddress(), request));
}
if (!future.isSuccess()) {
if (needRemoveTrace) {
getTracer().remove(request.packetId());
}
throw new SailfishException(ExceptionCode.CHANNEL_WRITE_FAIL,
String.format("write to remote[%s] fail, protocol[%s]", channel.remoteAddress(), request),
future.cause());
}
}
/**
* Start proxy server
* */
public void start()
throws InterruptedException {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(_acceptorGroup, _upstreamWorkerGroup);
serverBootstrap.channelFactory(new ChannelFactory<ServerChannel>() {
@Override
public ServerChannel newChannel() {
return new NioServerSocketChannel();
}
});
serverBootstrap.childHandler(new ProxyInitializer(this));
//bind
ChannelFuture future = serverBootstrap.bind(_host, _port);
//wait for the future
future.awaitUninterruptibly();
if (!future.isSuccess()) {
future.channel().closeFuture().awaitUninterruptibly();
throw new ChannelException(String.format("Failed to bind to: %s:%d", _host, _port), future.cause());
} else {
_allChannels.add(future.channel());
}
}
/**
* @see org.apache.commons.pool.BasePoolableObjectFactory#makeObject()
*/
public Object makeObject() throws Exception {
PbrpcClientChannel ch = new PbrpcClientChannel();
ChannelFuture future = pbrpcClient.connect();
future.awaitUninterruptibly();
if (!future.isSuccess()) {
LOG.warn("Making new connection on " + pbrpcClient.getInfo() + " not success",
future.cause());
}
LOG.info("Making new connection on " + pbrpcClient.getInfo() + " and adding to pool done");
ch.setChannelFuture(future);
return ch;
}
@Test
public void testSendGeneratedPinModeCommandWhenHardwareGoesOnline() throws Exception {
ChannelFuture channelFuture = clientPair.hardwareClient.stop();
channelFuture.awaitUninterruptibly();
assertTrue(channelFuture.isDone());
clientPair.appClient.send("hardware 1 vw 1 1");
verify(clientPair.appClient.responseMock, timeout(500)).channelRead(any(), eq(new ResponseMessage(1, DEVICE_NOT_IN_NETWORK)));
TestHardClient hardClient = new TestHardClient("localhost", properties.getHttpPort());
hardClient.start();
hardClient.login(clientPair.token);
verify(hardClient.responseMock, timeout(1000)).channelRead(any(), eq(ok(1)));
String expectedBody = "pm 1 out 2 out 3 out 5 out 6 in 7 in 30 in 8 in";
verify(hardClient.responseMock, timeout(500)).channelRead(any(), eq(hardware(1, expectedBody)));
verify(hardClient.responseMock, times(2)).channelRead(any(), any());
hardClient.stop().awaitUninterruptibly();
}
@Test
public void testCloseOnInvalid() {
ChannelFuture closeFuture = ch.closeFuture();
String header = "GET / HTTP/1.1\r\n";
try {
ch.writeInbound(copiedBuffer(header, CharsetUtil.US_ASCII));
} catch (HAProxyProtocolException ppex) {
// swallow this exception since we're just testing to be sure the channel was closed
}
boolean isComplete = closeFuture.awaitUninterruptibly(5000);
if (!isComplete || !closeFuture.isDone() || !closeFuture.isSuccess()) {
fail("Expected channel close");
}
}
public void send(FullHttpRequest request) throws HttpClientException {
request.headers().add(HttpHeaderNames.HOST, hostHeader);
if (!channel.isActive()) {
throw new HttpClientException("channel is not active");
}
try {
latch.await(5, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
throw new HttpClientException("sending poll is full");
}
incrementPendingRequests();
ChannelFuture future = this.channel.writeAndFlush(request);
future.awaitUninterruptibly();
}
public String awaitResponses(long timeout, TimeUnit unit) {
Iterator<Entry<Integer, MapValues>> itr = streamidMap.entrySet()
.iterator();
String response = null;
while (itr.hasNext()) {
Entry<Integer, MapValues> entry = itr.next();
ChannelFuture writeFuture = entry.getValue()
.getWriteFuture();
if (!writeFuture.awaitUninterruptibly(timeout, unit)) {
throw new IllegalStateException("Timed out waiting to write for stream id " + entry.getKey());
}
if (!writeFuture.isSuccess()) {
throw new RuntimeException(writeFuture.cause());
}
ChannelPromise promise = entry.getValue()
.getPromise();
if (!promise.awaitUninterruptibly(timeout, unit)) {
throw new IllegalStateException("Timed out waiting for response on stream id " + entry.getKey());
}
if (!promise.isSuccess()) {
throw new RuntimeException(promise.cause());
}
logger.info("---Stream id: " + entry.getKey() + " received---");
response = entry.getValue().getResponse();
itr.remove();
}
return response;
}
public Channel getChannel(Request request) {
connectLock.lock();
try {
NettyChannelWrapper nettyChannelWrapper = (NettyChannelWrapper) this.channelTable.get(request.getAddress());
if (nettyChannelWrapper != null && nettyChannelWrapper.isActive()) {
return nettyChannelWrapper.getChannel();
}
// 发起异步连接操作
this.doConnect(request);
ChannelFuture channelFuture = this.bootstrap.connect(HttpUtils.parseSocketAddress(request.getAddress()));
nettyChannelWrapper = new NettyChannelWrapper(channelFuture);
if (channelFuture.awaitUninterruptibly(Constants.RPC_TIMEOUT)) {
if (nettyChannelWrapper.isActive()) {
if (logger.isInfoEnabled()) {
logger.info("[JobX] NettyRPC getChannel: connect remote host[{}] success, {}", request.getAddress(), channelFuture.toString());
}
this.channelTable.put(request.getAddress(), nettyChannelWrapper);
return nettyChannelWrapper.getChannel();
} else {
if (logger.isWarnEnabled()) {
logger.warn("[JobX] NettyRPC getChannel: connect remote host[" + request.getAddress() + "] failed, " + channelFuture.toString(), channelFuture.cause());
}
}
} else {
if (logger.isWarnEnabled()) {
logger.warn("[JobX] NettyRPC getChannel: connect remote host[{}] timeout {}ms, {}", request.getAddress(), Constants.RPC_TIMEOUT, channelFuture);
}
}
}finally {
connectLock.unlock();
}
return null;
}
private void addConnection(String host, int port, boolean sync) {
ChannelFuture future = gatewayClient.connect(host, port);
future.channel().attr(attrKey).set(getHostAndPort(host, port));
future.addListener(f -> {
if (!f.isSuccess()) {
logger.error("create gateway connection failure, host={}, port={}", host, port, f.cause());
}
});
if (sync) future.awaitUninterruptibly();
}
@Override
public ChannelFuture send(Packet packet, final ChannelFutureListener listener) {
if (channel.isActive()) {
ChannelFuture future = channel.writeAndFlush(packet.toFrame(channel)).addListener(this);
if (listener != null) {
future.addListener(listener);
}
if (channel.isWritable()) {
return future;
}
//阻塞调用线程还是抛异常?
//return channel.newPromise().setFailure(new RuntimeException("send data too busy"));
if (!future.channel().eventLoop().inEventLoop()) {
future.awaitUninterruptibly(100);
}
return future;
} else {
/*if (listener != null) {
channel.newPromise()
.addListener(listener)
.setFailure(new RuntimeException("connection is disconnected"));
}*/
return this.close();
}
}
private ClientStompFrame sendFrameInternal(ClientStompFrame frame, boolean wicked) throws IOException, InterruptedException {
ClientStompFrame response = null;
ByteBuffer buffer;
if (wicked) {
buffer = frame.toByteBufferWithExtra("\n");
} else {
buffer = frame.toByteBuffer();
}
ByteBuf buf = Unpooled.copiedBuffer(buffer);
try {
buf.retain();
ChannelFuture future = transport.send(buf);
if (future != null) {
future.awaitUninterruptibly();
}
} finally {
buf.release();
}
//now response
if (frame.needsReply()) {
response = receiveFrame();
//filter out server ping
while (response != null) {
if (response.getCommand().equals(Stomp.Commands.STOMP)) {
response = receiveFrame();
} else {
break;
}
}
}
return response;
}
private void closeChannel(final Channel channel, boolean inEventLoop) {
checkFlushBatchBuffer();
// closing the channel results in closing any sslHandler first; SslHandler#close() was deprecated by netty
ChannelFuture closeFuture = channel.close();
if (!inEventLoop && !closeFuture.awaitUninterruptibly(DEFAULT_WAIT_MILLIS)) {
ActiveMQClientLogger.LOGGER.timeoutClosingNettyChannel();
}
}
@Override
protected void doConnect() throws Throwable {
long start = System.currentTimeMillis();
ChannelFuture future = bootstrap.connect(getConnectAddress());
try {
// 同步等待连接
boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
if (ret && future.isSuccess()) {
Channel newChannel = future.channel();
try {
// Close old channel
Channel oldChannel = NettyClient.this.channel; // copy reference
if (oldChannel != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
}
oldChannel.close();
} finally {
NettyChannel.removeChannelIfDisconnected(oldChannel);
}
}
} finally {
if (NettyClient.this.isClosed()) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close new netty channel " + newChannel + ", because the client closed.");
}
newChannel.close();
} finally {
NettyClient.this.channel = null;
NettyChannel.removeChannelIfDisconnected(newChannel);
}
} else {
NettyClient.this.channel = newChannel;
}
}
} else if (future.cause() != null) {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + ", error message is:" + future.cause().getMessage(), future.cause());
} else {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + " client-side timeout "
+ getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
}
} finally {
if (!isConnected()) {
//future.cancel(true);
}
}
}
public void execute(Request request, Response response) throws RpcException {
request.setCommunicationClient(this);
Channel channel = selectChannel();
request.setChannel(channel);
ChannelInfo channelInfo = ChannelInfo.getClientChannelInfo(channel);
RpcFuture rpcFuture = RpcFuture.createRpcFuture(request);
if (request.getCallback() != null) {
rpcFuture.setInterceptors(interceptors);
}
channelInfo.setCorrelationId(rpcFuture.getCorrelationId());
rpcFuture.setChannelInfo(channelInfo);
rpcFuture.setChannelType(communicationOptions.getChannelType());
request.setRpcFuture(rpcFuture);
request.setCorrelationId(rpcFuture.getCorrelationId());
try {
request.setSendBuf(communicationOptions.getProtocol().encodeRequest(request));
} catch (Throwable t) {
throw new RpcException(RpcException.SERIALIZATION_EXCEPTION, t.getMessage(), t);
}
// register timeout timer
Timer timeoutTimer = TimerInstance.getInstance();
RpcTimeoutTimer timeoutTask = new RpcTimeoutTimer(
channelInfo, request.getCorrelationId(), communicationOptions.getProtocol());
Timeout timeout = timeoutTimer.newTimeout(timeoutTask, request.getReadTimeoutMillis(), TimeUnit.MILLISECONDS);
request.getRpcFuture().setTimeout(timeout);
try {
// netty will release the send buffer after sent.
// we retain here, so it can be used when rpc retry.
request.retain();
ChannelFuture sendFuture = request.getChannel().writeAndFlush(request.getSendBuf());
sendFuture.awaitUninterruptibly(request.getWriteTimeoutMillis());
if (!sendFuture.isSuccess()) {
if (!(sendFuture.cause() instanceof ClosedChannelException)) {
log.warn("send request failed, channelActive={}, ex=",
request.getChannel().isActive(), sendFuture.cause());
}
String errMsg = String.format("send request failed, channelActive=%b",
request.getChannel().isActive());
throw new RpcException(RpcException.NETWORK_EXCEPTION, errMsg);
}
} catch (Exception ex) {
channelInfo.handleRequestFail(communicationOptions.getChannelType(), request.getCorrelationId());
timeout.cancel();
log.debug("send request failed:", ex);
if (ex instanceof RpcException) {
throw (RpcException) ex;
} else {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "send request failed", ex);
}
}
// return channel
channelInfo.handleRequestSuccess(communicationOptions.getChannelType());
// receive
if (rpcFuture.isAsync()) {
response.setRpcFuture(rpcFuture);
} else {
response.setResult(rpcFuture.get(request.getReadTimeoutMillis(), TimeUnit.MILLISECONDS));
response.setCorrelationId(rpcFuture.getCorrelationId());
}
}
/**
* shutdown server process (listeners and handlers
*/
public void shutdown() {
ChannelFuture cf = serverChannel.channel().close();
cf.awaitUninterruptibly();
}
@Override
protected void doConnect() throws Throwable {
long start = System.currentTimeMillis();
ChannelFuture future = bootstrap.connect(getConnectAddress()).sync();
try{
boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
if (ret && future.isSuccess()) {
io.netty.channel.Channel newChannel = future.channel();
try {
// 关闭旧的连接
io.netty.channel.Channel oldChannel = Netty4Client.this.channel; // copy reference
if (oldChannel != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
}
oldChannel.close();
} finally {
Netty4Channel.removeChannelIfDisconnected(oldChannel);
}
}
} finally {
if (Netty4Client.this.isClosed()) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close new netty channel " + newChannel + ", because the client closed.");
}
newChannel.close();
} finally {
Netty4Client.this.channel = null;
Netty4Channel.removeChannelIfDisconnected(newChannel);
}
} else {
Netty4Client.this.channel = newChannel;
}
}
} else if (future.cause() != null) {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + ", error message is:" + future.cause().getMessage(), future.cause());
} else {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + " client-side timeout "
+ getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
}
}finally{
if (! isConnected()) {
future.cancel(true);
}
}
}