下面列出了io.netty.channel.embedded.EmbeddedChannel#close ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testFailureDecodeBadVoteField() throws Exception {
// Create a well-formed request
EmbeddedChannel channel = createChannel();
Vote vote = new Vote("Test", "test", "test", "0");
JSONObject object = new JSONObject();
JsonObject payload = vote.serialize();
String payloadEncoded = GsonInst.gson.toJson(payload);
// We "forget" the challenge.
object.put("payload", payloadEncoded);
Mac mac = Mac.getInstance("HmacSHA256");
mac.init(TestVotifierPlugin.getI().getTokens().get("default"));
object.put("signature",
Base64.getEncoder().encodeToString(mac.doFinal(payloadEncoded.getBytes(StandardCharsets.UTF_8))));
assertThrows(DecoderException.class, () -> channel.writeInbound(object.toString()));
channel.close();
}
@Test
public void testCloseFutureNotified() throws Exception {
SslHandler handler = new SslHandler(SSLContext.getDefault().createSSLEngine());
EmbeddedChannel ch = new EmbeddedChannel(handler);
ch.close();
// When the channel is closed the SslHandler will write an empty buffer to the channel.
ByteBuf buf = ch.readOutbound();
assertFalse(buf.isReadable());
buf.release();
assertFalse(ch.finishAndReleaseAll());
assertTrue(handler.handshakeFuture().cause() instanceof ClosedChannelException);
assertTrue(handler.sslCloseFuture().cause() instanceof ClosedChannelException);
}
@Test
// verifies backward compatibility
public void testConcatenatedStreamsReadFirstOnly() throws IOException {
EmbeddedChannel chDecoderGZip = new EmbeddedChannel(createDecoder(ZlibWrapper.GZIP));
try {
byte[] bytes = IOUtils.toByteArray(getClass().getResourceAsStream("/multiple.gz"));
assertTrue(chDecoderGZip.writeInbound(Unpooled.copiedBuffer(bytes)));
Queue<Object> messages = chDecoderGZip.inboundMessages();
assertEquals(1, messages.size());
ByteBuf msg = (ByteBuf) messages.poll();
assertEquals("a", msg.toString(CharsetUtil.UTF_8));
ReferenceCountUtil.release(msg);
} finally {
assertFalse(chDecoderGZip.finish());
chDecoderGZip.close();
}
}
private void sendVote(Vote vote, Key key, boolean expectSuccess) throws Exception {
// Create a well-formed request
EmbeddedChannel channel = createChannel();
JSONObject object = new JSONObject();
JsonObject payload = vote.serialize();
payload.addProperty("challenge", SESSION.getChallenge());
String payloadEncoded = GsonInst.gson.toJson(payload);
object.put("payload", payloadEncoded);
Mac mac = Mac.getInstance("HmacSHA256");
mac.init(key);
object.put("signature",
Base64.getEncoder().encodeToString(mac.doFinal(payloadEncoded.getBytes(StandardCharsets.UTF_8))));
if (expectSuccess) {
assertTrue(channel.writeInbound(object.toString()));
assertEquals(vote, channel.readInbound());
assertFalse(channel.finish());
} else {
try {
channel.writeInbound(object.toString());
} finally {
channel.close();
}
}
}
/**
* Does a test to see that a health check request results in expected response from the health check handler
* @param httpMethod the {@link HttpMethod} for the request.
* @param keepAlive true if keep alive has to be set in the request, false otherwise
* @throws IOException
*/
private void testHealthCheckRequest(HttpMethod httpMethod, boolean isServiceUp, boolean keepAlive)
throws IOException {
EmbeddedChannel channel = createChannel();
for (int i = 0; i < 2; i++) {
if (isServiceUp) {
restServerState.markServiceUp();
}
HttpRequest request = RestTestUtils.createRequest(httpMethod, healthCheckUri, null);
HttpUtil.setKeepAlive(request, keepAlive);
FullHttpResponse response = sendRequestAndGetResponse(channel, request);
HttpResponseStatus httpResponseStatus =
(isServiceUp) ? HttpResponseStatus.OK : HttpResponseStatus.SERVICE_UNAVAILABLE;
assertEquals("Unexpected response status", httpResponseStatus, response.status());
String expectedStr = (isServiceUp) ? goodStr : badStr;
assertEquals("Unexpected content", expectedStr, RestTestUtils.getContentString(response));
restServerState.markServiceDown();
if (keepAlive && isServiceUp) {
Assert.assertTrue("Channel should not be closed ", channel.isOpen());
} else {
Assert.assertFalse("Channel should have been closed by now ", channel.isOpen());
channel = createChannel();
}
}
channel.close();
}
@Test
public void testDelimitedLengths() throws Exception {
Charset charset = CharsetUtil.ISO_8859_1;
EmbeddedChannel ch = getTestChannel(charset, 100, 0, false);
String v1 = "a";
String v2 = "abcdefghij";
String v3 = "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrsxt"
+ "uvwxyz";
writeStringAndAssert(ch, v1, charset, false, false);
writeStringAndAssert(ch, v2, charset, false, false);
writeStringAndAssert(ch, v3, charset, false, true);
writeStringAndAssert(ch, v1, charset, true, false);
writeStringAndAssert(ch, v2, charset, true, false);
writeStringAndAssert(ch, v3, charset, true, true);
writeStringAndAssert(ch, v1, charset, false, false);
ch.close();
}
/**
* Asks the server to write more data than the set Content-Length and checks behavior.
* @param chunkCount the number of chunks of {@link MockNettyMessageProcessor#CHUNK} to use to set Content-Length.
* @throws Exception
*/
private void doWriteMoreThanContentLengthTest(int chunkCount) throws Exception {
EmbeddedChannel channel = createEmbeddedChannel();
MockNettyMessageProcessor processor = channel.pipeline().get(MockNettyMessageProcessor.class);
HttpHeaders httpHeaders = new DefaultHttpHeaders();
httpHeaders.set(MockNettyMessageProcessor.CHUNK_COUNT_HEADER_NAME, chunkCount);
HttpRequest httpRequest =
RestTestUtils.createRequest(HttpMethod.POST, TestingUri.WriteMoreThanContentLength.toString(), httpHeaders);
HttpUtil.setKeepAlive(httpRequest, true);
channel.writeInbound(httpRequest);
try {
verifyCallbacks(processor);
fail("One of the callbacks should have failed because the data written was more than Content-Length");
} catch (IllegalStateException e) {
// expected. Nothing to do.
}
// It doesn't matter what the response is - because it may either fail or succeed depending on certain race
// conditions. What matters is that the programming error is caught appropriately by NettyResponseChannel and it
// makes a callback with the right exception.
while (channel.readOutbound() != null) {
}
channel.close();
}
@Test
public void testAcksLastMessageInBatch() {
EmbeddedChannel embeddedChannel = new EmbeddedChannel(new BeatsHandler(spyListener));
embeddedChannel.writeInbound(batch);
assertEquals(messageCount, spyListener.getLastMessages().size());
Ack ack = embeddedChannel.readOutbound();
assertEquals(ack.getProtocol(), Protocol.VERSION_1);
assertEquals(ack.getSequence(), startSequenceNumber + messageCount - 1);
embeddedChannel.close();
}
@Test
public void TestItCalledOnConnectionCloseOnListenerWhenChannelIsRemoved() {
EmbeddedChannel embeddedChannel = new EmbeddedChannel(new BeatsHandler(spyListener));
embeddedChannel.writeInbound(batch);
embeddedChannel.close();
assertTrue(spyListener.isOnConnectionCloseCalled());
}
/**
* Does a test to see that two consecutive requests without sending last http content for first request fails
* @param httpMethod the {@link HttpMethod} for the request.
* @param uri Uri to be used during the request
* @param useSSL {@code true} to test SSL logging.
* @throws Exception
*/
private void doRequestHandleWithMultipleRequest(HttpMethod httpMethod, String uri, boolean useSSL) throws Exception {
EmbeddedChannel channel = createChannel(useSSL);
// contains one logged request header
HttpHeaders headers1 = new DefaultHttpHeaders();
headers1.add(HttpHeaderNames.CONTENT_LENGTH, new Random().nextLong());
HttpRequest request = RestTestUtils.createRequest(httpMethod, uri, headers1);
HttpUtil.setKeepAlive(request, true);
channel.writeInbound(request);
// contains one logged and not logged header
HttpHeaders headers2 = new DefaultHttpHeaders();
headers2.add(NOT_LOGGED_HEADER_KEY + "1", "headerValue1");
headers2.add(HttpHeaderNames.CONTENT_LENGTH, new Random().nextLong());
// sending another request w/o sending last http content
request = RestTestUtils.createRequest(httpMethod, uri, headers2);
HttpUtil.setKeepAlive(request, true);
sendRequestCheckResponse(channel, request, uri, headers2, false, false, useSSL);
Assert.assertTrue("Channel should not be closed ", channel.isOpen());
// verify that headers from first request is not found in public access log
String lastLogEntry = publicAccessLogger.getLastPublicAccessLogEntry();
// verify request headers
verifyPublicAccessLogEntryForRequestHeaders(lastLogEntry, headers1, request.method(), false);
channel.close();
}
/**
* Does a test for the request handling flow with transfer encoding chunked
*/
private void doRequestHandleWithChunkedResponse(boolean useSSL) throws Exception {
EmbeddedChannel channel = createChannel(useSSL);
HttpHeaders headers = new DefaultHttpHeaders();
headers.add(EchoMethodHandler.IS_CHUNKED, "true");
HttpRequest request = RestTestUtils.createRequest(HttpMethod.POST, "POST", headers);
HttpUtil.setKeepAlive(request, true);
sendRequestCheckResponse(channel, request, "POST", headers, false, true, useSSL);
Assert.assertTrue("Channel should not be closed ", channel.isOpen());
channel.close();
}
/**
* Does a test to see that a non health check request results in expected responses
* @param httpMethod the {@link HttpMethod} for the request.
* @param uri Uri to be used during the request
* @throws IOException
*/
private void testNonHealthCheckRequest(HttpMethod httpMethod, String uri) throws IOException {
EmbeddedChannel channel = createChannel();
HttpRequest request = RestTestUtils.createRequest(httpMethod, uri, null);
FullHttpResponse response = sendRequestAndGetResponse(channel, request);
assertEquals("Unexpected response status", HttpResponseStatus.OK, response.status());
assertEquals("Unexpected content", httpMethod.toString(), RestTestUtils.getContentString(response));
channel.close();
}
private void verifyFailure(String bad) throws Exception {
// Send the bad vote
EmbeddedChannel channel = createChannel();
byte[] encrypted = RSA.encrypt(bad.getBytes(StandardCharsets.UTF_8), TestVotifierPlugin.getI().getProtocolV1Key().getPublic());
ByteBuf encryptedByteBuf = Unpooled.wrappedBuffer(encrypted);
assertThrows(DecoderException.class, () -> channel.writeInbound(encryptedByteBuf));
channel.close();
}
@Test
public void testFailureDecodeBadPacket() {
// Create a well-formed request
EmbeddedChannel channel = createChannel();
Vote vote = new Vote("Test", "test", "test", "0");
JSONObject object = new JSONObject();
JsonObject payload = vote.serialize();
payload.addProperty("challenge", SESSION.getChallenge());
object.put("payload", GsonInst.gson.toJson(payload));
// We "forget" the signature.
assertThrows(DecoderException.class, () -> channel.writeInbound(object.toString()));
channel.close();
}
@Test
public void v2Test() {
EmbeddedChannel channel = new EmbeddedChannel(new VotifierProtocolDifferentiator(true, false));
VotifierSession session = new VotifierSession();
channel.attr(VotifierSession.KEY).set(session);
ByteBuf test = Unpooled.buffer();
test.writeShort(0x733A);
channel.writeInbound(test);
assertEquals(VotifierSession.ProtocolVersion.TWO, session.getVersion());
channel.close();
}
@Test
public void failIfv1NotSupported() {
EmbeddedChannel channel = new EmbeddedChannel(new VotifierProtocolDifferentiator(true, false));
VotifierSession session = new VotifierSession();
channel.attr(VotifierSession.KEY).set(session);
ByteBuf test = Unpooled.buffer(256);
for (int i = 0; i < 256; i++) {
test.writeByte(0);
}
assertThrows(DecoderException.class, () -> channel.writeInbound(test));
channel.close();
}
@Test
public void test_channel_inactive() throws Exception {
final List<SubscriptionResult> subscriptions =
newArrayList(subResult(new Topic("#", QoS.AT_LEAST_ONCE), false));
final SendRetainedMessagesListener listener = createListener(subscriptions, ignoredTopics);
final EmbeddedChannel embeddedChannel = new EmbeddedChannel();
embeddedChannel.close();
when(channelFuture.isSuccess()).thenReturn(true);
when(channelFuture.channel()).thenReturn(embeddedChannel);
listener.operationComplete(channelFuture);
verify(retainedMessagePersistence, never()).get(anyString());
}
@Test
public void testSuccess_twoConnections_sameClient() {
EmbeddedChannel channel1 = new EmbeddedChannel();
EmbeddedChannel channel2 = new EmbeddedChannel(DefaultChannelId.newInstance());
metrics.registerActiveConnection(PROTOCOL, CERT_HASH, channel1);
assertThat(channel1.isActive()).isTrue();
assertThat(FrontendMetrics.activeConnectionsGauge)
.hasValueForLabels(1, PROTOCOL, CERT_HASH)
.and()
.hasNoOtherValues();
assertThat(FrontendMetrics.totalConnectionsCounter)
.hasValueForLabels(1, PROTOCOL, CERT_HASH)
.and()
.hasNoOtherValues();
metrics.registerActiveConnection(PROTOCOL, CERT_HASH, channel2);
assertThat(channel2.isActive()).isTrue();
assertThat(FrontendMetrics.activeConnectionsGauge)
.hasValueForLabels(2, PROTOCOL, CERT_HASH)
.and()
.hasNoOtherValues();
assertThat(FrontendMetrics.totalConnectionsCounter)
.hasValueForLabels(2, PROTOCOL, CERT_HASH)
.and()
.hasNoOtherValues();
@SuppressWarnings("unused")
ChannelFuture unusedFuture1 = channel1.close();
assertThat(channel1.isActive()).isFalse();
assertThat(FrontendMetrics.activeConnectionsGauge)
.hasValueForLabels(1, PROTOCOL, CERT_HASH)
.and()
.hasNoOtherValues();
assertThat(FrontendMetrics.totalConnectionsCounter)
.hasValueForLabels(2, PROTOCOL, CERT_HASH)
.and()
.hasNoOtherValues();
@SuppressWarnings("unused")
ChannelFuture unusedFuture2 = channel2.close();
assertThat(channel2.isActive()).isFalse();
assertThat(FrontendMetrics.activeConnectionsGauge).hasNoOtherValues();
assertThat(FrontendMetrics.totalConnectionsCounter)
.hasValueForLabels(2, PROTOCOL, CERT_HASH)
.and()
.hasNoOtherValues();
}
@Test
public void testSuccess_twoConnections_differentClients() {
EmbeddedChannel channel1 = new EmbeddedChannel();
EmbeddedChannel channel2 = new EmbeddedChannel(DefaultChannelId.newInstance());
String certHash2 = "blahblah_lol_234";
metrics.registerActiveConnection(PROTOCOL, CERT_HASH, channel1);
assertThat(channel1.isActive()).isTrue();
assertThat(FrontendMetrics.activeConnectionsGauge)
.hasValueForLabels(1, PROTOCOL, CERT_HASH)
.and()
.hasNoOtherValues();
assertThat(FrontendMetrics.totalConnectionsCounter)
.hasValueForLabels(1, PROTOCOL, CERT_HASH)
.and()
.hasNoOtherValues();
metrics.registerActiveConnection(PROTOCOL, certHash2, channel2);
assertThat(channel2.isActive()).isTrue();
assertThat(FrontendMetrics.activeConnectionsGauge)
.hasValueForLabels(1, PROTOCOL, CERT_HASH)
.and()
.hasValueForLabels(1, PROTOCOL, certHash2)
.and()
.hasNoOtherValues();
assertThat(FrontendMetrics.totalConnectionsCounter)
.hasValueForLabels(1, PROTOCOL, CERT_HASH)
.and()
.hasValueForLabels(1, PROTOCOL, certHash2)
.and()
.hasNoOtherValues();
ChannelFuture unusedFuture = channel1.close();
assertThat(channel1.isActive()).isFalse();
assertThat(FrontendMetrics.activeConnectionsGauge)
.hasValueForLabels(1, PROTOCOL, certHash2)
.and()
.hasNoOtherValues();
assertThat(FrontendMetrics.totalConnectionsCounter)
.hasValueForLabels(1, PROTOCOL, CERT_HASH)
.and()
.hasValueForLabels(1, PROTOCOL, certHash2)
.and()
.hasNoOtherValues();
unusedFuture = channel2.close();
assertThat(channel2.isActive()).isFalse();
assertThat(FrontendMetrics.activeConnectionsGauge).hasNoOtherValues();
assertThat(FrontendMetrics.totalConnectionsCounter)
.hasValueForLabels(1, PROTOCOL, CERT_HASH)
.and()
.hasValueForLabels(1, PROTOCOL, certHash2)
.and()
.hasNoOtherValues();
}
@Test(timeout = 10000)
public void test_write_connack_with_initializer_channel_inactive() throws Exception {
final EmbeddedChannel channel = new EmbeddedChannel();
channel.close();
when(channelHandlerContext.channel()).thenReturn(channel);
when(initializers.getClientInitializerMap()).thenReturn(createClientInitializerMap());
pluginInitializerHandler.write(channelHandlerContext, TestMessageUtil.createFullMqtt5Connack(), channelPromise);
verify(initializers, timeout(5000).times(1)).getClientInitializerMap();
verify(channelHandlerContext, times(0)).writeAndFlush(any(Object.class), eq(channelPromise));
}