下面列出了 io.netty.handler.codec.http2.Http2StreamVisitor #io.perfmark.PerfMark 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void transform_auto() throws Exception {
PerfMark.setEnabled(true);
Storage.resetForTest();
Class<?> clz = transformAndLoad(ClzAutoRecord.class);
Constructor<?> ctor = clz.getConstructor();
ctor.setAccessible(true);
ctor.newInstance();
List<Mark> marks = Storage.readForTest();
assertThat(marks).hasSize(2);
for (Mark mark : marks) {
assertNotNull(mark.getMarker());
StackTraceElement element = Internal.getElement(mark.getMarker());
assertThat(element.getClassName()).isEqualTo(ClzAutoRecord.class.getName());
assertThat(element.getMethodName()).isEqualTo("recordMe");
assertThat(element.getFileName()).isEqualTo("PerfMarkTransformerTest.java");
// TODO: reenable.
// assertThat(element.getLineNumber()).isGreaterThan(0);
}
}
@Test
public void transform_methodRef() throws Exception {
PerfMark.setEnabled(true);
Storage.resetForTest();
final class ClzLocal {
public ClzLocal() {
@SuppressWarnings("unused")
Object o = execute(PerfMark::linkOut);
}
Link execute(Supplier<Link> supplier) {
return supplier.get();
}
}
Class<?> clz = transformAndLoad(ClzLocal.class);
Constructor<?> ctor = clz.getConstructor(PerfMarkTransformerTest.class);
ctor.setAccessible(true);
ctor.newInstance(this);
List<Mark> marks = Storage.readForTest();
assertThat(marks).hasSize(1);
// I'm not sure what to do with methodrefs, so just leave it alone for now.
}
@Test
public void transform_clinit() throws Exception {
PerfMark.setEnabled(true);
Storage.resetForTest();
Class<?> clz = transformAndLoad(ClzWithClinit.class);
Constructor<?> ctor = clz.getDeclaredConstructor();
ctor.setAccessible(true);
ctor.newInstance();
List<Mark> marks = Storage.readForTest();
assertThat(marks).hasSize(4);
for (Mark mark : marks) {
assertNotNull(mark.getMarker());
StackTraceElement element = Internal.getElement(mark.getMarker());
assertThat(element.getClassName()).isEqualTo(ClzWithClinit.class.getName());
assertThat(element.getMethodName()).isEqualTo("<clinit>");
assertThat(element.getFileName()).isEqualTo("PerfMarkTransformerTest.java");
assertThat(element.getLineNumber()).isGreaterThan(0);
}
}
@Test
public void transform_toplevel() throws Exception {
PerfMark.setEnabled(true);
Storage.resetForTest();
Class<?> clz = transformAndLoad(ClzFooter.class);
Constructor<?> ctor = clz.getDeclaredConstructor();
ctor.setAccessible(true);
ctor.newInstance();
List<Mark> marks = Storage.readForTest();
assertThat(marks).hasSize(4);
for (Mark mark : marks) {
assertNotNull(mark.getMarker());
StackTraceElement element = Internal.getElement(mark.getMarker());
assertThat(element.getClassName()).isEqualTo(ClzFooter.class.getName());
assertThat(element.getMethodName()).isEqualTo("<init>");
assertThat(element.getFileName()).isEqualTo("PerfMarkTransformerTest.java");
assertThat(element.getLineNumber()).isGreaterThan(0);
}
}
@Override
public void writeHeaders(Metadata metadata, byte[] payload) {
PerfMark.startTask("OkHttpClientStream$Sink.writeHeaders");
String defaultPath = "/" + method.getFullMethodName();
if (payload != null) {
useGet = true;
defaultPath += "?" + BaseEncoding.base64().encode(payload);
}
try {
synchronized (state.lock) {
state.streamReady(metadata, defaultPath);
}
} finally {
PerfMark.stopTask("OkHttpClientStream$Sink.writeHeaders");
}
}
private void sendMessage_serverSendsOne_closeOnSecondCall(
MethodDescriptor<Long, Long> method) {
ServerCallImpl<Long, Long> serverCall = new ServerCallImpl<>(
stream,
method,
requestHeaders,
context,
DecompressorRegistry.getDefaultInstance(),
CompressorRegistry.getDefaultInstance(),
serverCallTracer,
PerfMark.createTag());
serverCall.sendHeaders(new Metadata());
serverCall.sendMessage(1L);
verify(stream, times(1)).writeMessage(any(InputStream.class));
verify(stream, never()).close(any(Status.class), any(Metadata.class));
// trying to send a second message causes gRPC to close the underlying stream
serverCall.sendMessage(1L);
verify(stream, times(1)).writeMessage(any(InputStream.class));
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
verify(stream, times(1)).cancel(statusCaptor.capture());
assertEquals(Status.Code.INTERNAL, statusCaptor.getValue().getCode());
assertEquals(ServerCallImpl.TOO_MANY_RESPONSES, statusCaptor.getValue().getDescription());
}
@Override
public void rstStream(int streamId, ErrorCode errorCode) {
logger.logRstStream(OkHttpFrameLogger.Direction.INBOUND, streamId, errorCode);
Status status = toGrpcStatus(errorCode).augmentDescription("Rst Stream");
boolean stopDelivery =
(status.getCode() == Code.CANCELLED || status.getCode() == Code.DEADLINE_EXCEEDED);
synchronized (lock) {
OkHttpClientStream stream = streams.get(streamId);
if (stream != null) {
PerfMark.event("OkHttpClientTransport$ClientFrameHandler.rstStream",
stream.transportState().tag());
finishStream(
streamId, status,
errorCode == ErrorCode.REFUSED_STREAM ? RpcProgress.REFUSED : RpcProgress.PROCESSED,
stopDelivery, null, null);
}
}
}
@Test
public void messageRead_runtimeExceptionCancelsCall() throws Exception {
JumpToApplicationThreadServerStreamListener listener
= new JumpToApplicationThreadServerStreamListener(
executor.getScheduledExecutorService(),
executor.getScheduledExecutorService(),
stream,
Context.ROOT.withCancellation(),
PerfMark.createTag());
ServerStreamListener mockListener = mock(ServerStreamListener.class);
listener.setListener(mockListener);
RuntimeException expectedT = new RuntimeException();
doThrow(expectedT).when(mockListener)
.messagesAvailable(any(StreamListener.MessageProducer.class));
// Closing the InputStream is done by the delegated listener (generally ServerCallImpl)
listener.messagesAvailable(mock(StreamListener.MessageProducer.class));
try {
executor.runDueTasks();
fail("Expected exception");
} catch (RuntimeException t) {
assertSame(expectedT, t);
ensureServerStateNotLeaked();
}
}
protected final void invokeNextStage(final O zuulMesg, final HttpContent chunk) {
if (nextStage != null) {
PerfMark.startTask(getClass().getName(), "invokeNextStageChunk");
try {
addPerfMarkTags(zuulMesg);
nextStage.filter(zuulMesg, chunk);
} finally {
PerfMark.stopTask(getClass().getName(), "invokeNextStageChunk");
}
} else {
//Next stage is Netty channel handler
PerfMark.startTask(getClass().getName(), "fireChannelReadChunk");
try {
addPerfMarkTags(zuulMesg);
getChannelHandlerContext(zuulMesg).fireChannelRead(chunk);
} finally {
PerfMark.stopTask(getClass().getName(), "fireChannelReadChunk");
}
}
}
@Override
public void runOnTransportThread(final Runnable r) {
if (eventLoop.inEventLoop()) {
r.run();
} else {
final Link link = PerfMark.linkOut();
eventLoop.execute(new Runnable() {
@Override
public void run() {
PerfMark.startTask("NettyServerStream$TransportState.runOnTransportThread", tag);
PerfMark.linkIn(link);
try {
r.run();
} finally {
PerfMark.stopTask("NettyServerStream$TransportState.runOnTransportThread", tag);
}
}
});
}
}
@Test
public void messageRead_errorCancelsCall() throws Exception {
JumpToApplicationThreadServerStreamListener listener
= new JumpToApplicationThreadServerStreamListener(
executor.getScheduledExecutorService(),
executor.getScheduledExecutorService(),
stream,
Context.ROOT.withCancellation(),
PerfMark.createTag());
ServerStreamListener mockListener = mock(ServerStreamListener.class);
listener.setListener(mockListener);
TestError expectedT = new TestError();
doThrow(expectedT).when(mockListener)
.messagesAvailable(any(StreamListener.MessageProducer.class));
// Closing the InputStream is done by the delegated listener (generally ServerCallImpl)
listener.messagesAvailable(mock(StreamListener.MessageProducer.class));
try {
executor.runDueTasks();
fail("Expected exception");
} catch (TestError t) {
assertSame(expectedT, t);
ensureServerStateNotLeaked();
}
}
private void onRstStreamRead(int streamId, long errorCode) throws Http2Exception {
try {
NettyServerStream.TransportState stream = serverStream(connection().stream(streamId));
if (stream != null) {
PerfMark.startTask("NettyServerHandler.onRstStreamRead", stream.tag());
try {
stream.transportReportStatus(
Status.CANCELLED.withDescription("RST_STREAM received for code " + errorCode));
} finally {
PerfMark.stopTask("NettyServerHandler.onRstStreamRead", stream.tag());
}
}
} catch (Throwable e) {
logger.log(Level.WARNING, "Exception in onRstStreamRead()", e);
// Throw an exception that will get handled by onStreamError.
throw newStreamException(streamId, e);
}
}
@Override
protected void onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
StreamException http2Ex) {
logger.log(Level.WARNING, "Stream Error", cause);
NettyServerStream.TransportState serverStream = serverStream(
connection().stream(Http2Exception.streamId(http2Ex)));
Tag tag = serverStream != null ? serverStream.tag() : PerfMark.createTag();
PerfMark.startTask("NettyServerHandler.onStreamError", tag);
try {
if (serverStream != null) {
serverStream.transportReportStatus(Utils.statusFromThrowable(cause));
}
// TODO(ejona): Abort the stream by sending headers to help the client with debugging.
// Delegate to the base class to send a RST_STREAM.
super.onStreamError(ctx, outbound, cause, http2Ex);
} finally {
PerfMark.stopTask("NettyServerHandler.onStreamError", tag);
}
}
private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
ChannelPromise promise) throws Exception {
super.close(ctx, promise);
connection().forEachActiveStream(new Http2StreamVisitor() {
@Override
public boolean visit(Http2Stream stream) throws Http2Exception {
NettyServerStream.TransportState serverStream = serverStream(stream);
if (serverStream != null) {
PerfMark.startTask("NettyServerHandler.forcefulClose", serverStream.tag());
PerfMark.linkIn(msg.getLink());
try {
serverStream.transportReportStatus(msg.getStatus());
resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
} finally {
PerfMark.stopTask("NettyServerHandler.forcefulClose", serverStream.tag());
}
}
stream.close();
return true;
}
});
}
@Test
public void onReady_runtimeExceptionCancelsCall() {
JumpToApplicationThreadServerStreamListener listener
= new JumpToApplicationThreadServerStreamListener(
executor.getScheduledExecutorService(),
executor.getScheduledExecutorService(),
stream,
Context.ROOT.withCancellation(),
PerfMark.createTag());
ServerStreamListener mockListener = mock(ServerStreamListener.class);
listener.setListener(mockListener);
RuntimeException expectedT = new RuntimeException();
doThrow(expectedT).when(mockListener).onReady();
listener.onReady();
try {
executor.runDueTasks();
fail("Expected exception");
} catch (RuntimeException t) {
assertSame(expectedT, t);
ensureServerStateNotLeaked();
}
}
@Test
public void onReady_errorCancelsCall() {
JumpToApplicationThreadServerStreamListener listener
= new JumpToApplicationThreadServerStreamListener(
executor.getScheduledExecutorService(),
executor.getScheduledExecutorService(),
stream,
Context.ROOT.withCancellation(),
PerfMark.createTag());
ServerStreamListener mockListener = mock(ServerStreamListener.class);
listener.setListener(mockListener);
TestError expectedT = new TestError();
doThrow(expectedT).when(mockListener).onReady();
listener.onReady();
try {
executor.runDueTasks();
fail("Expected exception");
} catch (TestError t) {
assertSame(expectedT, t);
ensureServerStateNotLeaked();
}
}
/**
* Cancels this stream.
*/
private void cancelStream(ChannelHandlerContext ctx, CancelClientStreamCommand cmd,
ChannelPromise promise) {
NettyClientStream.TransportState stream = cmd.stream();
PerfMark.startTask("NettyClientHandler.cancelStream", stream.tag());
PerfMark.linkIn(cmd.getLink());
try {
Status reason = cmd.reason();
if (reason != null) {
stream.transportReportStatus(reason, true, new Metadata());
}
if (!cmd.stream().isNonExistent()) {
encoder().writeRstStream(ctx, stream.id(), Http2Error.CANCEL.code(), promise);
} else {
promise.setSuccess();
}
} finally {
PerfMark.stopTask("NettyClientHandler.cancelStream", stream.tag());
}
}
@Override
public InputStream next() {
while (true) {
InputStream is = appListener.messageReadQueuePoll();
if (is != null) {
return is;
}
Op op;
synchronized (lock) {
op = opQueue.poll();
if (op == null) {
if (deframer.hasPendingDeliveries()) {
PerfMark.event("MigratingThreadDeframer.deframerOnTransportThread");
migratingListener.setDelegate(transportListener);
deframerOnTransportThread = true;
}
messageProducerEnqueued = false;
return null;
}
}
op.run(/*isDeframerOnTransportThread=*/false);
}
}
private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
ChannelPromise promise) throws Exception {
// close() already called by NettyClientTransport, so just need to clean up streams
connection().forEachActiveStream(new Http2StreamVisitor() {
@Override
public boolean visit(Http2Stream stream) throws Http2Exception {
NettyClientStream.TransportState clientStream = clientStream(stream);
Tag tag = clientStream != null ? clientStream.tag() : PerfMark.createTag();
PerfMark.startTask("NettyClientHandler.forcefulClose", tag);
PerfMark.linkIn(msg.getLink());
try {
if (clientStream != null) {
clientStream.transportReportStatus(msg.getStatus(), true, new Metadata());
resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
}
stream.close();
return true;
} finally {
PerfMark.stopTask("NettyClientHandler.forcefulClose", tag);
}
}
});
promise.setSuccess();
}
@Override
public void onNext(O outMesg) {
boolean stopped = false;
PerfMark.startTask(filter.filterName(), "onNextAsync");
try {
PerfMark.linkIn(onNextLinkOut.get());
addPerfMarkTags(inMesg);
recordFilterCompletion(SUCCESS, filter, startTime, inMesg, snapshot);
if (outMesg == null) {
outMesg = filter.getDefaultOutput(inMesg);
}
stopped = true;
PerfMark.stopTask(filter.filterName(), "onNextAsync");
resumeInBindingContext(outMesg, filter.filterName());
}
catch (Exception e) {
decrementConcurrency();
handleException(inMesg, filter.filterName(), e);
} finally {
if (!stopped) {
PerfMark.stopTask(filter.filterName(), "onNextAsync");
}
}
}
protected final void invokeNextStage(final O zuulMesg) {
if (nextStage != null) {
PerfMark.startTask(getClass().getName(), "invokeNextStage");
try {
addPerfMarkTags(zuulMesg);
nextStage.filter(zuulMesg);
} finally {
PerfMark.stopTask(getClass().getName(), "invokeNextStage");
}
} else {
//Next stage is Netty channel handler
PerfMark.startTask(getClass().getName(), "fireChannelRead");
try {
addPerfMarkTags(zuulMesg);
getChannelHandlerContext(zuulMesg).fireChannelRead(zuulMesg);
} finally {
PerfMark.stopTask(getClass().getName(), "fireChannelRead");
}
}
}
private void serverSendsOne_okFailsOnMissingResponse(
MethodDescriptor<Long, Long> method) {
ServerCallImpl<Long, Long> serverCall = new ServerCallImpl<>(
stream,
method,
requestHeaders,
context,
DecompressorRegistry.getDefaultInstance(),
CompressorRegistry.getDefaultInstance(),
serverCallTracer,
PerfMark.createTag());
serverCall.close(Status.OK, new Metadata());
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
verify(stream, times(1)).cancel(statusCaptor.capture());
assertEquals(Status.Code.INTERNAL, statusCaptor.getValue().getCode());
assertEquals(ServerCallImpl.MISSING_RESPONSE, statusCaptor.getValue().getDescription());
}
@Test
public void halfClosed_runtimeExceptionCancelsCall() {
JumpToApplicationThreadServerStreamListener listener
= new JumpToApplicationThreadServerStreamListener(
executor.getScheduledExecutorService(),
executor.getScheduledExecutorService(),
stream,
Context.ROOT.withCancellation(),
PerfMark.createTag());
ServerStreamListener mockListener = mock(ServerStreamListener.class);
listener.setListener(mockListener);
RuntimeException expectedT = new RuntimeException();
doThrow(expectedT).when(mockListener).halfClosed();
listener.halfClosed();
try {
executor.runDueTasks();
fail("Expected exception");
} catch (RuntimeException t) {
assertSame(expectedT, t);
ensureServerStateNotLeaked();
}
}
private void requestFromTransportThread(final int numMessages) {
class RequestAgainOp implements Op {
@Override public void run(boolean isDeframerOnTransportThread) {
if (!isDeframerOnTransportThread) {
// State changed. Go back and try again
request(numMessages);
return;
}
try {
deframer.request(numMessages);
} catch (Throwable t) {
appListener.deframeFailed(t);
deframer.close(); // unrecoverable state
}
if (!deframer.hasPendingDeliveries()) {
synchronized (lock) {
PerfMark.event("MigratingThreadDeframer.deframerOnApplicationThread");
migratingListener.setDelegate(appListener);
deframerOnTransportThread = false;
}
}
}
}
runWhereAppropriate(new RequestAgainOp());
}
@Test
public void transform_lambda() throws Exception {
PerfMark.setEnabled(true);
Storage.resetForTest();
final class ClzLocal implements Executor {
public ClzLocal() {
execute(
() -> {
PerfMark.startTask("task");
PerfMark.stopTask("task");
});
}
@Override
public void execute(Runnable command) {
command.run();
}
}
Class<?> clz = transformAndLoad(ClzLocal.class);
Constructor<?> ctor = clz.getConstructor(PerfMarkTransformerTest.class);
ctor.setAccessible(true);
ctor.newInstance(this);
List<Mark> marks = Storage.readForTest();
assertThat(marks).hasSize(2);
for (Mark mark : marks) {
assertNotNull(mark.getMarker());
StackTraceElement element = Internal.getElement(mark.getMarker());
assertThat(element.getClassName()).isEqualTo(ClzLocal.class.getName());
assertThat(element.getMethodName()).isEqualTo("lambda$new$0");
assertThat(element.getFileName()).isEqualTo("PerfMarkTransformerTest.java");
assertThat(element.getLineNumber()).isGreaterThan(0);
}
}
@Test
public void transform_interface() throws Exception {
PerfMark.setEnabled(true);
Storage.resetForTest();
final class Bar implements InterfaceWithDefaults {
public Bar() {
record();
}
}
Class<?> clz = transformAndLoad(Bar.class);
Constructor<?> ctor = clz.getConstructor(PerfMarkTransformerTest.class);
ctor.setAccessible(true);
ctor.newInstance(this);
List<Mark> marks = Storage.readForTest();
assertThat(marks).hasSize(2);
for (Mark mark : marks) {
assertNotNull(mark.getMarker());
StackTraceElement element = Internal.getElement(mark.getMarker());
assertThat(element.getClassName()).isEqualTo(InterfaceWithDefaults.class.getName());
assertThat(element.getMethodName()).isEqualTo("record");
assertThat(element.getFileName()).isEqualTo("PerfMarkTransformerTest.java");
assertThat(element.getLineNumber()).isGreaterThan(0);
}
}
@Test
public void transform_link() throws Exception {
PerfMark.setEnabled(true);
Storage.resetForTest();
final class ClzLocal {
public ClzLocal() {
PerfMark.startTask("task");
Link link = PerfMark.linkOut();
PerfMark.linkIn(link);
PerfMark.stopTask("task");
}
}
Class<?> clz = transformAndLoad(ClzLocal.class);
Constructor<?> ctor = clz.getConstructor(PerfMarkTransformerTest.class);
ctor.setAccessible(true);
ctor.newInstance(this);
List<Mark> marks = Storage.readForTest();
assertThat(marks).hasSize(4);
for (Mark mark : marks) {
assertNotNull(mark.getMarker());
StackTraceElement element = Internal.getElement(mark.getMarker());
assertThat(element.getClassName()).isEqualTo(ClzLocal.class.getName());
assertThat(element.getMethodName()).isEqualTo("<init>");
assertThat(element.getFileName()).isEqualTo("PerfMarkTransformerTest.java");
assertThat(element.getLineNumber()).isGreaterThan(0);
}
}
@Test
public void transform_ctor() throws Exception {
PerfMark.setEnabled(true);
Storage.resetForTest();
final class ClzLocal {
public ClzLocal() {
Tag tag = PerfMark.createTag("tag", 1);
PerfMark.startTask("task");
PerfMark.stopTask("task");
PerfMark.startTask("task", tag);
PerfMark.stopTask("task", tag);
}
}
Class<?> clz = transformAndLoad(ClzLocal.class);
Constructor<?> ctor = clz.getConstructor(PerfMarkTransformerTest.class);
ctor.setAccessible(true);
ctor.newInstance(this);
List<Mark> marks = Storage.readForTest();
assertThat(marks).hasSize(4);
for (Mark mark : marks) {
assertNotNull(mark.getMarker());
StackTraceElement element = Internal.getElement(mark.getMarker());
assertThat(element.getClassName()).isEqualTo(ClzLocal.class.getName());
assertThat(element.getMethodName()).isEqualTo("<init>");
assertThat(element.getFileName()).isEqualTo("PerfMarkTransformerTest.java");
assertThat(element.getLineNumber()).isGreaterThan(0);
}
}
@Test
public void transform_init() throws Exception {
PerfMark.setEnabled(true);
Storage.resetForTest();
final class ClzLocal {
{
Tag tag = PerfMark.createTag("tag", 1);
PerfMark.startTask("task");
PerfMark.stopTask("task");
PerfMark.startTask("task", tag);
PerfMark.stopTask("task", tag);
}
}
Class<?> clz = transformAndLoad(ClzLocal.class);
Constructor<?> ctor = clz.getDeclaredConstructor(PerfMarkTransformerTest.class);
ctor.setAccessible(true);
ctor.newInstance(this);
List<Mark> marks = Storage.readForTest();
assertThat(marks).hasSize(4);
for (Mark mark : marks) {
assertNotNull(mark.getMarker());
StackTraceElement element = Internal.getElement(mark.getMarker());
assertThat(element.getClassName()).isEqualTo(ClzLocal.class.getName());
assertThat(element.getMethodName()).isEqualTo("<init>");
assertThat(element.getFileName()).isEqualTo("PerfMarkTransformerTest.java");
assertThat(element.getLineNumber()).isGreaterThan(0);
}
}
public static <T> T recordTaskResult(String taskName, Tag tag, Supplier<T> cmd) {
PerfMark.startTask(taskName, tag);
try {
return cmd.get();
} finally {
PerfMark.stopTask(taskName, tag);
}
}