下面列出了io.netty.channel.embedded.EmbeddedChannel#disconnect ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testAppendThrows() throws Exception {
ReplyProcessor processor = new ReplyProcessor();
Flow flow = new Flow(10, 0);
FlowHandler flowHandler = new FlowHandler("testConnection");
@Cleanup
ClientConnection clientConnection = flowHandler.createFlow(flow, processor);
EmbeddedChannel embeddedChannel = createChannelWithContext(flowHandler);
embeddedChannel.runScheduledPendingTasks();
embeddedChannel.runPendingTasks();
Queue<Object> messages = embeddedChannel.outboundMessages();
assertEquals(1, messages.size());
clientConnection.send(new WireCommands.SetupAppend(1, new UUID(1, 2), "segment", ""));
embeddedChannel.runPendingTasks();
clientConnection.send(new Append("segment", new UUID(1, 2), 1, new Event(Unpooled.EMPTY_BUFFER), 2));
embeddedChannel.disconnect();
embeddedChannel.runPendingTasks();
assertTrue(processor.falure.get());
}
@Test
public void force_client_disconnect_connected() throws ExecutionException, InterruptedException {
final EmbeddedChannel channel = new EmbeddedChannel();
channel.attr(ChannelAttributes.MQTT_VERSION).set(ProtocolVersion.MQTTv5);
when(channelPersistence.get("client")).thenReturn(channel);
when(localPersistence.getSession(eq("client"), anyBoolean(), anyBoolean())).thenReturn(new ClientSession(true, 0));
final ListenableFuture<Boolean> future = clientSessionPersistence.forceDisconnectClient("client", true, ClientSessionPersistenceImpl.DisconnectSource.EXTENSION);
channel.disconnect();
final Boolean result = future.get();
assertTrue(result);
verify(pendingWillMessages).cancelWill("client");
verify(mqtt5ServerDisconnector).disconnect(any(Channel.class), anyString(), anyString(), eq(Mqtt5DisconnectReasonCode.ADMINISTRATIVE_ACTION), any());
}
@Test
public void force_client_disconnect_connected_reason_code_string() throws ExecutionException, InterruptedException {
final EmbeddedChannel channel = new EmbeddedChannel();
channel.attr(ChannelAttributes.MQTT_VERSION).set(ProtocolVersion.MQTTv5);
when(channelPersistence.get("client")).thenReturn(channel);
when(localPersistence.getSession(eq("client"), anyBoolean(), anyBoolean())).thenReturn(new ClientSession(true, 0));
final ListenableFuture<Boolean> future = clientSessionPersistence.forceDisconnectClient("client", true, ClientSessionPersistenceImpl.DisconnectSource.EXTENSION, Mqtt5DisconnectReasonCode.SESSION_TAKEN_OVER, "reason-string");
channel.disconnect();
final Boolean result = future.get();
assertTrue(result);
verify(pendingWillMessages).cancelWill("client");
verify(mqtt5ServerDisconnector).disconnect(any(Channel.class), anyString(), anyString(), eq(Mqtt5DisconnectReasonCode.SESSION_TAKEN_OVER), eq("reason-string"));
}
@Test
public void testFlushViaDisconnect() {
final AtomicInteger flushCount = new AtomicInteger();
EmbeddedChannel channel = newChannel(flushCount, false);
// Simulate read loop;
channel.pipeline().fireChannelRead(1L);
assertEquals(0, flushCount.get());
assertNull(channel.readOutbound());
channel.disconnect();
assertEquals(1, flushCount.get());
assertEquals(1L, channel.readOutbound());
assertNull(channel.readOutbound());
assertFalse(channel.finish());
}
/**
* Sends the provided {@code httpRequest} and verifies that the response is as expected.
* @param channel the {@link EmbeddedChannel} to send the request over.
* @param httpRequest the {@link HttpRequest} that has to be sent
* @param uri, Uri to be used for the request
* @param headers {@link HttpHeaders} that is set in the request to be used for verification purposes
* @param testErrorCase true if error case has to be tested, false otherwise
* @param sslUsed true if SSL was used for this request.
*/
private void sendRequestCheckResponse(EmbeddedChannel channel, HttpRequest httpRequest, String uri,
HttpHeaders headers, boolean testErrorCase, boolean chunkedResponse, boolean sslUsed) throws Exception {
channel.writeInbound(httpRequest);
if (uri.equals(EchoMethodHandler.DISCONNECT_URI)) {
channel.disconnect();
} else {
channel.writeInbound(new DefaultLastHttpContent());
}
String lastLogEntry = publicAccessLogger.getLastPublicAccessLogEntry();
// verify remote host, http method and uri
String subString = testErrorCase ? "Error" : "Info" + ":embedded" + " " + httpRequest.method() + " " + uri;
Assert.assertTrue("Public Access log entry doesn't have expected remote host/method/uri ",
lastLogEntry.startsWith(subString));
// verify SSL-related info
subString = "SSL ([used=" + sslUsed + "]";
if (sslUsed) {
subString += ", [principal=" + PEER_CERT.getSubjectX500Principal() + "]";
subString += ", [san=" + PEER_CERT.getSubjectAlternativeNames() + "]";
}
subString += ")";
Assert.assertTrue("Public Access log entry doesn't have SSL info set correctly", lastLogEntry.contains(subString));
// verify request headers
verifyPublicAccessLogEntryForRequestHeaders(lastLogEntry, headers, httpRequest.method(), true);
// verify response
subString = "Response (";
for (String responseHeader : RESPONSE_HEADERS.split(",")) {
if (headers.contains(responseHeader)) {
subString += "[" + responseHeader + "=" + headers.get(responseHeader) + "] ";
}
}
subString += "[isChunked=" + chunkedResponse + "]), status=" + HttpResponseStatus.OK.code();
if (!testErrorCase) {
Assert.assertTrue("Public Access log entry doesn't have response set correctly",
lastLogEntry.contains(subString));
} else {
Assert.assertTrue("Public Access log entry doesn't have error set correctly ",
lastLogEntry.contains(": Channel closed while request in progress."));
}
}