下面列出了io.netty.channel.ChannelFuture#addListener ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public void start(Consumer<TtyConnection> handler, final Consumer<Throwable> doneHandler) {
group = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(group)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new TtyServerInitializer(channelGroup, handler, httpResourcePath));
final ChannelFuture f = b.bind(host, port);
f.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
if (future.isSuccess()) {
channel = f.channel();
doneHandler.accept(null);
} else {
doneHandler.accept(future.cause());
}
}
});
}
/**
* Connect to the {@link ByteTransport} server of the specified executor.
*
* @param remoteExecutorId the id of the executor
* @return a {@link ChannelFuture} for connecting
*/
ChannelFuture connectTo(final String remoteExecutorId) {
final InetSocketAddress address;
try {
final ByteTransportIdentifier identifier = new ByteTransportIdentifier(remoteExecutorId);
address = nameResolver.lookup(identifier);
} catch (final Exception e) {
LOG.error(String.format("Cannot lookup ByteTransport listening address of %s", remoteExecutorId), e);
throw new RuntimeException(e);
}
final ChannelFuture connectFuture = clientBootstrap.connect(address);
connectFuture.addListener(future -> {
if (future.isSuccess()) {
// Succeed to connect
LOG.debug("Connected to {}", remoteExecutorId);
return;
}
// Failed to connect (Not logging the cause here, which is not very useful)
LOG.error("Failed to connect to {}", remoteExecutorId);
});
return connectFuture;
}
public void start() throws InterruptedException {
Bootstrap bootstrap = new Bootstrap();
EventLoopGroup group = new NioEventLoopGroup();
try {
bootstrap.group(group).channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(hostIp, port))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture future = bootstrap.connect().sync();
future.addListener((ChannelFutureListener) listener -> {
if (listener.isSuccess()) {
System.out.println("Client connected.");
} else {
System.out.println("Server attempt failed!");
listener.cause().printStackTrace();
}
});
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
@Override
public void channelRead0(ChannelHandlerContext ctx, String request) throws Exception {
// Generate and write a response.
String response;
boolean close = false;
if (request.isEmpty()) {
response = "Please type something.\r\n";
} else if ("bye".equals(request.toLowerCase())) {
response = "Have a good day!\r\n";
close = true;
} else {
response = "Did you say '" + request + "'?\r\n";
}
// We do not need to write a ChannelBuffer here.
// We know the encoder inserted at TelnetPipelineFactory will do the conversion.
ChannelFuture future = ctx.write(response);
// Close the connection after sending 'Have a good day!'
// if the client has sent 'bye'.
if (close) {
future.addListener(ChannelFutureListener.CLOSE);
}
}
@Override
public void sendResponse(FullHttpRequest req, ChannelHandlerContext ctx) throws Exception {
counter.inc();
byte[] content = null;
try(InputStream is = FallbackResponder.class.getClassLoader().getResourceAsStream(resource)) {
content = IOUtils.toByteArray(is);
}
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
HttpHeaders.setContentLength(response, content.length);
response.headers().set(HttpHeaders.Names.CONTENT_TYPE, MediaType.APPLICATION_XML_UTF_8.toString());
response.content().writeBytes(content);
ctx.write(response);
ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
future.addListener(ChannelFutureListener.CLOSE);
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
final Channel inboundChannel = ctx.channel();
inboundChannel.config().setAutoRead(false);
// Start the connection attempt.
Bootstrap b = new Bootstrap();
b.group(inboundChannel.eventLoop())
.channel(ctx.channel().getClass())
.handler(new BackendProxyCodec(inboundChannel))
.option(ChannelOption.AUTO_READ, false);
ChannelFuture f = b.connect(proxyEndpoint);
outboundChannel = f.channel();
f.addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
// connection complete start to read first data
inboundChannel.read();
} else {
// Close the connection if the connection attempt has failed.
inboundChannel.close();
}
}
});
}
/**
* Writes the message to this link.
*
* @param message the message
*/
@Override
public void write(final T message) {
LOG.log(Level.FINEST, "write {0} :: {1}", new Object[] {channel, message});
final ChannelFuture future = channel.writeAndFlush(Unpooled.wrappedBuffer(encoder.encode(message)));
if (listener != null) {
future.addListener(new NettyChannelFutureListener<>(message, listener));
}
}
public static void response1pxGifImage(ChannelHandlerContext ctx) {
FullHttpResponse response = theBase64Image1pxGif();
ChannelFuture future = ctx.write(response);
ctx.flush();
ctx.close();
//Close the non-keep-alive connection after the write operation is done.
future.addListener(ChannelFutureListener.CLOSE);
}
/**
* @param remoteExecutorId id of the remote executor
* @return {@link ContextManager} for the channel to the specified executor
*/
private CompletableFuture<ContextManager> connectTo(final String remoteExecutorId) {
final CompletableFuture<ContextManager> completableFuture = new CompletableFuture<>();
final ChannelFuture channelFuture;
try {
channelFuture = executorIdToChannelFutureMap.compute(remoteExecutorId, (executorId, cachedChannelFuture) -> {
if (cachedChannelFuture != null
&& (cachedChannelFuture.channel().isOpen() || cachedChannelFuture.channel().isActive())) {
return cachedChannelFuture;
} else {
final ChannelFuture future = byteTransport.connectTo(executorId);
future.channel().closeFuture().addListener(f -> executorIdToChannelFutureMap.remove(executorId, future));
return future;
}
});
} catch (final RuntimeException e) {
completableFuture.completeExceptionally(e);
return completableFuture;
}
channelFuture.addListener(future -> {
if (future.isSuccess()) {
completableFuture.complete(channelFuture.channel().pipeline().get(ContextManager.class));
} else {
executorIdToChannelFutureMap.remove(remoteExecutorId, channelFuture);
completableFuture.completeExceptionally(future.cause());
}
});
return completableFuture;
}
/**
* Closes the underlying connection with the client.
*
* @param reason reason to force close connection
*/
private void forceCloseConnection(String reason) {
LOGGER.info("Force closing connection {}. Reason: {}", getId(), reason);
ChannelFuture close = ctx.close();
close.addListener(future -> {
if (future.isSuccess()) {
LOGGER.info("Connection {} forcefully closed successfully.", getId());
} else {
LOGGER.error("Error occurred while closing connection {}", getId(), future.cause());
}
});
}
protected static void sendHttpResponse(ChannelHandlerContext context, FullHttpRequest request, FullHttpResponse response) {
if (response.getStatus().code() != 200) {
response.content().writeBytes(response.getStatus().toString().getBytes(CharsetUtil.UTF_8));
}
setContentLength(response, response.content().readableBytes());
if (isKeepAlive(request)) {
response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
}
ChannelFuture future = context.writeAndFlush(response);
if (!isKeepAlive(request) || response.getStatus().code() != 200) {
future.addListener(ChannelFutureListener.CLOSE);
}
}
@Override
public Publisher<Long> createPublisher(final long elements) {
final BatchedProducer out;
if (scheduled) {
out = new ScheduledBatchedProducer(elements, batchSize, publishInitial, executor, 5);
} else {
out = new BatchedProducer(elements, batchSize, publishInitial);
}
final ClosedLoopChannel channel = new ClosedLoopChannel();
channel.config().setAutoRead(false);
ChannelFuture registered = eventLoop.register(channel);
final HandlerPublisher<Long> publisher = new HandlerPublisher<>(registered.channel().eventLoop(), Long.class);
registered.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
channel.pipeline().addLast("out", out);
channel.pipeline().addLast("publisher", publisher);
for (long i = 0; i < publishInitial && i < elements; i++) {
channel.pipeline().fireChannelRead(i);
}
if (elements <= publishInitial) {
channel.pipeline().fireChannelInactive();
}
}
});
return publisher;
}
public <T, R> RFuture<R> async(long timeout, Codec encoder, RedisCommand<T> command, Object... params) {
RPromise<R> promise = new RedissonPromise<R>();
if (timeout == -1) {
timeout = redisClient.getCommandTimeout();
}
if (redisClient.getEventLoopGroup().isShuttingDown()) {
RedissonShutdownException cause = new RedissonShutdownException("Redisson is shutdown");
return RedissonPromise.newFailedFuture(cause);
}
Timeout scheduledFuture = redisClient.getTimer().newTimeout(t -> {
RedisTimeoutException ex = new RedisTimeoutException("Command execution timeout for command: "
+ LogHelper.toString(command, params) + ", Redis client: " + redisClient);
promise.tryFailure(ex);
}, timeout, TimeUnit.MILLISECONDS);
promise.onComplete((res, e) -> {
scheduledFuture.cancel();
});
ChannelFuture writeFuture = send(new CommandData<T, R>(promise, encoder, command, params));
writeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
}
}
});
return promise;
}
protected void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
// Generate an error page if response getStatus code is not OK (200).
if (res.status().code() != 200) {
ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
res.content().writeBytes(buf);
buf.release();
HttpHeaderUtil.setContentLength(res, res.content().readableBytes());
}
// Send the response and close the connection if necessary.
ChannelFuture f = ctx.channel().writeAndFlush(res);
if (!HttpHeaderUtil.isKeepAlive(req) || res.status().code() != 200) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
private static void sendError(
ChannelHandlerContext ctx, FullHttpRequest request, HttpResponseStatus status) {
ByteBuf data = Unpooled.copiedBuffer("Failure: " + status + "\r\n", CharsetUtil.UTF_8);
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, data);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, data.readableBytes());
ChannelFuture future = ctx.writeAndFlush(response);
if (!HttpUtil.isKeepAlive(request)) {
future.addListener(ChannelFutureListener.CLOSE);
}
}
private void doGracefulShutdown(ChannelHandlerContext ctx, ChannelFuture future, ChannelPromise promise) {
if (isGracefulShutdownComplete()) {
// If there are no active streams, close immediately after the GO_AWAY write completes.
future.addListener(new ClosingChannelFutureListener(ctx, promise));
} else {
// If there are active streams we should wait until they are all closed before closing the connection.
if (gracefulShutdownTimeoutMillis < 0) {
closeListener = new ClosingChannelFutureListener(ctx, promise);
} else {
closeListener = new ClosingChannelFutureListener(ctx, promise,
gracefulShutdownTimeoutMillis, MILLISECONDS);
}
}
}
@Override
protected void handleConnected(CommandConnected connected) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Received Connected from broker", inboundChannel, outboundChannel);
}
if (config.isTlsHostnameVerificationEnabled() && remoteHostName != null
&& !verifyTlsHostName(remoteHostName, ctx)) {
// close the connection if host-verification failed with the
// broker
log.warn("[{}] Failed to verify hostname of {}", ctx.channel(), remoteHostName);
ctx.close();
return;
}
state = BackendState.HandshakeCompleted;
ChannelFuture channelFuture;
if (connected.hasMaxMessageSize()) {
channelFuture = inboundChannel.writeAndFlush(
Commands.newConnected(connected.getProtocolVersion(), connected.getMaxMessageSize()));
} else {
channelFuture = inboundChannel.writeAndFlush(Commands.newConnected(connected.getProtocolVersion()));
}
channelFuture.addListener(future -> {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Removing decoder from pipeline", inboundChannel, outboundChannel);
}
if (service.getProxyLogLevel() == 0) {
// direct tcp proxy
inboundChannel.pipeline().remove("frameDecoder");
outboundChannel.pipeline().remove("frameDecoder");
} else {
// Enable parsing feature, proxyLogLevel(1 or 2)
// Add parser handler
if (connected.hasMaxMessageSize()) {
inboundChannel.pipeline().replace("frameDecoder", "newFrameDecoder",
new LengthFieldBasedFrameDecoder(connected.getMaxMessageSize()
+ Commands.MESSAGE_SIZE_FRAME_PADDING,
0, 4, 0, 4));
outboundChannel.pipeline().replace("frameDecoder", "newFrameDecoder",
new LengthFieldBasedFrameDecoder(
connected.getMaxMessageSize()
+ Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
inboundChannel.pipeline().addBefore("handler", "inboundParser",
new ParserProxyHandler(service, inboundChannel,
ParserProxyHandler.FRONTEND_CONN,
connected.getMaxMessageSize()));
outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser",
new ParserProxyHandler(service, outboundChannel,
ParserProxyHandler.BACKEND_CONN,
connected.getMaxMessageSize()));
} else {
inboundChannel.pipeline().addBefore("handler", "inboundParser",
new ParserProxyHandler(service, inboundChannel,
ParserProxyHandler.FRONTEND_CONN,
Commands.DEFAULT_MAX_MESSAGE_SIZE));
outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser",
new ParserProxyHandler(service, outboundChannel,
ParserProxyHandler.BACKEND_CONN,
Commands.DEFAULT_MAX_MESSAGE_SIZE));
}
}
// Start reading from both connections
inboundChannel.read();
outboundChannel.read();
});
}
private void writeResponse(Channel channel) {
// Convert the response content to a ChannelBuffer.
ByteBuf buf = copiedBuffer(responseContent.toString(), CharsetUtil.UTF_8);
responseContent.setLength(0);
// Decide whether to close the connection or not.
boolean close = request.headers().contains(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE, true)
|| request.protocolVersion().equals(HttpVersion.HTTP_1_0)
&& !request.headers().contains(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE, true);
// Build the response object.
FullHttpResponse response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.OK, buf);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
if (!close) {
// There's no need to add 'Content-Length' header
// if this is the last response.
response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, buf.readableBytes());
}
Set<Cookie> cookies;
String value = request.headers().get(HttpHeaderNames.COOKIE);
if (value == null) {
cookies = Collections.emptySet();
} else {
cookies = ServerCookieDecoder.STRICT.decode(value);
}
if (!cookies.isEmpty()) {
// Reset the cookies if necessary.
for (Cookie cookie : cookies) {
response.headers().add(HttpHeaderNames.SET_COOKIE, ServerCookieEncoder.STRICT.encode(cookie));
}
}
// Write the response.
ChannelFuture future = channel.writeAndFlush(response);
// Close the connection after the write operation is done if necessary.
if (close) {
future.addListener(ChannelFutureListener.CLOSE);
}
}
public static void sendErrorResponse(ChannelHandlerContext ctx) {
FullHttpResponse res = createFullHttpResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR);
res.headers().add(HttpHeaders.Names.CONTENT_LENGTH, 0);
ChannelFuture f = ctx.writeAndFlush(res);
f.addListener(ChannelFutureListener.CLOSE);
}
private void handleHttpRequest(ChannelHandlerContext ctx, Object msg) {
FullHttpRequest request = (FullHttpRequest) msg;
FullHttpResponse response = new DefaultFullHttpResponse(request.protocolVersion(), HttpResponseStatus.OK);
boolean isKeepAlive = HttpUtil.isKeepAlive(request);
HttpUtil.setKeepAlive(response, isKeepAlive);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=UTF-8");
response.headers().set(HttpHeaderNames.SERVER, "Dorado");
ChannelFuture channelFuture = null;
try {
set(ctx.channel());
HttpRequest _request = new HttpRequestImpl(request);
HttpResponse _response = new HttpResponseImpl(response);
Router router = webapp.getUriRoutingRegistry().findRouteController(_request);
if (router == null) {
response.setStatus(HttpResponseStatus.NOT_FOUND);
ByteBufUtil.writeUtf8(response.content(),
String.format("Resource not found, url: [%s], http_method: [%s]", _request.getRequestURI(),
_request.getMethod()));
} else {
router.invoke(_request, _response);
}
} catch (Throwable ex) {
LogUtils.error("handle http request error", ex);
response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
ByteBufUtil.writeUtf8(response.content(), ExceptionUtils.toString(ex));
} finally {
unset();
if (isKeepAlive) {
HttpUtil.setContentLength(response, response.content().readableBytes());
}
channelFuture = ctx.channel().writeAndFlush(response);
if (!isKeepAlive && channelFuture != null) {
channelFuture.addListener(ChannelFutureListener.CLOSE);
}
ReferenceCountUtil.release(msg);
status.handledRequestsIncrement();
}
}