下面列出了怎么用io.netty.channel.ChannelFutureListener的API类实例代码及写法,或者点击链接到github查看源代码。
public void bindForTCP(Integer serverPort, ServerBootstrap bootstrap, ProxyRealServer proxyRealServer) {
bootstrap.bind(serverPort).addListener((ChannelFutureListener) channelFuture -> {
if (channelFuture.isSuccess()) {
logger.info("绑定本地服务端口({})成功 客户端({})--{}", serverPort, proxyRealServer.getClientKey(), proxyRealServer.getDescription());
//绑定成功
ProxyChannel proxyChannel = new ProxyChannel();
proxyChannel.setPort(serverPort);
proxyChannel.setChannel(channelFuture.channel());
proxyChannel.setBootstrap(bootstrap);
proxyChannel.setClientKey(proxyRealServer.getClientKey());
proxyChannel.setProxyType(CommonConstant.ProxyType.TCP);
proxyChannelCache.put(serverPort, proxyChannel);
//设置状态
proxyRealServer.setStatus(CommonConstant.ProxyStatus.ONLINE);
} else {
logger.error("绑定本地服务端口{}失败", serverPort);
}
});
}
@Override
protected void sendResponse(final ChannelHandlerContext ctx, String streamId, int latency,
final FullHttpResponse response, final FullHttpRequest request) {
HttpUtil.setContentLength(response, response.content().readableBytes());
ctx.executor().schedule(new Runnable() {
@Override
public void run() {
if (isKeepAlive(request)) {
response.headers().set(CONNECTION, HttpHeaderValues.KEEP_ALIVE);
ctx.writeAndFlush(response);
} else {
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
}
}, latency, TimeUnit.MILLISECONDS);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpRequest) {
HttpRequest req = (HttpRequest) msg;
boolean keepAlive = HttpUtil.isKeepAlive(req);
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(CONTENT));
response.headers().set(CONTENT_TYPE, "text/plain");
response.headers().setInt(CONTENT_LENGTH, response.content().readableBytes());
if (!keepAlive) {
ctx.write(response).addListener(ChannelFutureListener.CLOSE);
} else {
response.headers().set(CONNECTION, KEEP_ALIVE);
ctx.write(response);
}
}
}
@Override
public void handle() {
final SecretKey secretKey = CryptManager.createNewSharedKey();
String hash = (new BigInteger(CryptManager.getServerIdHash("", publicKey, secretKey))).toString(16);
MinecraftSessionService yggdrasil = new YggdrasilAuthenticationService(The5zigMod.getVars().getProxy(), UUID.randomUUID().toString()).createMinecraftSessionService();
try {
yggdrasil.joinServer(The5zigMod.getVars().getGameProfile(), The5zigMod.getDataManager().getSession(), hash);
} catch (AuthenticationException e) {
The5zigMod.getNetworkManager().disconnect(I18n.translate("connection.bad_login"));
throw new RuntimeException(e);
}
The5zigMod.getNetworkManager().sendPacket(new PacketEncryption(secretKey, publicKey, verifyToken), new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
The5zigMod.getNetworkManager().enableEncryption(secretKey);
}
});
}
private void handleGet(ChannelHandlerContext ctx, FullHttpRequest request) {
if (!isUriValid(request.uri())) {
sendError(ctx, request, HttpResponseStatus.BAD_REQUEST);
return;
}
byte[] contents = cache.get(request.uri());
if (contents == null) {
sendError(ctx, request, HttpResponseStatus.NOT_FOUND);
return;
}
FullHttpResponse response =
new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(contents));
HttpUtil.setContentLength(response, contents.length);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/octet-stream");
ChannelFuture lastContentFuture = ctx.writeAndFlush(response);
if (!HttpUtil.isKeepAlive(request)) {
lastContentFuture.addListener(ChannelFutureListener.CLOSE);
}
}
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
@Override
public void ping(final PingCallback callback, final Executor executor) {
if (channel == null) {
executor.execute(new Runnable() {
@Override
public void run() {
callback.onFailure(statusExplainingWhyTheChannelIsNull.asException());
}
});
return;
}
// The promise and listener always succeed in NettyClientHandler. So this listener handles the
// error case, when the channel is closed and the NettyClientHandler no longer in the pipeline.
ChannelFutureListener failureListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
Status s = statusFromFailedFuture(future);
Http2Ping.notifyFailed(callback, executor, s.asException());
}
}
};
// Write the command requesting the ping
handler.getWriteQueue().enqueue(new SendPingCommand(callback, executor), true)
.addListener(failureListener);
}
protected Future<?> execute() {
LOG.debug("Handling CONNECT request through Chained Proxy");
chainedProxy.filterRequest(initialRequest);
boolean isMitmEnabled = isMITMEnabled();
/*
* We ignore the LastHttpContent which we read from the client
* connection when we are negotiating connect (see readHttp()
* in ProxyConnection). This cannot be ignored while we are
* doing MITM + Chained Proxy because the HttpRequestEncoder
* of the ProxyToServerConnection will be in an invalid state
* when the next request is written. Writing the EmptyLastContent
* resets its state.
*/
if(isMitmEnabled){
ChannelFuture future = writeToChannel(initialRequest);
future.addListener((ChannelFutureListener) arg0 -> {
if(arg0.isSuccess()){
writeToChannel(LastHttpContent.EMPTY_LAST_CONTENT);
}
});
return future;
} else {
return writeToChannel(initialRequest);
}
}
@Override
public void onError(Throwable e) {
if (ctx.channel() == null || !ctx.channel().isActive()) {
return;
}
if (e instanceof TimeoutException) {
endpoint.setLastKeepAliveLatency(TimeUnit.MILLISECONDS.toMicros(env().keepAliveTimeout()));
}
LOGGER.warn("{}Got error while consuming KeepAliveResponse.", logIdent(ctx, endpoint), e);
keepAliveThreshold++;
if (keepAliveThreshold >= env().keepAliveErrorThreshold()) {
LOGGER.warn( "{}KeepAliveThreshold reached - " +
"closing this socket proactively.", system(logIdent(ctx, endpoint)));
ctx.close().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
LOGGER.warn("Error while proactively closing the socket.", future.cause());
}
}
});
}
}
@Test(timeout = 5000)
public void newOutboundStream() {
final Http2FrameStream stream = frameCodec.newStream();
assertNotNull(stream);
assertFalse(isStreamIdValid(stream.id()));
final Promise<Void> listenerExecuted = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
channel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers(), false).stream(stream))
.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
assertTrue(future.isSuccess());
assertTrue(isStreamIdValid(stream.id()));
listenerExecuted.setSuccess(null);
}
}
);
ByteBuf data = Unpooled.buffer().writeZero(100);
ChannelFuture f = channel.writeAndFlush(new DefaultHttp2DataFrame(data).stream(stream));
assertTrue(f.isSuccess());
listenerExecuted.syncUninterruptibly();
assertTrue(listenerExecuted.isSuccess());
}
@Override
public void handle() {
final SecretKey secretKey = CryptManager.createNewSharedKey();
String hash = (new BigInteger(CryptManager.getServerIdHash("", publicKey, secretKey))).toString(16);
MinecraftSessionService yggdrasil = new YggdrasilAuthenticationService(The5zigMod.getVars().getProxy(), UUID.randomUUID().toString()).createMinecraftSessionService();
try {
yggdrasil.joinServer(The5zigMod.getVars().getGameProfile(), The5zigMod.getDataManager().getSession(), hash);
} catch (AuthenticationException e) {
The5zigMod.getNetworkManager().disconnect(I18n.translate("connection.bad_login"));
throw new RuntimeException(e);
}
The5zigMod.getNetworkManager().sendPacket(new PacketEncryption(secretKey, publicKey, verifyToken), new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
The5zigMod.getNetworkManager().enableEncryption(secretKey);
}
});
}
private void getConnections(ChannelHandlerContext ctx, ConcurrentHashMap<String,ClientConnection> connectionFactory) {
ArrayList<ClientConnectionVO> vos = new ArrayList<>();
if(null !=connectionFactory && !connectionFactory.isEmpty()){
connectionFactory.forEach((k,v)->{
ClientConnectionVO vo=new ClientConnectionVO();
vo.setClientId(v.getClientId());
vo.setUsername(v.getUsername());
vo.setIp(v.getIp());
vo.setPort(v.getPort());
vo.setConnectedDate(v.getConnectedDate());
vo.setProtocolVersion(v.getProtocolVersion());
vo.setPassword(v.getPassword());
vos.add(vo);
});
}
// 1.设置响应
Result result= new Result<Object>().ok(vos);
FullHttpResponse resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.OK,
Unpooled.copiedBuffer(JSONObject.toJSONString(result), CharsetUtil.UTF_8));
resp.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=UTF-8");
// 2.发送
// 注意必须在使用完之后,close channel
ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE);
}
@Override
public void channelRead0(ChannelHandlerContext ctx, String request)
throws Exception {
String response;
boolean close = false;
if (request.isEmpty()) {
response = "명령을 입력해 주세요2.\r\n";
}
else if ("bye".equals(request.toLowerCase())) {
response = "좋은 하루 되세요2!\r\n";
close = true;
}
else {
response = "입력하신 명령이 '" + request + "' 입니까2?\r\n";
}
ChannelFuture future = ctx.write(response);
if (close) {
future.addListener(ChannelFutureListener.CLOSE);
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.ALL_IDLE)) {
final String remoteAddress = RemotingUtil.parseChannelRemoteAddr(ctx.channel());
logger.warn("NETTY CLIENT PIPELINE: IDLE exception [{}]", remoteAddress);
HeartBeatFactory heartBeatFactory = SpiClassLoader.getClassLoader(HeartBeatFactory.class).getExtension(config.getExt(ConfigEnum.heartbeatFactory.getName(), ConfigEnum.heartbeatFactory.getValue()));
ctx.writeAndFlush(heartBeatFactory.createRequest()).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
client.resetErrorCount();
}
}
});
}
}
ctx.fireUserEventTriggered(evt);
}
public void checkProducerTransactionState(
final Channel channel,
final CheckTransactionStateRequestHeader requestHeader,
final SelectMappedBufferResult selectMappedBufferResult) {
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader);
request.markOnewayRPC();
try {
FileRegion fileRegion =
new OneMessageTransfer(request.encodeHeader(selectMappedBufferResult.getSize()),
selectMappedBufferResult);
channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
selectMappedBufferResult.release();
if (!future.isSuccess()) {
log.error("invokeProducer failed,", future.cause());
}
}
});
} catch (Throwable e) {
log.error("invokeProducer exception", e);
selectMappedBufferResult.release();
}
}
@Override
void succeedAuthentication(final @NotNull ReAuthOutput output) {
super.succeedAuthentication(output);
final Channel channel = ctx.channel();
channel.attr(ChannelAttributes.RE_AUTH_ONGOING).set(false);
applyClientSettings(output.getClientSettings(), channel);
final ChannelFuture authFuture = authSender.sendAuth(
channel,
output.getAuthenticationData(),
Mqtt5AuthReasonCode.SUCCESS,
Mqtt5UserProperties.of(output.getOutboundUserProperties().asInternalList()),
output.getReasonString());
authFuture.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
ctx.pipeline().fireUserEventTriggered(new OnAuthSuccessEvent());
} else if (future.channel().isActive()) {
onSendException(future.cause());
}
});
}
private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
// Generate an error page if response getStatus code is not OK (200).
if (res.getStatus().code() != 200) {
ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
res.content().writeBytes(buf);
buf.release();
setContentLength(res, res.content().readableBytes());
}
// Send the response and close the connection if necessary.
ChannelFuture f;
if (useSSL) {
f = ctx.channel().writeAndFlush(res);
} else {
// TODO may not want to flush here -- only write
f = ctx.channel().writeAndFlush(res);
}
if (!isKeepAlive(req) || res.getStatus().code() != 200) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
@Test
public void channelClosedWhenCloseListenerCompletes() {
LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream);
Http2StreamChannel childChannel = (Http2StreamChannel) inboundHandler.channel();
assertTrue(childChannel.isOpen());
assertTrue(childChannel.isActive());
final AtomicBoolean channelOpen = new AtomicBoolean(true);
final AtomicBoolean channelActive = new AtomicBoolean(true);
// Create a promise before actually doing the close, because otherwise we would be adding a listener to a future
// that is already completed because we are using EmbeddedChannel which executes code in the JUnit thread.
ChannelPromise p = childChannel.newPromise();
p.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
channelOpen.set(future.channel().isOpen());
channelActive.set(future.channel().isActive());
}
});
childChannel.close(p).syncUninterruptibly();
assertFalse(channelOpen.get());
assertFalse(childChannel.isActive());
}
/**
* Recursively binds the given bootstrap to the given interfaces.
*
* @param bootstrap the bootstrap to bind
* @param ifaces an iterator of interfaces to which to bind
* @param port the port to which to bind
* @param future the future to completed once the bootstrap has been bound to all provided interfaces
*/
private void bind(ServerBootstrap bootstrap, Iterator<String> ifaces, int port, CompletableFuture<Void> future) {
if (ifaces.hasNext()) {
String iface = ifaces.next();
bootstrap.bind(iface, port).addListener((ChannelFutureListener) f -> {
if (f.isSuccess()) {
log.info("TCP server listening for connections on {}:{}", iface, port);
serverChannel = f.channel();
bind(bootstrap, ifaces, port, future);
} else {
log.warn("Failed to bind TCP server to port {}:{} due to {}", iface, port, f.cause());
future.completeExceptionally(f.cause());
}
});
} else {
future.complete(null);
}
}
@Override
public void messageReceived(ChannelHandlerContext ctx, HttpRequest req) throws Exception {
if (HttpHeaderUtil.is100ContinueExpected(req)) {
ctx.write(new DefaultFullHttpResponse(HTTP_1_1, CONTINUE));
}
boolean keepAlive = HttpHeaderUtil.isKeepAlive(req);
ByteBuf content = ctx.alloc().buffer();
content.writeBytes(HelloWorldHttp2Handler.RESPONSE_BYTES.duplicate());
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, content);
response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
response.headers().setInt(CONTENT_LENGTH, response.content().readableBytes());
if (!keepAlive) {
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
} else {
response.headers().set(CONNECTION, HttpHeaderValues.KEEP_ALIVE);
ctx.writeAndFlush(response);
}
}
Mono<Void> sendClose(CloseWebSocketFrame frame) {
if (CLOSE_SENT.get(this) == 0) {
//commented for now as we assume the close is always scheduled (deferFuture runs)
//onTerminate().subscribe(null, null, () -> ReactorNetty.safeRelease(frame));
return FutureMono.deferFuture(() -> {
if (CLOSE_SENT.getAndSet(this, 1) == 0) {
discard();
onCloseState.onNext(new WebSocketCloseStatus(frame.statusCode(), frame.reasonText()));
return channel().writeAndFlush(frame)
.addListener(ChannelFutureListener.CLOSE);
}
frame.release();
return channel().newSucceededFuture();
}).doOnCancel(() -> ReactorNetty.safeRelease(frame));
}
frame.release();
return Mono.empty();
}
Mono<Void> sendClose(CloseWebSocketFrame frame) {
if (CLOSE_SENT.get(this) == 0) {
//commented for now as we assume the close is always scheduled (deferFuture runs)
//onTerminate().subscribe(null, null, () -> ReactorNetty.safeRelease(frame));
return FutureMono.deferFuture(() -> {
if (CLOSE_SENT.getAndSet(this, 1) == 0) {
discard();
onCloseState.onNext(new WebSocketCloseStatus(frame.statusCode(), frame.reasonText()));
return channel().writeAndFlush(frame)
.addListener(ChannelFutureListener.CLOSE);
}
frame.release();
return channel().newSucceededFuture();
}).doOnCancel(() -> ReactorNetty.safeRelease(frame));
}
frame.release();
return Mono.empty();
}
void pumpProtonToChannel(ChannelHandlerContext ctx, ChannelFutureListener writeCompletionAction) {
boolean done = false;
while (!done) {
ByteBuffer toWrite = protonTransport.getOutputBuffer();
if (toWrite != null && toWrite.hasRemaining()) {
LOG.trace("Server: Sending {} bytes out", toWrite.limit());
ctx.write(Unpooled.wrappedBuffer(toWrite));
toWrite.position(toWrite.limit());
protonTransport.outputConsumed();
} else {
done = true;
}
}
if (writeCompletionAction != null) {
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(writeCompletionAction);
} else {
ctx.flush();
}
}
private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest request, HttpResponseStatus status,
PushUserAuth userAuth) {
final FullHttpResponse resp = new DefaultFullHttpResponse(HTTP_1_1, status);
resp.headers().add("Content-Length", "0");
final ChannelFuture cf = ctx.channel().writeAndFlush(resp);
if (!HttpUtil.isKeepAlive(request)) {
cf.addListener(ChannelFutureListener.CLOSE);
}
logPushEvent(request, status, userAuth);
}
@Override
public void handle(ChannelHandlerContext ctx, AmqpConnectionHandler connectionHandler) {
ctx.fireChannelRead((BlockingTask) () -> {
connectionHandler.closeAllChannels();
ctx.writeAndFlush(new ConnectionCloseOk(getChannel())).addListener(ChannelFutureListener.CLOSE);
});
}
/**
* When file timestamp is the same as what the browser is sending up, send a "304 Not Modified"
*
* @param ctx
* Context
*/
private static void sendNotModified(ChannelHandlerContext ctx) {
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, NOT_MODIFIED);
setDateHeader(response);
// Close the connection as soon as the error message is sent.
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
data = null;
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
requestHandler.onClose(ctx);
super.exceptionCaught(ctx, cause);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof WebSocketHandshakeException) {
FullHttpResponse response = new DefaultFullHttpResponse(
HTTP_1_1, HttpResponseStatus.BAD_REQUEST, Unpooled.wrappedBuffer(cause.getMessage().getBytes()));
ctx.channel().writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
} else {
ctx.close();
}
}
@Test(timeout = 2000)
public void promiseDoesNotInfiniteLoop() throws InterruptedException {
EmbeddedChannel channel = new EmbeddedChannel();
channel.closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
future.channel().close();
}
});
channel.close().syncUninterruptibly();
}
public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
request.markOnewayRPC();
boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
try {
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
once.release();
if (!f.isSuccess()) {
PLOG.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
}
}
});
} catch (Exception e) {
once.release();
PLOG.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
}
} else {
if (timeoutMillis <= 0) {
throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");
} else {
String info = String.format(
"invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", //
timeoutMillis, //
this.semaphoreOneway.getQueueLength(), //
this.semaphoreOneway.availablePermits()//
);
PLOG.warn(info);
throw new RemotingTimeoutException(info);
}
}
}