下面列出了怎么用io.grpc.stub.ClientResponseObserver的API类实例代码及写法,或者点击链接到github查看源代码。
public static <REQ, RESP> ClientResponseObserver<REQ, RESP> createClientResponseObserver(final Action1<ClientCallStreamObserver<REQ>> beforeStart,
final Action1<? super RESP> onNext,
final Action1<Throwable> onError,
final Action0 onCompleted) {
return new ClientResponseObserver<REQ, RESP>() {
@Override
public void beforeStart(ClientCallStreamObserver<REQ> requestStream) {
beforeStart.call(requestStream);
}
@Override
public void onNext(RESP value) {
onNext.call(value);
}
@Override
public void onError(Throwable t) {
onError.call(t);
}
@Override
public void onCompleted() {
onCompleted.call();
}
};
}
@Override
public void beforeStart(ClientCallStreamObserver<ReqT> requestStream) {
for (StreamObserver<ResT> observer : observers) {
if (observer instanceof ClientResponseObserver) {
((ClientResponseObserver)observer).beforeStart(requestStream);
}
}
}
StreamObserverToCallListenerAdapter(StreamObserver<Response> observer,
CallToStreamObserverAdapter<Request, Response> adapter, boolean streamingResponse) {
this.observer = observer;
this.streamingResponse = streamingResponse;
this.adapter = adapter;
if (observer instanceof ClientResponseObserver) {
@SuppressWarnings("unchecked")
ClientResponseObserver<Request, Response> clientResponseObserver =
(ClientResponseObserver<Request, Response>) observer;
clientResponseObserver.beforeStart(adapter);
}
adapter.freeze();
}
public static <REQ, RESP> ClientResponseObserver<REQ, RESP> createSimpleClientResponseObserver(Emitter<RESP> emitter) {
return createClientResponseObserver(
emitter,
emitter::onNext,
emitter::onError,
emitter::onCompleted
);
}
public static <REQ> ClientResponseObserver<REQ, Empty> createEmptyClientResponseObserver(Emitter<Empty> emitter) {
return createClientResponseObserver(
emitter,
ignored -> {
},
emitter::onError,
emitter::onCompleted
);
}
public static <REQ> ClientResponseObserver<REQ, Empty> createEmptyClientMonoResponse(MonoSink<Empty> monoSink) {
return createClientResponseObserver(
requestStream -> monoSink.onCancel(() -> requestStream.cancel(CANCELLING_MESSAGE, null)),
monoSink::success,
monoSink::error,
monoSink::success
);
}
public static <REQ, RESP> ClientResponseObserver<REQ, RESP> createClientResponseObserver(Emitter<?> emitter,
final Action1<? super RESP> onNext,
final Action1<Throwable> onError,
final Action0 onCompleted) {
return createClientResponseObserver(
requestStream -> emitter.setCancellation(() -> requestStream.cancel(CANCELLING_MESSAGE, null)),
onNext,
onError,
onCompleted
);
}
private FluxInvocation(FluxSink<Object> sink, Object[] args) {
StreamObserver<Object> grpcStreamObserver = new ClientResponseObserver<Object, Object>() {
@Override
public void beforeStart(ClientCallStreamObserver requestStream) {
sink.onCancel(() -> requestStream.cancel("React subscription cancelled", null));
}
@Override
public void onNext(Object value) {
sink.next(value);
}
@Override
public void onError(Throwable error) {
sink.error(error);
}
@Override
public void onCompleted() {
sink.complete();
}
};
Object[] grpcArgs = new Object[]{
grpcArgPos < 0 ? Empty.getDefaultInstance() : args[grpcArgPos],
grpcStreamObserver
};
GRPC_STUB invocationStub = handleCallMetadata(args)
.withDeadline(Deadline.after(timeout.toMillis(), TimeUnit.MILLISECONDS));
try {
grpcMethod.invoke(invocationStub, grpcArgs);
} catch (Exception e) {
sink.error(e);
}
}
private synchronized void initiateWrite() throws IOException {
if (writeObserver == null) {
checkNotNull(deadlineAfterUnits);
writeObserver =
bsStub
.get()
.withDeadlineAfter(deadlineAfter, deadlineAfterUnits)
.write(
new ClientResponseObserver<WriteRequest, WriteResponse>() {
@Override
public void beforeStart(ClientCallStreamObserver<WriteRequest> requestStream) {
requestStream.setOnReadyHandler(
() -> {
if (requestStream.isReady()) {
onReadyHandler.run();
}
});
}
@Override
public void onNext(WriteResponse response) {
writeFuture.set(response.getCommittedSize());
}
@Override
public void onError(Throwable t) {
writeFuture.setException(t);
}
@Override
public void onCompleted() {
synchronized (StubWriteOutputStream.this) {
writeObserver = null;
}
}
});
}
}
private MonoInvocation(MonoSink<Object> sink, Object[] args) {
StreamObserver<Object> grpcStreamObserver = new ClientResponseObserver<Object, Object>() {
@Override
public void beforeStart(ClientCallStreamObserver requestStream) {
sink.onCancel(() -> requestStream.cancel("React subscription cancelled", null));
}
@Override
public void onNext(Object value) {
if (emptyToVoidReply) {
sink.success();
} else {
sink.success(value);
}
}
@Override
public void onError(Throwable error) {
sink.error(error);
}
@Override
public void onCompleted() {
sink.success();
}
};
Object[] grpcArgs = new Object[]{
grpcArgPos < 0 ? Empty.getDefaultInstance() : args[grpcArgPos],
grpcStreamObserver
};
GRPC_STUB invocationStub = handleCallMetadata(args)
.withDeadline(Deadline.after(timeout.toMillis(), TimeUnit.MILLISECONDS));
try {
grpcMethod.invoke(invocationStub, grpcArgs);
} catch (Exception e) {
sink.error(e);
}
}
@Test
public void serverBackPressure() {
final AtomicInteger numRequests = new AtomicInteger();
final AtomicInteger numResponses = new AtomicInteger();
final AtomicBoolean done = new AtomicBoolean();
final ClientCallStreamObserver<SimpleRequest> req =
(ClientCallStreamObserver<SimpleRequest>) client.serverBackPressure(
new ClientResponseObserver<SimpleRequest, SimpleResponse>() {
@Override
public void onNext(SimpleResponse value) {
numResponses.incrementAndGet();
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
done.set(true);
}
@Override
public void beforeStart(ClientCallStreamObserver<SimpleRequest> requestStream) {
requestStream.setOnReadyHandler(() -> {
if (numRequests.get() < TOTAL_NUM_MESSAGES) {
numRequests.incrementAndGet();
requestStream.onNext(REQUEST);
}
});
}
});
for (int i = 0; i < TOTAL_NUM_MESSAGES; i++) {
if (req.isReady()) {
numRequests.incrementAndGet();
req.onNext(REQUEST);
} else {
break;
}
}
await().untilAsserted(() -> assertThat(done).isTrue());
// Flow control happens on the second request, and an extra message is often sent after the last
// requested one since there will still be some space in the last flow control window, which results
// in two more than our expected cap.
assertThat(numRequests).hasValueBetween(CAPPED_NUM_MESSAGES, CAPPED_NUM_MESSAGES + 2);
assertThat(numResponses).hasValue(TOTAL_NUM_MESSAGES);
}
private void greetService() {
ClientResponseObserver<HelloRequest, HelloReply> helloReplyStreamObserver = new ClientResponseObserver<HelloRequest, HelloReply>() {
private ClientCallStreamObserver<HelloRequest> requestStream;
@Override
public void beforeStart(ClientCallStreamObserver observer) {
this.requestStream = observer;
this.requestStream.setOnReadyHandler(new Runnable() {
Iterator<String> iterator = names().iterator();
@Override
public void run() {
while (requestStream.isReady()) {
if (iterator.hasNext()) {
String name = iterator.next();
HelloRequest request = HelloRequest.newBuilder().setName(name).build();
requestStream.onNext(request);
} else {
requestStream.onCompleted();
}
}
}
});
}
@Override
public void onNext(HelloReply reply) {
logger.info("Receive an message from provider. message: {}", reply.getMessage());
requestStream.request(1);
}
public void onError(Throwable throwable) {
logger.error("Failed to send data", throwable);
}
public void onCompleted() {
logger.info("All Done");
}
};
greeterStub.sayHello(helloReplyStreamObserver);
}
@Override
public ClientResponseObserver getResponseObserver() {
return clientResponseObserver;
}
@Override
public ExecutionHandle execute(
Digest actionDigest, String ruleName, MetadataProvider metadataProvider)
throws IOException, InterruptedException {
SettableFuture<Operation> future = SettableFuture.create();
StubAndResponseMetadata<ExecutionStub> stubAndMetadata =
GrpcHeaderHandler.wrapStubToSendAndReceiveMetadata(
executionStub,
metadataProvider.getForAction(
RemoteExecutionActionEvent.actionDigestToString(actionDigest), ruleName));
ExecutionState state = new ExecutionState(stubAndMetadata.getMetadata());
stubAndMetadata
.getStub()
.execute(
ExecuteRequest.newBuilder()
.setInstanceName(instanceName)
.setActionDigest(GrpcProtocol.get(actionDigest))
.setSkipCacheLookup(false)
.build(),
new ClientResponseObserver<ExecuteRequest, Operation>() {
@Override
public void beforeStart(ClientCallStreamObserver<ExecuteRequest> requestStream) {
state.registerClientObserver(requestStream);
}
@Override
public void onNext(Operation value) {
state.currentOp = value;
if (state.currentOp.hasMetadata()) {
try {
state.setCurrentOpMetadata(
state.currentOp.getMetadata().unpack(ExecuteOperationMetadata.class));
} catch (InvalidProtocolBufferException e) {
LOG.warn("Unable to parse ExecuteOperationMetadata from Operation");
}
}
}
@Override
public void onError(Throwable t) {
String msg =
String.format(
"Failed execution request with metadata=[%s] and exception=[%s].",
stubAndMetadata.getMetadata(), t.toString());
LOG.warn(t, msg);
future.setException(new IOException(msg, t));
}
@Override
public void onCompleted() {
state.setExecutedMetadata(stubAndMetadata.getMetadata());
state.onCompleted();
}
});
return new ExecutionHandle() {
@Override
public ListenableFuture<ExecutionResult> getResult() {
return state.resultFuture;
}
@Override
public ListenableFuture<ExecuteOperationMetadata> getExecutionStarted() {
return state.startedExecutionFuture;
}
@Override
public void cancel() {
state.cancel();
}
};
}
ClientResponseObserver getResponseObserver();