下面列出了io.netty.channel.embedded.EmbeddedChannel#runPendingTasks ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
static Reply sendRequest(EmbeddedChannel channel, Request request) throws Exception {
channel.writeInbound(request);
Object encodedReply = channel.readOutbound();
for (int i = 0; encodedReply == null && i < 50; i++) {
channel.runPendingTasks();
Thread.sleep(10);
encodedReply = channel.readOutbound();
}
if (encodedReply == null) {
throw new IllegalStateException("No reply to request: " + request);
}
WireCommand decoded = CommandDecoder.parseCommand((ByteBuf) encodedReply);
((ByteBuf) encodedReply).release();
assertNotNull(decoded);
return (Reply) decoded;
}
@Test
public void nonAckPingsResultInOneChannelException() {
PipelineExceptionCatcher catcher = new PipelineExceptionCatcher();
EmbeddedChannel channel = createHttp2Channel(fastChecker, catcher);
channel.eventLoop().scheduleAtFixedRate(() -> channel.writeInbound(new DefaultHttp2PingFrame(0, false)),
0, FAST_CHECKER_DURATION_MILLIS, TimeUnit.MILLISECONDS);
Instant runEnd = Instant.now().plus(1, SECONDS);
while (Instant.now().isBefore(runEnd)) {
channel.runPendingTasks();
}
assertThat(catcher.caughtExceptions).hasSize(1);
assertThat(catcher.caughtExceptions.get(0)).isInstanceOf(IOException.class);
}
@Test
public void testDeniedRule() throws UnknownHostException {
List<Http1DeterministicRuleEngineConfig.Rule> blacklist = new ArrayList<>();
HashMultimap<String, String> headers = HashMultimap.create();
headers.put("User-Agent", "Bad-actor: 1.0");
Http1DeterministicRuleEngineConfig.Rule bad =
new Http1DeterministicRuleEngineConfig.Rule(
HttpMethod.GET, "/path/to/failure", HttpVersion.HTTP_1_0, headers);
blacklist.add(bad);
Http1Filter http1Filter =
new Http1Filter(new Http1FilterConfig(ImmutableList.copyOf(blacklist)));
EmbeddedChannel chDeny = new EmbeddedChannel(http1Filter);
DefaultHttpRequest request =
new DefaultHttpRequest(HttpVersion.HTTP_1_0, HttpMethod.GET, "/path/to/failure");
request.headers().set("User-Agent", "Bad-actor: 1.0");
chDeny.writeInbound(request);
chDeny.runPendingTasks();
assertFalse(chDeny.isActive());
assertFalse(chDeny.isOpen());
}
@Test
public void test_wildcard_subscription_qos_downgraded_to_actual_subscription() throws Exception {
when(retainedMessagePersistence.get("topic")).thenReturn(Futures.immediateFuture(
new RetainedMessage("test".getBytes(UTF_8), QoS.EXACTLY_ONCE, 1L,
MqttConfigurationDefaults.TTL_DISABLED)));
final ImmutableSet.Builder<String> builder = ImmutableSet.builder();
builder.add("topic");
final Set<String> set = builder.build();
when(retainedMessagePersistence.getWithWildcards("#")).thenReturn(Futures.immediateFuture(set));
final List<SubscriptionResult> subscriptions = newArrayList(subResult(new Topic("#", QoS.AT_MOST_ONCE), false));
final SendRetainedMessagesListener listener = createListener(subscriptions, ignoredTopics);
final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new PublishUserEventReceivedHandler());
listener.operationComplete(embeddedChannel.newSucceededFuture());
embeddedChannel.runPendingTasks();
final PUBLISH publish = (PUBLISH) embeddedChannel.outboundMessages().element();
assertEquals(QoS.AT_MOST_ONCE, publish.getQoS());
}
@Test
public void test_subscription_shared() throws Exception {
when(retainedMessagePersistence.get("topic")).thenReturn(Futures.immediateFuture(
new RetainedMessage("test".getBytes(UTF_8), QoS.EXACTLY_ONCE, 1L,
MqttConfigurationDefaults.TTL_DISABLED)));
final Set<String> set = ImmutableSet.of("topic");
when(retainedMessagePersistence.getWithWildcards("#")).thenReturn(Futures.immediateFuture(set));
final Topic topic = new Topic("#", QoS.EXACTLY_ONCE);
final List<SubscriptionResult> subscriptions = newArrayList(new SubscriptionResult(topic, false, "shareName"));
final SendRetainedMessagesListener listener = createListener(subscriptions, ignoredTopics);
final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new PublishUserEventReceivedHandler());
listener.operationComplete(embeddedChannel.newSucceededFuture());
embeddedChannel.runPendingTasks();
assertEquals(0, embeddedChannel.outboundMessages().size());
}
@Test
public void testThreadBoundaries() throws Exception {
Thread thread =
new Thread(
() -> {
EmbeddedChannel channel =
new EmbeddedChannel(
new Http1ServerCodec(),
new HttpServerTracingHandler(httpTracingState),
new ApplicationHandler());
DefaultFullHttpRequest request = new DefaultFullHttpRequest(HTTP_1_1, GET, "/foo");
channel.writeInbound(request);
channel.runPendingTasks();
synchronized (httpTracing) {
httpTracing.notify();
}
});
thread.start();
synchronized (httpTracing) {
httpTracing.wait();
}
Assert.assertEquals(1, spans.size());
}
@Before
public void setUp() {
childChannelInitializer = new TestChannelInitializer();
parentChannel = new EmbeddedChannel();
writer = new Writer();
parentChannel.connect(new InetSocketAddress(0));
codec = new TestableHttp2MultiplexCodecBuilder(true, childChannelInitializer).build();
parentChannel.pipeline().addLast(codec);
parentChannel.runPendingTasks();
Http2Settings settings = new Http2Settings().initialWindowSize(initialRemoteStreamWindow);
codec.onHttp2Frame(new DefaultHttp2SettingsFrame(settings));
inboundStream = codec.newStream();
inboundStream.id = 3;
outboundStream = codec.newStream();
outboundStream.id = 2;
}
@Test
public void register_withHttp1Protocol_doesNotPing() {
EmbeddedChannel channel = createHttp1Channel(fastChecker);
channel.runPendingTasks();
DefaultHttp2PingFrame sentFrame = channel.readOutbound();
assertThat(sentFrame).isNull();
}
@Test(timeout = 5000)
public void test_publish_avaliable_channel_inactive() {
final EmbeddedChannel channel = new EmbeddedChannel();
channel.attr(ChannelAttributes.IN_FLIGHT_MESSAGES_SENT).set(true);
channel.attr(ChannelAttributes.IN_FLIGHT_MESSAGES).set(new AtomicInteger(0));
channel.close();
when(clientSessionLocalPersistence.getSession("client")).thenReturn(new ClientSession(true, 1000L));
when(channelPersistence.get("client")).thenReturn(channel);
clientQueuePersistence.publishAvailable("client");
channel.runPendingTasks();
verify(publishPollService, never()).pollNewMessages("client", channel);
}
@Test
public void ignoredPingsResultInOneChannelException() throws InterruptedException {
PipelineExceptionCatcher catcher = new PipelineExceptionCatcher();
EmbeddedChannel channel = createHttp2Channel(fastChecker, catcher);
Thread.sleep(FAST_CHECKER_DURATION_MILLIS);
channel.runPendingTasks();
assertThat(catcher.caughtExceptions).hasSize(1);
assertThat(catcher.caughtExceptions.get(0)).isInstanceOf(IOException.class);
}
@Test
public void idleTimerDoesNotApplyBeforeFirstChannelIsCreated() throws InterruptedException {
int idleTimeoutMillis = 1000;
EmbeddedChannel channel = newHttp2Channel();
MultiplexedChannelRecord record = new MultiplexedChannelRecord(channel, 2, Duration.ofMillis(idleTimeoutMillis));
Thread.sleep(idleTimeoutMillis * 2);
channel.runPendingTasks();
assertThat(channel.isOpen()).isTrue();
}
@Test
public void writeTest() throws Exception {
Integer payload = new Integer(1);
Request request = requestMuxer.write(payload);
CountDownLatch done = new CountDownLatch(1);
Futures.addCallback(
request.getWriteFuture(),
new FutureCallback<UUID>() {
@Override
public void onSuccess(UUID id) {
setSuccess(true);
done.countDown();
}
@Override
public void onFailure(Throwable throwable) {
setFailure(true);
done.countDown();
}
});
EmbeddedChannel channel = channels.get(0);
channel.runPendingTasks();
Uninterruptibles.awaitUninterruptibly(done); // block
channel.runPendingTasks();
assertTrue(request.getWriteFuture().isDone());
assertFalse(failure);
assertTrue(success);
Integer written = (Integer) channel.outboundMessages().peek();
assertEquals(payload, written);
requestMuxer.close();
}
@Test
public void test_wildcard_subscription_retained_messages_available_no_wildcard() throws Exception {
when(retainedMessagePersistence.get("topic")).thenReturn(Futures.immediateFuture(
new RetainedMessage("test".getBytes(UTF_8), QoS.EXACTLY_ONCE, 1L,
MqttConfigurationDefaults.TTL_DISABLED)));
when(retainedMessagePersistence.get("topic2")).thenReturn(Futures.immediateFuture(
new RetainedMessage("test".getBytes(UTF_8), QoS.AT_MOST_ONCE, 1L,
MqttConfigurationDefaults.TTL_DISABLED)));
final ImmutableSet<String> set = ImmutableSet.of("topic", "topic2");
when(retainedMessagePersistence.getWithWildcards("#")).thenReturn(Futures.immediateFuture(set));
final List<SubscriptionResult> subscriptions = newArrayList(
subResult(new Topic("topic", QoS.EXACTLY_ONCE), false),
subResult(new Topic("topic2", QoS.AT_MOST_ONCE), false));
final SendRetainedMessagesListener listener = createListener(subscriptions, ignoredTopics);
final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new PublishUserEventReceivedHandler());
embeddedChannel.attr(ChannelAttributes.CLIENT_ID).set("client");
listener.operationComplete(embeddedChannel.newSucceededFuture());
embeddedChannel.runPendingTasks();
embeddedChannel.runPendingTasks();
final ArgumentCaptor<List<PUBLISH>> captor =
ArgumentCaptor.forClass((Class<List<PUBLISH>>) (Class) ArrayList.class);
verify(queuePersistence).add(eq("client"), eq(false), captor.capture(), eq(true));
final PUBLISH publish = captor.getAllValues().get(0).get(0);
assertEquals("topic", publish.getTopic());
assertEquals(QoS.EXACTLY_ONCE, publish.getQoS());
assertArrayEquals("test".getBytes(UTF_8), publish.getPayload());
assertEquals(true, publish.isRetain());
final PUBLISH publish2 = (PUBLISH) embeddedChannel.outboundMessages().poll();
assertEquals("topic2", publish2.getTopic());
assertEquals(QoS.AT_MOST_ONCE, publish2.getQoS());
assertArrayEquals("test".getBytes(UTF_8), publish2.getPayload());
assertEquals(true, publish2.isRetain());
}
public static ByteBuf encodeRequest(HttpRequest request) {
EmbeddedChannel channel = new EmbeddedChannel();
channel.pipeline().addLast("http request encoder", new HttpRequestEncoder());
channel.writeOutbound(request);
channel.runPendingTasks();
return channel.readOutbound();
}
@Test
public void closeChildChannels_shouldDeliverException() throws ExecutionException, InterruptedException {
EmbeddedChannel channel = newHttp2Channel();
loopGroup.register(channel).awaitUninterruptibly();
Promise<Channel> channelPromise = new DefaultPromise<>(loopGroup.next());
channelPromise.setSuccess(channel);
MultiplexedChannelRecord record = new MultiplexedChannelRecord(channel, 2, Duration.ofSeconds(10));
Promise<Channel> streamPromise = channel.eventLoop().newPromise();
record.acquireStream(streamPromise);
channel.runPendingTasks();
Channel childChannel = streamPromise.get();
VerifyExceptionHandler verifyExceptionHandler = new VerifyExceptionHandler();
childChannel.pipeline().addFirst(verifyExceptionHandler);
IOException ioException = new IOException("foobar");
record.closeChildChannels(ioException);
assertThat(childChannel.pipeline().get(UnusedChannelExceptionHandler.class)).isNotNull();
assertThat(verifyExceptionHandler.exceptionCaught).hasStackTraceContaining("foobar")
.hasRootCauseInstanceOf(IOException.class);
// should be closed by UnusedChannelExceptionHandler
assertThat(childChannel.isOpen()).isFalse();
}
@Test
public void testUserDefinedWritability2() {
final StringBuilder buf = new StringBuilder();
EmbeddedChannel ch = new EmbeddedChannel(new ChannelInboundHandlerAdapter() {
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
buf.append(ctx.channel().isWritable());
buf.append(' ');
}
});
ch.config().setWriteBufferLowWaterMark(128);
ch.config().setWriteBufferHighWaterMark(256);
ChannelOutboundBuffer cob = ch.unsafe().outboundBuffer();
// Ensure that setting a user-defined writability flag to false affects channel.isWritable()
cob.setUserDefinedWritability(1, false);
ch.runPendingTasks();
assertThat(buf.toString(), is("false "));
// Ensure that setting another user-defined writability flag to false does not trigger
// channelWritabilityChanged.
cob.setUserDefinedWritability(2, false);
ch.runPendingTasks();
assertThat(buf.toString(), is("false "));
// Ensure that setting only one user-defined writability flag to true does not affect channel.isWritable()
cob.setUserDefinedWritability(1, true);
ch.runPendingTasks();
assertThat(buf.toString(), is("false "));
// Ensure that setting all user-defined writability flags to true affects channel.isWritable()
cob.setUserDefinedWritability(2, true);
ch.runPendingTasks();
assertThat(buf.toString(), is("false true "));
safeClose(ch);
}
@Test
public void acquireRequestResetsCloseTimer() throws InterruptedException {
int idleTimeoutMillis = 1000;
EmbeddedChannel channel = newHttp2Channel();
MultiplexedChannelRecord record = new MultiplexedChannelRecord(channel, 2, Duration.ofMillis(idleTimeoutMillis));
for (int i = 0; i < 20; ++i) {
Thread.sleep(idleTimeoutMillis / 10);
channel.runPendingTasks();
Promise<Channel> streamPromise = channel.eventLoop().newPromise();
assertThat(record.acquireStream(streamPromise)).isTrue();
channel.runPendingTasks();
assertThat(streamPromise.isSuccess()).isTrue();
assertThat(channel.isOpen()).isTrue();
record.closeAndReleaseChild(streamPromise.getNow());
channel.runPendingTasks();
}
assertThat(channel.isOpen()).isTrue();
Thread.sleep(idleTimeoutMillis * 2);
channel.runPendingTasks();
assertThat(channel.isOpen()).isFalse();
}
@Test
public void testEncodeDecodeRandomEnvelopes() throws Exception {
final InboundEnvelopeDecoder decoder = new InboundEnvelopeDecoder(this.bufferProviderBroker);
final EmbeddedChannel ch = new EmbeddedChannel(
new OutboundEnvelopeEncoder(), decoder);
when(this.bufferProviderBroker.getBufferProvider(anyJobId(), anyChannelId()))
.thenReturn(this.bufferProvider);
when(this.bufferProvider.requestBuffer(anyInt())).thenAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
// fulfill the buffer request with the requested size
return allocBuffer((Integer) invocation.getArguments()[0]);
}
});
Random randomAnswerSource = new Random(RANDOM_SEED);
RandomBufferRequestAnswer randomBufferRequestAnswer = new RandomBufferRequestAnswer(randomAnswerSource);
RandomBufferAvailabilityRegistrationAnswer randomBufferAvailabilityRegistrationAnswer =
new RandomBufferAvailabilityRegistrationAnswer(randomAnswerSource, randomBufferRequestAnswer);
when(this.bufferProvider.requestBuffer(anyInt())).thenAnswer(randomBufferRequestAnswer);
when(this.bufferProvider.registerBufferAvailabilityListener(Matchers.<BufferAvailabilityListener>anyObject()))
.thenAnswer(randomBufferAvailabilityRegistrationAnswer);
// --------------------------------------------------------------------
Envelope[] envelopes = nextRandomEnvelopes(1024);
ByteBuf buf = encode(ch, envelopes);
ByteBuf[] slices = randomSlices(buf);
for (ByteBuf slice : slices) {
int refCount = slice.refCnt();
ch.writeInbound(slice);
// registered BufferAvailabilityListener => call bufferAvailable(buffer)
while (randomBufferAvailabilityRegistrationAnswer.isRegistered()) {
randomBufferAvailabilityRegistrationAnswer.unregister();
Assert.assertFalse(ch.config().isAutoRead());
Assert.assertEquals(refCount + 1, slice.refCnt());
// return a buffer of max size => decoder needs to limit buffer size
decoder.bufferAvailable(allocBuffer(MAX_BUFFER_SIZE));
ch.runPendingTasks();
}
Assert.assertEquals(refCount - 1, slice.refCnt());
Assert.assertTrue(ch.config().isAutoRead());
}
Envelope[] expected = randomBufferAvailabilityRegistrationAnswer.removeSkippedEnvelopes(envelopes);
decodeAndVerify(ch, expected);
Assert.assertEquals(1, buf.refCnt());
buf.release();
}
@Test
public void testBufferStaging() throws Exception {
final InboundEnvelopeDecoder decoder = new InboundEnvelopeDecoder(this.bufferProviderBroker);
final EmbeddedChannel ch = new EmbeddedChannel(
new OutboundEnvelopeEncoder(),
decoder);
when(this.bufferProviderBroker.getBufferProvider(anyJobId(), anyChannelId()))
.thenReturn(this.bufferProvider);
// --------------------------------------------------------------------
Envelope[] envelopes = nextEnvelopes(3, true);
ByteBuf buf = encode(ch, envelopes);
when(this.bufferProvider.registerBufferAvailabilityListener(Matchers.<BufferAvailabilityListener>anyObject()))
.thenReturn(BufferAvailabilityRegistration.SUCCEEDED_REGISTERED);
Buffer buffer = allocBuffer(envelopes[2].getBuffer().size());
when(this.bufferProvider.requestBuffer(anyInt()))
.thenReturn(null, null, buffer, null);
// --------------------------------------------------------------------
// slices: [0] => full envelope, [1] => half envelope, [2] => remaining half + full envelope
ByteBuf[] slices = slice(buf,
OutboundEnvelopeEncoder.HEADER_SIZE + envelopes[0].getBuffer().size(),
OutboundEnvelopeEncoder.HEADER_SIZE + envelopes[1].getBuffer().size() / 2);
// 1. no buffer available, incoming slice contains all data
int refCount = slices[0].refCnt();
decodeAndVerify(ch, slices[0]);
Assert.assertEquals(refCount + 1, slices[0].refCnt());
Assert.assertFalse(ch.config().isAutoRead());
// notify of available buffer (=> bufferAvailable() callback does return a buffer
// of the current network buffer size; the decoder needs to adjust its size to the
// requested size
decoder.bufferAvailable(allocBuffer(envelopes[0].getBuffer().size() * 2));
ch.runPendingTasks();
Assert.assertEquals(refCount - 1, slices[0].refCnt());
Assert.assertTrue(ch.config().isAutoRead());
decodeAndVerify(ch, envelopes[0]);
// 2. no buffer available, incoming slice does NOT contain all data
refCount = slices[1].refCnt();
decodeAndVerify(ch, slices[1]);
Assert.assertEquals(refCount + 1, slices[1].refCnt());
Assert.assertFalse(ch.config().isAutoRead());
decoder.bufferAvailable(allocBuffer());
ch.runPendingTasks();
Assert.assertEquals(refCount - 1, slices[1].refCnt());
Assert.assertTrue(ch.config().isAutoRead());
decodeAndVerify(ch);
// 3. buffer available
refCount = slices[2].refCnt();
decodeAndVerify(ch, slices[2], envelopes[1], envelopes[2]);
Assert.assertEquals(refCount - 1, slices[2].refCnt());
Assert.assertTrue(ch.config().isAutoRead());
Assert.assertEquals(1, buf.refCnt());
buf.release();
}
@Test
public void testHttpRequest() throws Exception {
assertTrue(OpenSsl.isAvailable());
X509Certificate selfSignedCert = SelfSignedX509CertGenerator.generate("*.test.com");
java.security.cert.X509Certificate[] chain = new java.security.cert.X509Certificate[1];
chain[0] = selfSignedCert.getCert();
SslContext sslContext =
SslContextBuilder.forServer(selfSignedCert.getKey(), chain)
.sslProvider(SslProvider.OPENSSL)
.build();
HttpsUpgradeHandler cleartextHandler = new HttpsUpgradeHandler();
GentleSslHandler upgradeHandler = new GentleSslHandler(sslContext, cleartextHandler);
ByteBuf rawRequest = encodeRequest(new DefaultHttpRequest(HTTP_1_1, GET, "/"));
EmbeddedChannel channel = new EmbeddedChannel();
channel.pipeline().addLast("upgradeHandler", upgradeHandler);
channel.writeInbound(rawRequest);
channel.runPendingTasks(); // blocks
HttpResponse response = Recipes.decodeResponse(Recipes.extractBuffers(channel));
assertTrue(response != null);
assertEquals(response.status(), UPGRADE_REQUIRED);
assertThat(
Arrays.asList(response.headers().get(HttpHeaderNames.CONNECTION).split(",")),
IsCollectionContaining.hasItem(HttpHeaderValues.CLOSE.toString()));
assertThat(
Arrays.asList(response.headers().get(HttpHeaderNames.CONNECTION).split(",")),
IsCollectionContaining.hasItem(HttpHeaderValues.UPGRADE.toString()));
assertThat(
Arrays.asList(response.headers().get(HttpHeaderNames.UPGRADE).split(",")),
IsCollectionContaining.hasItem("TLS/1.2"));
assertThat(
Arrays.asList(response.headers().get(HttpHeaderNames.UPGRADE).split(",")),
IsCollectionContaining.hasItem("HTTP/1.1"));
}