下面列出了io.grpc.stub.ClientCallStreamObserver#onNext ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void flowControl() throws Exception {
FlowControlClientResponseObserver clientResponseObserver =
new FlowControlClientResponseObserver();
ClientCallStreamObserver<ServerReflectionRequest> requestObserver =
(ClientCallStreamObserver<ServerReflectionRequest>)
stub.serverReflectionInfo(clientResponseObserver);
// ClientCalls.startCall() calls request(1) initially, so we should get an immediate response.
requestObserver.onNext(flowControlRequest);
assertEquals(1, clientResponseObserver.getResponses().size());
assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(0));
// Verify we don't receive an additional response until we request it.
requestObserver.onNext(flowControlRequest);
assertEquals(1, clientResponseObserver.getResponses().size());
requestObserver.request(1);
assertEquals(2, clientResponseObserver.getResponses().size());
assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(1));
requestObserver.onCompleted();
assertTrue(clientResponseObserver.onCompleteCalled());
}
@Test
public void flowControlOnCompleteWithPendingRequest() throws Exception {
FlowControlClientResponseObserver clientResponseObserver =
new FlowControlClientResponseObserver();
ClientCallStreamObserver<ServerReflectionRequest> requestObserver =
(ClientCallStreamObserver<ServerReflectionRequest>)
stub.serverReflectionInfo(clientResponseObserver);
// ClientCalls.startCall() calls request(1) initially, so make additional request.
requestObserver.onNext(flowControlRequest);
requestObserver.onNext(flowControlRequest);
requestObserver.onCompleted();
assertEquals(1, clientResponseObserver.getResponses().size());
assertFalse(clientResponseObserver.onCompleteCalled());
requestObserver.request(1);
assertTrue(clientResponseObserver.onCompleteCalled());
assertEquals(2, clientResponseObserver.getResponses().size());
assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(1));
}
public int send(final int numberOfMessages, final int length, final long timestamp, final long checksum)
{
final ClientCallStreamObserver<EchoMessage> requestObserver = this.requestObserver;
final EchoMessage.Builder messageBuilder = this.messageBuilder;
final ByteString payload = this.payload;
int count = 0;
for (int i = 0; i < numberOfMessages && requestObserver.isReady(); i++)
{
final EchoMessage request = messageBuilder
.setTimestamp(timestamp)
.setPayload(payload)
.setChecksum(checksum)
.build();
requestObserver.onNext(request);
count++;
}
return count;
}
@Test
public void flowControlOnCompleteWithPendingRequest() throws Exception {
FlowControlClientResponseObserver clientResponseObserver =
new FlowControlClientResponseObserver();
ClientCallStreamObserver<ServerReflectionRequest> requestObserver =
(ClientCallStreamObserver<ServerReflectionRequest>)
stub.serverReflectionInfo(clientResponseObserver);
requestObserver.onNext(flowControlRequest);
requestObserver.onCompleted();
assertEquals(0, clientResponseObserver.getResponses().size());
assertFalse(clientResponseObserver.onCompleteCalled());
requestObserver.request(1);
assertTrue(clientResponseObserver.onCompleteCalled());
assertEquals(1, clientResponseObserver.getResponses().size());
assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(0));
}
@Test
public void flowControl() throws Exception {
FlowControlClientResponseObserver clientResponseObserver =
new FlowControlClientResponseObserver();
ClientCallStreamObserver<ServerReflectionRequest> requestObserver =
(ClientCallStreamObserver<ServerReflectionRequest>)
stub.serverReflectionInfo(clientResponseObserver);
// Verify we don't receive a response until we request it.
requestObserver.onNext(flowControlRequest);
assertEquals(0, clientResponseObserver.getResponses().size());
requestObserver.request(1);
assertEquals(1, clientResponseObserver.getResponses().size());
assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(0));
// Verify we don't receive an additional response until we request it.
requestObserver.onNext(flowControlRequest);
assertEquals(1, clientResponseObserver.getResponses().size());
requestObserver.request(1);
assertEquals(2, clientResponseObserver.getResponses().size());
assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(1));
requestObserver.onCompleted();
assertTrue(clientResponseObserver.onCompleteCalled());
}
/**
* Tests client per-message compression for streaming calls. The Java API does not support
* inspecting a message's compression level, so this is primarily intended to run against a gRPC
* C++ server.
*/
public void clientCompressedStreaming(boolean probe) throws Exception {
final StreamingInputCallRequest expectCompressedRequest =
StreamingInputCallRequest.newBuilder()
.setExpectCompressed(BoolValue.newBuilder().setValue(true))
.setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[27182])))
.build();
final StreamingInputCallRequest expectUncompressedRequest =
StreamingInputCallRequest.newBuilder()
.setExpectCompressed(BoolValue.newBuilder().setValue(false))
.setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[45904])))
.build();
final StreamingInputCallResponse goldenResponse =
StreamingInputCallResponse.newBuilder().setAggregatedPayloadSize(73086).build();
StreamRecorder<StreamingInputCallResponse> responseObserver = StreamRecorder.create();
StreamObserver<StreamingInputCallRequest> requestObserver =
asyncStub.streamingInputCall(responseObserver);
if (probe) {
// Send a non-compressed message with expectCompress=true. Servers supporting this test case
// should return INVALID_ARGUMENT.
requestObserver.onNext(expectCompressedRequest);
responseObserver.awaitCompletion(operationTimeoutMillis(), TimeUnit.MILLISECONDS);
Throwable e = responseObserver.getError();
assertNotNull("expected INVALID_ARGUMENT", e);
assertEquals(Status.INVALID_ARGUMENT.getCode(), Status.fromThrowable(e).getCode());
}
// Start a new stream
responseObserver = StreamRecorder.create();
@SuppressWarnings("unchecked")
ClientCallStreamObserver<StreamingInputCallRequest> clientCallStreamObserver =
(ClientCallStreamObserver)
asyncStub.withCompression("gzip").streamingInputCall(responseObserver);
clientCallStreamObserver.setMessageCompression(true);
clientCallStreamObserver.onNext(expectCompressedRequest);
clientCallStreamObserver.setMessageCompression(false);
clientCallStreamObserver.onNext(expectUncompressedRequest);
clientCallStreamObserver.onCompleted();
responseObserver.awaitCompletion();
assertSuccess(responseObserver);
assertEquals(goldenResponse, responseObserver.firstValue().get());
}
@Test
public void serverBackPressure() {
final AtomicInteger numRequests = new AtomicInteger();
final AtomicInteger numResponses = new AtomicInteger();
final AtomicBoolean done = new AtomicBoolean();
final ClientCallStreamObserver<SimpleRequest> req =
(ClientCallStreamObserver<SimpleRequest>) client.serverBackPressure(
new ClientResponseObserver<SimpleRequest, SimpleResponse>() {
@Override
public void onNext(SimpleResponse value) {
numResponses.incrementAndGet();
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
done.set(true);
}
@Override
public void beforeStart(ClientCallStreamObserver<SimpleRequest> requestStream) {
requestStream.setOnReadyHandler(() -> {
if (numRequests.get() < TOTAL_NUM_MESSAGES) {
numRequests.incrementAndGet();
requestStream.onNext(REQUEST);
}
});
}
});
for (int i = 0; i < TOTAL_NUM_MESSAGES; i++) {
if (req.isReady()) {
numRequests.incrementAndGet();
req.onNext(REQUEST);
} else {
break;
}
}
await().untilAsserted(() -> assertThat(done).isTrue());
// Flow control happens on the second request, and an extra message is often sent after the last
// requested one since there will still be some space in the last flow control window, which results
// in two more than our expected cap.
assertThat(numRequests).hasValueBetween(CAPPED_NUM_MESSAGES, CAPPED_NUM_MESSAGES + 2);
assertThat(numResponses).hasValue(TOTAL_NUM_MESSAGES);
}
/**
* Tests client per-message compression for streaming calls. The Java API does not support
* inspecting a message's compression level, so this is primarily intended to run against a gRPC
* C++ server.
*/
public void clientCompressedStreaming(boolean probe) throws Exception {
final StreamingInputCallRequest expectCompressedRequest =
StreamingInputCallRequest.newBuilder()
.setExpectCompressed(BoolValue.newBuilder().setValue(true))
.setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[27182])))
.build();
final StreamingInputCallRequest expectUncompressedRequest =
StreamingInputCallRequest.newBuilder()
.setExpectCompressed(BoolValue.newBuilder().setValue(false))
.setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[45904])))
.build();
final StreamingInputCallResponse goldenResponse =
StreamingInputCallResponse.newBuilder().setAggregatedPayloadSize(73086).build();
StreamRecorder<StreamingInputCallResponse> responseObserver = StreamRecorder.create();
StreamObserver<StreamingInputCallRequest> requestObserver =
asyncStub.streamingInputCall(responseObserver);
if (probe) {
// Send a non-compressed message with expectCompress=true. Servers supporting this test case
// should return INVALID_ARGUMENT.
requestObserver.onNext(expectCompressedRequest);
responseObserver.awaitCompletion(operationTimeoutMillis(), TimeUnit.MILLISECONDS);
Throwable e = responseObserver.getError();
assertNotNull("expected INVALID_ARGUMENT", e);
assertEquals(Status.INVALID_ARGUMENT.getCode(), Status.fromThrowable(e).getCode());
}
// Start a new stream
responseObserver = StreamRecorder.create();
@SuppressWarnings("unchecked")
ClientCallStreamObserver<StreamingInputCallRequest> clientCallStreamObserver =
(ClientCallStreamObserver)
asyncStub.withCompression("gzip").streamingInputCall(responseObserver);
clientCallStreamObserver.setMessageCompression(true);
clientCallStreamObserver.onNext(expectCompressedRequest);
clientCallStreamObserver.setMessageCompression(false);
clientCallStreamObserver.onNext(expectUncompressedRequest);
clientCallStreamObserver.onCompleted();
responseObserver.awaitCompletion();
assertSuccess(responseObserver);
assertEquals(goldenResponse, responseObserver.firstValue().get());
}