下面列出了怎么用 io.netty.handler.codec.PrematureChannelClosureException 的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testFailsOnMissingResponse() {
HttpClientCodec codec = new HttpClientCodec(4096, 8192, 8192, true);
EmbeddedChannel ch = new EmbeddedChannel(codec);
assertTrue(ch.writeOutbound(new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET,
"http://localhost/")));
ByteBuf buffer = ch.readOutbound();
assertNotNull(buffer);
buffer.release();
try {
ch.finish();
fail();
} catch (CodecException e) {
assertTrue(e instanceof PrematureChannelClosureException);
}
}
@Test
public void testFailsOnIncompleteChunkedResponse() {
HttpClientCodec codec = new HttpClientCodec(4096, 8192, 8192, true);
EmbeddedChannel ch = new EmbeddedChannel(codec);
ch.writeOutbound(new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "http://localhost/"));
ByteBuf buffer = ch.readOutbound();
assertNotNull(buffer);
buffer.release();
assertNull(ch.readInbound());
ch.writeInbound(Unpooled.copiedBuffer(INCOMPLETE_CHUNKED_RESPONSE, CharsetUtil.ISO_8859_1));
assertThat(ch.readInbound(), instanceOf(HttpResponse.class));
((HttpContent) ch.readInbound()).release(); // Chunk 'first'
((HttpContent) ch.readInbound()).release(); // Chunk 'second'
assertNull(ch.readInbound());
try {
ch.finish();
fail();
} catch (CodecException e) {
assertTrue(e instanceof PrematureChannelClosureException);
}
}
@Test
public void testFailsOnIncompleteChunkedResponse() {
HttpClientCodec codec = new HttpClientCodec(4096, 8192, 8192, true);
EmbeddedChannel ch = new EmbeddedChannel(codec);
ch.writeOutbound(releaseLater(
new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "http://localhost/")));
assertNotNull(releaseLater(ch.readOutbound()));
assertNull(ch.readInbound());
ch.writeInbound(releaseLater(
Unpooled.copiedBuffer(INCOMPLETE_CHUNKED_RESPONSE, CharsetUtil.ISO_8859_1)));
assertThat(releaseLater(ch.readInbound()), instanceOf(HttpResponse.class));
assertThat(releaseLater(ch.readInbound()), instanceOf(HttpContent.class)); // Chunk 'first'
assertThat(releaseLater(ch.readInbound()), instanceOf(HttpContent.class)); // Chunk 'second'
assertNull(ch.readInbound());
try {
ch.finish();
fail();
} catch (CodecException e) {
assertTrue(e instanceof PrematureChannelClosureException);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx)
throws Exception {
super.channelInactive(ctx);
if (failOnMissingResponse) {
long missingResponses = requestResponseCounter.get();
if (missingResponses > 0) {
ctx.fireExceptionCaught(new PrematureChannelClosureException(
"channel gone inactive with " + missingResponses +
" missing response(s)"));
}
}
}
@Test
public void testConnectionClosedBeforeHeadersReceived() {
EmbeddedChannel channel = new EmbeddedChannel(new HttpResponseDecoder());
String responseInitialLine =
"HTTP/1.1 200 OK\r\n";
assertFalse(channel.writeInbound(Unpooled.copiedBuffer(responseInitialLine, CharsetUtil.US_ASCII)));
assertTrue(channel.finish());
HttpMessage message = channel.readInbound();
assertTrue(message.decoderResult().isFailure());
assertThat(message.decoderResult().cause(), instanceOf(PrematureChannelClosureException.class));
assertNull(channel.readInbound());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
if (failOnMissingResponse) {
long missingResponses = requestResponseCounter.get();
if (missingResponses > 0) {
ctx.fireExceptionCaught(new PrematureChannelClosureException(
"channel gone inactive with " + missingResponses +
" missing response(s)"));
}
}
}
@Test
public void notAllHeadersReceived() throws Exception {
encodedResponse.set("HTTP/1.1 200 OK\r\n" +
"Transfer-Encoding: chunked\r\n" +
"Connection: close\r\n"); // no final CRLF after headers
HttpRequest request = client.get("/");
ReservedBlockingHttpConnection connection = client.reserveConnection(request);
// Wait until a server closes the connection:
connection.connectionContext().onClose().whenFinally(connectionClosedLatch::countDown).subscribe();
assertThrows(PrematureChannelClosureException.class, () -> connection.request(request));
connectionClosedLatch.await();
}
@Override
public void channelInactive(ChannelHandlerContext ctx)
throws Exception {
super.channelInactive(ctx);
if (failOnMissingResponse) {
long missingResponses = requestResponseCounter.get();
if (missingResponses > 0) {
ctx.fireExceptionCaught(new PrematureChannelClosureException(
"channel gone inactive with " + missingResponses +
" missing response(s)"));
}
}
}
@Test
public void testFailsOnMissingResponse() {
HttpClientCodec codec = new HttpClientCodec(4096, 8192, 8192, true);
EmbeddedChannel ch = new EmbeddedChannel(codec);
assertTrue(ch.writeOutbound(new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET,
"http://localhost/")));
assertNotNull(releaseLater(ch.readOutbound()));
try {
ch.finish();
fail();
} catch (CodecException e) {
assertTrue(e instanceof PrematureChannelClosureException);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
if (failOnMissingResponse) {
long missingResponses = requestResponseCounter.get();
if (missingResponses > 0) {
ctx.fireExceptionCaught(new PrematureChannelClosureException(
"channel gone inactive with " + missingResponses
+ " missing response(s)"));
}
}
}
@Override
public void onUncaughtException(Throwable t) {
if (t instanceof PrematureChannelClosureException) {
LOG.warn("PrematureChannelClosureException, will attempt restart");
} else if (
(t instanceof IllegalStateException || isConnectException(t)) &&
restartInProgress.get()
) {
onSubscribeException(t);
} else {
LOG.error("uncaught exception", t);
}
callWithStateLock(
() -> {
if (t instanceof PrematureChannelClosureException) {
state.setMesosSchedulerState(MesosSchedulerState.PAUSED_FOR_MESOS_RECONNECT);
offerCache.invalidateAll();
reconnectMesos();
} else {
LOG.error("Aborting due to error: {}", t.getMessage(), t);
notifyStopping();
abort.abort(AbortReason.MESOS_ERROR, Optional.of(t));
}
},
"errorUncaughtException",
true
);
}
@Override
protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
super.decodeLast(ctx, in, out);
if (resetRequested) {
// If a reset was requested by decodeLast() we need to do it now otherwise we may produce a
// LastHttpContent while there was already one.//如果decodeLast()请求重置,我们现在就需要做,否则可能会产生a
// LastHttpContent已经有一个了。
resetNow();
}
// Handle the last unfinished message.
if (message != null) {
boolean chunked = HttpUtil.isTransferEncodingChunked(message);
if (currentState == State.READ_VARIABLE_LENGTH_CONTENT && !in.isReadable() && !chunked) {
// End of connection.
out.add(LastHttpContent.EMPTY_LAST_CONTENT);
resetNow();
return;
}
if (currentState == State.READ_HEADER) {
// If we are still in the state of reading headers we need to create a new invalid message that
// signals that the connection was closed before we received the headers.//如果我们仍然处于读取header的状态,我们需要创建一个新的无效消息
//在我们收到报头之前,连接已经关闭的信号。
out.add(invalidMessage(Unpooled.EMPTY_BUFFER,
new PrematureChannelClosureException("Connection closed before received headers")));
resetNow();
return;
}
// Check if the closure of the connection signifies the end of the content.检查连接的关闭是否意味着内容的结束。
boolean prematureClosure;
if (isDecodingRequest() || chunked) {
// The last request did not wait for a response.最后一个请求没有等待响应。
prematureClosure = true;
} else {
// Compare the length of the received content and the 'Content-Length' header.
// If the 'Content-Length' header is absent, the length of the content is determined by the end of the
// connection, so it is perfectly fine.//比较接收内容的长度和“content - length”标题。
//如果没有“content - length”标头,则内容的长度由
//连接,所以它是完美的。
prematureClosure = contentLength() > 0;
}
if (!prematureClosure) {
out.add(LastHttpContent.EMPTY_LAST_CONTENT);
}
resetNow();
}
}
@Override
protected final void decodeLast(final ChannelHandlerContext ctx, final ByteBuf in) throws Exception {
super.decodeLast(ctx, in);
// Handle the last unfinished message.
if (message != null) {
boolean chunked = isTransferEncodingChunked(message.headers());
if (!in.isReadable() && (
(currentState == State.READ_VARIABLE_LENGTH_CONTENT && !chunked) ||
(currentState == State.READ_CHUNK_SIZE && chunked && allowPrematureClosureBeforePayloadBody))) {
// End of connection.
ctx.fireChannelRead(EmptyHttpHeaders.INSTANCE);
closeHandler.protocolPayloadEndInbound(ctx);
resetNow();
return;
}
if (currentState == State.READ_HEADER) {
// If we are still in the state of reading headers we need to create a new invalid message that
// signals that the connection was closed before we received the headers.
ctx.fireExceptionCaught(
new PrematureChannelClosureException("Connection closed before received headers"));
resetNow();
return;
}
// Check if the closure of the connection signifies the end of the content.
boolean prematureClosure;
if (isDecodingRequest() || chunked) {
// The last request did not wait for a response.
prematureClosure = true;
} else {
// Compare the length of the received content and the 'Content-Length' header.
// If the 'Content-Length' header is absent, the length of the content is determined by the end of the
// connection, so it is perfectly fine.
prematureClosure = contentLength() > 0;
}
if (!prematureClosure) {
ctx.fireChannelRead(EmptyHttpHeaders.INSTANCE);
closeHandler.protocolPayloadEndInbound(ctx);
}
resetNow();
}
}