下面列出了怎么用io.netty.util.concurrent.Future的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void channelActive(final ChannelHandlerContext ctx) {
// Once session is secured, send a greeting and register the channel to the global channel
// list so the channel received the messages from others.
ctx.pipeline().get(SslHandler.class).handshakeFuture().addListener(
new GenericFutureListener<Future<Channel>>() {
@Override
public void operationComplete(Future<Channel> future) throws Exception {
ctx.writeAndFlush(
"Welcome to " + InetAddress.getLocalHost().getHostName() + " secure chat service!\n");
ctx.writeAndFlush(
"Your session is protected by " +
ctx.pipeline().get(SslHandler.class).engine().getSession().getCipherSuite() +
" cipher suite.\n");
channels.add(ctx.channel());
}
});
}
public void disconnect() {
// logger.info("close tcp socket, Disconnecting.");
synchronized (this.clientBoot) {
this.channelPromise = null;
final Future<Void> channelCloseFuture;
if (this.channelPromise != null) {
channelCloseFuture = this.channelPromise.channel().close();
} else {
channelCloseFuture = new SucceededFuture<Void>(GlobalEventExecutor.INSTANCE, null);
}
channelCloseFuture.addListener(new GenericFutureListener<Future<Void>>() {
@Override
public void operationComplete(final Future<Void> future) throws Exception {
PlatformSSLClient.this.clientBoot.config().group().shutdownGracefully();
}
});
}
// logger.info("close netty tcp socket connection");
}
@Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
// Once session is secured, send a greeting and register the channel to
// the global channel
// list so the channel received the messages from others.
ctx.pipeline().get(SslHandler.class).handshakeFuture()
.addListener(new GenericFutureListener<Future<Channel>>() {
@Override
public void operationComplete(Future<Channel> future)
throws Exception {
ctx.writeAndFlush("Welcome to "
+ InetAddress.getLocalHost().getHostName()
+ " secure chat service!\n");
ctx.writeAndFlush("Your session is protected by "
+ ctx.pipeline().get(SslHandler.class).engine()
.getSession().getCipherSuite()
+ " cipher suite.\n");
channels.add(ctx.channel());
}
});
}
@Override
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
connected.set(false);
try {
if (channel != null) {
channel.close().syncUninterruptibly();
}
} finally {
if (group != null) {
Future<?> fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
LOG.trace("Channel group shutdown failed to complete in allotted time");
}
}
}
}
}
protected Future execute() {
try {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.get("encoder") != null) {
pipeline.remove("encoder");
}
if (pipeline.get("responseWrittenMonitor") != null) {
pipeline.remove("responseWrittenMonitor");
}
if (pipeline.get("decoder") != null) {
pipeline.remove("decoder");
}
if (pipeline.get("requestReadMonitor") != null) {
pipeline.remove("requestReadMonitor");
}
tunneling = true;
return channel.newSucceededFuture();
} catch (Throwable t) {
return channel.newFailedFuture(t);
}
}
@Test
public void testRSCR() {
Future<Object> recvFuture1 = pipe.receive();
assertFalse(recvFuture1.isDone());
Future<Void> sendFuture1 = pipe.send(MESSAGE);
assertTrue(sendFuture1.isSuccess());
assertSame(MESSAGE, recvFuture1.getNow());
pipe.close();
Future<Object> recvFuture2 = pipe.receive();
assertNotNull(recvFuture2.cause());
assertTrue(sendFuture1.isSuccess());
assertTrue(recvFuture1.isSuccess());
}
private void doQuery0() {
assertInEventloop();
if (closed) {
// best effort check to cleanup state after close.
handleTerminalError0(new ClosedServiceDiscovererException(DefaultDnsClient.this +
" has been closed!"));
} else {
final DnsResolutionObserver resolutionObserver = newResolutionObserver();
LOGGER.trace("DnsClient {}, querying DNS for {}", DefaultDnsClient.this, AbstractDnsPublisher.this);
final Future<DnsAnswer<T>> addressFuture = doDnsQuery();
cancellableForQuery = () -> addressFuture.cancel(true);
if (addressFuture.isDone()) {
handleResolveDone0(addressFuture, resolutionObserver);
} else {
addressFuture.addListener((FutureListener<DnsAnswer<T>>) f ->
handleResolveDone0(f, resolutionObserver));
}
}
}
public Future<Void> connect(String address, int port) {
final Future<Void> connectionFuture;
this.peerHost = address;
this.peerPort = port;
synchronized (clientBoot) {
if (this.channelPromise == null) {
try {
final ChannelFuture connectFuture = this.clientBoot.connect(address, port).sync();
this.channelPromise = connectFuture.channel().newPromise();
} catch (Exception e) {
logger.error("connect to akaxin platform error.", e);
}
}
connectionFuture = this.channelPromise;
}
// logger.info("connect to server connectionFuture={}", connectionFuture);
return connectionFuture;
}
@Override
public void connectionSucceeded(final UserToBitConnection connection) {
connection.getChannel().closeFuture().addListener(
new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future)
throws Exception {
for (final UserResultsListener listener : queryIdToResultsListenersMap.values()) {
listener.submissionFailed(UserException.connectionError()
.message("Connection %s closed unexpectedly. Drillbit down?",
connection.getName())
.build(logger));
if (listener instanceof BufferingResultsListener) {
// the appropriate listener will be failed by SubmissionListener#failed
logger.warn("Buffering listener failed before results were transferred to the actual listener.");
}
}
}
});
parentHandler.connectionSucceeded(connection);
}
@Test
public void testSSRR() {
Future<Void> sendFuture1 = pipe.send(MESSAGE1);
assertFalse(sendFuture1.isDone());
Future<Void> sendFuture2 = pipe.send(MESSAGE2);
assertFalse(sendFuture2.isDone());
assertFalse(sendFuture1.isDone());
Future<Object> recvFuture1 = pipe.receive();
assertSame(MESSAGE1, recvFuture1.getNow());
assertFalse(sendFuture2.isDone());
assertTrue(sendFuture1.isSuccess());
Future<Object> recvFuture2 = pipe.receive();
assertSame(MESSAGE2, recvFuture2.getNow());
assertTrue(recvFuture1.isSuccess());
assertTrue(sendFuture2.isSuccess());
assertTrue(sendFuture1.isSuccess());
}
/**
* Sends message to serialization.
*
* @param msg to be sent
*/
@Override
public Future<Void> sendMessage(final Message msg) {
final ChannelFuture f = this.channel.writeAndFlush(msg);
this.lastMessageSentAt = TICKER.read();
this.sessionState.updateLastSentMsg();
if (!(msg instanceof KeepaliveMessage)) {
LOG.debug("PCEP Message enqueued: {}", msg);
}
if (msg instanceof PcerrMessage) {
this.sessionState.setLastSentError(msg);
}
f.addListener((ChannelFutureListener) arg -> {
if (arg.isSuccess()) {
LOG.trace("Message sent to socket: {}", msg);
} else {
LOG.debug("Message not sent: {}", msg, arg.cause());
}
});
return f;
}
@Override
public void channelActive(final ChannelHandlerContext ctx) {
// 一旦session处于安全状态, 发送一个标记将但前channel注册到全局channel列表
// 可以接收其他channel的消息.
ctx.pipeline().get(SslHandler.class).handshakeFuture().addListener(
new GenericFutureListener<Future<Channel>>() {
@Override
public void operationComplete(Future<Channel> future) throws Exception {
ctx.writeAndFlush(
"Welcome to " + InetAddress.getLocalHost().getHostName() + " secure chat service!\n");
ctx.writeAndFlush(
"Your session is protected by " +
ctx.pipeline().get(SslHandler.class).engine().getSession().getCipherSuite() +
" cipher suite.\n");
channels.add(ctx.channel());
}
});
}
public static void asyncWrite(String action, byte[] byteData) {
try {
CoreProto.TransportPackageData.Builder packageDataBuilder = CoreProto.TransportPackageData.newBuilder();
Map<Integer, String> header = new HashMap<Integer, String>();
header.put(CoreProto.HeaderKey.SITE_SERVER_VERSION_VALUE, CommandConst.SITE_VERSION);
packageDataBuilder.putAllHeader(header);
packageDataBuilder.setData(ByteString.copyFrom(byteData));
PlatformSSLClient nettyClient = new PlatformSSLClient();
nettyClient.connect(AKAXIN_PUSH_ADDRESS, AKAXIN_PUSH_PORT);
Future<IRedisCommandResponse> future = nettyClient.sendRedisCommand(new RedisCommand()
.add(CommandConst.PROTOCOL_VERSION).add(action).add(packageDataBuilder.build().toByteArray()));
IRedisCommandResponse response = future.get(5, TimeUnit.SECONDS);
logger.debug("write push to platform finish response={}", response);
nettyClient.disconnect();
} catch (Exception e) {
logger.error("async send package to platform error", e);
}
}
public Future<Void> connect(String address, int port) {
final Future<Void> connectionFuture;
this.peerHost = address;
this.peerPort = port;
synchronized (clientBoot) {
if (this.channelPromise == null) {
try {
final ChannelFuture connectFuture = this.clientBoot.connect(address, port).sync();
this.channelPromise = connectFuture.channel().newPromise();
} catch (Exception e) {
logger.error("connect to akaxin platform error.", e);
}
}
connectionFuture = this.channelPromise;
}
// logger.info("connect to server connectionFuture={}", connectionFuture);
return connectionFuture;
}
public static void writeAndClose(final Channel channel, CommandResponse response) {
CoreProto.TransportPackageData.Builder packageDataBuilder = CoreProto.TransportPackageData.newBuilder();
CoreProto.ErrorInfo errorInfo = CoreProto.ErrorInfo.newBuilder().setCode(response.getErrCode())
.setInfo(String.valueOf(response.getErrInfo())).build();
packageDataBuilder.setErr(errorInfo);
Map<Integer, String> header = new HashMap<Integer, String>();
header.put(CoreProto.HeaderKey.SITE_SERVER_VERSION_VALUE, CommandConst.SITE_VERSION);
packageDataBuilder.putAllHeader(header);
if (response.getParams() != null) {
packageDataBuilder.setData(ByteString.copyFrom(response.getParams()));
}
channel.writeAndFlush(new RedisCommand().add(response.getVersion()).add(response.getAction())
.add(packageDataBuilder.build().toByteArray()))
.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
channel.close();
}
});
}
@Override
public Future<T> asyncExecute(Promise<T> promise, Object... request) {
if (promise == null) {
throw new IllegalArgumentException("promise should not be null");
}
// async execute
eventExecutor.execute(new Runnable() {
@Override
public void run() {
try {
T response = doExecute(request);
promise.setSuccess(response);
} catch (Exception e) {
promise.setFailure(e);
}
}
});
// return the promise back
return promise;
}
private void finishPeerRead0(VirtualChannel peer) {
Future<?> peerFinishReadFuture = peer.finishReadFuture;
if (peerFinishReadFuture != null) {
if (!peerFinishReadFuture.isDone()) {
runFinishPeerReadTask(peer);
return;
} else {
// Lazy unset to make sure we don't prematurely unset it while scheduling a new task.
VirtualChannel.FINISH_READ_FUTURE_UPDATER.compareAndSet(peer, peerFinishReadFuture, null);
}
}
// We should only set readInProgress to false if there is any data that was read as otherwise we may miss to
// forward data later on.
if (peer.readInProgress && !peer.inboundBuffer.isEmpty()) {
peer.readInProgress = false;
peer.readInbound();
}
}
@Override
public void writeAndFlush(final Object obj) {
Future future = channel.writeAndFlush(obj);
future.addListener(new FutureListener() {
@Override
public void operationComplete(Future future1) throws Exception {
if (!future1.isSuccess()) {
Throwable throwable = future1.cause();
LOGGER.error("Failed to send to "
+ NetUtils.channelToString(localAddress(), remoteAddress())
+ " for msg : " + obj
+ ", Cause by:", throwable);
}
}
});
}
public static byte[] syncWrite(String action, byte[] byteData) {
try {
CoreProto.TransportPackageData.Builder packageDataBuilder = CoreProto.TransportPackageData.newBuilder();
Map<Integer, String> header = new HashMap<Integer, String>();
header.put(CoreProto.HeaderKey.SITE_SERVER_VERSION_VALUE, CommandConst.SITE_VERSION);
packageDataBuilder.putAllHeader(header);
packageDataBuilder.setData(ByteString.copyFrom(byteData));
PlatformSSLClient nettyClient = new PlatformSSLClient();
nettyClient.connect(AKAXIN_PLATFROM_HOST, AKAXIN_PLATFROM_PORT);
Future<IRedisCommandResponse> future = nettyClient.sendRedisCommand(new RedisCommand()
.add(CommandConst.PROTOCOL_VERSION).add(action).add(packageDataBuilder.build().toByteArray()));
IRedisCommandResponse response = future.get(5, TimeUnit.SECONDS);
nettyClient.disconnect();
if (response != null && response.isSuccess()) {
return getResponseBytes(response.getRedisCommand());
}
logger.debug("sync write data to platform with response={}", response);
} catch (Exception e) {
logger.error("sync send package error ", e);
}
return null;
}
@Test
void removedWhenNoCacheHit() throws Exception {
try (TestDnsServer server = new TestDnsServer(ImmutableMap.of(
new DefaultDnsQuestion("foo.com.", A),
new DefaultDnsResponse(0).addRecord(ANSWER, newAddressRecord("foo.com.", "1.1.1.1", 1))))
) {
final EventLoop eventLoop = eventLoopExtension.get();
final DnsResolverGroupBuilder builder = builder(server);
try (RefreshingAddressResolverGroup group = builder.build(eventLoop)) {
final AddressResolver<InetSocketAddress> resolver = group.getResolver(eventLoop);
final long start = System.nanoTime();
final Future<InetSocketAddress> foo = resolver.resolve(
InetSocketAddress.createUnresolved("foo.com", 36462));
await().untilAsserted(() -> assertThat(foo.isSuccess()).isTrue());
assertThat(foo.getNow().getAddress().getHostAddress()).isEqualTo("1.1.1.1");
final ConcurrentMap<String, CompletableFuture<CacheEntry>> cache = group.cache();
await().until(cache::isEmpty);
assertThat(System.nanoTime() - start).isGreaterThanOrEqualTo(
(long) (TimeUnit.SECONDS.toNanos(1) * 0.9));
}
}
}
public static void main(String[] args) {
final EventLoopGroup ioGroup = new NioEventLoopGroup(1);
//start listen
Bootstrap boot = new Bootstrap();
boot.group(ioGroup)
.option(ChannelOption.SO_BROADCAST, false)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.SO_SNDBUF, 1024*10)
.option(ChannelOption.SO_RCVBUF, 1024*10)
.channel(NioDatagramChannel.class)
.handler(new UdpHandlerTestClient());
try{
ChannelFuture future = boot.bind(0).sync();
channel = future.channel();
future.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> f) throws Exception {
boolean isDone = f.isDone();
boolean isSucc = f.isSuccess();
boolean isCancel = f.isCancelled();
if(isDone && isSucc){ //listen
log.I("Init udp succ");
}else{
//shutdown io group
ioGroup.shutdownGracefully();
}
}
});
}catch(Throwable e){
e.printStackTrace();
}
//start loop
ExecutorService thPool = Executors.newFixedThreadPool(1);
thPool.submit(new UdpTestClientLoop());
}
public Future<Void> write(HttpRequest request) {
if (!request.headers().contains(HttpHeaderNames.HOST)) {
SocketAddress address = bootstrap.config().remoteAddress();
if (address instanceof InetSocketAddress) {
InetSocketAddress socketAddress = (InetSocketAddress) address;
String value = socketAddress.getHostString() + ":" + socketAddress.getPort();
request.headers().set(HttpHeaderNames.HOST, value);
}
}
return write((Object) request);
}
/**
* Closes all channels and releases all resources.
*/
@Override
public void close() {
LOG.log(Level.FINE, "Closing netty transport socket address: {0}", this.localAddress);
final ChannelGroupFuture clientChannelGroupFuture = this.clientChannelGroup.close();
final ChannelGroupFuture serverChannelGroupFuture = this.serverChannelGroup.close();
final ChannelFuture acceptorFuture = this.acceptor.close();
final ArrayList<Future> eventLoopGroupFutures = new ArrayList<>(3);
eventLoopGroupFutures.add(this.clientWorkerGroup.shutdownGracefully());
eventLoopGroupFutures.add(this.serverBossGroup.shutdownGracefully());
eventLoopGroupFutures.add(this.serverWorkerGroup.shutdownGracefully());
clientChannelGroupFuture.awaitUninterruptibly();
serverChannelGroupFuture.awaitUninterruptibly();
try {
acceptorFuture.sync();
} catch (final Exception ex) {
LOG.log(Level.SEVERE, "Error closing the acceptor channel for " + this.localAddress, ex);
}
for (final Future eventLoopGroupFuture : eventLoopGroupFutures) {
eventLoopGroupFuture.awaitUninterruptibly();
}
LOG.log(Level.FINE, "Closing netty transport socket address: {0} done", this.localAddress);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
promise.addListener(new GenericFutureListener<Future<? super Void>>() {
public void operationComplete(Future<? super Void> future) throws Exception {
if (!future.isSuccess()) {
logger.error("write data to client fail ", future.cause());
}
}
});
super.write(ctx, msg, promise);
}
@Override
protected void configure(ServerBuilder sb) throws Exception {
sb.http(0);
sb.https(0);
sb.tlsSelfSigned();
sb.serviceUnder(
"/jsp/",
TomcatService.builder(webAppRoot())
.serviceName(SERVICE_NAME)
.configurator(s -> Collections.addAll(tomcatServices, s.findServices()))
.build()
.decorate(LoggingService.newDecorator()));
sb.serviceUnder(
"/jar/",
TomcatService.builder(AppRootFinder.find(Future.class))
.serviceName("TomcatServiceTest-JAR")
.build()
.decorate(LoggingService.newDecorator()));
sb.serviceUnder(
"/jar_altroot/",
TomcatService.builder(AppRootFinder.find(Future.class), "/io/netty/util/concurrent")
.serviceName("TomcatServiceTest-JAR-AltRoot")
.build()
.decorate(LoggingService.newDecorator()));
}
/**
* Create a {@link GenericFutureListener} that will notify the provided {@link Promise} on success and failure.
*
* @param channelPromise Promise to notify.
* @return GenericFutureListener
*/
public static <T> GenericFutureListener<Future<T>> promiseNotifyingListener(Promise<T> channelPromise) {
return future -> {
if (future.isSuccess()) {
channelPromise.setSuccess(future.getNow());
} else {
channelPromise.setFailure(future.cause());
}
};
}
@Override
@SuppressWarnings("unchecked")
public void operationComplete(Future future) throws Exception {
try (SafeCloseable ignored = ctx.push()) {
listener.operationComplete(future);
}
}
Future<PCEPSessionImpl> createClient(final InetSocketAddress address, final int retryTimer,
final int connectTimeout, final PCEPSessionListenerFactory listenerFactory) {
return createClient(address, retryTimer, connectTimeout, (ch, promise) -> {
ch.pipeline().addLast(this.factory.getDecoders());
ch.pipeline().addLast("negotiator", this.negotiatorFactory.getSessionNegotiator(
() -> listenerFactory, ch, promise));
ch.pipeline().addLast(this.factory.getEncoders());
});
}
/**
* Disconnects. This will wait for pending writes to be flushed before
* disconnecting.
*
* @return {@code Future<Void>} for when we're done disconnecting. If we weren't
* connected, this returns null.
*/
Future<Void> disconnect() {
if (channel == null) {
return null;
} else {
final Promise<Void> promise = channel.newPromise();
writeToChannel(Unpooled.EMPTY_BUFFER).addListener(
future -> closeChannel(promise));
return promise;
}
}
@SuppressWarnings("FutureReturnValueIgnored")
private Future<Channel> acquireDownloadChannel() {
Promise<Channel> channelReady = eventLoop.next().newPromise();
channelPool
.acquire()
.addListener(
(Future<Channel> channelAcquired) -> {
if (!channelAcquired.isSuccess()) {
channelReady.setFailure(channelAcquired.cause());
return;
}
try {
Channel ch = channelAcquired.getNow();
ChannelPipeline p = ch.pipeline();
if (!isChannelPipelineEmpty(p)) {
channelReady.setFailure(
new IllegalStateException("Channel pipeline is not empty."));
return;
}
ch.pipeline()
.addFirst("read-timeout-handler", new ReadTimeoutHandler(timeoutMillis));
p.addLast(new HttpClientCodec());
synchronized (credentialsLock) {
p.addLast(new HttpDownloadHandler(creds));
}
channelReady.setSuccess(ch);
} catch (Throwable t) {
channelReady.setFailure(t);
}
});
return channelReady;
}