下面列出了java.nio.channels.NotYetConnectedException#io.netty.channel.ChannelFuture 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public ChannelFuture close(final ChannelPromise promise) {
ChannelHandlerContext ctx = ctx();
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
return finishEncode(ctx, promise);
} else {
final ChannelPromise p = ctx.newPromise();
executor.execute(new Runnable() {
@Override
public void run() {
ChannelFuture f = finishEncode(ctx(), p);
f.addListener(new ChannelPromiseNotifier(promise));
}
});
return p;
}
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
final TablesKey tk = new TablesKey(Ipv4AddressFamily.class, UnicastSubsequentAddressFamily.class);
final Map<TablesKey, PathSelectionMode> pathTables = ImmutableMap.of(tk,
BasePathSelectionModeFactory.createBestPathSelectionStrategy());
this.ribImpl = new RIBImpl(this.tableRegistry, new RibId("test-rib"), AS_NUMBER, new BgpId(RIB_ID),
this.ribExtension,
this.serverDispatcher, this.codecsRegistry, getDomBroker(), getDataBroker(), this.policies,
TABLES_TYPE, pathTables);
this.ribImpl.instantiateServiceInstance();
final ChannelFuture channelFuture = this.serverDispatcher.createServer(
new InetSocketAddress(RIB_ID, PORT.toJava()));
waitFutureSuccess(channelFuture);
this.serverChannel = channelFuture.channel();
}
public void start() throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new HttpClientInitializer());
// 发起异步连接
ChannelFuture future = bootstrap.connect(new InetSocketAddress("127.0.0.1", 3560));
// 当客户端链路关闭
future.channel().closeFuture().sync();
}finally {
// 优雅退出,释放NIO线程组
group.shutdownGracefully();
}
}
private TransportAddress bindAddress(final InetAddress hostAddress) {
final AtomicReference<Exception> lastException = new AtomicReference<>();
final AtomicReference<InetSocketAddress> boundSocket = new AtomicReference<>();
boolean success = port.iterate(portNumber -> {
try {
synchronized (serverChannels) {
ChannelFuture future = serverBootstrap.bind(new InetSocketAddress(hostAddress, portNumber)).sync();
serverChannels.add(future.channel());
boundSocket.set((InetSocketAddress) future.channel().localAddress());
}
} catch (Exception e) {
lastException.set(e);
return false;
}
return true;
});
if (!success) {
throw new BindHttpException("Failed to bind to [" + port.getPortRangeString() + "]", lastException.get());
}
if (logger.isDebugEnabled()) {
logger.debug("Bound http to address {{}}", NetworkAddress.format(boundSocket.get()));
}
return new TransportAddress(boundSocket.get());
}
@Override
public void bind(SocketAddress address) throws IOException {
InetSocketAddress inetAddress = (InetSocketAddress) address;
ChannelFuture f = bootstrap.bind(inetAddress);
Channel channel = f.channel();
channelGroup.add(channel);
try {
f.sync();
SocketAddress bound = channel.localAddress();
boundAddresses.put(bound, channel);
channel.closeFuture().addListener(fut -> {
boundAddresses.remove(bound);
});
} catch (Exception e) {
throw Helper.toIOException(e);
}
}
private static void sendHttpResponse(
ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
// Generate an error page if response getStatus code is not OK (200).
if (res.status().code() != 200) {
ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
res.content().writeBytes(buf);
buf.release();
HttpUtil.setContentLength(res, res.content().readableBytes());
}
// Send the response and close the connection if necessary.
ChannelFuture f = ctx.channel().writeAndFlush(res);
if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
protected ChannelFuture doTcpConntecSync(DFTcpClientCfg cfg, EventLoopGroup ioGroup, ChannelHandler handler){
if(ioGroup == null){
return null;
}
Bootstrap boot = new Bootstrap();
boot.group(ioGroup)
.option(ChannelOption.ALLOCATOR,
PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.SO_KEEPALIVE, cfg.isKeepAlive())
.option(ChannelOption.SO_RCVBUF, cfg.getSoRecvBufLen())
.option(ChannelOption.SO_SNDBUF, cfg.getSoSendBufLen())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int)cfg.getConnTimeout())
.option(ChannelOption.TCP_NODELAY, cfg.isTcpNoDelay())
.handler(new TcpHandlerInit(false, cfg.getTcpProtocol(),
cfg.getTcpMsgMaxLength(), 0, 0, cfg.getWsUri(), null,
cfg.getDecoder(), cfg.getEncoder(), cfg.getUserHandler(), cfg.getSslCfg()
, cfg.getReqData(), handler));
if(ioGroup instanceof EpollEventLoopGroup){
boot.channel(EpollSocketChannel.class);
}else{
boot.channel(NioSocketChannel.class);
}
ChannelFuture future = boot.connect(cfg.host, cfg.port);
return future;
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
if (outboundChannel.isActive()) {
outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
// was able to flush out data, start to read the next chunk
ctx.channel().read();
} else {
future.channel().close();
}
}
});
}
}
@Test
public void receivingGoAwayFailsBufferedStreams() {
encoder.writeSettingsAck(ctx, newPromise());
setMaxConcurrentStreams(5);
int streamId = 3;
List<ChannelFuture> futures = new ArrayList<ChannelFuture>();
for (int i = 0; i < 9; i++) {
futures.add(encoderWriteHeaders(streamId, newPromise()));
streamId += 2;
}
assertEquals(4, encoder.numBufferedStreams());
connection.goAwayReceived(11, 8, EMPTY_BUFFER);
assertEquals(5, connection.numActiveStreams());
int failCount = 0;
for (ChannelFuture f : futures) {
if (f.cause() != null) {
failCount++;
}
}
assertEquals(9, failCount);
assertEquals(0, encoder.numBufferedStreams());
}
@Override
public ChannelFuture doWriteHeaders(int id, int streamId, RequestHeaders headers, boolean endStream) {
final Http2Connection conn = encoder().connection();
if (isStreamPresentAndWritable(streamId)) {
if (keepAliveHandler != null) {
keepAliveHandler.onReadOrWrite();
}
return encoder().writeHeaders(ctx(), streamId, convertHeaders(headers), 0,
endStream, ctx().newPromise());
}
final Endpoint<Http2LocalFlowController> local = conn.local();
if (local.mayHaveCreatedStream(streamId)) {
final ClosedStreamException closedStreamException =
new ClosedStreamException("Cannot create a new stream. streamId: " + streamId +
", lastStreamCreated: " + local.lastStreamCreated());
return newFailedFuture(UnprocessedRequestException.of(closedStreamException));
}
// Client starts a new stream.
return encoder().writeHeaders(ctx(), streamId, convertHeaders(headers), 0, endStream,
ctx().newPromise());
}
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoServerV4FirstHandler());
p.addLast(new EchoServerV4SecondHandler());
}
});
ChannelFuture f = b.bind(8888).sync();
f.channel().closeFuture().sync();
}
finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
/**
* Stitches all channel handlers into server bootstrap.
*/
private void run() {
try {
final Bootstrap bootstrap = createServerBootstrap();
configBootstrapOptions(bootstrap);
lispPorts.forEach(p -> {
InetSocketAddress sa = new InetSocketAddress(p);
channelFutures.add(bootstrap.bind(sa));
log.info("Listening for LISP router connections on {}", sa);
});
for (ChannelFuture f : channelFutures) {
f.sync();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Test
public void scanNotActiveChannel() throws Exception {
producerManager.registerProducer(group, clientInfo);
assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNotNull();
Field field = ProducerManager.class.getDeclaredField("CHANNEL_EXPIRED_TIMEOUT");
field.setAccessible(true);
long CHANNEL_EXPIRED_TIMEOUT = field.getLong(producerManager);
clientInfo.setLastUpdateTimestamp(System.currentTimeMillis() - CHANNEL_EXPIRED_TIMEOUT - 10);
when(channel.close()).thenReturn(mock(ChannelFuture.class));
producerManager.scanNotActiveChannel();
assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNull();
}
private static void notifyConnect(ChannelFuture future, Promise<Channel> promise)
{
if (future.isSuccess()) {
Channel channel = future.channel();
if (!promise.trySuccess(channel)) {
// Promise was completed in the meantime (likely cancelled), just release the channel again
channel.close();
}
}
else {
promise.tryFailure(future.cause());
}
}
private void closeChannel(Channel channel) {
final String addrRemote = RemotingHelper.parseChannelRemoteAddr(channel);
channel.close().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
logger.info("closeChannel: close the connection to remote address[{}] result: {}", addrRemote, future.isSuccess());
}
});
}
@Test
public void headersAfterCloseShouldImmediatelyFail() {
encoder.writeSettingsAck(ctx, newPromise());
encoder.close();
ChannelFuture f = encoderWriteHeaders(3, newPromise());
assertNotNull(f.cause());
}
default void sendResponse(ChannelHandlerContext ctx, Object msg) {
ChannelFuture f = ctx.writeAndFlush(msg);
LOG.trace(Constants.LOG_RETURNING_RESPONSE, msg);
if (!f.isSuccess()) {
LOG.error(Constants.ERR_WRITING_RESPONSE, f.cause());
}
}
@Override
public void writeFrame(
WritableBuffer frame, boolean endOfStream, boolean flush, final int numMessages) {
Preconditions.checkArgument(numMessages >= 0);
ByteBuf bytebuf = frame == null ? EMPTY_BUFFER : ((NettyWritableBuffer) frame).bytebuf();
final int numBytes = bytebuf.readableBytes();
if (numBytes > 0) {
// Add the bytes to outbound flow control.
onSendingBytes(numBytes);
writeQueue.enqueue(new SendGrpcFrameCommand(transportState(), bytebuf, endOfStream), flush)
.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// If the future succeeds when http2stream is null, the stream has been cancelled
// before it began and Netty is purging pending writes from the flow-controller.
if (future.isSuccess() && transportState().http2Stream() != null) {
// Remove the bytes from outbound flow control, optionally notifying
// the client that they can send more bytes.
transportState().onSentBytes(numBytes);
NettyClientStream.this.getTransportTracer().reportMessageSent(numMessages);
}
}
});
} else {
// The frame is empty and will not impact outbound flow control. Just send it.
writeQueue.enqueue(new SendGrpcFrameCommand(transportState(), bytebuf, endOfStream), flush);
}
}
public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
request.markOnewayRPC();
boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
try {
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
once.release();
if (!f.isSuccess()) {
PLOG.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
}
}
});
} catch (Exception e) {
once.release();
PLOG.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
}
} else {
if (timeoutMillis <= 0) {
throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");
} else {
String info = String.format(
"invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", //
timeoutMillis, //
this.semaphoreOneway.getQueueLength(), //
this.semaphoreOneway.availablePermits()//
);
PLOG.warn(info);
throw new RemotingTimeoutException(info);
}
}
}
public void run() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.handler(new SecureChatClientInitializer(sslMode));
// Start the connection attempt.
Channel ch = b.connect(host, port).sync().channel();
// Read commands from the stdin.
ChannelFuture lastWriteFuture = null;
BufferedReader in = new BufferedReader(new InputStreamReader(
System.in));
for (;;) {
String line = in.readLine();
if (line == null) {
break;
}
// Sends the received line to the server.
lastWriteFuture = ch.writeAndFlush(line + "\r\n");
// If user typed the 'bye' command, wait until the server closes
// the connection.
if ("bye".equals(line.toLowerCase())) {
ch.closeFuture().sync();
break;
}
}
// Wait until all messages are flushed before closing the channel.
if (lastWriteFuture != null) {
lastWriteFuture.sync();
}
} finally {
// The connection is closed automatically on shutdown.
group.shutdownGracefully();
}
}
/**
* Do not rely on channel handlers to propagate exceptions to us.
* {@link NettyClientHandler} is an example of a class that does not propagate exceptions.
* Add a listener to the connect future directly and do appropriate error handling.
*/
@Override
public void connect(final ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception {
super.connect(ctx, remoteAddress, localAddress, promise);
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
fail(ctx, future.cause());
}
}
});
}
@Test
public void uncaughtReadFails() throws Exception {
WriteBufferingAndExceptionHandler handler =
new WriteBufferingAndExceptionHandler(new ChannelHandlerAdapter() {});
LocalAddress addr = new LocalAddress("local");
ChannelFuture cf = new Bootstrap()
.channel(LocalChannel.class)
.handler(handler)
.group(group)
.register();
chan = cf.channel();
cf.sync();
ChannelFuture sf = new ServerBootstrap()
.channel(LocalServerChannel.class)
.childHandler(new ChannelHandlerAdapter() {})
.group(group)
.bind(addr);
server = sf.channel();
sf.sync();
ChannelFuture wf = chan.writeAndFlush(new Object());
chan.connect(addr);
chan.pipeline().fireChannelRead(Unpooled.copiedBuffer(new byte[] {'a'}));
try {
wf.sync();
fail();
} catch (Exception e) {
Status status = Status.fromThrowable(e);
assertThat(status.getCode()).isEqualTo(Code.INTERNAL);
assertThat(status.getDescription()).contains("channelRead() missed");
}
}
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause() == null ? exception : future.cause();
if (cause != null) {
handleChannelWriteFailure(cause, false);
} else {
cleanupChunks(null);
}
logger.debug("Chunk cleanup complete on channel {}", ctx.channel());
}
public void connect(int port) throws Exception {
// 配置客户端NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast("http-decoder",
new HttpResponseDecoder());
ch.pipeline().addLast("http-aggregator",
new HttpObjectAggregator(65536));
// XML解码器
ch.pipeline().addLast(
"xml-decoder",
new HttpXmlResponseDecoder(Order.class,
true));
ch.pipeline().addLast("http-encoder",
new HttpRequestEncoder());
ch.pipeline().addLast("xml-encoder",
new HttpXmlRequestEncoder());
ch.pipeline().addLast("xmlClientHandler",
new HttpXmlClientHandler());
}
});
// 发起异步连接操作
ChannelFuture f = b.connect(new InetSocketAddress(port)).sync();
// 当代客户端链路关闭
f.channel().closeFuture().sync();
} finally {
// 优雅退出,释放NIO线程组
group.shutdownGracefully();
}
}
private void writeResponse(boolean forceClose){
boolean close = isClose();
if(!close && !forceClose){
response.headers().add(HttpHeaders.CONTENT_LENGTH, String.valueOf(response.content().readableBytes()));
}
ChannelFuture future = channel.write(response);
if(close || forceClose){
future.addListener(ChannelFutureListener.CLOSE);
}
}
@Test
public void receivedGoAwayShouldRefuseLaterStreamId() throws Exception {
ChannelFuture future = enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));
channelRead(goAwayFrame(streamId - 1));
verify(streamListener).closed(any(Status.class), eq(REFUSED), any(Metadata.class));
assertTrue(future.isDone());
}
@Test
public void receivedGoAwayShouldFailUnknownBufferedStreams() throws Exception {
receiveMaxConcurrentStreams(0);
ChannelFuture future = enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));
// Read a GOAWAY that indicates our stream was never processed by the server.
channelRead(goAwayFrame(0, 8 /* Cancel */, Unpooled.copiedBuffer("this is a test", UTF_8)));
assertTrue(future.isDone());
assertFalse(future.isSuccess());
Status status = Status.fromThrowable(future.cause());
assertEquals(Status.CANCELLED.getCode(), status.getCode());
assertEquals("HTTP/2 error code: CANCEL\nReceived Goaway\nthis is a test",
status.getDescription());
}
@Override
public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
try {
return joinGroup(
multicastAddress,
NetworkInterface.getByInetAddress(localAddress().getAddress()),
null, promise);
} catch (SocketException e) {
promise.setFailure(e);
}
return promise;
}
@Override
public CompletableFuture<Channel> getChannel(M address) {
final CompletableFuture<Channel> channelFuture = new CompletableFuture<>();
ChannelFuture f = getBootstrap().connect(address.recipient());
//Acquire from pool and listen for completion
f.addListener((ChannelFuture future) -> {
if (future.isSuccess()) {
channelFuture.complete(future.channel());
} else {
channelFuture.completeExceptionally(future.cause());
}
});
return channelFuture;
}
/**
* @param remoteExecutorId id of the remote executor
* @return {@link ContextManager} for the channel to the specified executor
*/
private CompletableFuture<ContextManager> connectTo(final String remoteExecutorId) {
final CompletableFuture<ContextManager> completableFuture = new CompletableFuture<>();
final ChannelFuture channelFuture;
try {
channelFuture = executorIdToChannelFutureMap.compute(remoteExecutorId, (executorId, cachedChannelFuture) -> {
if (cachedChannelFuture != null
&& (cachedChannelFuture.channel().isOpen() || cachedChannelFuture.channel().isActive())) {
return cachedChannelFuture;
} else {
final ChannelFuture future = byteTransport.connectTo(executorId);
future.channel().closeFuture().addListener(f -> executorIdToChannelFutureMap.remove(executorId, future));
return future;
}
});
} catch (final RuntimeException e) {
completableFuture.completeExceptionally(e);
return completableFuture;
}
channelFuture.addListener(future -> {
if (future.isSuccess()) {
completableFuture.complete(channelFuture.channel().pipeline().get(ContextManager.class));
} else {
executorIdToChannelFutureMap.remove(remoteExecutorId, channelFuture);
completableFuture.completeExceptionally(future.cause());
}
});
return completableFuture;
}