下面列出了怎么用io.netty.util.concurrent.ImmediateEventExecutor的API类实例代码及写法,或者点击链接到github查看源代码。
@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
executor1 = new PluginTaskExecutor(new AtomicLong());
executor1.postConstruct();
embeddedChannel = new EmbeddedChannel();
embeddedChannel.attr(ChannelAttributes.CLIENT_ID).set("test_client");
embeddedChannel.attr(ChannelAttributes.MQTT_VERSION).set(ProtocolVersion.MQTTv5);
when(channelHandlerContext.channel()).thenReturn(embeddedChannel);
when(channelHandlerContext.executor()).thenReturn(ImmediateEventExecutor.INSTANCE);
pluginTaskExecutorService = new PluginTaskExecutorServiceImpl(() -> executor1, mock(ShutdownHooks.class));
pluginInitializerHandler = new PluginInitializerHandler(initializers, pluginTaskExecutorService,
new ServerInformationImpl(new SystemInformationImpl(), listenerConfigurationService),
hiveMQExtensions, clientSessionPersistence, mqttConnacker);
}
@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
executor1 = new PluginTaskExecutor(new AtomicLong());
executor1.postConstruct();
final EmbeddedChannel embeddedChannel = new EmbeddedChannel();
embeddedChannel.attr(ChannelAttributes.CLIENT_ID).set("test_client");
embeddedChannel.attr(ChannelAttributes.MQTT_VERSION).set(ProtocolVersion.MQTTv5);
when(channelHandlerContext.channel()).thenReturn(embeddedChannel);
when(channelHandlerContext.executor()).thenReturn(ImmediateEventExecutor.INSTANCE);
pluginTaskExecutorService = new PluginTaskExecutorServiceImpl(() -> executor1, mock(ShutdownHooks.class));
clientLifecycleEventHandler =
new ClientLifecycleEventHandler(lifecycleEventListeners, pluginTaskExecutorService, hiveMQExtensions);
}
public void testConnectNotExists(Bootstrap cb) throws Throwable {
final Promise<Throwable> promise = ImmediateEventExecutor.INSTANCE.newPromise();
cb.handler(new ChannelInboundHandlerAdapter() {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
promise.trySuccess(cause);
}
});
ChannelFuture future = cb.connect(NetUtil.LOCALHOST, SocketTestPermutation.BAD_PORT);
try {
Channel datagramChannel = future.syncUninterruptibly().channel();
Assert.assertTrue(datagramChannel.isActive());
datagramChannel.writeAndFlush(
Unpooled.copiedBuffer("test", CharsetUtil.US_ASCII)).syncUninterruptibly();
if (!(datagramChannel instanceof OioDatagramChannel)) {
Assert.assertTrue(promise.syncUninterruptibly().getNow() instanceof PortUnreachableException);
}
} finally {
future.channel().close();
}
}
@Test
public void canSendGoAwayUsingVoidPromise() throws Exception {
handler = newHandler();
ByteBuf data = dummyData();
long errorCode = Http2Error.INTERNAL_ERROR.code();
handler = newHandler();
final Throwable cause = new RuntimeException("fake exception");
doAnswer(new Answer<ChannelFuture>() {
@Override
public ChannelFuture answer(InvocationOnMock invocation) throws Throwable {
ChannelPromise promise = invocation.getArgument(4);
assertFalse(promise.isVoid());
// This is what DefaultHttp2FrameWriter does... I hate mocking :-(.
SimpleChannelPromiseAggregator aggregatedPromise =
new SimpleChannelPromiseAggregator(promise, channel, ImmediateEventExecutor.INSTANCE);
aggregatedPromise.newPromise();
aggregatedPromise.doneAllocatingPromises();
return aggregatedPromise.setFailure(cause);
}
}).when(frameWriter).writeGoAway(
any(ChannelHandlerContext.class), anyInt(), anyLong(), any(ByteBuf.class), any(ChannelPromise.class));
handler.goAway(ctx, STREAM_ID, errorCode, data, newVoidPromise(channel));
verify(pipeline).fireExceptionCaught(cause);
}
private SslHandler getSslHandler() throws Exception {
// get SslHandler if it was added to the pipeline
ArgumentCaptor<ChannelHandler> captor = ArgumentCaptor.forClass(ChannelHandler.class);
verify(pipeline).addFirst(captor.capture());
SslHandler sslHandler = (SslHandler) captor.getValue();
// mock and store the context so we can get the handshake future
ChannelHandlerContext context = mock(ChannelHandlerContext.class);
when(context.executor()).thenReturn(ImmediateEventExecutor.INSTANCE);
when(context.channel()).thenReturn(mock(Channel.class, Answers.RETURNS_MOCKS.get()));
// add the handler but prevent the handshake from running automatically
when(channel.isActive()).thenReturn(false);
sslHandler.handlerAdded(context);
return sslHandler;
}
@Override
public void sendAsync(List<Append> appends, CompletedCallback callback) {
Channel ch;
try {
checkClientConnectionClosed();
ch = nettyHandler.getChannel();
} catch (ConnectionFailedException e) {
callback.complete(new ConnectionFailedException("Connection to " + connectionName + " is not established."));
return;
}
PromiseCombiner combiner = new PromiseCombiner(ImmediateEventExecutor.INSTANCE);
for (Append append : appends) {
combiner.add(ch.write(append));
}
ch.flush();
ChannelPromise promise = ch.newPromise();
promise.addListener(future -> {
nettyHandler.setRecentMessage();
Throwable cause = future.cause();
callback.complete(cause == null ? null : new ConnectionFailedException(cause));
});
combiner.finish(promise);
}
private void abort0(Throwable cause) {
final DownstreamSubscription<T> currentSubscription = subscription;
if (currentSubscription != null) {
currentSubscription.abort(cause);
return;
}
final DownstreamSubscription<T> newSubscription = new DownstreamSubscription<>(
this, AbortingSubscriber.get(cause), processor, ImmediateEventExecutor.INSTANCE,
false, false);
if (subscriptionUpdater.compareAndSet(this, null, newSubscription)) {
newSubscription.whenComplete().completeExceptionally(cause);
} else {
subscription.abort(cause);
}
}
private void abort0(Throwable cause) {
final AbortableSubscriber subscriber = this.subscriber;
if (subscriber != null) {
subscriber.abort(cause);
return;
}
final AbortableSubscriber abortable = new AbortableSubscriber(this, AbortingSubscriber.get(cause),
ImmediateEventExecutor.INSTANCE,
false);
if (!subscriberUpdater.compareAndSet(this, null, abortable)) {
this.subscriber.abort(cause);
return;
}
abortable.abort(cause);
abortable.onSubscribe(NoopSubscription.INSTANCE);
}
@Test
void testLeak() {
final ByteBuf buf = Unpooled.buffer();
buf.writeCharSequence("foo", StandardCharsets.UTF_8);
final HttpResponse orig =
AggregatedHttpResponse.of(HttpStatus.OK,
MediaType.PLAIN_TEXT_UTF_8,
PooledHttpData.wrap(buf).withEndOfStream()).toHttpResponse();
final HttpEncodedResponse encoded = new HttpEncodedResponse(
orig, HttpEncodingType.DEFLATE, mediaType -> true, 1);
// Drain the stream.
encoded.subscribe(NoopSubscriber.get(), ImmediateEventExecutor.INSTANCE);
// 'buf' should be released.
assertThat(buf.refCnt()).isZero();
}
@ParameterizedTest
@ArgumentsSource(AbortCauseArgumentProvider.class)
void testEarlyAbortWithSubscriber(@Nullable Throwable cause) {
final DeferredStreamMessage<Object> m = new DeferredStreamMessage<>();
@SuppressWarnings("unchecked")
final Subscriber<Object> subscriber = mock(Subscriber.class);
m.subscribe(subscriber, ImmediateEventExecutor.INSTANCE);
if (cause == null) {
m.abort();
} else {
m.abort(cause);
}
assertAborted(m, cause);
final DefaultStreamMessage<Object> d = new DefaultStreamMessage<>();
m.delegate(d);
assertAborted(d, cause);
}
@ParameterizedTest
@ArgumentsSource(AbortCauseArgumentProvider.class)
void testLateAbortWithSubscriber(@Nullable Throwable cause) {
final DeferredStreamMessage<Object> m = new DeferredStreamMessage<>();
final DefaultStreamMessage<Object> d = new DefaultStreamMessage<>();
@SuppressWarnings("unchecked")
final Subscriber<Object> subscriber = mock(Subscriber.class);
m.subscribe(subscriber, ImmediateEventExecutor.INSTANCE);
m.delegate(d);
verify(subscriber).onSubscribe(any());
if (cause == null) {
m.abort();
} else {
m.abort(cause);
}
if (cause == null) {
verify(subscriber, times(1)).onError(isA(AbortedStreamException.class));
} else {
verify(subscriber, times(1)).onError(isA(cause.getClass()));
}
assertAborted(m, cause);
assertAborted(d, cause);
}
@Test
void testStreaming() {
final DeferredStreamMessage<String> m = new DeferredStreamMessage<>();
final DefaultStreamMessage<String> d = new DefaultStreamMessage<>();
m.delegate(d);
final RecordingSubscriber subscriber = new RecordingSubscriber();
final List<String> recording = subscriber.recording;
m.subscribe(subscriber, ImmediateEventExecutor.INSTANCE);
assertThat(recording).containsExactly("onSubscribe");
d.write("A");
assertThat(recording).containsExactly("onSubscribe", "A");
d.close();
assertThat(recording).containsExactly("onSubscribe", "A", "onComplete");
assertThat(m.isOpen()).isFalse();
assertThat(m.isEmpty()).isFalse();
assertThat(m.whenComplete()).isCompletedWithValue(null);
assertThat(d.isOpen()).isFalse();
assertThat(d.isEmpty()).isFalse();
assertThat(d.whenComplete()).isCompletedWithValue(null);
}
@Test
void closePublisherNormally() throws Exception {
final DefaultStreamMessage<String> publisher = new DefaultStreamMessage<>();
final StreamMessageDuplicator<String> duplicator =
publisher.toDuplicator(ImmediateEventExecutor.INSTANCE);
final CompletableFuture<String> future1 = subscribe(duplicator.duplicate());
final CompletableFuture<String> future2 = subscribe(duplicator.duplicate());
writeData(publisher);
publisher.close();
assertThat(future1.get()).isEqualTo("Armeria is awesome.");
assertThat(future2.get()).isEqualTo("Armeria is awesome.");
duplicator.abort();
}
@Test
void closePublisherExceptionally() throws Exception {
final DefaultStreamMessage<String> publisher = new DefaultStreamMessage<>();
final StreamMessageDuplicator<String> duplicator =
publisher.toDuplicator(ImmediateEventExecutor.INSTANCE);
final CompletableFuture<String> future1 = subscribe(duplicator.duplicate());
final CompletableFuture<String> future2 = subscribe(duplicator.duplicate());
writeData(publisher);
publisher.close(clearTrace(new AnticipatedException()));
assertThatThrownBy(future1::join).hasCauseInstanceOf(AnticipatedException.class);
assertThatThrownBy(future2::join).hasCauseInstanceOf(AnticipatedException.class);
duplicator.abort();
}
@Test
void subscribeAfterPublisherClosed() throws Exception {
final DefaultStreamMessage<String> publisher = new DefaultStreamMessage<>();
final StreamMessageDuplicator<String> duplicator =
publisher.toDuplicator(ImmediateEventExecutor.INSTANCE);
final CompletableFuture<String> future1 = subscribe(duplicator.duplicate());
writeData(publisher);
publisher.close();
assertThat(future1.get()).isEqualTo("Armeria is awesome.");
// Still subscribable.
final CompletableFuture<String> future2 = subscribe(duplicator.duplicate());
assertThat(future2.get()).isEqualTo("Armeria is awesome.");
duplicator.abort();
}
@Test
void childStreamIsNotClosedWhenDemandIsNotEnough() throws Exception {
final DefaultStreamMessage<String> publisher = new DefaultStreamMessage<>();
final StreamMessageDuplicator<String> duplicator =
publisher.toDuplicator(ImmediateEventExecutor.INSTANCE);
final CompletableFuture<String> future1 = new CompletableFuture<>();
final StringSubscriber subscriber = new StringSubscriber(future1, 2);
final StreamMessage<String> sm = duplicator.duplicate();
sm.whenComplete().whenComplete(subscriber);
sm.subscribe(subscriber);
final CompletableFuture<String> future2 = subscribe(duplicator.duplicate(), 3);
writeData(publisher);
publisher.close();
assertThat(future2.get()).isEqualTo("Armeria is awesome.");
assertThat(future1.isDone()).isEqualTo(false);
subscriber.requestAnother();
assertThat(future1.get()).isEqualTo("Armeria is awesome.");
duplicator.abort();
}
@Test
void abortPublisherWithSubscribers() {
for (Throwable abortCause : ABORT_CAUSES) {
final DefaultStreamMessage<String> publisher = new DefaultStreamMessage<>();
final StreamMessageDuplicator<String> duplicator =
publisher.toDuplicator(ImmediateEventExecutor.INSTANCE);
final CompletableFuture<String> future = subscribe(duplicator.duplicate());
if (abortCause == null) {
publisher.abort();
} else {
publisher.abort(abortCause);
}
if (abortCause == null) {
assertThatThrownBy(future::join).hasCauseInstanceOf(AbortedStreamException.class);
} else {
assertThatThrownBy(future::join).hasCauseInstanceOf(abortCause.getClass());
}
duplicator.abort();
}
}
@Test
void abortPublisherWithoutSubscriber() {
for (Throwable abortCause : ABORT_CAUSES) {
final DefaultStreamMessage<String> publisher = new DefaultStreamMessage<>();
final StreamMessageDuplicator<String> duplicator =
publisher.toDuplicator(ImmediateEventExecutor.INSTANCE);
if (abortCause == null) {
publisher.abort();
} else {
publisher.abort(abortCause);
}
// Completed exceptionally once a subscriber subscribes.
final CompletableFuture<String> future = subscribe(duplicator.duplicate());
if (abortCause == null) {
assertThatThrownBy(future::join).hasCauseInstanceOf(AbortedStreamException.class);
} else {
assertThatThrownBy(future::join).hasCauseInstanceOf(abortCause.getClass());
}
duplicator.abort();
}
}
@Test
void closingDuplicatorDoesNotAbortDuplicatedStream() {
final DefaultStreamMessage<ByteBuf> publisher = new DefaultStreamMessage<>();
final StreamMessageDuplicator<ByteBuf> duplicator =
publisher.toDuplicator(ImmediateEventExecutor.INSTANCE);
final ByteBufSubscriber subscriber = new ByteBufSubscriber();
duplicator.duplicate().subscribe(subscriber, ImmediateEventExecutor.INSTANCE);
duplicator.close();
// duplicate() is not allowed anymore.
assertThatThrownBy(duplicator::duplicate).isInstanceOf(IllegalStateException.class);
assertThat(subscriber.completionFuture().isDone()).isFalse();
publisher.close();
assertThat(subscriber.completionFuture().isDone()).isTrue();
}
@Test
void raiseExceptionInOnNext() {
final DefaultStreamMessage<ByteBuf> publisher = new DefaultStreamMessage<>();
final StreamMessageDuplicator<ByteBuf> duplicator =
publisher.toDuplicator(ImmediateEventExecutor.INSTANCE);
final ByteBuf buf = newUnpooledBuffer();
publisher.write(buf);
assertThat(buf.refCnt()).isOne();
// Release the buf after writing to the publisher which must not happen!
buf.release();
final ByteBufSubscriber subscriber = new ByteBufSubscriber();
duplicator.duplicate().subscribe(subscriber, ImmediateEventExecutor.INSTANCE);
assertThatThrownBy(() -> subscriber.completionFuture().get()).hasCauseInstanceOf(
IllegalReferenceCountException.class);
}
public RFuture<V> pollAsync() {
TransferQueueServiceImpl s = new TransferQueueServiceImpl();
RFuture<Boolean> future = remoteService.tryExecuteAsync(TransferQueueService.class, s, ImmediateEventExecutor.INSTANCE, -1, null);
RPromise<V> result = new RedissonPromise<>();
result.setUncancellable();
future.onComplete((r, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
result.trySuccess((V) s.getResult());
});
return result;
}
public RFuture<V> pollAsync(long timeout, TimeUnit unit) {
RPromise<V> result = new RedissonPromise<>();
result.setUncancellable();
TransferQueueServiceImpl s = new TransferQueueServiceImpl();
RFuture<Boolean> future = remoteService.tryExecuteAsync(TransferQueueService.class, s, ImmediateEventExecutor.INSTANCE, timeout, unit);
future.onComplete((r, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
result.trySuccess((V) s.getResult());
});
return result;
}
/**
* Sends a message to this pipe. Returns a {@link Future} that is completed
* when the message is received.
* <p>
* If the pipe is closed then this will return a failed future.</p>
*
* @param message the message to send to the pipe
* @return a {@link Future} that is satisfied when the message is received,
* or a failed future if the pipe is closed.
* @throws NullPointerException if the message is {@code null}.
* @throws IllegalStateException if the message could not be added to the queue for some reason.
* @see #receive()
*/
public Future<Void> send(T message) {
Objects.requireNonNull(message, "msg");
Promise<T> receivePromise;
synchronized (this) {
if (closed) {
return CLOSED_FUTURE;
}
receivePromise = receiveQueue.poll();
if (receivePromise == null) {
Promise<Void> sendPromise = ImmediateEventExecutor.INSTANCE.newPromise();
sendQueue.add(new Node(message, sendPromise));
return sendPromise;
}
}
receivePromise.setSuccess(message);
return SENT_FUTURE;
}
/**
* Receives a message from this pipe.
* <p>
* If the pipe is closed then this will return a failed future.</p>
*/
public Future<T> receive() {
Node node;
synchronized (this) {
node = sendQueue.poll();
if (node == null) {
if (closed) {
return ImmediateEventExecutor.INSTANCE.newFailedFuture(PIPE_CLOSED);
}
Promise<T> promise = ImmediateEventExecutor.INSTANCE.newPromise();
receiveQueue.add(promise);
return promise;
}
}
node.promise.setSuccess(null);
return ImmediateEventExecutor.INSTANCE.newSucceededFuture(node.message);
}
/**
* Simple constructor with the host and port to use to connect to.
* <p>This constructor manages the lifecycle of the {@link TcpClient} and
* underlying resources such as {@link ConnectionProvider},
* {@link LoopResources}, and {@link ChannelGroup}.
* <p>For full control over the initialization and lifecycle of the
* TcpClient, use {@link #ReactorNettyTcpClient(TcpClient, ReactorNettyCodec)}.
* @param host the host to connect to
* @param port the port to connect to
* @param codec for encoding and decoding the input/output byte streams
* @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec
*/
public ReactorNettyTcpClient(String host, int port, ReactorNettyCodec<P> codec) {
Assert.notNull(host, "host is required");
Assert.notNull(codec, "ReactorNettyCodec is required");
this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
this.loopResources = LoopResources.create("tcp-client-loop");
this.poolResources = ConnectionProvider.elastic("tcp-client-pool");
this.codec = codec;
this.tcpClient = TcpClient.create(this.poolResources)
.host(host).port(port)
.runOn(this.loopResources, false)
.doOnConnected(conn -> this.channelGroup.add(conn.channel()));
}
/**
* A variant of {@link #ReactorNettyTcpClient(String, int, ReactorNettyCodec)}
* that still manages the lifecycle of the {@link TcpClient} and underlying
* resources, but allows for direct configuration of other properties of the
* client through a {@code Function<TcpClient, TcpClient>}.
* @param clientConfigurer the configurer function
* @param codec for encoding and decoding the input/output byte streams
* @since 5.1.3
* @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec
*/
public ReactorNettyTcpClient(Function<TcpClient, TcpClient> clientConfigurer, ReactorNettyCodec<P> codec) {
Assert.notNull(codec, "ReactorNettyCodec is required");
this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
this.loopResources = LoopResources.create("tcp-client-loop");
this.poolResources = ConnectionProvider.elastic("tcp-client-pool");
this.codec = codec;
this.tcpClient = clientConfigurer.apply(TcpClient
.create(this.poolResources)
.runOn(this.loopResources, false)
.doOnConnected(conn -> this.channelGroup.add(conn.channel())));
}
@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
handler = new SubscribeHandler(clientSessionSubscriptionPersistence, retainedMessagePersistence, sharedSubscriptionService, eventLog, retainedMessagesSender, mqttConfigurationService);
channel = new EmbeddedChannel(handler);
channel.attr(ChannelAttributes.CLIENT_ID).set("client");
when(clientSessionSubscriptionPersistence.addSubscription(anyString(), any(Topic.class))).thenReturn(Futures.immediateFuture(null));
when(clientSessionSubscriptionPersistence.addSubscriptions(anyString(), any(ImmutableSet.class))).thenReturn(Futures.<Void>immediateFuture(null));
when(ctx.channel()).thenReturn(channel);
when(ctx.writeAndFlush(anyObject())).thenReturn(channelFuture);
when(ctx.executor()).thenReturn(ImmediateEventExecutor.INSTANCE);
}
public void testLocalAddressAfterConnect(ServerBootstrap sb, Bootstrap cb) throws Throwable {
Channel serverChannel = null;
Channel clientChannel = null;
try {
final Promise<InetSocketAddress> localAddressPromise = ImmediateEventExecutor.INSTANCE.newPromise();
serverChannel = sb.childHandler(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
localAddressPromise.setSuccess((InetSocketAddress) ctx.channel().localAddress());
}
}).bind().syncUninterruptibly().channel();
clientChannel = cb.handler(new ChannelInboundHandlerAdapter()).register().syncUninterruptibly().channel();
assertNull(clientChannel.localAddress());
assertNull(clientChannel.remoteAddress());
clientChannel.connect(serverChannel.localAddress()).syncUninterruptibly().channel();
assertLocalAddress((InetSocketAddress) clientChannel.localAddress());
assertNotNull(clientChannel.remoteAddress());
assertLocalAddress(localAddressPromise.get());
} finally {
if (clientChannel != null) {
clientChannel.close().syncUninterruptibly();
}
if (serverChannel != null) {
serverChannel.close().syncUninterruptibly();
}
}
}
@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
frameWriter = new DefaultHttp2FrameWriter();
outbound = Unpooled.buffer();
expectedOutbound = Unpooled.EMPTY_BUFFER;
promise = new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE);
http2HeadersEncoder = new DefaultHttp2HeadersEncoder();
Answer<Object> answer = new Answer<Object>() {
@Override
public Object answer(InvocationOnMock var1) throws Throwable {
Object msg = var1.getArgument(0);
if (msg instanceof ByteBuf) {
outbound.writeBytes((ByteBuf) msg);
}
ReferenceCountUtil.release(msg);
return future;
}
};
when(ctx.write(any())).then(answer);
when(ctx.write(any(), any(ChannelPromise.class))).then(answer);
when(ctx.alloc()).thenReturn(UnpooledByteBufAllocator.DEFAULT);
when(ctx.channel()).thenReturn(channel);
when(ctx.executor()).thenReturn(ImmediateEventExecutor.INSTANCE);
}
@Override
protected void checkDeadLock() {
EventExecutor e = executor();
if (e != null && e != ImmediateEventExecutor.INSTANCE && e.inEventLoop()) {
throw new BlockingOperationException();
}
}