下面列出了怎么用 io.netty.handler.codec.http2.Http2Error 的API类实例代码及写法,或者点击链接到github查看源代码。
private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
ChannelPromise promise) throws Exception {
close(ctx, promise);
connection().forEachActiveStream(new Http2StreamVisitor() {
@Override
public boolean visit(Http2Stream stream) throws Http2Exception {
NettyServerStream.TransportState serverStream = serverStream(stream);
if (serverStream != null) {
serverStream.transportReportStatus(msg.getStatus());
resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
}
stream.close();
return true;
}
});
}
@Override
public void onPingRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
if (keepAliveManager != null) {
keepAliveManager.onDataReceived();
}
if (!keepAliveEnforcer.pingAcceptable()) {
ByteBuf debugData = ByteBufUtil.writeAscii(ctx.alloc(), "too_many_pings");
goAway(ctx, connection().remote().lastStreamCreated(), Http2Error.ENHANCE_YOUR_CALM.code(),
debugData, ctx.newPromise());
Status status = Status.RESOURCE_EXHAUSTED.withDescription("Too many pings from client");
try {
forcefulClose(ctx, new ForcefulCloseCommand(status), ctx.newPromise());
} catch (Exception ex) {
onError(ctx, /* outbound= */ true, ex);
}
}
}
/**
* Sends out first GOAWAY and ping, and schedules second GOAWAY and close.
*/
void start(final ChannelHandlerContext ctx) {
goAway(
ctx,
Integer.MAX_VALUE,
Http2Error.NO_ERROR.code(),
ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage),
ctx.newPromise());
pingFuture = ctx.executor().schedule(
new Runnable() {
@Override
public void run() {
secondGoAwayAndClose(ctx);
}
},
GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS,
TimeUnit.NANOSECONDS);
encoder().writePing(ctx, false /* isAck */, GRACEFUL_SHUTDOWN_PING, ctx.newPromise());
}
/**
* Handler for an inbound HTTP/2 RST_STREAM frame, terminating a stream.
*/
private void onRstStreamRead(int streamId, long errorCode) {
NettyClientStream.TransportState stream = clientStream(connection().stream(streamId));
if (stream != null) {
Status status = GrpcUtil.Http2Error.statusForCode((int) errorCode)
.augmentDescription("Received Rst Stream");
stream.transportReportStatus(
status,
errorCode == Http2Error.REFUSED_STREAM.code()
? RpcProgress.REFUSED : RpcProgress.PROCESSED,
false /*stop delivery*/,
new Metadata());
if (keepAliveManager != null) {
keepAliveManager.onDataReceived();
}
}
}
private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
ChannelPromise promise) throws Exception {
// close() already called by NettyClientTransport, so just need to clean up streams
connection().forEachActiveStream(new Http2StreamVisitor() {
@Override
public boolean visit(Http2Stream stream) throws Http2Exception {
NettyClientStream.TransportState clientStream = clientStream(stream);
if (clientStream != null) {
clientStream.transportReportStatus(msg.getStatus(), true, new Metadata());
resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
}
stream.close();
return true;
}
});
promise.setSuccess();
}
private void onWrapperCompleted(HttpResponseWrapper resWrapper, int id, @Nullable Throwable cause) {
// Cancel timeout future and abort the request if it exists.
resWrapper.onSubscriptionCancelled(cause);
if (cause != null) {
// We are not closing the connection but just send a RST_STREAM,
// so we have to remove the response manually.
removeResponse(id);
// Reset the stream.
final int streamId = idToStreamId(id);
final int lastStreamId = conn.local().lastStreamKnownByPeer();
if (lastStreamId < 0 || // Did not receive a GOAWAY yet or
streamId <= lastStreamId) { // received a GOAWAY and the request's streamId <= lastStreamId
final ChannelHandlerContext ctx = channel().pipeline().lastContext();
if (ctx != null) {
encoder.writeRstStream(ctx, streamId, Http2Error.CANCEL.code(), ctx.newPromise());
ctx.flush();
} else {
// The pipeline has been cleaned up due to disconnection.
}
}
}
}
@Override
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) throws Http2Exception {
keepAliveChannelRead();
final HttpResponseWrapper res = removeResponse(streamIdToId(streamId));
if (res == null) {
if (conn.streamMayHaveExisted(streamId)) {
if (logger.isDebugEnabled()) {
logger.debug("{} Received a late RST_STREAM frame for a closed stream: {}",
ctx.channel(), streamId);
}
} else {
throw connectionError(PROTOCOL_ERROR,
"received a RST_STREAM frame for an unknown stream: %d", streamId);
}
return;
}
res.close(new ClosedStreamException("received a RST_STREAM frame: " + Http2Error.valueOf(errorCode)));
}
private TimeoutTask newTimeoutTask() {
return new TimeoutTask() {
@Override
public boolean canSchedule() {
return state != State.DONE;
}
@Override
public void run() {
if (state != State.DONE) {
final Runnable requestTimeoutHandler = reqCtx.requestTimeoutHandler();
if (requestTimeoutHandler != null) {
requestTimeoutHandler.run();
} else {
failAndRespond(RequestTimeoutException.get(), serviceUnavailableResponse,
Http2Error.INTERNAL_ERROR, true);
}
}
}
};
}
@Override
public ChannelFuture doWriteHeaders(int id, int streamId, ResponseHeaders headers, boolean endStream,
boolean isTrailersEmpty) {
if (!isStreamPresentAndWritable(streamId)) {
// One of the following cases:
// - Stream has been closed already.
// - (bug) Server tried to send a response HEADERS frame before receiving a request HEADERS frame.
return newFailedFuture(ClosedStreamException.get());
}
if (!isGoAwaySent && keepAliveHandler != null && keepAliveHandler.isMaxConnectionAgeExceeded()) {
final int lastStreamId = encoder().connection().remote().lastStreamCreated();
encoder().writeGoAway(ctx(), lastStreamId, Http2Error.NO_ERROR.code(),
MAX_CONNECTION_AGE_DEBUG.retain(), ctx().newPromise());
isGoAwaySent = true;
}
final Http2Headers converted = convertHeaders(headers, isTrailersEmpty);
onKeepAliveReadOrWrite();
return encoder().writeHeaders(ctx(), streamId, converted, 0, endStream, ctx().newPromise());
}
@Test
public void testIsExpected() {
final boolean expected = !Flags.verboseSocketExceptions();
assertThat(Exceptions.isExpected(new Exception())).isFalse();
assertThat(Exceptions.isExpected(new Exception("broken pipe"))).isFalse();
assertThat(Exceptions.isExpected(new Exception("connection reset by peer"))).isFalse();
assertThat(Exceptions.isExpected(new Exception("stream closed"))).isFalse();
assertThat(Exceptions.isExpected(new Exception("SSLEngine closed already"))).isFalse();
assertThat(Exceptions.isExpected(new ClosedChannelException())).isEqualTo(expected);
assertThat(Exceptions.isExpected(ClosedSessionException.get())).isEqualTo(expected);
assertThat(Exceptions.isExpected(new IOException("connection reset by peer"))).isEqualTo(expected);
assertThat(Exceptions.isExpected(new IOException("invalid argument"))).isEqualTo(false);
assertThat(Exceptions.isExpected(new ChannelException("broken pipe"))).isEqualTo(expected);
assertThat(Exceptions.isExpected(new ChannelException("invalid argument"))).isEqualTo(false);
assertThat(Exceptions.isExpected(new Http2Exception(Http2Error.INTERNAL_ERROR, "stream closed")))
.isEqualTo(expected);
assertThat(Exceptions.isExpected(new Http2Exception(Http2Error.INTERNAL_ERROR))).isEqualTo(false);
assertThat(Exceptions.isExpected(new SSLException("SSLEngine closed already"))).isEqualTo(expected);
assertThat(Exceptions.isExpected(new SSLException("Handshake failed"))).isEqualTo(false);
}
private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
ChannelPromise promise) throws Exception {
super.close(ctx, promise);
connection().forEachActiveStream(new Http2StreamVisitor() {
@Override
public boolean visit(Http2Stream stream) throws Http2Exception {
NettyServerStream.TransportState serverStream = serverStream(stream);
if (serverStream != null) {
PerfMark.startTask("NettyServerHandler.forcefulClose", serverStream.tag());
PerfMark.linkIn(msg.getLink());
try {
serverStream.transportReportStatus(msg.getStatus());
resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
} finally {
PerfMark.stopTask("NettyServerHandler.forcefulClose", serverStream.tag());
}
}
stream.close();
return true;
}
});
}
@Override
public void onPingRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
if (keepAliveManager != null) {
keepAliveManager.onDataReceived();
}
if (!keepAliveEnforcer.pingAcceptable()) {
ByteBuf debugData = ByteBufUtil.writeAscii(ctx.alloc(), "too_many_pings");
goAway(ctx, connection().remote().lastStreamCreated(), Http2Error.ENHANCE_YOUR_CALM.code(),
debugData, ctx.newPromise());
Status status = Status.RESOURCE_EXHAUSTED.withDescription("Too many pings from client");
try {
forcefulClose(ctx, new ForcefulCloseCommand(status), ctx.newPromise());
} catch (Exception ex) {
onError(ctx, /* outbound= */ true, ex);
}
}
}
/**
* Handler for an inbound HTTP/2 RST_STREAM frame, terminating a stream.
*/
private void onRstStreamRead(int streamId, long errorCode) {
NettyClientStream.TransportState stream = clientStream(connection().stream(streamId));
if (stream != null) {
PerfMark.event("NettyClientHandler.onRstStreamRead", stream.tag());
Status status = GrpcUtil.Http2Error.statusForCode((int) errorCode)
.augmentDescription("Received Rst Stream");
stream.transportReportStatus(
status,
errorCode == Http2Error.REFUSED_STREAM.code()
? RpcProgress.REFUSED : RpcProgress.PROCESSED,
false /*stop delivery*/,
new Metadata());
if (keepAliveManager != null) {
keepAliveManager.onDataReceived();
}
}
}
private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
ChannelPromise promise) throws Exception {
// close() already called by NettyClientTransport, so just need to clean up streams
connection().forEachActiveStream(new Http2StreamVisitor() {
@Override
public boolean visit(Http2Stream stream) throws Http2Exception {
NettyClientStream.TransportState clientStream = clientStream(stream);
Tag tag = clientStream != null ? clientStream.tag() : PerfMark.createTag();
PerfMark.startTask("NettyClientHandler.forcefulClose", tag);
PerfMark.linkIn(msg.getLink());
try {
if (clientStream != null) {
clientStream.transportReportStatus(msg.getStatus(), true, new Metadata());
resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
}
stream.close();
return true;
} finally {
PerfMark.stopTask("NettyClientHandler.forcefulClose", tag);
}
}
});
promise.setSuccess();
}
private ChannelPromise handleOutstandingControlFrames(ChannelHandlerContext ctx, ChannelPromise promise) {
if (!limitReached) {
if (outstandingControlFrames == maxOutstandingControlFrames) {
// Let's try to flush once as we may be able to flush some of the control frames.
ctx.flush();
}
if (outstandingControlFrames == maxOutstandingControlFrames) {
limitReached = true;
Http2Exception exception = Http2Exception.connectionError(Http2Error.ENHANCE_YOUR_CALM,
"Maximum number %d of outstanding control frames reached", maxOutstandingControlFrames);
logger.info("Maximum number {} of outstanding control frames reached. Closing channel {}",
maxOutstandingControlFrames, ctx.channel(), exception);
// First notify the Http2LifecycleManager and then close the connection.
lifecycleManager.onError(ctx, true, exception);
ctx.close();
}
outstandingControlFrames++;
// We did not reach the limit yet, add the listener to decrement the number of outstanding control frames
// once the promise was completed
return promise.unvoid().addListener(outstandingControlFramesListener);
}
return promise;
}
@Test
public void closeShouldGracefullyCloseChannel() throws Exception {
manualSetUp();
handler().close(ctx(), newPromise());
verifyWrite().writeGoAway(eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()),
isA(ByteBuf.class), any(ChannelPromise.class));
verifyWrite().writePing(
eq(ctx()),
eq(false),
eq(NettyServerHandler.GRACEFUL_SHUTDOWN_PING),
isA(ChannelPromise.class));
channelRead(pingFrame(/*ack=*/ true , NettyServerHandler.GRACEFUL_SHUTDOWN_PING));
verifyWrite().writeGoAway(eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()),
isA(ByteBuf.class), any(ChannelPromise.class));
// Verify that the channel was closed.
assertFalse(channel().isOpen());
}
@Test
public void keepAliveEnforcer_sendingDataResetsCounters() throws Exception {
permitKeepAliveWithoutCalls = false;
permitKeepAliveTimeInNanos = TimeUnit.HOURS.toNanos(1);
manualSetUp();
createStream();
Http2Headers headers = Utils.convertServerHeaders(new Metadata());
ChannelFuture future = enqueue(
SendResponseHeadersCommand.createHeaders(stream.transportState(), headers));
future.get();
for (int i = 0; i < 10; i++) {
future = enqueue(
new SendGrpcFrameCommand(stream.transportState(), content().retainedSlice(), false));
future.get();
channel().releaseOutbound();
channelRead(pingFrame(false /* isAck */, 1L));
}
verifyWrite(never()).writeGoAway(eq(ctx()), eq(STREAM_ID),
eq(Http2Error.ENHANCE_YOUR_CALM.code()), any(ByteBuf.class), any(ChannelPromise.class));
}
private void cancelStream(ChannelHandlerContext ctx, CancelServerStreamCommand cmd,
ChannelPromise promise) {
// Notify the listener if we haven't already.
cmd.stream().transportReportStatus(cmd.reason());
// Terminate the stream.
encoder().writeRstStream(ctx, cmd.stream().id(), Http2Error.CANCEL.code(), promise);
}
@Test
public void cancelShouldSucceed() throws Exception {
createStream();
cancelStream(Status.CANCELLED);
verifyWrite().writeRstStream(eq(ctx()), eq(3), eq(Http2Error.CANCEL.code()),
any(ChannelPromise.class));
verify(mockKeepAliveManager, times(1)).onTransportActive(); // onStreamActive
verify(mockKeepAliveManager, times(1)).onTransportIdle(); // onStreamClosed
verifyNoMoreInteractions(mockKeepAliveManager);
}
private Status statusFromGoAway(long errorCode, byte[] debugData) {
Status status = GrpcUtil.Http2Error.statusForCode((int) errorCode)
.augmentDescription("Received Goaway");
if (debugData != null && debugData.length > 0) {
// If a debug message was provided, use it.
String msg = new String(debugData, UTF_8);
status = status.augmentDescription(msg);
}
return status;
}
@Test
public void clientCancelShouldForwardToStreamListener() throws Exception {
manualSetUp();
createStream();
channelRead(rstStreamFrame(STREAM_ID, (int) Http2Error.CANCEL.code()));
ArgumentCaptor<Status> statusCap = ArgumentCaptor.forClass(Status.class);
verify(streamListener).closed(statusCap.capture());
assertEquals(Code.CANCELLED, statusCap.getValue().getCode());
Truth.assertThat(statusCap.getValue().getDescription()).contains("RST_STREAM");
verify(streamListener, atLeastOnce()).onReady();
assertNull("no messages expected", streamListenerMessageQueue.poll());
}
@Test
public void closeShouldCloseChannel() throws Exception {
manualSetUp();
handler().close(ctx(), newPromise());
verifyWrite().writeGoAway(eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()),
eq(Unpooled.EMPTY_BUFFER), any(ChannelPromise.class));
// Verify that the channel was closed.
assertFalse(channel().isOpen());
}
@Test
public void cancelShouldSendRstStream() throws Exception {
manualSetUp();
createStream();
enqueue(new CancelServerStreamCommand(stream.transportState(), Status.DEADLINE_EXCEEDED));
verifyWrite().writeRstStream(eq(ctx()), eq(stream.transportState().id()),
eq(Http2Error.CANCEL.code()), any(ChannelPromise.class));
}
@Test
public void receivedResetWithRefuseCode() throws Exception {
ChannelFuture future = enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));
channelRead(rstStreamFrame(streamId, (int) Http2Error.REFUSED_STREAM.code() ));
verify(streamListener).closed(any(Status.class), eq(REFUSED), any(Metadata.class));
assertTrue(future.isDone());
}
@Test
public void keepAliveEnforcer_enforcesPings() throws Exception {
permitKeepAliveWithoutCalls = false;
permitKeepAliveTimeInNanos = TimeUnit.HOURS.toNanos(1);
manualSetUp();
for (int i = 0; i < KeepAliveEnforcer.MAX_PING_STRIKES + 1; i++) {
channelRead(pingFrame(false /* isAck */, 1L));
}
verifyWrite().writeGoAway(eq(ctx()), eq(0), eq(Http2Error.ENHANCE_YOUR_CALM.code()),
any(ByteBuf.class), any(ChannelPromise.class));
assertFalse(channel().isActive());
}
@Test
public void maxConnectionAge_goAwaySent_pingTimeout() throws Exception {
maxConnectionAgeInNanos = TimeUnit.MILLISECONDS.toNanos(10L);
manualSetUp();
assertTrue(channel().isOpen());
fakeClock().forwardNanos(maxConnectionAgeInNanos);
// first GO_AWAY sent
verifyWrite().writeGoAway(
eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
// ping sent
verifyWrite().writePing(
eq(ctx()), eq(false), eq(0x97ACEF001L), any(ChannelPromise.class));
verifyWrite(never()).writeGoAway(
eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
assertTrue(channel().isOpen());
fakeClock().forwardTime(10, TimeUnit.SECONDS);
// second GO_AWAY sent
verifyWrite().writeGoAway(
eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
// channel closed
assertTrue(!channel().isOpen());
}
@Test
public void keepAliveEnforcer_initialIdle() throws Exception {
permitKeepAliveWithoutCalls = false;
permitKeepAliveTimeInNanos = 0;
manualSetUp();
for (int i = 0; i < KeepAliveEnforcer.MAX_PING_STRIKES + 1; i++) {
channelRead(pingFrame(false /* isAck */, 1L));
}
verifyWrite().writeGoAway(eq(ctx()), eq(0),
eq(Http2Error.ENHANCE_YOUR_CALM.code()), any(ByteBuf.class), any(ChannelPromise.class));
assertFalse(channel().isActive());
}
@Test
public void keepAliveEnforcer_noticesActive() throws Exception {
permitKeepAliveWithoutCalls = false;
permitKeepAliveTimeInNanos = 0;
manualSetUp();
createStream();
for (int i = 0; i < 10; i++) {
channelRead(pingFrame(false /* isAck */, 1L));
}
verifyWrite(never()).writeGoAway(eq(ctx()), eq(STREAM_ID),
eq(Http2Error.ENHANCE_YOUR_CALM.code()), any(ByteBuf.class), any(ChannelPromise.class));
}
@Test
public void keepAliveEnforcer_noticesInactive() throws Exception {
permitKeepAliveWithoutCalls = false;
permitKeepAliveTimeInNanos = 0;
manualSetUp();
createStream();
channelRead(rstStreamFrame(STREAM_ID, (int) Http2Error.CANCEL.code()));
for (int i = 0; i < KeepAliveEnforcer.MAX_PING_STRIKES + 1; i++) {
channelRead(pingFrame(false /* isAck */, 1L));
}
verifyWrite().writeGoAway(eq(ctx()), eq(STREAM_ID),
eq(Http2Error.ENHANCE_YOUR_CALM.code()), any(ByteBuf.class), any(ChannelPromise.class));
assertFalse(channel().isActive());
}
@Test
public void maxConnectionIdle_goAwaySent_pingAck() throws Exception {
maxConnectionIdleInNanos = TimeUnit.MILLISECONDS.toNanos(10L);
manualSetUp();
assertTrue(channel().isOpen());
fakeClock().forwardNanos(maxConnectionIdleInNanos);
// first GO_AWAY sent
verifyWrite().writeGoAway(
eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
// ping sent
verifyWrite().writePing(
eq(ctx()), eq(false), eq(0x97ACEF001L), any(ChannelPromise.class));
verifyWrite(never()).writeGoAway(
eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
channelRead(pingFrame(true /* isAck */, 0xDEADL)); // irrelevant ping Ack
verifyWrite(never()).writeGoAway(
eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
assertTrue(channel().isOpen());
channelRead(pingFrame(true /* isAck */, 0x97ACEF001L));
// second GO_AWAY sent
verifyWrite().writeGoAway(
eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
// channel closed
assertTrue(!channel().isOpen());
}