下面列出了怎么用io.grpc.stub.ClientCallStreamObserver的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void flowControl() throws Exception {
FlowControlClientResponseObserver clientResponseObserver =
new FlowControlClientResponseObserver();
ClientCallStreamObserver<ServerReflectionRequest> requestObserver =
(ClientCallStreamObserver<ServerReflectionRequest>)
stub.serverReflectionInfo(clientResponseObserver);
// ClientCalls.startCall() calls request(1) initially, so we should get an immediate response.
requestObserver.onNext(flowControlRequest);
assertEquals(1, clientResponseObserver.getResponses().size());
assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(0));
// Verify we don't receive an additional response until we request it.
requestObserver.onNext(flowControlRequest);
assertEquals(1, clientResponseObserver.getResponses().size());
requestObserver.request(1);
assertEquals(2, clientResponseObserver.getResponses().size());
assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(1));
requestObserver.onCompleted();
assertTrue(clientResponseObserver.onCompleteCalled());
}
@Test
public void flowControlOnCompleteWithPendingRequest() throws Exception {
FlowControlClientResponseObserver clientResponseObserver =
new FlowControlClientResponseObserver();
ClientCallStreamObserver<ServerReflectionRequest> requestObserver =
(ClientCallStreamObserver<ServerReflectionRequest>)
stub.serverReflectionInfo(clientResponseObserver);
// ClientCalls.startCall() calls request(1) initially, so make additional request.
requestObserver.onNext(flowControlRequest);
requestObserver.onNext(flowControlRequest);
requestObserver.onCompleted();
assertEquals(1, clientResponseObserver.getResponses().size());
assertFalse(clientResponseObserver.onCompleteCalled());
requestObserver.request(1);
assertTrue(clientResponseObserver.onCompleteCalled());
assertEquals(2, clientResponseObserver.getResponses().size());
assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(1));
}
public static <REQ, RESP> ClientResponseObserver<REQ, RESP> createClientResponseObserver(final Action1<ClientCallStreamObserver<REQ>> beforeStart,
final Action1<? super RESP> onNext,
final Action1<Throwable> onError,
final Action0 onCompleted) {
return new ClientResponseObserver<REQ, RESP>() {
@Override
public void beforeStart(ClientCallStreamObserver<REQ> requestStream) {
beforeStart.call(requestStream);
}
@Override
public void onNext(RESP value) {
onNext.call(value);
}
@Override
public void onError(Throwable t) {
onError.call(t);
}
@Override
public void onCompleted() {
onCompleted.call();
}
};
}
public int send(final int numberOfMessages, final int length, final long timestamp, final long checksum)
{
final ClientCallStreamObserver<EchoMessage> requestObserver = this.requestObserver;
final EchoMessage.Builder messageBuilder = this.messageBuilder;
final ByteString payload = this.payload;
int count = 0;
for (int i = 0; i < numberOfMessages && requestObserver.isReady(); i++)
{
final EchoMessage request = messageBuilder
.setTimestamp(timestamp)
.setPayload(payload)
.setChecksum(checksum)
.build();
requestObserver.onNext(request);
count++;
}
return count;
}
@Override
public void beforeStart(final ClientCallStreamObserver<PPing> requestStream) {
requestStream.setOnReadyHandler(new Runnable() {
@Override
public void run() {
logger.info("{} onReady", streamId);
reconnector.reset();
final Runnable pingRunnable = new Runnable() {
@Override
public void run() {
PPing pPing = newPing();
requestStream.onNext(pPing);
}
};
PingClientResponseObserver.this.pingScheduler = schedule(pingRunnable);
}
});
}
@Test
public void flowControlOnCompleteWithPendingRequest() throws Exception {
FlowControlClientResponseObserver clientResponseObserver =
new FlowControlClientResponseObserver();
ClientCallStreamObserver<ServerReflectionRequest> requestObserver =
(ClientCallStreamObserver<ServerReflectionRequest>)
stub.serverReflectionInfo(clientResponseObserver);
requestObserver.onNext(flowControlRequest);
requestObserver.onCompleted();
assertEquals(0, clientResponseObserver.getResponses().size());
assertFalse(clientResponseObserver.onCompleteCalled());
requestObserver.request(1);
assertTrue(clientResponseObserver.onCompleteCalled());
assertEquals(1, clientResponseObserver.getResponses().size());
assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(0));
}
@Override
public void beforeStart(ClientCallStreamObserver<ReqT> requestStream) {
for (StreamObserver<ResT> observer : observers) {
if (observer instanceof ClientResponseObserver) {
((ClientResponseObserver)observer).beforeStart(requestStream);
}
}
}
@Override
public void beforeStart(ClientCallStreamObserver<ReqT> rs) {
rs.setOnReadyHandler(() -> {
// called from grpc response thread
if (rs.isReady()) {
errCounter = 0;
boolean notify = userReqStream.established(rs);
if (notify) {
respStream.onEstablished();
}
}
});
}
@Override
public void cancel() {
if (!isCanceled()) {
super.cancel();
((ClientCallStreamObserver<?>) downstream).cancel("Cancelled", Status.CANCELLED.asException());
}
}
@Benchmark
public Object gRpcBothWaysStreamingCall(Blackhole blackhole) throws InterruptedException {
PerfObserver observer = new PerfObserver(blackhole) {
private boolean done;
@Override
public void beforeStart(ClientCallStreamObserver<Messages.SimpleRequest> sender) {
sender.setOnReadyHandler(() -> {
if (done) {
return;
}
for (Messages.SimpleRequest request : ARRAY_REQUEST) {
sender.onNext(request);
}
sender.onCompleted();
done = true;
});
super.beforeStart(observer);
}
};
gRpcClient.streamingFromClient(observer);
observer.latch.await();
return observer;
}
public static void attachCancellingCallback(Emitter emitter, ClientCallStreamObserver... clientCalls) {
emitter.setCancellation(() -> {
for (ClientCallStreamObserver call : clientCalls) {
call.cancel(CANCELLING_MESSAGE, null);
}
});
}
private FluxInvocation(FluxSink<Object> sink, Object[] args) {
StreamObserver<Object> grpcStreamObserver = new ClientResponseObserver<Object, Object>() {
@Override
public void beforeStart(ClientCallStreamObserver requestStream) {
sink.onCancel(() -> requestStream.cancel("React subscription cancelled", null));
}
@Override
public void onNext(Object value) {
sink.next(value);
}
@Override
public void onError(Throwable error) {
sink.error(error);
}
@Override
public void onCompleted() {
sink.complete();
}
};
Object[] grpcArgs = new Object[]{
grpcArgPos < 0 ? Empty.getDefaultInstance() : args[grpcArgPos],
grpcStreamObserver
};
GRPC_STUB invocationStub = handleCallMetadata(args)
.withDeadline(Deadline.after(timeout.toMillis(), TimeUnit.MILLISECONDS));
try {
grpcMethod.invoke(invocationStub, grpcArgs);
} catch (Exception e) {
sink.error(e);
}
}
private synchronized void initiateWrite() throws IOException {
if (writeObserver == null) {
checkNotNull(deadlineAfterUnits);
writeObserver =
bsStub
.get()
.withDeadlineAfter(deadlineAfter, deadlineAfterUnits)
.write(
new ClientResponseObserver<WriteRequest, WriteResponse>() {
@Override
public void beforeStart(ClientCallStreamObserver<WriteRequest> requestStream) {
requestStream.setOnReadyHandler(
() -> {
if (requestStream.isReady()) {
onReadyHandler.run();
}
});
}
@Override
public void onNext(WriteResponse response) {
writeFuture.set(response.getCommittedSize());
}
@Override
public void onError(Throwable t) {
writeFuture.setException(t);
}
@Override
public void onCompleted() {
synchronized (StubWriteOutputStream.this) {
writeObserver = null;
}
}
});
}
}
@Override
public synchronized boolean isReady() {
if (writeObserver == null) {
return false;
}
ClientCallStreamObserver<WriteRequest> clientCallStreamObserver =
(ClientCallStreamObserver<WriteRequest>) writeObserver;
return clientCallStreamObserver.isReady();
}
public void init(final Configuration configuration)
{
serverChannel = getServerChannel();
final StreamObserver<EchoMessage> responseObserver = new StreamObserver<EchoMessage>()
{
public void onNext(final EchoMessage response)
{
onMessageReceived(response.getTimestamp(), response.getChecksum());
}
public void onError(final Throwable t)
{
t.printStackTrace();
LangUtil.rethrowUnchecked(t);
}
public void onCompleted()
{
}
};
final EchoBenchmarksStub asyncClient = EchoBenchmarksGrpc.newStub(serverChannel);
requestObserver = (ClientCallStreamObserver<EchoMessage>)asyncClient.echoStream(responseObserver);
messageBuilder = EchoMessage.newBuilder();
final int payloadLength = configuration.messageLength() - MIN_MESSAGE_LENGTH;
if (payloadLength == 0)
{
payload = ByteString.EMPTY;
}
else
{
final byte[] bytes = new byte[payloadLength];
ThreadLocalRandom.current().nextBytes(bytes);
payload = copyFrom(bytes);
}
}
@Override
public void beforeStart(final ClientCallStreamObserver<PCmdMessage> requestStream) {
this.requestStream = requestStream;
requestStream.setOnReadyHandler(new Runnable() {
@Override
public void run() {
logger.info("Connect to CommandServiceStream completed.");
reconnector.reset();
}
});
}
@Override
public void beforeStart(ClientCallStreamObserver<ReqT> requestStream) {
logger.info("beforeStart:{}", name);
requestStream.setOnReadyHandler(new Runnable() {
private final AtomicInteger counter = new AtomicInteger();
@Override
public void run() {
logger.info("onReadyHandler:{} eventNumber:{}", name, counter.getAndIncrement());
reconnector.reset();
}
});
}
private void initIfRequired() {
if (requestObserver == null) {
log.debug("Starting new StreamChannel RPC for {}...", deviceId);
open.set(false);
client.execRpcNoTimeout(
s -> requestObserver =
(ClientCallStreamObserver<StreamMessageRequest>)
s.streamChannel(responseObserver)
);
}
}
private void sendSubscribeRequest() {
if (requestObserver == null) {
log.debug("Starting new Subscribe RPC for {}...", deviceId);
client.execRpcNoTimeout(
s -> requestObserver =
(ClientCallStreamObserver<Gnmi.SubscribeRequest>)
s.subscribe(responseObserver)
);
}
requestObserver.onNext(existingSubscription);
active.set(true);
}
@Test
public void flowControl() throws Exception {
FlowControlClientResponseObserver clientResponseObserver =
new FlowControlClientResponseObserver();
ClientCallStreamObserver<ServerReflectionRequest> requestObserver =
(ClientCallStreamObserver<ServerReflectionRequest>)
stub.serverReflectionInfo(clientResponseObserver);
// Verify we don't receive a response until we request it.
requestObserver.onNext(flowControlRequest);
assertEquals(0, clientResponseObserver.getResponses().size());
requestObserver.request(1);
assertEquals(1, clientResponseObserver.getResponses().size());
assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(0));
// Verify we don't receive an additional response until we request it.
requestObserver.onNext(flowControlRequest);
assertEquals(1, clientResponseObserver.getResponses().size());
requestObserver.request(1);
assertEquals(2, clientResponseObserver.getResponses().size());
assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(1));
requestObserver.onCompleted();
assertTrue(clientResponseObserver.onCompleteCalled());
}
public void cancel() {
clientObserver.addCallback(
new FutureCallback<ClientCallStreamObserver<?>>() {
@Override
public void onSuccess(@Nullable ClientCallStreamObserver<?> result) {
Objects.requireNonNull(result).cancel("Cancelled by client.", null);
}
@Override
public void onFailure(Throwable ignored) {}
},
MoreExecutors.directExecutor());
}
/**
* Tests client per-message compression for streaming calls. The Java API does not support
* inspecting a message's compression level, so this is primarily intended to run against a gRPC
* C++ server.
*/
public void clientCompressedStreaming(boolean probe) throws Exception {
final StreamingInputCallRequest expectCompressedRequest =
StreamingInputCallRequest.newBuilder()
.setExpectCompressed(BoolValue.newBuilder().setValue(true))
.setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[27182])))
.build();
final StreamingInputCallRequest expectUncompressedRequest =
StreamingInputCallRequest.newBuilder()
.setExpectCompressed(BoolValue.newBuilder().setValue(false))
.setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[45904])))
.build();
final StreamingInputCallResponse goldenResponse =
StreamingInputCallResponse.newBuilder().setAggregatedPayloadSize(73086).build();
StreamRecorder<StreamingInputCallResponse> responseObserver = StreamRecorder.create();
StreamObserver<StreamingInputCallRequest> requestObserver =
asyncStub.streamingInputCall(responseObserver);
if (probe) {
// Send a non-compressed message with expectCompress=true. Servers supporting this test case
// should return INVALID_ARGUMENT.
requestObserver.onNext(expectCompressedRequest);
responseObserver.awaitCompletion(operationTimeoutMillis(), TimeUnit.MILLISECONDS);
Throwable e = responseObserver.getError();
assertNotNull("expected INVALID_ARGUMENT", e);
assertEquals(Status.INVALID_ARGUMENT.getCode(), Status.fromThrowable(e).getCode());
}
// Start a new stream
responseObserver = StreamRecorder.create();
@SuppressWarnings("unchecked")
ClientCallStreamObserver<StreamingInputCallRequest> clientCallStreamObserver =
(ClientCallStreamObserver)
asyncStub.withCompression("gzip").streamingInputCall(responseObserver);
clientCallStreamObserver.setMessageCompression(true);
clientCallStreamObserver.onNext(expectCompressedRequest);
clientCallStreamObserver.setMessageCompression(false);
clientCallStreamObserver.onNext(expectUncompressedRequest);
clientCallStreamObserver.onCompleted();
responseObserver.awaitCompletion();
assertSuccess(responseObserver);
assertEquals(goldenResponse, responseObserver.firstValue().get());
}
@Override
public void beforeStart(final ClientCallStreamObserver<ServerReflectionRequest> requestStream) {
requestStream.disableAutoInboundFlowControl();
}
@Override
public void beforeStart(ClientCallStreamObserver<RequestT> requestStream) {
this.rsObserver = requestStream;
requestStream.disableAutoInboundFlowControl();
this.sink.onCancel(() -> requestStream.cancel("Flux requested cancel.", null));
}
@Override
public void beforeStart(ClientCallStreamObserver<ReqT> requestStream) {
this.requestStream = requestStream;
}
@Override
public void beforeStart(ClientCallStreamObserver<T> requestStream) {
super.onSubscribe(requestStream);
}
@Override
protected void doOnCancel() {
if (subscription != null) {
((ClientCallStreamObserver<?>) subscription).cancel("Client canceled request", null);
}
}
@Override
public void beforeStart(ClientCallStreamObserver<Messages.SimpleRequest> observer) {
this.observer = observer;
}
@Override
public void beforeStart(ClientCallStreamObserver<Empty> requestStream) {
GrpcUtil.attachCancellingCallback(emitter, requestStream);
}
private MonoInvocation(MonoSink<Object> sink, Object[] args) {
StreamObserver<Object> grpcStreamObserver = new ClientResponseObserver<Object, Object>() {
@Override
public void beforeStart(ClientCallStreamObserver requestStream) {
sink.onCancel(() -> requestStream.cancel("React subscription cancelled", null));
}
@Override
public void onNext(Object value) {
if (emptyToVoidReply) {
sink.success();
} else {
sink.success(value);
}
}
@Override
public void onError(Throwable error) {
sink.error(error);
}
@Override
public void onCompleted() {
sink.success();
}
};
Object[] grpcArgs = new Object[]{
grpcArgPos < 0 ? Empty.getDefaultInstance() : args[grpcArgPos],
grpcStreamObserver
};
GRPC_STUB invocationStub = handleCallMetadata(args)
.withDeadline(Deadline.after(timeout.toMillis(), TimeUnit.MILLISECONDS));
try {
grpcMethod.invoke(invocationStub, grpcArgs);
} catch (Exception e) {
sink.error(e);
}
}