下面列出了怎么用io.netty.util.concurrent.GenericFutureListener的API类实例代码及写法,或者点击链接到github查看源代码。
private void connect(Bootstrap b) {
if (currentState.compareAndSet(ClientConstants.CLIENT_STATUS_OFF, ClientConstants.CLIENT_STATUS_PENDING)) {
b.connect(host, port)
.addListener(new GenericFutureListener<ChannelFuture>() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.cause() != null) {
RecordLog.warn(
String.format("[NettyTransportClient] Could not connect to <%s:%d> after %d times",
host, port, failConnectedTime.get()), future.cause());
failConnectedTime.incrementAndGet();
channel = null;
} else {
failConnectedTime.set(0);
channel = future.channel();
RecordLog.info(
"[NettyTransportClient] Successfully connect to server <" + host + ":" + port + ">");
}
}
});
}
}
public void disconnect() {
// logger.info("close tcp socket, Disconnecting.");
synchronized (this.clientBoot) {
this.channelPromise = null;
final Future<Void> channelCloseFuture;
if (this.channelPromise != null) {
channelCloseFuture = this.channelPromise.channel().close();
} else {
channelCloseFuture = new SucceededFuture<Void>(GlobalEventExecutor.INSTANCE, null);
}
channelCloseFuture.addListener(new GenericFutureListener<Future<Void>>() {
@Override
public void operationComplete(final Future<Void> future) throws Exception {
NettyClient2.this.clientBoot.config().group().shutdownGracefully();
}
});
}
// logger.info("close netty tcp socket connection");
}
public void disconnect() {
// logger.info("close tcp socket, Disconnecting.");
synchronized (this.clientBoot) {
this.channelPromise = null;
final Future<Void> channelCloseFuture;
if (this.channelPromise != null) {
channelCloseFuture = this.channelPromise.channel().close();
} else {
channelCloseFuture = new SucceededFuture<Void>(GlobalEventExecutor.INSTANCE, null);
}
channelCloseFuture.addListener(new GenericFutureListener<Future<Void>>() {
@Override
public void operationComplete(final Future<Void> future) throws Exception {
PlatformSSLClient.this.clientBoot.config().group().shutdownGracefully();
}
});
}
// logger.info("close netty tcp socket connection");
}
protected void send(MqttMessage message, GenericFutureListener<? extends Future<? super Void>> completeListener) {
if (!session.isConnected(true)) {
logger.error("Message is not sent - Channel is inactive or out of the node. [{}]", message);
return;
}
ChannelHandlerContext ctx = Session.NEXUS.channelHandlerContext(session.clientId());
String log = message.toString();
ChannelFuture cf = ctx.writeAndFlush(message).addListener(f -> {
if (f.isSuccess()) {
logger.debug("packet outgoing [{}]", log);
}
else {
logger.error("packet outgoing failed [{}] {}", log, f.cause());
}
});
if (completeListener != null) {
cf.addListener(completeListener);
}
}
public void start(String address, int port) throws Exception {
try {
if (bootstrap != null) {
ChannelFuture channelFuture = bootstrap.bind(address, port).sync();
channelFuture.channel().closeFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
closeGracefully();
}
});
} else {
throw new Exception();
}
} catch (Exception e) {
closeGracefully();
throw new Exception("start websocket server error", e);
}
}
/**
* Intercept the connect phase and store the original promise.
*/
@Override
public void connect(final ChannelHandlerContext ctx, final SocketAddress remoteAddress,
final SocketAddress localAddress, final ChannelPromise promise) throws Exception {
originalPromise = promise;
ChannelPromise inboundPromise = ctx.newPromise();
inboundPromise.addListener(new GenericFutureListener<Future<Void>>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess() && !originalPromise.isDone()) {
originalPromise.setFailure(future.cause());
}
}
});
ctx.connect(remoteAddress, localAddress, inboundPromise);
}
@Override
protected void doStop() {
fiber.execute(() -> {
final AtomicInteger countDown = new AtomicInteger(1);
GenericFutureListener<? extends Future<? super Void>> listener = future -> {
if (countDown.decrementAndGet() == 0) {
fiber.dispose();
fiber = null;
notifyStopped();
}
};
if (listenChannel != null) {
countDown.incrementAndGet();
listenChannel.close().addListener(listener);
}
allChannels.close().addListener(listener);
replicatorInstances.values().forEach(ReplicatorInstance::dispose);
replicatorInstances.clear();
});
}
/**
* Disconnects. This will wait for pending writes to be flushed before
* disconnecting.
*
* @return Future<Void> for when we're done disconnecting. If we weren't
* connected, this returns null.
*/
Future<Void> disconnect() {
if (channel == null) {
return null;
} else {
final Promise<Void> promise = channel.newPromise();
writeToChannel(Unpooled.EMPTY_BUFFER).addListener(
new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(
Future<? super Void> future)
throws Exception {
closeChannel(promise);
}
});
return promise;
}
}
public void stop(final Consumer<Throwable> doneHandler) {
if (channel != null) {
channel.close();
}
channelGroup.close().addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
try {
doneHandler.accept(future.cause());
} finally {
group.shutdownGracefully();
}
}
});
}
/**
* Does the work of processing the current step, checking the result and
* handling success/failure.
*
* @param LOG
*/
@SuppressWarnings("unchecked")
private void doProcessCurrentStep(final ProxyConnectionLogger LOG) {
currentStep.execute().addListener(
new GenericFutureListener<Future<?>>() {
public void operationComplete(
io.netty.util.concurrent.Future<?> future)
throws Exception {
synchronized (connectLock) {
if (future.isSuccess()) {
LOG.debug("ConnectionFlowStep succeeded");
currentStep
.onSuccess(ConnectionFlow.this);
} else {
LOG.debug("ConnectionFlowStep failed",
future.cause());
fail(future.cause());
}
}
};
});
}
@Override
public void channelActive(final ChannelHandlerContext ctx) {
// Once session is secured, send a greeting and register the channel to the global channel
// list so the channel received the messages from others.
ctx.pipeline().get(SslHandler.class).handshakeFuture().addListener(
new GenericFutureListener<Future<Channel>>() {
@Override
public void operationComplete(Future<Channel> future) throws Exception {
ctx.writeAndFlush(
"Welcome to " + InetAddress.getLocalHost().getHostName() + " secure chat service!\n");
ctx.writeAndFlush(
"Your session is protected by " +
ctx.pipeline().get(SslHandler.class).engine().getSession().getCipherSuite() +
" cipher suite.\n");
channels.add(ctx.channel());
}
});
}
public void start(String address, int port) throws Exception {
try {
if (bootstrap != null) {
ChannelFuture channelFuture = bootstrap.bind(address, port).sync();
channelFuture.channel().closeFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
closeGracefully();
}
});
} else {
throw new Exception();
}
} catch (Exception e) {
closeGracefully();
throw new Exception("start websocket server error", e);
}
}
public void start(String address, int port) throws HttpServerException {
try {
ChannelFuture channelFuture = bootstrap.bind(address, port).sync();
channelFuture.channel().closeFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
closeGracefylly();
}
});
} catch (Exception e) {
closeGracefylly();
throw new HttpServerException("start openzaly http-server error", e);
}
}
private void connect(Bootstrap b) {
if (currentState.compareAndSet(ClientConstants.CLIENT_STATUS_OFF, ClientConstants.CLIENT_STATUS_PENDING)) {
b.connect(host, port)
.addListener(new GenericFutureListener<ChannelFuture>() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.cause() != null) {
RecordLog.warn(
String.format("[NettyTransportClient] Could not connect to <%s:%d> after %d times",
host, port, failConnectedTime.get()), future.cause());
failConnectedTime.incrementAndGet();
channel = null;
} else {
failConnectedTime.set(0);
channel = future.channel();
RecordLog.info(
"[NettyTransportClient] Successfully connect to server <" + host + ":" + port + ">");
}
}
});
}
}
@Test
public void finalizeChannelPipeline_should_send_event_to_metricsListener_for_successful_response_and_flush_context() throws Exception {
// given
ChannelFuture responseWriterChannelFuture = mock(ChannelFuture.class);
state.setResponseWriterFinalChunkChannelFuture(responseWriterChannelFuture);
HttpProcessingState stateSpy = spy(state);
doReturn(stateSpy).when(stateAttributeMock).get();
ChannelFuture responseWriteFutureResult = mock(ChannelFuture.class);
doReturn(true).when(responseWriteFutureResult).isSuccess();
Assertions.assertThat(stateSpy.isRequestMetricsRecordedOrScheduled()).isFalse();
// when
handler.finalizeChannelPipeline(ctxMock, null, stateSpy, null);
// then
ArgumentCaptor<GenericFutureListener> channelFutureListenerArgumentCaptor = ArgumentCaptor.forClass(GenericFutureListener.class);
verify(responseWriterChannelFuture).addListener(channelFutureListenerArgumentCaptor.capture());
GenericFutureListener futureListener = channelFutureListenerArgumentCaptor.getValue();
assertThat(futureListener, notNullValue());
futureListener.operationComplete(responseWriteFutureResult);
verify(metricsListenerMock).onEvent(eq(ServerMetricsEvent.RESPONSE_SENT), any(HttpProcessingState.class));
verify(ctxMock).flush();
Assertions.assertThat(stateSpy.isRequestMetricsRecordedOrScheduled()).isTrue();
}
public void stop(final Consumer<Throwable> doneHandler) {
if (channel != null) {
channel.close();
}
channelGroup.close().addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
try {
doneHandler.accept(future.cause());
} finally {
group.shutdownGracefully();
}
}
});
}
@Override
public void channelActive(final ChannelHandlerContext ctx) {
// Once session is secured, send a greeting and register the channel to the global channel
// list so the channel received the messages from others.
ctx.pipeline().get(SslHandler.class).handshakeFuture().addListener(
new GenericFutureListener<Future<Channel>>() {
@Override
public void operationComplete(Future<Channel> future) throws Exception {
ctx.writeAndFlush(
"Welcome to " + InetAddress.getLocalHost().getHostName() + " secure chat service!\n");
ctx.writeAndFlush(
"Your session is protected by " +
ctx.pipeline().get(SslHandler.class).engine().getSession().getCipherSuite() +
" cipher suite.\n");
channels.add(ctx.channel());
}
});
}
@Override
protected void doStop() {
//noinspection unchecked
eventLoopGroup.shutdownGracefully().addListener(new GenericFutureListener() {
@Override
public void operationComplete(Future future) throws Exception {
if (future.isSuccess()) {
notifyStopped();
} else {
Throwable failure = new Exception("Netty event loop did not shutdown properly", future.cause());
log.error("Shutdown failed", failure);
notifyFailed(failure);
}
}
});
}
private void closeChannel(final Promise<Void> promise) {
channel.close().addListener(
new GenericFutureListener<Future<? super Void>>() {
public void operationComplete(
Future<? super Void> future)
throws Exception {
if (future
.isSuccess()) {
promise.setSuccess(null);
} else {
promise.setFailure(future
.cause());
}
};
});
}
private void acquire0(Promise<Channel> promise) {
if (closed) {
promise.setFailure(new IllegalStateException("Channel pool is closed!"));
return;
}
if (protocolImpl != null) {
protocolImpl.acquire(promise);
return;
}
if (protocolImplPromise == null) {
initializeProtocol();
}
protocolImplPromise.addListener((GenericFutureListener<Future<ChannelPool>>) future -> {
if (future.isSuccess()) {
future.getNow().acquire(promise);
} else {
// Couldn't negotiate protocol, fail this acquire.
promise.setFailure(future.cause());
}
});
}
@Override
protected Future<?> execute() {
return clientConnection
.encrypt(proxyServer.getMitmManager()
.clientSslEngineFor(initialRequest, sslEngine.getSession()), false)
.addListener(
new GenericFutureListener<Future<? super Channel>>() {
@Override
public void operationComplete(
Future<? super Channel> future)
throws Exception {
if (future.isSuccess()) {
clientConnection.setMitming(true);
}
}
});
}
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
originalPromise = promise;
ChannelPromise downPromise = ctx.newPromise();
downPromise.addListener(new GenericFutureListener<Future<Void>>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess() && !originalPromise.isDone()) {
originalPromise.setFailure(future.cause());
}
}
});
ctx.connect(remoteAddress, localAddress, downPromise);
}
public Future<IRedisCommandResponse> sendRedisCommand(final RedisCommand redisCommand) {
final Future<IRedisCommandResponse> responseFuture;
// logger.info("send push message {} {} {}", channelPromise,
// channelPromise.isSuccess(),
// channelPromise.channel().isActive());
if (channelPromise != null) {
final ChannelPromise readyPromise = this.channelPromise;
final DefaultPromise<IRedisCommandResponse> responsePromise = new DefaultPromise<IRedisCommandResponse>(
readyPromise.channel().eventLoop());
// 提交一个事件
readyPromise.channel().eventLoop().submit(new Runnable() {
@Override
public void run() {
// 将这个结果赋值给responsePromise
NettyClient2.this.responsePromise = responsePromise;
}
});
readyPromise.channel().writeAndFlush(redisCommand).addListener(new GenericFutureListener<ChannelFuture>() {
@Override
public void operationComplete(final ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
// 如果失败了,直接将promise返回
responsePromise.tryFailure(future.cause());
logger.error("send push message error: {},cause={}", redisCommand, future.cause());
} else {
// logger.info("write data to platform success");
}
}
});
responseFuture = responsePromise;
} else {
logger.error("send push error because client is not connected: {}", redisCommand.toString());
responseFuture = new FailedFuture<IRedisCommandResponse>(GlobalEventExecutor.INSTANCE, CONNECT_EXCEPTION);
}
return responseFuture;
}
@Override
public void connect() {
Assert.notNull(serverAttr, "serverAttr can not be null");
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ClientInitializer(GenericClient.this));
}
});
ChannelFuture future = bootstrap.connect(serverAttr.getAddress(), serverAttr.getPort());
future.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> f) throws Exception {
channel = future.channel();
if (f.isSuccess()) {
connected = true;
log.info("[{}] Has connected to {} successfully", GenericClient.class.getSimpleName(), serverAttr);
} else {
log.warn("[{}] Connect to {} failed, cause={}", GenericClient.class.getSimpleName(), serverAttr, f.cause().getMessage());
// fire the channelInactive and make sure
// the {@link HealthyChecker} will reconnect
channel.pipeline().fireChannelInactive();
}
}
});
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
super.flush(ctx);
ctx.close().addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future)
throws Exception {
System.out.println("close connection: "+future.isSuccess());
}
});
}
private void agentRegister(ChannelHandlerContext ctx, String requestUri) throws URISyntaxException {
// generate a random agent id
String id = RandomStringUtils.random(20, true, true).toUpperCase();
QueryStringDecoder queryDecoder = new QueryStringDecoder(requestUri);
List<String> idList = queryDecoder.parameters().get("id");
if (idList != null && !idList.isEmpty()) {
id = idList.get(0);
}
final String finalId = id;
URI responseUri = new URI("response", null, "/", "method=agentRegister" + "&id=" + id, null);
AgentInfo info = new AgentInfo();
SocketAddress remoteAddress = ctx.channel().remoteAddress();
if (remoteAddress instanceof InetSocketAddress) {
InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
info.setHost(inetSocketAddress.getHostString());
info.setPort(inetSocketAddress.getPort());
}
info.setChannelHandlerContext(ctx);
tunnelServer.addAgent(id, info);
ctx.channel().closeFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
tunnelServer.removeAgent(finalId);
}
});
ctx.channel().writeAndFlush(new TextWebSocketFrame(responseUri.toString()));
}
/**
* Create a {@link GenericFutureListener} that will notify the provided {@link Promise} on success and failure.
*
* @param channelPromise Promise to notify.
* @return GenericFutureListener
*/
public static <T> GenericFutureListener<Future<T>> promiseNotifyingListener(Promise<T> channelPromise) {
return future -> {
if (future.isSuccess()) {
channelPromise.setSuccess(future.getNow());
} else {
channelPromise.setFailure(future.cause());
}
};
}
@SafeVarargs
@Override
public final ChannelPromise removeListeners(
final GenericFutureListener<? extends Future<? super Void>>... listeners) {
assert channel.eventLoop().inEventLoop();
for (GenericFutureListener<? extends Future<? super Void>> listener : listeners) {
listenersOnWriteBoundaries.removeFirstOccurrence(listener);
}
return this;
}
@Override
@SafeVarargs
public final Promise<T> addListeners(
GenericFutureListener<? extends Future<? super T>>... listeners) {
for (GenericFutureListener<? extends Future<? super T>> l : listeners) {
delegate.addListeners(RequestContextAwareFutureListener.of(context, l));
}
return this;
}
@Override
public void sendOneWayMessage(ByteBuf message, SendResultCallback callback) {
io.netty.channel.Channel _socket = this.socket;
if (_socket == null || !_socket.isOpen()) {
callback.messageSent(new Exception(this + " connection is closed"));
return;
}
if (LOGGER.isLoggable(Level.FINEST)) {
StringBuilder dumper = new StringBuilder();
ByteBufUtil.appendPrettyHexDump(dumper, message);
LOGGER.log(Level.FINEST, "Sending to {}: {}", new Object[]{_socket, dumper});
}
_socket.writeAndFlush(message).addListener(new GenericFutureListener() {
@Override
public void operationComplete(Future future) throws Exception {
if (future.isSuccess()) {
callback.messageSent(null);
} else {
LOGGER.log(Level.SEVERE, this + ": error " + future.cause(), future.cause());
callback.messageSent(future.cause());
close();
}
}
});
unflushedWrites.incrementAndGet();
}