下面列出了怎么用io.grpc.internal.NoopClientCall的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void wrapChannel_methodDescriptor() throws Exception {
final AtomicReference<MethodDescriptor<?, ?>> methodRef =
new AtomicReference<MethodDescriptor<?, ?>>();
Channel channel = new Channel() {
@Override
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
MethodDescriptor<RequestT, ResponseT> method, CallOptions callOptions) {
methodRef.set(method);
return new NoopClientCall<RequestT, ResponseT>();
}
@Override
public String authority() {
throw new UnsupportedOperationException();
}
};
Channel wChannel = binlogProvider.wrapChannel(channel);
ClientCall<String, Integer> unusedClientCall = wChannel.newCall(method, CallOptions.DEFAULT);
validateWrappedMethod(methodRef.get());
}
@Test
public void unaryBlockingCallSuccess() throws Exception {
Integer req = 2;
final String resp = "bar";
final Status status = Status.OK;
final Metadata trailers = new Metadata();
NoopClientCall<Integer, String> call = new NoopClientCall<Integer, String>() {
@Override
public void start(ClientCall.Listener<String> listener, Metadata headers) {
listener.onMessage(resp);
listener.onClose(status, trailers);
}
};
String actualResponse = ClientCalls.blockingUnaryCall(call, req);
assertEquals(resp, actualResponse);
}
@Test
public void unaryBlockingCallFailed() throws Exception {
Integer req = 2;
final Status status = Status.INTERNAL.withDescription("Unique status");
final Metadata trailers = new Metadata();
NoopClientCall<Integer, String> call = new NoopClientCall<Integer, String>() {
@Override
public void start(io.grpc.ClientCall.Listener<String> listener, Metadata headers) {
listener.onClose(status, trailers);
}
};
try {
ClientCalls.blockingUnaryCall(call, req);
fail("Should fail");
} catch (StatusRuntimeException e) {
assertSame(status, e.getStatus());
assertSame(trailers, e.getTrailers());
}
}
@Test
public void unaryFutureCallFailed() throws Exception {
final AtomicReference<ClientCall.Listener<String>> listener =
new AtomicReference<ClientCall.Listener<String>>();
NoopClientCall<Integer, String> call = new NoopClientCall<Integer, String>() {
@Override
public void start(io.grpc.ClientCall.Listener<String> responseListener, Metadata headers) {
listener.set(responseListener);
}
};
Integer req = 2;
ListenableFuture<String> future = ClientCalls.futureUnaryCall(call, req);
Metadata trailers = new Metadata();
listener.get().onClose(Status.INTERNAL, trailers);
try {
future.get();
fail("Should fail");
} catch (ExecutionException e) {
Status status = Status.fromThrowable(e);
assertEquals(Status.INTERNAL, status);
Metadata metadata = Status.trailersFromThrowable(e);
assertSame(trailers, metadata);
}
}
@Test
public void cannotSetOnReadyAfterCallStarted() throws Exception {
NoopClientCall<Integer, String> call = new NoopClientCall<Integer, String>();
CallStreamObserver<Integer> callStreamObserver =
(CallStreamObserver<Integer>) ClientCalls.asyncClientStreamingCall(call,
new NoopStreamObserver<String>());
Runnable noOpRunnable = new Runnable() {
@Override
public void run() {
}
};
try {
callStreamObserver.setOnReadyHandler(noOpRunnable);
fail("Should not be able to set handler after call started");
} catch (IllegalStateException ise) {
// expected
}
}
@Test
public void blockingResponseStreamFailed() throws Exception {
final AtomicReference<ClientCall.Listener<String>> listener =
new AtomicReference<ClientCall.Listener<String>>();
NoopClientCall<Integer, String> call = new NoopClientCall<Integer, String>() {
@Override
public void start(io.grpc.ClientCall.Listener<String> responseListener, Metadata headers) {
listener.set(responseListener);
}
};
Integer req = 2;
Iterator<String> iter = ClientCalls.blockingServerStreamingCall(call, req);
Metadata trailers = new Metadata();
listener.get().onClose(Status.INTERNAL, trailers);
try {
iter.next();
fail("Should fail");
} catch (Exception e) {
Status status = Status.fromThrowable(e);
assertEquals(Status.INTERNAL, status);
Metadata metadata = Status.trailersFromThrowable(e);
assertSame(trailers, metadata);
}
}
@Test
public void wrapChannel_methodDescriptor() throws Exception {
final AtomicReference<MethodDescriptor<?, ?>> methodRef =
new AtomicReference<>();
Channel channel = new Channel() {
@Override
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
MethodDescriptor<RequestT, ResponseT> method, CallOptions callOptions) {
methodRef.set(method);
return new NoopClientCall<>();
}
@Override
public String authority() {
throw new UnsupportedOperationException();
}
};
Channel wChannel = binlogProvider.wrapChannel(channel);
ClientCall<String, Integer> unusedClientCall = wChannel.newCall(method, CallOptions.DEFAULT);
validateWrappedMethod(methodRef.get());
}
@Test
public void unaryBlockingCallSuccess() throws Exception {
Integer req = 2;
final String resp = "bar";
final Status status = Status.OK;
final Metadata trailers = new Metadata();
NoopClientCall<Integer, String> call = new NoopClientCall<Integer, String>() {
@Override
public void start(ClientCall.Listener<String> listener, Metadata headers) {
listener.onMessage(resp);
listener.onClose(status, trailers);
}
};
String actualResponse = ClientCalls.blockingUnaryCall(call, req);
assertEquals(resp, actualResponse);
}
@Test
public void unaryBlockingCallFailed() throws Exception {
Integer req = 2;
final Status status = Status.INTERNAL.withDescription("Unique status");
final Metadata trailers = new Metadata();
NoopClientCall<Integer, String> call = new NoopClientCall<Integer, String>() {
@Override
public void start(io.grpc.ClientCall.Listener<String> listener, Metadata headers) {
listener.onClose(status, trailers);
}
};
try {
ClientCalls.blockingUnaryCall(call, req);
fail("Should fail");
} catch (StatusRuntimeException e) {
assertSame(status, e.getStatus());
assertSame(trailers, e.getTrailers());
}
}
@Test
public void blockingUnaryCall_HasBlockingStubType() {
NoopClientCall<Integer, Integer> call = new NoopClientCall<Integer, Integer>() {
@Override
public void start(io.grpc.ClientCall.Listener<Integer> listener, Metadata headers) {
listener.onMessage(1);
listener.onClose(Status.OK, new Metadata());
}
};
when(mockChannel.newCall(
ArgumentMatchers.<MethodDescriptor<Integer, Integer>>any(), any(CallOptions.class)))
.thenReturn(call);
Integer unused =
ClientCalls.blockingUnaryCall(mockChannel, UNARY_METHOD, CallOptions.DEFAULT, 1);
verify(mockChannel).newCall(methodDescriptorCaptor.capture(), callOptionsCaptor.capture());
CallOptions capturedCallOption = callOptionsCaptor.getValue();
assertThat(capturedCallOption.getOption(ClientCalls.STUB_TYPE_OPTION))
.isEquivalentAccordingToCompareTo(StubType.BLOCKING);
}
@Test
public void blockingServerStreamingCall_HasBlockingStubType() {
NoopClientCall<Integer, Integer> call = new NoopClientCall<Integer, Integer>() {
@Override
public void start(io.grpc.ClientCall.Listener<Integer> listener, Metadata headers) {
listener.onMessage(1);
listener.onClose(Status.OK, new Metadata());
}
};
when(mockChannel.newCall(
ArgumentMatchers.<MethodDescriptor<Integer, Integer>>any(), any(CallOptions.class)))
.thenReturn(call);
Iterator<Integer> unused =
ClientCalls.blockingServerStreamingCall(mockChannel, UNARY_METHOD, CallOptions.DEFAULT, 1);
verify(mockChannel).newCall(methodDescriptorCaptor.capture(), callOptionsCaptor.capture());
CallOptions capturedCallOption = callOptionsCaptor.getValue();
assertThat(capturedCallOption.getOption(ClientCalls.STUB_TYPE_OPTION))
.isEquivalentAccordingToCompareTo(StubType.BLOCKING);
}
@Test
public void unaryFutureCallFailed() throws Exception {
final AtomicReference<ClientCall.Listener<String>> listener =
new AtomicReference<>();
NoopClientCall<Integer, String> call = new NoopClientCall<Integer, String>() {
@Override
public void start(io.grpc.ClientCall.Listener<String> responseListener, Metadata headers) {
listener.set(responseListener);
}
};
Integer req = 2;
ListenableFuture<String> future = ClientCalls.futureUnaryCall(call, req);
Metadata trailers = new Metadata();
listener.get().onClose(Status.INTERNAL, trailers);
try {
future.get();
fail("Should fail");
} catch (ExecutionException e) {
Status status = Status.fromThrowable(e);
assertEquals(Status.INTERNAL, status);
Metadata metadata = Status.trailersFromThrowable(e);
assertSame(trailers, metadata);
}
}
@Test
public void cannotSetOnReadyAfterCallStarted() throws Exception {
NoopClientCall<Integer, String> call = new NoopClientCall<>();
CallStreamObserver<Integer> callStreamObserver =
(CallStreamObserver<Integer>) ClientCalls.asyncClientStreamingCall(call,
new NoopStreamObserver<String>());
Runnable noOpRunnable = new Runnable() {
@Override
public void run() {
}
};
try {
callStreamObserver.setOnReadyHandler(noOpRunnable);
fail("Should not be able to set handler after call started");
} catch (IllegalStateException ise) {
// expected
}
}
@Test
public void blockingResponseStreamFailed() throws Exception {
final AtomicReference<ClientCall.Listener<String>> listener =
new AtomicReference<>();
NoopClientCall<Integer, String> call = new NoopClientCall<Integer, String>() {
@Override
public void start(io.grpc.ClientCall.Listener<String> responseListener, Metadata headers) {
listener.set(responseListener);
}
};
Integer req = 2;
Iterator<String> iter = ClientCalls.blockingServerStreamingCall(call, req);
Metadata trailers = new Metadata();
listener.get().onClose(Status.INTERNAL, trailers);
try {
iter.next();
fail("Should fail");
} catch (Exception e) {
Status status = Status.fromThrowable(e);
assertEquals(Status.INTERNAL, status);
Metadata metadata = Status.trailersFromThrowable(e);
assertSame(trailers, metadata);
}
}
/**
* Sets up mocks.
*/
@Before public void setUp() {
MockitoAnnotations.initMocks(this);
ClientCall<SimpleRequest, SimpleResponse> call =
new NoopClientCall<SimpleRequest, SimpleResponse>();
when(channel.newCall(
Mockito.<MethodDescriptor<SimpleRequest, SimpleResponse>>any(), any(CallOptions.class)))
.thenReturn(call);
}
@Test
public void unaryFutureCallSuccess() throws Exception {
final AtomicReference<ClientCall.Listener<String>> listener =
new AtomicReference<ClientCall.Listener<String>>();
final AtomicReference<Integer> message = new AtomicReference<Integer>();
final AtomicReference<Boolean> halfClosed = new AtomicReference<Boolean>();
NoopClientCall<Integer, String> call = new NoopClientCall<Integer, String>() {
@Override
public void start(io.grpc.ClientCall.Listener<String> responseListener, Metadata headers) {
listener.set(responseListener);
}
@Override
public void sendMessage(Integer msg) {
message.set(msg);
}
@Override
public void halfClose() {
halfClosed.set(true);
}
};
Integer req = 2;
ListenableFuture<String> future = ClientCalls.futureUnaryCall(call, req);
assertEquals(req, message.get());
assertTrue(halfClosed.get());
listener.get().onMessage("bar");
listener.get().onClose(Status.OK, new Metadata());
assertEquals("bar", future.get());
}
@Test
public void unaryFutureCallCancelled() throws Exception {
final AtomicReference<ClientCall.Listener<String>> listener =
new AtomicReference<ClientCall.Listener<String>>();
final AtomicReference<String> cancelMessage = new AtomicReference<String>();
final AtomicReference<Throwable> cancelCause = new AtomicReference<Throwable>();
NoopClientCall<Integer, String> call = new NoopClientCall<Integer, String>() {
@Override
public void start(io.grpc.ClientCall.Listener<String> responseListener, Metadata headers) {
listener.set(responseListener);
}
@Override
public void cancel(String message, Throwable cause) {
cancelMessage.set(message);
cancelCause.set(cause);
}
};
Integer req = 2;
ListenableFuture<String> future = ClientCalls.futureUnaryCall(call, req);
future.cancel(true);
assertEquals("GrpcFuture was cancelled", cancelMessage.get());
assertNull(cancelCause.get());
listener.get().onMessage("bar");
listener.get().onClose(Status.OK, new Metadata());
try {
future.get();
fail("Should fail");
} catch (CancellationException e) {
// Exepcted
}
}
/**
* Sets up mocks.
*/
@Before public void setUp() {
MockitoAnnotations.initMocks(this);
ClientCall<SimpleRequest, SimpleResponse> call =
new NoopClientCall<>();
when(channel.newCall(
ArgumentMatchers.<MethodDescriptor<SimpleRequest, SimpleResponse>>any(),
any(CallOptions.class)))
.thenReturn(call);
}
@Test
public void unaryFutureCallSuccess() throws Exception {
final AtomicReference<ClientCall.Listener<String>> listener =
new AtomicReference<>();
final AtomicReference<Integer> message = new AtomicReference<>();
final AtomicReference<Boolean> halfClosed = new AtomicReference<>();
NoopClientCall<Integer, String> call = new NoopClientCall<Integer, String>() {
@Override
public void start(io.grpc.ClientCall.Listener<String> responseListener, Metadata headers) {
listener.set(responseListener);
}
@Override
public void sendMessage(Integer msg) {
message.set(msg);
}
@Override
public void halfClose() {
halfClosed.set(true);
}
};
Integer req = 2;
ListenableFuture<String> future = ClientCalls.futureUnaryCall(call, req);
assertEquals(req, message.get());
assertTrue(halfClosed.get());
listener.get().onMessage("bar");
listener.get().onClose(Status.OK, new Metadata());
assertEquals("bar", future.get());
}
@Test
public void unaryFutureCallCancelled() throws Exception {
final AtomicReference<ClientCall.Listener<String>> listener =
new AtomicReference<>();
final AtomicReference<String> cancelMessage = new AtomicReference<>();
final AtomicReference<Throwable> cancelCause = new AtomicReference<>();
NoopClientCall<Integer, String> call = new NoopClientCall<Integer, String>() {
@Override
public void start(io.grpc.ClientCall.Listener<String> responseListener, Metadata headers) {
listener.set(responseListener);
}
@Override
public void cancel(String message, Throwable cause) {
cancelMessage.set(message);
cancelCause.set(cause);
}
};
Integer req = 2;
ListenableFuture<String> future = ClientCalls.futureUnaryCall(call, req);
future.cancel(true);
assertEquals("GrpcFuture was cancelled", cancelMessage.get());
assertNull(cancelCause.get());
listener.get().onMessage("bar");
listener.get().onClose(Status.OK, new Metadata());
try {
future.get();
fail("Should fail");
} catch (CancellationException e) {
// Exepcted
}
}
@Test
@SuppressWarnings({"unchecked"})
public void clientDeadlineLogged_deadlineSetViaCallOption() {
MethodDescriptor<byte[], byte[]> method =
MethodDescriptor.<byte[], byte[]>newBuilder()
.setType(MethodType.UNKNOWN)
.setFullMethodName("service/method")
.setRequestMarshaller(BYTEARRAY_MARSHALLER)
.setResponseMarshaller(BYTEARRAY_MARSHALLER)
.build();
ClientCall.Listener<byte[]> mockListener = mock(ClientCall.Listener.class);
ClientCall<byte[], byte[]> call =
new BinlogHelper(mockSinkWriter)
.getClientInterceptor(CALL_ID)
.interceptCall(
method,
CallOptions.DEFAULT.withDeadlineAfter(1, TimeUnit.SECONDS),
new Channel() {
@Override
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
MethodDescriptor<RequestT, ResponseT> methodDescriptor,
CallOptions callOptions) {
return new NoopClientCall<RequestT, ResponseT>();
}
@Override
public String authority() {
return null;
}
});
call.start(mockListener, new Metadata());
ArgumentCaptor<Duration> callOptTimeoutCaptor = ArgumentCaptor.forClass(Duration.class);
verify(mockSinkWriter).logClientHeader(
any(Integer.class),
any(String.class),
any(String.class),
callOptTimeoutCaptor.capture(),
any(Metadata.class),
any(GrpcLogEntry.Logger.class),
any(Long.class),
any(SocketAddress.class));
Duration timeout = callOptTimeoutCaptor.getValue();
assertThat(TimeUnit.SECONDS.toNanos(1) - Durations.toNanos(timeout))
.isAtMost(TimeUnit.MILLISECONDS.toNanos(250));
}
@Test
@SuppressWarnings({"unchecked"})
public void clientDeadlineLogged_deadlineSetViaContext() throws Exception {
// important: deadline is read from the ctx where call was created
final SettableFuture<ClientCall<byte[], byte[]>> callFuture = SettableFuture.create();
Context.current()
.withDeadline(
Deadline.after(1, TimeUnit.SECONDS), Executors.newSingleThreadScheduledExecutor())
.run(new Runnable() {
@Override
public void run() {
MethodDescriptor<byte[], byte[]> method =
MethodDescriptor.<byte[], byte[]>newBuilder()
.setType(MethodType.UNKNOWN)
.setFullMethodName("service/method")
.setRequestMarshaller(BYTEARRAY_MARSHALLER)
.setResponseMarshaller(BYTEARRAY_MARSHALLER)
.build();
callFuture.set(new BinlogHelper(mockSinkWriter)
.getClientInterceptor(CALL_ID)
.interceptCall(
method,
CallOptions.DEFAULT.withDeadlineAfter(1, TimeUnit.SECONDS),
new Channel() {
@Override
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
MethodDescriptor<RequestT, ResponseT> methodDescriptor,
CallOptions callOptions) {
return new NoopClientCall<RequestT, ResponseT>();
}
@Override
public String authority() {
return null;
}
}));
}
});
ClientCall.Listener<byte[]> mockListener = mock(ClientCall.Listener.class);
callFuture.get().start(mockListener, new Metadata());
ArgumentCaptor<Duration> callOptTimeoutCaptor = ArgumentCaptor.forClass(Duration.class);
verify(mockSinkWriter).logClientHeader(
any(Integer.class),
any(String.class),
any(String.class),
callOptTimeoutCaptor.capture(),
any(Metadata.class),
any(GrpcLogEntry.Logger.class),
any(Long.class),
any(SocketAddress.class));
Duration timeout = callOptTimeoutCaptor.getValue();
assertThat(TimeUnit.SECONDS.toNanos(1) - Durations.toNanos(timeout))
.isAtMost(TimeUnit.MILLISECONDS.toNanos(250));
}
@Test
@SuppressWarnings({"rawtypes", "unchecked"})
public void clientInterceptor_trailersOnlyResponseLogsPeerAddress() throws Exception {
final AtomicReference<ClientCall.Listener> interceptedListener =
new AtomicReference<ClientCall.Listener>();
// capture these manually because ClientCall can not be mocked
final AtomicReference<Metadata> actualClientInitial = new AtomicReference<Metadata>();
final AtomicReference<Object> actualRequest = new AtomicReference<Object>();
Channel channel = new Channel() {
@Override
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) {
return new NoopClientCall<RequestT, ResponseT>() {
@Override
public void start(Listener<ResponseT> responseListener, Metadata headers) {
interceptedListener.set(responseListener);
actualClientInitial.set(headers);
}
@Override
public void sendMessage(RequestT message) {
actualRequest.set(message);
}
@Override
public Attributes getAttributes() {
return Attributes.newBuilder().set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer).build();
}
};
}
@Override
public String authority() {
return "the-authority";
}
};
ClientCall.Listener<byte[]> mockListener = mock(ClientCall.Listener.class);
MethodDescriptor<byte[], byte[]> method =
MethodDescriptor.<byte[], byte[]>newBuilder()
.setType(MethodType.UNKNOWN)
.setFullMethodName("service/method")
.setRequestMarshaller(BYTEARRAY_MARSHALLER)
.setResponseMarshaller(BYTEARRAY_MARSHALLER)
.build();
ClientCall<byte[], byte[]> interceptedCall =
new BinlogHelper(mockSinkWriter)
.getClientInterceptor(CALL_ID)
.interceptCall(
method,
CallOptions.DEFAULT.withDeadlineAfter(1, TimeUnit.SECONDS),
channel);
Metadata clientInitial = new Metadata();
interceptedCall.start(mockListener, clientInitial);
verify(mockSinkWriter).logClientHeader(
/*seq=*/ eq(1L),
any(String.class),
any(String.class),
any(Duration.class),
any(Metadata.class),
eq(Logger.LOGGER_CLIENT),
eq(CALL_ID),
isNull(SocketAddress.class));
verifyNoMoreInteractions(mockSinkWriter);
// trailer only response
{
Status status = Status.INTERNAL.withDescription("some description");
Metadata trailers = new Metadata();
interceptedListener.get().onClose(status, trailers);
verify(mockSinkWriter).logTrailer(
/*seq=*/ eq(2L),
same(status),
same(trailers),
eq(Logger.LOGGER_CLIENT),
eq(CALL_ID),
same(peer));
verifyNoMoreInteractions(mockSinkWriter);
verify(mockListener).onClose(same(status), same(trailers));
}
}
@Test
public void disablingInboundAutoFlowControlSuppressesRequestsForMoreMessages()
throws Exception {
final AtomicReference<ClientCall.Listener<String>> listener =
new AtomicReference<ClientCall.Listener<String>>();
final List<Integer> requests = new ArrayList<>();
NoopClientCall<Integer, String> call = new NoopClientCall<Integer, String>() {
@Override
public void start(io.grpc.ClientCall.Listener<String> responseListener, Metadata headers) {
listener.set(responseListener);
}
@Override
public void request(int numMessages) {
requests.add(numMessages);
}
};
ClientCalls.asyncBidiStreamingCall(call, new ClientResponseObserver<Integer, String>() {
@Override
public void beforeStart(ClientCallStreamObserver<Integer> requestStream) {
requestStream.disableAutoInboundFlowControl();
}
@Override
public void onNext(String value) {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
}
});
listener.get().onMessage("message");
assertThat(requests).containsExactly(1);
}
@Test
public void callStreamObserverPropagatesFlowControlRequestsToCall()
throws Exception {
ClientResponseObserver<Integer, String> responseObserver =
new ClientResponseObserver<Integer, String>() {
@Override
public void beforeStart(ClientCallStreamObserver<Integer> requestStream) {
requestStream.disableAutoInboundFlowControl();
}
@Override
public void onNext(String value) {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
}
};
final AtomicReference<ClientCall.Listener<String>> listener =
new AtomicReference<ClientCall.Listener<String>>();
final List<Integer> requests = new ArrayList<>();
NoopClientCall<Integer, String> call = new NoopClientCall<Integer, String>() {
@Override
public void start(io.grpc.ClientCall.Listener<String> responseListener, Metadata headers) {
listener.set(responseListener);
}
@Override
public void request(int numMessages) {
requests.add(numMessages);
}
};
CallStreamObserver<Integer> requestObserver =
(CallStreamObserver<Integer>)
ClientCalls.asyncBidiStreamingCall(call, responseObserver);
listener.get().onMessage("message");
requestObserver.request(5);
assertThat(requests).contains(5);
}
@Test
public void canCaptureInboundFlowControlForServerStreamingObserver()
throws Exception {
ClientResponseObserver<Integer, String> responseObserver =
new ClientResponseObserver<Integer, String>() {
@Override
public void beforeStart(ClientCallStreamObserver<Integer> requestStream) {
requestStream.disableAutoInboundFlowControl();
requestStream.request(5);
}
@Override
public void onNext(String value) {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
}
};
final AtomicReference<ClientCall.Listener<String>> listener =
new AtomicReference<ClientCall.Listener<String>>();
final List<Integer> requests = new ArrayList<>();
NoopClientCall<Integer, String> call = new NoopClientCall<Integer, String>() {
@Override
public void start(io.grpc.ClientCall.Listener<String> responseListener, Metadata headers) {
listener.set(responseListener);
}
@Override
public void request(int numMessages) {
requests.add(numMessages);
}
};
ClientCalls.asyncServerStreamingCall(call, 1, responseObserver);
listener.get().onMessage("message");
assertThat(requests).containsExactly(5, 1).inOrder();
}
@Test
@SuppressWarnings({"unchecked"})
public void clientDeadlineLogged_deadlineSetViaCallOption() {
MethodDescriptor<byte[], byte[]> method =
MethodDescriptor.<byte[], byte[]>newBuilder()
.setType(MethodType.UNKNOWN)
.setFullMethodName("service/method")
.setRequestMarshaller(BYTEARRAY_MARSHALLER)
.setResponseMarshaller(BYTEARRAY_MARSHALLER)
.build();
ClientCall.Listener<byte[]> mockListener = mock(ClientCall.Listener.class);
ClientCall<byte[], byte[]> call =
new BinlogHelper(mockSinkWriter)
.getClientInterceptor(CALL_ID)
.interceptCall(
method,
CallOptions.DEFAULT.withDeadlineAfter(1, TimeUnit.SECONDS),
new Channel() {
@Override
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
MethodDescriptor<RequestT, ResponseT> methodDescriptor,
CallOptions callOptions) {
return new NoopClientCall<>();
}
@Override
public String authority() {
return null;
}
});
call.start(mockListener, new Metadata());
ArgumentCaptor<Duration> callOptTimeoutCaptor = ArgumentCaptor.forClass(Duration.class);
verify(mockSinkWriter)
.logClientHeader(
anyLong(),
AdditionalMatchers.or(ArgumentMatchers.<String>isNull(), anyString()),
AdditionalMatchers.or(ArgumentMatchers.<String>isNull(), anyString()),
callOptTimeoutCaptor.capture(),
any(Metadata.class),
any(GrpcLogEntry.Logger.class),
anyLong(),
AdditionalMatchers.or(ArgumentMatchers.<SocketAddress>isNull(),
ArgumentMatchers.<SocketAddress>any()));
Duration timeout = callOptTimeoutCaptor.getValue();
assertThat(TimeUnit.SECONDS.toNanos(1) - Durations.toNanos(timeout))
.isAtMost(TimeUnit.MILLISECONDS.toNanos(250));
}
@Test
@SuppressWarnings({"unchecked"})
public void clientDeadlineLogged_deadlineSetViaContext() throws Exception {
// important: deadline is read from the ctx where call was created
final SettableFuture<ClientCall<byte[], byte[]>> callFuture = SettableFuture.create();
Context.current()
.withDeadline(
Deadline.after(1, TimeUnit.SECONDS), Executors.newSingleThreadScheduledExecutor())
.run(new Runnable() {
@Override
public void run() {
MethodDescriptor<byte[], byte[]> method =
MethodDescriptor.<byte[], byte[]>newBuilder()
.setType(MethodType.UNKNOWN)
.setFullMethodName("service/method")
.setRequestMarshaller(BYTEARRAY_MARSHALLER)
.setResponseMarshaller(BYTEARRAY_MARSHALLER)
.build();
callFuture.set(new BinlogHelper(mockSinkWriter)
.getClientInterceptor(CALL_ID)
.interceptCall(
method,
CallOptions.DEFAULT.withDeadlineAfter(1, TimeUnit.SECONDS),
new Channel() {
@Override
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
MethodDescriptor<RequestT, ResponseT> methodDescriptor,
CallOptions callOptions) {
return new NoopClientCall<>();
}
@Override
public String authority() {
return null;
}
}));
}
});
ClientCall.Listener<byte[]> mockListener = mock(ClientCall.Listener.class);
callFuture.get().start(mockListener, new Metadata());
ArgumentCaptor<Duration> callOptTimeoutCaptor = ArgumentCaptor.forClass(Duration.class);
verify(mockSinkWriter)
.logClientHeader(
anyLong(),
anyString(),
ArgumentMatchers.<String>any(),
callOptTimeoutCaptor.capture(),
any(Metadata.class),
any(GrpcLogEntry.Logger.class),
anyLong(),
AdditionalMatchers.or(ArgumentMatchers.<SocketAddress>isNull(),
ArgumentMatchers.<SocketAddress>any()));
Duration timeout = callOptTimeoutCaptor.getValue();
assertThat(TimeUnit.SECONDS.toNanos(1) - Durations.toNanos(timeout))
.isAtMost(TimeUnit.MILLISECONDS.toNanos(250));
}
@Test
@SuppressWarnings({"rawtypes", "unchecked"})
public void clientInterceptor_trailersOnlyResponseLogsPeerAddress() throws Exception {
final AtomicReference<ClientCall.Listener> interceptedListener =
new AtomicReference<>();
// capture these manually because ClientCall can not be mocked
final AtomicReference<Metadata> actualClientInitial = new AtomicReference<>();
final AtomicReference<Object> actualRequest = new AtomicReference<>();
Channel channel = new Channel() {
@Override
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) {
return new NoopClientCall<RequestT, ResponseT>() {
@Override
public void start(Listener<ResponseT> responseListener, Metadata headers) {
interceptedListener.set(responseListener);
actualClientInitial.set(headers);
}
@Override
public void sendMessage(RequestT message) {
actualRequest.set(message);
}
@Override
public Attributes getAttributes() {
return Attributes.newBuilder().set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer).build();
}
};
}
@Override
public String authority() {
return "the-authority";
}
};
ClientCall.Listener<byte[]> mockListener = mock(ClientCall.Listener.class);
MethodDescriptor<byte[], byte[]> method =
MethodDescriptor.<byte[], byte[]>newBuilder()
.setType(MethodType.UNKNOWN)
.setFullMethodName("service/method")
.setRequestMarshaller(BYTEARRAY_MARSHALLER)
.setResponseMarshaller(BYTEARRAY_MARSHALLER)
.build();
ClientCall<byte[], byte[]> interceptedCall =
new BinlogHelper(mockSinkWriter)
.getClientInterceptor(CALL_ID)
.interceptCall(
method,
CallOptions.DEFAULT.withDeadlineAfter(1, TimeUnit.SECONDS),
channel);
Metadata clientInitial = new Metadata();
interceptedCall.start(mockListener, clientInitial);
verify(mockSinkWriter).logClientHeader(
/*seq=*/ eq(1L),
anyString(),
anyString(),
any(Duration.class),
any(Metadata.class),
eq(Logger.LOGGER_CLIENT),
eq(CALL_ID),
ArgumentMatchers.<SocketAddress>isNull());
verifyNoMoreInteractions(mockSinkWriter);
// trailer only response
{
Status status = Status.INTERNAL.withDescription("some description");
Metadata trailers = new Metadata();
interceptedListener.get().onClose(status, trailers);
verify(mockSinkWriter).logTrailer(
/*seq=*/ eq(2L),
same(status),
same(trailers),
eq(Logger.LOGGER_CLIENT),
eq(CALL_ID),
same(peer));
verifyNoMoreInteractions(mockSinkWriter);
verify(mockListener).onClose(same(status), same(trailers));
}
}
@Test
public void disablingInboundAutoFlowControlSuppressesRequestsForMoreMessages()
throws Exception {
final AtomicReference<ClientCall.Listener<String>> listener =
new AtomicReference<>();
final List<Integer> requests = new ArrayList<>();
NoopClientCall<Integer, String> call = new NoopClientCall<Integer, String>() {
@Override
public void start(io.grpc.ClientCall.Listener<String> responseListener, Metadata headers) {
listener.set(responseListener);
}
@Override
public void request(int numMessages) {
requests.add(numMessages);
}
};
ClientCalls.asyncBidiStreamingCall(call, new ClientResponseObserver<Integer, String>() {
@Override
public void beforeStart(ClientCallStreamObserver<Integer> requestStream) {
requestStream.disableAutoInboundFlowControl();
}
@Override
public void onNext(String value) {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
}
});
listener.get().onMessage("message");
assertThat(requests).containsExactly(1);
}