下面列出了io.grpc.stub.StreamObserver#onError ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public StreamObserver<StreamingExample.Metric> collect(StreamObserver<StreamingExample.Average> responseObserver) {
return new StreamObserver<StreamingExample.Metric>() {
private long sum = 0;
private long count = 0;
@Override
public void onNext(StreamingExample.Metric value) {
System.out.println("value: " + value);
sum += value.getMetric();
count++;
}
@Override
public void onError(Throwable t) {
responseObserver.onError(t);
}
@Override
public void onCompleted() {
responseObserver.onNext(StreamingExample.Average.newBuilder()
.setVal(sum / count)
.build());
}
};
}
@Test
public void cancelAfterBegin() throws Exception {
StreamRecorder<StreamingInputCallResponse> responseObserver = StreamRecorder.create();
StreamObserver<StreamingInputCallRequest> requestObserver =
asyncStub.streamingInputCall(responseObserver);
requestObserver.onError(new RuntimeException());
responseObserver.awaitCompletion();
assertEquals(Arrays.<StreamingInputCallResponse>asList(), responseObserver.getValues());
assertEquals(Status.Code.CANCELLED,
Status.fromThrowable(responseObserver.getError()).getCode());
if (metricsExpected()) {
MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
checkStartTags(clientStartRecord, "grpc.testing.TestService/StreamingInputCall", true);
// CensusStreamTracerModule record final status in the interceptor, thus is guaranteed to be
// recorded. The tracer stats rely on the stream being created, which is not always the case
// in this test. Therefore we don't check the tracer stats.
MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
checkEndTags(
clientEndRecord, "grpc.testing.TestService/StreamingInputCall",
Status.CANCELLED.getCode(), true);
// Do not check server-side metrics, because the status on the server side is undetermined.
}
}
/**
* Receive a message from a client, and notify a corresponding listener. If the listener is not registered, it
* raises an exception with {@link StreamObserver#onError(Throwable)}. This rpc call will be finished in
* {@link GrpcMessageContext} since the context only know when the {@link MessageListener} would reply a message.
*
* @param message a message from a client
* @param responseObserver an observer to control this rpc call
*/
@Override
public void request(final ControlMessage.Message message,
final StreamObserver<ControlMessage.Message> responseObserver) {
LOG.debug("[REQUEST] request msg.id={}, msg.listenerId={}, msg.type={}",
message.getId(), message.getListenerId(), message.getType());
final MessageListener<ControlMessage.Message> listener = listenerMap.get(message.getListenerId());
if (listener == null) {
LOG.warn("A message arrived, which has no registered listener. msg.id={}, msg.listenerId={}, msg.type={}",
message.getId(), message.getListenerId(), message.getType());
responseObserver.onError(new Exception("There is no registered listener id=" + message.getListenerId()
+ " for message type=" + message.getType()));
return;
}
final GrpcMessageContext messageContext = new GrpcMessageContext(responseObserver);
// responseObserver.onComplete() is called in GrpcMessageContext, when the caller replies with a response message.
listener.onMessageWithContext(message, messageContext);
}
@Override
public void store(StoreRequest request, StreamObserver<StoreResponse> responseObserver) {
try {
responseObserver.onNext(indexManger.storeDocument(request));
responseObserver.onCompleted();
}
catch (Exception e) {
log.error("Failed to store: <" + request.getUniqueId() + "> in index <" + request.getIndexName() + ">: " + e.getClass().getSimpleName() + ": ", e);
Metadata m = new Metadata();
m.put(MetaKeys.ERROR_KEY, e.getMessage());
responseObserver.onError(new StatusRuntimeException(Status.UNKNOWN, m));
if (request.hasResultDocument()) {
try {
if (request.getResultDocument().hasDocument()) {
BasicBSONObject document = (BasicBSONObject) BSON.decode(request.getResultDocument().getDocument().toByteArray());
log.error(document.toString());
}
}
catch (Exception e2) {
}
}
}
}
@Override
public void findJob(JobFindJobRequest request, StreamObserver<JobFindJobResponse> responseObserver) {
try {
responseObserver.onNext(JobFindJobResponse.newBuilder()
.setJob(whiteboard.findJob(request.getName()))
.build());
responseObserver.onCompleted();
} catch (EmptyResultDataAccessException e) {
responseObserver.onError(Status.NOT_FOUND
.withDescription(e.getMessage())
.withCause(e)
.asRuntimeException());
}
}
@Override
public void generateProductMixIdeas(
GenerateProductMixIdeasRequest request,
StreamObserver<GenerateProductMixIdeasResponse> responseObserver) {
Object response = responses.remove();
if (response instanceof GenerateProductMixIdeasResponse) {
requests.add(request);
responseObserver.onNext((GenerateProductMixIdeasResponse) response);
responseObserver.onCompleted();
} else if (response instanceof Exception) {
responseObserver.onError((Exception) response);
} else {
responseObserver.onError(new IllegalArgumentException("Unrecognized response type"));
}
}
@Override
public void getAdGroupFeed(
GetAdGroupFeedRequest request, StreamObserver<AdGroupFeed> responseObserver) {
Object response = responses.remove();
if (response instanceof AdGroupFeed) {
requests.add(request);
responseObserver.onNext((AdGroupFeed) response);
responseObserver.onCompleted();
} else if (response instanceof Exception) {
responseObserver.onError((Exception) response);
} else {
responseObserver.onError(new IllegalArgumentException("Unrecognized response type"));
}
}
@Override
public void mutateCustomerExtensionSettings(
MutateCustomerExtensionSettingsRequest request,
StreamObserver<MutateCustomerExtensionSettingsResponse> responseObserver) {
Object response = responses.remove();
if (response instanceof MutateCustomerExtensionSettingsResponse) {
requests.add(request);
responseObserver.onNext((MutateCustomerExtensionSettingsResponse) response);
responseObserver.onCompleted();
} else if (response instanceof Exception) {
responseObserver.onError((Exception) response);
} else {
responseObserver.onError(new IllegalArgumentException("Unrecognized response type"));
}
}
@Override
public void getMobileAppCategoryConstant(
GetMobileAppCategoryConstantRequest request,
StreamObserver<MobileAppCategoryConstant> responseObserver) {
Object response = responses.remove();
if (response instanceof MobileAppCategoryConstant) {
requests.add(request);
responseObserver.onNext((MobileAppCategoryConstant) response);
responseObserver.onCompleted();
} else if (response instanceof Exception) {
responseObserver.onError((Exception) response);
} else {
responseObserver.onError(new IllegalArgumentException("Unrecognized response type"));
}
}
@Override
public void getTopic(GetTopicRequest request, StreamObserver<Topic> responseObserver) {
logger.atFine().log("Getting Topic %s", request);
Optional<Topic> topic = configurationManager.getTopicByName(request.getTopic());
if (!topic.isPresent()) {
String message = request.getTopic() + " is not a valid Topic";
logger.atWarning().log(message);
responseObserver.onError(Status.NOT_FOUND.withDescription(message).asException());
} else {
responseObserver.onNext(topic.get());
responseObserver.onCompleted();
}
}
@Override
public void getDistanceView(
GetDistanceViewRequest request, StreamObserver<DistanceView> responseObserver) {
Object response = responses.remove();
if (response instanceof DistanceView) {
requests.add(request);
responseObserver.onNext((DistanceView) response);
responseObserver.onCompleted();
} else if (response instanceof Exception) {
responseObserver.onError((Exception) response);
} else {
responseObserver.onError(new IllegalArgumentException("Unrecognized response type"));
}
}
@Override
public void listCampaignExperimentAsyncErrors(
ListCampaignExperimentAsyncErrorsRequest request,
StreamObserver<ListCampaignExperimentAsyncErrorsResponse> responseObserver) {
Object response = responses.remove();
if (response instanceof ListCampaignExperimentAsyncErrorsResponse) {
requests.add(request);
responseObserver.onNext((ListCampaignExperimentAsyncErrorsResponse) response);
responseObserver.onCompleted();
} else if (response instanceof Exception) {
responseObserver.onError((Exception) response);
} else {
responseObserver.onError(new IllegalArgumentException("Unrecognized response type"));
}
}
@Override
public void getAdGroupCriterionLabel(
GetAdGroupCriterionLabelRequest request,
StreamObserver<AdGroupCriterionLabel> responseObserver) {
Object response = responses.remove();
if (response instanceof AdGroupCriterionLabel) {
requests.add(request);
responseObserver.onNext((AdGroupCriterionLabel) response);
responseObserver.onCompleted();
} else if (response instanceof Exception) {
responseObserver.onError((Exception) response);
} else {
responseObserver.onError(new IllegalArgumentException("Unrecognized response type"));
}
}
@Override
public void mutateCampaignSharedSets(
MutateCampaignSharedSetsRequest request,
StreamObserver<MutateCampaignSharedSetsResponse> responseObserver) {
Object response = responses.remove();
if (response instanceof MutateCampaignSharedSetsResponse) {
requests.add(request);
responseObserver.onNext((MutateCampaignSharedSetsResponse) response);
responseObserver.onCompleted();
} else if (response instanceof Exception) {
responseObserver.onError((Exception) response);
} else {
responseObserver.onError(new IllegalArgumentException("Unrecognized response type"));
}
}
@Override
public void createCampaignExperiment(
CreateCampaignExperimentRequest request, StreamObserver<Operation> responseObserver) {
Object response = responses.remove();
if (response instanceof Operation) {
requests.add(request);
responseObserver.onNext((Operation) response);
responseObserver.onCompleted();
} else if (response instanceof Exception) {
responseObserver.onError((Exception) response);
} else {
responseObserver.onError(new IllegalArgumentException("Unrecognized response type"));
}
}
@Override
public StreamObserver<Messages.StreamingOutputCallRequest> fullDuplexCall(
StreamObserver<Messages.StreamingOutputCallResponse> responseObserver) {
AtomicInteger counter = new AtomicInteger();
return new StreamObserver<Messages.StreamingOutputCallRequest>() {
@Override
public void onNext(Messages.StreamingOutputCallRequest streamingOutputCallRequest) {
Messages.Payload payload = streamingOutputCallRequest.getPayload();
ByteString value = ByteString
.copyFromUtf8(payload.getBody().toStringUtf8() + counter.incrementAndGet());
Messages.Payload resp = Messages.Payload.newBuilder().setBody(value).build();
Messages.StreamingOutputCallResponse response = Messages.StreamingOutputCallResponse.newBuilder()
.setPayload(resp).build();
responseObserver.onNext(response);
}
@Override
public void onError(Throwable throwable) {
responseObserver.onError(throwable);
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
public static <V> void convertExceptions(StreamObserver<V> responseObserver, Runnable runnable) {
try {
runnable.run();
responseObserver.onCompleted();
} catch (Exception e) {
final Throwable rootCause;
rootCause = Throwables.getRootCause(e);
responseObserver.onError(Status.INTERNAL.withDescription(rootCause.getMessage()).asRuntimeException());
}
}
@Override
public void getBalance(GetBalanceRequest req, StreamObserver<GetBalanceReply> responseObserver) {
try {
long result = coreApi.getAvailableBalance();
var reply = GetBalanceReply.newBuilder().setBalance(result).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (IllegalStateException cause) {
var ex = new StatusRuntimeException(Status.UNKNOWN.withDescription(cause.getMessage()));
responseObserver.onError(ex);
throw ex;
}
}
@Override
public void promoteCampaignExperiment(
PromoteCampaignExperimentRequest request, StreamObserver<Operation> responseObserver) {
Object response = responses.remove();
if (response instanceof Operation) {
requests.add(request);
responseObserver.onNext((Operation) response);
responseObserver.onCompleted();
} else if (response instanceof Exception) {
responseObserver.onError((Exception) response);
} else {
responseObserver.onError(new IllegalArgumentException("Unrecognized response type"));
}
}
/**
* Example for testing async client-streaming.
*/
@Test
public void recordRoute_serverError() throws Exception {
client.setRandom(noRandomness);
Point point1 = Point.newBuilder().setLatitude(1).setLongitude(1).build();
final Feature requestFeature1 =
Feature.newBuilder().setLocation(point1).build();
final List<Feature> features = Arrays.asList(requestFeature1);
final StatusRuntimeException fakeError = new StatusRuntimeException(Status.INVALID_ARGUMENT);
// implement the fake service
RouteGuideImplBase recordRouteImpl =
new RouteGuideImplBase() {
@Override
public StreamObserver<Point> recordRoute(StreamObserver<RouteSummary> responseObserver) {
// send an error immediately
responseObserver.onError(fakeError);
StreamObserver<Point> requestObserver = new StreamObserver<Point>() {
@Override
public void onNext(Point value) {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
}
};
return requestObserver;
}
};
serviceRegistry.addService(recordRouteImpl);
client.recordRoute(features, 4);
ArgumentCaptor<Throwable> errorCaptor = ArgumentCaptor.forClass(Throwable.class);
verify(testHelper).onRpcError(errorCaptor.capture());
assertEquals(fakeError.getStatus(), Status.fromThrowable(errorCaptor.getValue()));
}