下面列出了 io.netty.handler.codec.http2.Http2Connection #io.netty.util.concurrent.DefaultPromise 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Publish a message to the given payload, using the given qos and optional retain
*
* @param topic The topic to publish to
* @param payload The payload to send
* @param qos The qos to use while publishing
* @param retain true if you want to retain the message on the server, false otherwise
* @return A future which will be completed when the message is delivered to the server
*/
@Override
public Future<Void> publish(String topic, ByteBuf payload, MqttQoS qos, boolean retain) {
Promise<Void> future = new DefaultPromise<>(this.eventLoop.next());
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, retain, 0);
MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(topic, getNewMessageId().messageId());
MqttPublishMessage message = new MqttPublishMessage(fixedHeader, variableHeader, payload);
MqttPendingPublish pendingPublish = new MqttPendingPublish(variableHeader.packetId(), future, payload.retain(), message, qos);
ChannelFuture channelFuture = this.sendAndFlushPacket(message);
if (channelFuture != null) {
pendingPublish.setSent(true);
if (channelFuture.cause() != null) {
future.setFailure(channelFuture.cause());
return future;
}
}
if (pendingPublish.isSent() && pendingPublish.getQos() == MqttQoS.AT_MOST_ONCE) {
pendingPublish.getFuture().setSuccess(null); //We don't get an ACK for QOS 0
} else if (pendingPublish.isSent()) {
this.pendingPublishes.put(pendingPublish.getMessageId(), pendingPublish);
pendingPublish.startPublishRetransmissionTimer(this.eventLoop.next(), this::sendAndFlushPacket);
}
return future;
}
@Test(timeout = 5000)
public void newOutboundStream() {
final Http2FrameStream stream = frameCodec.newStream();
assertNotNull(stream);
assertFalse(isStreamIdValid(stream.id()));
final Promise<Void> listenerExecuted = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
channel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers(), false).stream(stream))
.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
assertTrue(future.isSuccess());
assertTrue(isStreamIdValid(stream.id()));
listenerExecuted.setSuccess(null);
}
}
);
ByteBuf data = Unpooled.buffer().writeZero(100);
ChannelFuture f = channel.writeAndFlush(new DefaultHttp2DataFrame(data).stream(stream));
assertTrue(f.isSuccess());
listenerExecuted.syncUninterruptibly();
assertTrue(listenerExecuted.isSuccess());
}
@Test
public void testTerminationFutureSuccessReflectively() throws Exception {
Field terminationFutureField =
ThreadPerChannelEventLoopGroup.class.getDeclaredField("terminationFuture");
terminationFutureField.setAccessible(true);
final Exception[] exceptionHolder = new Exception[1];
for (int i = 0; i < 2; i++) {
ThreadPerChannelEventLoopGroup loopGroup = new ThreadPerChannelEventLoopGroup(64);
Promise<?> promise = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE) {
@Override
public Promise<Void> setSuccess(Void result) {
try {
return super.setSuccess(result);
} catch (IllegalStateException e) {
exceptionHolder[0] = e;
throw e;
}
}
};
terminationFutureField.set(loopGroup, promise);
runTest(loopGroup);
}
// The global event executor will not terminate, but this will give the test a chance to fail.
GlobalEventExecutor.INSTANCE.awaitTermination(100, TimeUnit.MILLISECONDS);
assertNull(exceptionHolder[0]);
}
@Test
public void itReturnsTheStartTlsResponseIfTheTlsHandshakeSucceeds() throws Exception {
CompletableFuture<SmtpClientResponse> f = session.startTls();
responseFuture.complete(Lists.newArrayList(OK_RESPONSE));
// respond to the ehlo sent after starttls
secondResponseFuture.complete(Lists.newArrayList(new DefaultSmtpResponse(250,
"smtp.example.com Hello client.example.com",
"AUTH PLAIN LOGIN",
"PIPELINING")));
// the handshake succeeds
SslHandler sslHandler = getSslHandler();
((DefaultPromise<Channel>) sslHandler.handshakeFuture()).setSuccess(channel);
assertThat(f.isDone()).isTrue();
assertThat(f.get().getResponses().get(0).code()).isEqualTo(OK_RESPONSE.code());
// check EHLO is parsed again
assertThat(session.getEhloResponse().isSupported(Extension.PIPELINING)).isTrue();
assertThat(session.getEhloResponse().isSupported(Extension.STARTTLS)).isFalse();
}
@Test
public void acquire_shouldAcquireAgainIfExistingNotReusable() throws Exception {
Channel channel = new EmbeddedChannel();
try {
ChannelPool connectionPool = Mockito.mock(ChannelPool.class);
loopGroup.register(channel).awaitUninterruptibly();
Promise<Channel> channelPromise = new DefaultPromise<>(loopGroup.next());
channelPromise.setSuccess(channel);
Mockito.when(connectionPool.acquire()).thenReturn(channelPromise);
Http2MultiplexedChannelPool h2Pool = new Http2MultiplexedChannelPool(connectionPool, loopGroup,
Collections.emptySet(), null);
h2Pool.acquire().awaitUninterruptibly();
h2Pool.acquire().awaitUninterruptibly();
Mockito.verify(connectionPool, Mockito.times(2)).acquire();
} finally {
channel.close();
}
}
/**
* Connect to MySQL.
*/
public synchronized void connect() {
responseCallback = new DefaultPromise<>(eventLoopGroup.next());
channel = new Bootstrap()
.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(final SocketChannel socketChannel) {
socketChannel.pipeline().addLast(new PacketCodec(new MySQLPacketCodecEngine()));
socketChannel.pipeline().addLast(new MySQLCommandPacketDecoder());
socketChannel.pipeline().addLast(new MySQLNegotiateHandler(username, password, responseCallback));
socketChannel.pipeline().addLast(new MySQLCommandResponseHandler());
}
})
.option(ChannelOption.AUTO_READ, true)
.connect(host, port).channel();
serverInfo = waitExpectedResponse(ServerInfo.class);
}
@Test
public void testTerminationFutureSuccessReflectively() throws Exception {
Field terminationFutureField =
ThreadPerChannelEventLoopGroup.class.getDeclaredField("terminationFuture");
terminationFutureField.setAccessible(true);
final Exception[] exceptionHolder = new Exception[1];
for (int i = 0; i < 2; i++) {
ThreadPerChannelEventLoopGroup loopGroup = new ThreadPerChannelEventLoopGroup(64);
Promise<?> promise = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE) {
@Override
public Promise<Void> setSuccess(Void result) {
try {
return super.setSuccess(result);
} catch (IllegalStateException e) {
exceptionHolder[0] = e;
throw e;
}
}
};
terminationFutureField.set(loopGroup, promise);
runTest(loopGroup);
}
// The global event executor will not terminate, but this will give the test a chance to fail.
GlobalEventExecutor.INSTANCE.awaitTermination(100, TimeUnit.MILLISECONDS);
assertNull(exceptionHolder[0]);
}
private void writeAndFlush(Object message, DefaultPromise<Void> promise) {
Channel channel = channelResult.channel();
channel
.writeAndFlush(message)
.addListener(
(ChannelFutureListener)
channelFuture -> {
if (channelFuture.isSuccess()) {
log.debug("write finished for " + message);
promise.setSuccess(null);
} else {
log.error("Write error: ", channelFuture.cause());
promise.setFailure(channelFuture.cause());
}
});
}
public Future<Void> send(Object message) {
DefaultPromise<Void> promise = new DefaultPromise<>(eventLoopGroup().next());
log.debug("Acquiring Node: " + this);
if (channelResult == null) {
channelResult = bootstrap.clone().connect();
}
if (channelResult.isSuccess()) {
writeAndFlush(message, promise);
} else {
channelResult.addListener(
(ChannelFutureListener)
channelFuture -> {
if (channelFuture.isSuccess()) {
log.debug("connection achieved " + message);
writeAndFlush(message, promise);
} else {
log.error("connection error: ", channelFuture.cause());
promise.setFailure(channelFuture.cause());
}
});
}
return promise;
}
private void acquireWithRetry(AsyncRetryLoop retry, DefaultPromise<Channel> result) {
Future<Channel> poolResult = simpleChannelPool.acquire();
poolResult.addListener(
new FutureListener<Channel>() {
public void operationComplete(Future<Channel> f) {
if (f.isSuccess()) {
result.setSuccess(f.getNow());
} else {
// deal with connection failure here.
if (retry.canRetry()) {
retry.attempt(() -> acquireWithRetry(retry, result));
} else {
result.setFailure(f.cause());
}
}
}
});
}
/**
* Establish PCEPS TLS connection with peer.
*/
@Test
public void testEstablishTLS() {
final DefaultPCEPSessionNegotiator negotiator =
new DefaultPCEPSessionNegotiator(new DefaultPromise<>(GlobalEventExecutor.INSTANCE),
this.channel, this.listener, (short) 1, 20, new OpenBuilder().setKeepalive(Uint8.ONE).build(),
SslContextFactoryTest.createTlsConfig());
negotiator.channelActive(null);
assertEquals(1, this.msgsSend.size());
assertTrue(this.msgsSend.get(0) instanceof Starttls);
assertEquals(DefaultPCEPSessionNegotiator.State.START_TLS_WAIT, negotiator.getState());
negotiator.handleMessage(this.startTlsMsg);
assertEquals(DefaultPCEPSessionNegotiator.State.OPEN_WAIT, negotiator.getState());
assertEquals(2, this.msgsSend.size());
assertTrue(this.msgsSend.get(1) instanceof Open);
negotiator.handleMessage(this.openMsg);
assertEquals(DefaultPCEPSessionNegotiator.State.KEEP_WAIT, negotiator.getState());
}
/**
* IOException is expected if acquire stream from closed channel.
*/
@Test
public void acquireClaimedConnectionOnClosedChannelShouldThrowIOException() {
loopGroup.register(channel).awaitUninterruptibly();
Promise<Channel> channelPromise = new DefaultPromise<>(loopGroup.next());
MultiplexedChannelRecord record = new MultiplexedChannelRecord(channel, 1, 10000L, streamChannelInitializer);
record.closeChildChannels();
record.acquireClaimedStream(channelPromise);
try {
channelPromise.get();
} catch (InterruptedException | ExecutionException e) {
assertTrue(e.getCause() instanceof IOException);
}
}
public Future<IRedisCommandResponse> sendRedisCommand(final RedisCommand redisCommand) {
final Future<IRedisCommandResponse> responseFuture;
// logger.info("send push message {} {} {}", channelPromise,
// channelPromise.isSuccess(),
// channelPromise.channel().isActive());
if (channelPromise != null) {
final ChannelPromise readyPromise = this.channelPromise;
final DefaultPromise<IRedisCommandResponse> responsePromise = new DefaultPromise<IRedisCommandResponse>(
readyPromise.channel().eventLoop());
// 提交一个事件
readyPromise.channel().eventLoop().submit(new Runnable() {
@Override
public void run() {
// 将这个结果赋值给responsePromise
NettyClient2.this.responsePromise = responsePromise;
}
});
readyPromise.channel().writeAndFlush(redisCommand).addListener(new GenericFutureListener<ChannelFuture>() {
@Override
public void operationComplete(final ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
// 如果失败了,直接将promise返回
responsePromise.tryFailure(future.cause());
logger.error("send push message error: {},cause={}", redisCommand, future.cause());
} else {
// logger.info("write data to platform success");
}
}
});
responseFuture = responsePromise;
} else {
logger.error("send push error because client is not connected: {}", redisCommand.toString());
responseFuture = new FailedFuture<IRedisCommandResponse>(GlobalEventExecutor.INSTANCE, CONNECT_EXCEPTION);
}
return responseFuture;
}
public Future<IRedisCommandResponse> sendRedisCommand(final RedisCommand redisCommand) {
final Future<IRedisCommandResponse> responseFuture;
if (channelPromise != null) {
final ChannelPromise readyPromise = this.channelPromise;
final DefaultPromise<IRedisCommandResponse> responsePromise = new DefaultPromise<IRedisCommandResponse>(
readyPromise.channel().eventLoop());
// 提交一个事件
readyPromise.channel().eventLoop().submit(new Runnable() {
@Override
public void run() {
// 将这个结果赋值给responsePromise
PlatformSSLClient.this.responsePromise = responsePromise;
}
});
readyPromise.channel().writeAndFlush(redisCommand).addListener(new GenericFutureListener<ChannelFuture>() {
@Override
public void operationComplete(final ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
// 如果失败了,直接将promise返回
responsePromise.tryFailure(future.cause());
logger.error("send push message error: {},cause={}", redisCommand, future.cause());
} else {
// logger.info("write data to platform success");
}
}
});
responseFuture = responsePromise;
} else {
logger.error("send push error because client is not connected: {}", redisCommand.toString());
responseFuture = new FailedFuture<IRedisCommandResponse>(GlobalEventExecutor.INSTANCE, CONNECT_EXCEPTION);
}
return responseFuture;
}
/**
* Remove the subscription for the given topic and handler
* If you want to unsubscribe from all handlers known for this topic, use {@link #off(String)}
*
* @param topic The topic to unsubscribe for
* @param handler The handler to unsubscribe
* @return A future which will be completed when the server acknowledges our unsubscribe request
*/
@Override
public Future<Void> off(String topic, MqttHandler handler) {
Promise<Void> future = new DefaultPromise<>(this.eventLoop.next());
for (MqttSubscription subscription : this.handlerToSubscribtion.get(handler)) {
this.subscriptions.remove(topic, subscription);
}
this.handlerToSubscribtion.removeAll(handler);
this.checkSubscribtions(topic, future);
return future;
}
/**
* Remove all subscriptions for the given topic.
* If you want to specify which handler to unsubscribe, use {@link #off(String, MqttHandler)}
*
* @param topic The topic to unsubscribe for
* @return A future which will be completed when the server acknowledges our unsubscribe request
*/
@Override
public Future<Void> off(String topic) {
Promise<Void> future = new DefaultPromise<>(this.eventLoop.next());
ImmutableSet<MqttSubscription> subscriptions = ImmutableSet.copyOf(this.subscriptions.get(topic));
for (MqttSubscription subscription : subscriptions) {
for (MqttSubscription handSub : this.handlerToSubscribtion.get(subscription.getHandler())) {
this.subscriptions.remove(topic, handSub);
}
this.handlerToSubscribtion.remove(subscription.getHandler(), subscription);
}
this.checkSubscribtions(topic, future);
return future;
}
public Future<IRedisCommandResponse> sendRedisCommand(final RedisCommand redisCommand) {
final Future<IRedisCommandResponse> responseFuture;
// logger.info("send push message {} {} {}", channelPromise,
// channelPromise.isSuccess(),
// channelPromise.channel().isActive());
if (channelPromise != null) {
final ChannelPromise readyPromise = this.channelPromise;
final DefaultPromise<IRedisCommandResponse> responsePromise = new DefaultPromise<IRedisCommandResponse>(
readyPromise.channel().eventLoop());
// 提交一个事件
readyPromise.channel().eventLoop().submit(new Runnable() {
@Override
public void run() {
// 将这个结果赋值给responsePromise
NettyClient2.this.responsePromise = responsePromise;
}
});
readyPromise.channel().writeAndFlush(redisCommand).addListener(new GenericFutureListener<ChannelFuture>() {
@Override
public void operationComplete(final ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
// 如果失败了,直接将promise返回
responsePromise.tryFailure(future.cause());
logger.error("send push message error: {},cause={}", redisCommand, future.cause());
} else {
// logger.info("write data to platform success");
}
}
});
responseFuture = responsePromise;
} else {
logger.error("send push error because client is not connected: {}", redisCommand.toString());
responseFuture = new FailedFuture<IRedisCommandResponse>(GlobalEventExecutor.INSTANCE, CONNECT_EXCEPTION);
}
return responseFuture;
}
public Future<IRedisCommandResponse> sendRedisCommand(final RedisCommand redisCommand) {
final Future<IRedisCommandResponse> responseFuture;
if (channelPromise != null) {
final ChannelPromise readyPromise = this.channelPromise;
final DefaultPromise<IRedisCommandResponse> responsePromise = new DefaultPromise<IRedisCommandResponse>(
readyPromise.channel().eventLoop());
// 提交一个事件
readyPromise.channel().eventLoop().submit(new Runnable() {
@Override
public void run() {
// 将这个结果赋值给responsePromise
PlatformSSLClient.this.responsePromise = responsePromise;
}
});
readyPromise.channel().writeAndFlush(redisCommand).addListener(new GenericFutureListener<ChannelFuture>() {
@Override
public void operationComplete(final ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
// 如果失败了,直接将promise返回
responsePromise.tryFailure(future.cause());
logger.error("send push message error: {},cause={}", redisCommand, future.cause());
} else {
// logger.info("write data to platform success");
}
}
});
responseFuture = responsePromise;
} else {
logger.error("send push error because client is not connected: {}", redisCommand.toString());
responseFuture = new FailedFuture<IRedisCommandResponse>(GlobalEventExecutor.INSTANCE, CONNECT_EXCEPTION);
}
return responseFuture;
}
public Future<IRedisCommandResponse> sendRedisCommand(final RedisCommand redisCommand) {
final Future<IRedisCommandResponse> responseFuture;
// logger.info("send push message {} {} {}", channelPromise,
// channelPromise.isSuccess(),
// channelPromise.channel().isActive());
if (channelPromise != null) {
final ChannelPromise readyPromise = this.channelPromise;
final DefaultPromise<IRedisCommandResponse> responsePromise = new DefaultPromise<IRedisCommandResponse>(
readyPromise.channel().eventLoop());
// 提交一个事件
readyPromise.channel().eventLoop().submit(new Runnable() {
@Override
public void run() {
// 将这个结果赋值给responsePromise
NettyClient2.this.responsePromise = responsePromise;
}
});
readyPromise.channel().writeAndFlush(redisCommand).addListener(new GenericFutureListener<ChannelFuture>() {
@Override
public void operationComplete(final ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
// 如果失败了,直接将promise返回
responsePromise.tryFailure(future.cause());
logger.error("send push message error: {},cause={}", redisCommand, future.cause());
} else {
// logger.info("write data to platform success");
}
}
});
responseFuture = responsePromise;
} else {
logger.error("send push error because client is not connected: {}", redisCommand.toString());
responseFuture = new FailedFuture<IRedisCommandResponse>(GlobalEventExecutor.INSTANCE, CONNECT_EXCEPTION);
}
return responseFuture;
}
public Future<IRedisCommandResponse> sendRedisCommand(final RedisCommand redisCommand) {
final Future<IRedisCommandResponse> responseFuture;
if (channelPromise != null) {
final ChannelPromise readyPromise = this.channelPromise;
final DefaultPromise<IRedisCommandResponse> responsePromise = new DefaultPromise<IRedisCommandResponse>(
readyPromise.channel().eventLoop());
// 提交一个事件
readyPromise.channel().eventLoop().submit(new Runnable() {
@Override
public void run() {
// 将这个结果赋值给responsePromise
PlatformSSLClient.this.responsePromise = responsePromise;
}
});
readyPromise.channel().writeAndFlush(redisCommand).addListener(new GenericFutureListener<ChannelFuture>() {
@Override
public void operationComplete(final ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
// 如果失败了,直接将promise返回
responsePromise.tryFailure(future.cause());
logger.error("send push message error: {},cause={}", redisCommand, future.cause());
} else {
// logger.info("write data to platform success");
}
}
});
responseFuture = responsePromise;
} else {
logger.error("send push error because client is not connected: {}", redisCommand.toString());
responseFuture = new FailedFuture<IRedisCommandResponse>(GlobalEventExecutor.INSTANCE, CONNECT_EXCEPTION);
}
return responseFuture;
}
@Test
public void itFailsTheFutureIfTheTlsHandshakeFails() throws Exception {
CompletableFuture<SmtpClientResponse> f = session.startTls();
responseFuture.complete(Lists.newArrayList(OK_RESPONSE));
SslHandler sslHandler = getSslHandler();
// fail the handshake
Exception testException = new Exception();
((DefaultPromise<Channel>) sslHandler.handshakeFuture()).setFailure(testException);
assertThat(f.isCompletedExceptionally()).isTrue();
assertThatThrownBy(f::get).hasCause(testException);
verify(channel).close();
}
@Before
public void reset() {
Mockito.reset(eventLoopGroup, eventLoop, downstreamChannelPool, scheduledFuture, attribute);
channels.clear();
Mockito.when(eventLoopGroup.next()).thenReturn(eventLoop);
Mockito.when(eventLoop.newPromise())
.thenAnswer((Answer<Promise<Object>>) i -> new DefaultPromise<>(GlobalEventExecutor.INSTANCE));
}
@Override
public Subscriber<Long> createSubscriber(WhiteboxSubscriberProbe<Long> probe) {
final ClosedLoopChannel channel = new ClosedLoopChannel();
channel.config().setAutoRead(false);
ChannelFuture registered = eventLoop.register(channel);
final HandlerSubscriber<Long> subscriber = new HandlerSubscriber<>(registered.channel().eventLoop(), 2, 4);
final ProbeHandler<Long> probeHandler = new ProbeHandler<>(probe, Long.class);
final Promise<Void> handlersInPlace = new DefaultPromise<>(eventLoop.next());
registered.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
channel.pipeline().addLast("probe", probeHandler);
channel.pipeline().addLast("subscriber", subscriber);
handlersInPlace.setSuccess(null);
// Channel needs to be active before the subscriber starts responding to demand
channel.pipeline().fireChannelActive();
}
});
if (workAroundIssue277) {
try {
// Wait for the pipeline to be setup, so we're ready to receive elements even if they aren't requested,
// because https://github.com/reactive-streams/reactive-streams-jvm/issues/277
handlersInPlace.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return probeHandler.wrap(subscriber);
}
@Test
public void availableStream0_reusableShouldBeFalse() {
loopGroup.register(channel).awaitUninterruptibly();
Promise<Channel> channelPromise = new DefaultPromise<>(loopGroup.next());
channelPromise.setSuccess(channel);
MultiplexedChannelRecord record = new MultiplexedChannelRecord(channel, 0, Duration.ofSeconds(10));
assertThat(record.acquireStream(null)).isFalse();
}
@Test
public void closeChildChannels_shouldDeliverException() throws ExecutionException, InterruptedException {
EmbeddedChannel channel = newHttp2Channel();
loopGroup.register(channel).awaitUninterruptibly();
Promise<Channel> channelPromise = new DefaultPromise<>(loopGroup.next());
channelPromise.setSuccess(channel);
MultiplexedChannelRecord record = new MultiplexedChannelRecord(channel, 2, Duration.ofSeconds(10));
Promise<Channel> streamPromise = channel.eventLoop().newPromise();
record.acquireStream(streamPromise);
channel.runPendingTasks();
Channel childChannel = streamPromise.get();
VerifyExceptionHandler verifyExceptionHandler = new VerifyExceptionHandler();
childChannel.pipeline().addFirst(verifyExceptionHandler);
IOException ioException = new IOException("foobar");
record.closeChildChannels(ioException);
assertThat(childChannel.pipeline().get(UnusedChannelExceptionHandler.class)).isNotNull();
assertThat(verifyExceptionHandler.exceptionCaught).hasStackTraceContaining("foobar")
.hasRootCauseInstanceOf(IOException.class);
// should be closed by UnusedChannelExceptionHandler
assertThat(childChannel.isOpen()).isFalse();
}
@Test(timeout = 5_000)
public void interruptDuringClosePreservesFlag() throws InterruptedException {
SocketChannel channel = new NioSocketChannel();
try {
loopGroup.register(channel).awaitUninterruptibly();
Promise<Channel> channelPromise = new DefaultPromise<>(loopGroup.next());
channelPromise.setSuccess(channel);
ChannelPool connectionPool = mock(ChannelPool.class);
Promise<Void> releasePromise = Mockito.spy(new DefaultPromise<>(loopGroup.next()));
when(connectionPool.release(eq(channel))).thenReturn(releasePromise);
MultiplexedChannelRecord record = new MultiplexedChannelRecord(channel, 8, null);
Http2MultiplexedChannelPool h2Pool = new Http2MultiplexedChannelPool(connectionPool, loopGroup,
Collections.singleton(record), null);
CompletableFuture<Boolean> interrupteFlagPreserved = new CompletableFuture<>();
Thread t = new Thread(() -> {
try {
h2Pool.close();
} catch (Exception e) {
if (e.getCause() instanceof InterruptedException && Thread.currentThread().isInterrupted()) {
interrupteFlagPreserved.complete(true);
}
}
});
t.start();
t.interrupt();
t.join();
assertThat(interrupteFlagPreserved.join()).isTrue();
} finally {
channel.close().awaitUninterruptibly();
}
}
/**
* Execute command.
*
* @param queryString query string
* @return true if execute successfully, otherwise false
*/
public synchronized boolean execute(final String queryString) {
responseCallback = new DefaultPromise<>(eventLoopGroup.next());
MySQLComQueryPacket comQueryPacket = new MySQLComQueryPacket(queryString);
channel.writeAndFlush(comQueryPacket);
return null != waitExpectedResponse(MySQLOKPacket.class);
}
/**
* Execute update.
*
* @param queryString query string
* @return affected rows
*/
public synchronized int executeUpdate(final String queryString) {
responseCallback = new DefaultPromise<>(eventLoopGroup.next());
MySQLComQueryPacket comQueryPacket = new MySQLComQueryPacket(queryString);
channel.writeAndFlush(comQueryPacket);
return (int) waitExpectedResponse(MySQLOKPacket.class).getAffectedRows();
}
/**
* Execute query.
*
* @param queryString query string
* @return result set
*/
public synchronized InternalResultSet executeQuery(final String queryString) {
responseCallback = new DefaultPromise<>(eventLoopGroup.next());
MySQLComQueryPacket comQueryPacket = new MySQLComQueryPacket(queryString);
channel.writeAndFlush(comQueryPacket);
return waitExpectedResponse(InternalResultSet.class);
}
private void registerSlave() {
responseCallback = new DefaultPromise<>(eventLoopGroup.next());
InetSocketAddress localAddress = (InetSocketAddress) channel.localAddress();
MySQLComRegisterSlaveCommandPacket registerSlaveCommandPacket = new MySQLComRegisterSlaveCommandPacket(serverId, localAddress.getHostName(), username, password, localAddress.getPort());
channel.writeAndFlush(registerSlaveCommandPacket);
waitExpectedResponse(MySQLOKPacket.class);
}