下面列出了怎么用io.grpc.stub.StreamObserver的API类实例代码及写法,或者点击链接到github查看源代码。
private InterruptingPredicate<QueueEntry> createOnMatch(
Instance instance, StreamObserver<QueueEntry> responseObserver) {
return (queueEntry) -> {
try {
responseObserver.onNext(queueEntry);
responseObserver.onCompleted();
return true;
} catch (StatusRuntimeException e) {
Status status = Status.fromThrowable(e);
if (status.getCode() != Status.Code.CANCELLED) {
responseObserver.onError(e);
}
}
instance.putOperation(instance.getOperation(queueEntry.getExecuteEntry().getOperationName()));
return false;
};
}
@Override
public void findRepositories(
FindRepositories request, StreamObserver<FindRepositories.Response> responseObserver) {
QPSCountResource.inc();
try {
try (RequestLatencyResource latencyResource =
new RequestLatencyResource(modelDBAuthInterceptor.getMethodName())) {
FindRepositories.Response response = repositoryDAO.findRepositories(request);
responseObserver.onNext(response);
responseObserver.onCompleted();
}
} catch (Exception e) {
ModelDBUtils.observeError(
responseObserver, e, FindRepositories.Response.getDefaultInstance());
}
}
@Override
public void onTxEvent(GrpcTxEvent message, StreamObserver<GrpcAck> responseObserver) {
boolean ok = txConsistentService.handle(new TxEvent(
message.getServiceName(),
message.getInstanceId(),
new Date(),
message.getGlobalTxId(),
message.getLocalTxId(),
message.getParentTxId().isEmpty() ? null : message.getParentTxId(),
message.getType(),
message.getCompensationMethod(),
message.getTimeout(),
message.getRetryMethod(),
message.getForwardRetries(),
message.getPayloads().toByteArray()
));
responseObserver.onNext(ok ? ALLOW : REJECT);
responseObserver.onCompleted();
}
@Override
public void nodeUnpublishVolume(NodeUnpublishVolumeRequest request,
StreamObserver<NodeUnpublishVolumeResponse> responseObserver) {
String umountCommand =
String.format("fusermount -u %s", request.getTargetPath());
LOG.info("Executing {}", umountCommand);
try {
executeCommand(umountCommand);
responseObserver.onNext(NodeUnpublishVolumeResponse.newBuilder()
.build());
responseObserver.onCompleted();
} catch (Exception e) {
responseObserver.onError(e);
}
}
/**
* Implements a unary → stream call as {@link Single} → {@link Flowable}, where the server responds with a
* stream of messages.
*/
public static <TRequest, TResponse> Flowable<TResponse> oneToMany(
final Single<TRequest> rxRequest,
final BiConsumer<TRequest, StreamObserver<TResponse>> delegate,
final CallOptions options) {
try {
final int prefetch = RxCallOptions.getPrefetch(options);
final int lowTide = RxCallOptions.getLowTide(options);
return rxRequest
.flatMapPublisher(new io.reactivex.functions.Function<TRequest, Publisher<? extends TResponse>>() {
@Override
public Publisher<? extends TResponse> apply(TRequest request) {
final RxClientStreamObserverAndPublisher<TResponse> consumerStreamObserver =
new RxClientStreamObserverAndPublisher<TResponse>(null, null, prefetch, lowTide);
delegate.accept(request, consumerStreamObserver);
return consumerStreamObserver;
}
});
} catch (Throwable throwable) {
return Flowable.error(throwable);
}
}
private <T> void throttle(String agentId, boolean postV09, String collectionType,
StreamObserver<T> responseObserver, Runnable runnable) {
Semaphore semaphore = throttlePerAgentId.getUnchecked(agentId);
boolean acquired;
try {
acquired = semaphore.tryAcquire(1, MINUTES);
} catch (InterruptedException e) {
// probably shutdown requested
responseObserver.onError(e);
return;
}
if (!acquired) {
logger.warn("{} - {} collection rejected due to backlog",
getAgentIdForLogging(agentId, postV09), collectionType);
responseObserver.onError(Status.RESOURCE_EXHAUSTED
.withDescription("collection rejected due to backlog")
.asRuntimeException());
return;
}
try {
runnable.run();
} finally {
semaphore.release();
}
}
@Override
public CompletableFuture<LeaseKeepAliveResponse> keepAliveOnce(long leaseId) {
CompletableFuture<LeaseKeepAliveResponse> future = new CompletableFuture<>();
StreamObserver<LeaseKeepAliveRequest> requestObserver = Observers.observe(
this.leaseStub::leaseKeepAlive,
response -> future.complete(new LeaseKeepAliveResponse(response)),
throwable -> future.completeExceptionally(toEtcdException(throwable)));
// cancel grpc stream when leaseKeepAliveResponseCompletableFuture completes.
CompletableFuture<LeaseKeepAliveResponse> answer = future
.whenCompleteAsync((val, throwable) -> requestObserver.onCompleted(), connectionManager.getExecutorService());
requestObserver.onNext(LeaseKeepAliveRequest.newBuilder().setID(leaseId).build());
return answer;
}
@Test
public void unexpectedExceptionCanNotMatch() {
GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
throw new ArithmeticException("Divide by zero");
}
};
ServerInterceptor interceptor = new TransmitUnexpectedExceptionInterceptor().forExactType(NullPointerException.class);
serverRule.getServiceRegistry().addService(ServerInterceptors.intercept(svc, interceptor));
GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(serverRule.getChannel());
assertThatThrownBy(() -> stub.sayHello(HelloRequest.newBuilder().setName("World").build()))
.isInstanceOf(StatusRuntimeException.class)
.matches(sre -> ((StatusRuntimeException) sre).getStatus().getCode().equals(Status.UNKNOWN.getCode()), "is Status.UNKNOWN")
.hasMessageContaining("UNKNOWN");
}
@Override
public void deleteLineage(
DeleteLineage request, StreamObserver<DeleteLineage.Response> responseObserver) {
QPSCountResource.inc();
try {
if (request.getInputCount() == 0 && request.getOutputCount() == 0) {
throw new ModelDBException("Input and output not specified", Code.INVALID_ARGUMENT);
} else {
if (request.getInputCount() == 0) {
throw new ModelDBException("Input not specified", Code.INVALID_ARGUMENT);
} else if (request.getOutputCount() == 0) {
throw new ModelDBException("Output not specified", Code.INVALID_ARGUMENT);
}
}
try (RequestLatencyResource latencyResource =
new RequestLatencyResource(ModelDBAuthInterceptor.METHOD_NAME.get())) {
DeleteLineage.Response response = lineageDAO.deleteLineage(request);
responseObserver.onNext(response);
responseObserver.onCompleted();
}
} catch (Exception e) {
ModelDBUtils.observeError(responseObserver, e, DeleteLineage.Response.getDefaultInstance());
}
}
@Test
public void cacheHitWithInlineOutput() throws Exception {
serviceRegistry.addService(
new ActionCacheImplBase() {
@Override
public void getActionResult(
GetActionResultRequest request, StreamObserver<ActionResult> responseObserver) {
responseObserver.onNext(
ActionResult.newBuilder()
.addOutputFiles(DUMMY_OUTPUT)
.setStdoutRaw(ByteString.copyFromUtf8("stdout"))
.setStderrRaw(ByteString.copyFromUtf8("stderr"))
.build());
responseObserver.onCompleted();
}
});
FakeSpawnExecutionContext policy =
new FakeSpawnExecutionContext(simpleSpawn, fakeFileCache, execRoot, outErr);
SpawnResult result = client.exec(simpleSpawn, policy);
assertThat(result.setupSuccess()).isTrue();
assertThat(result.exitCode()).isEqualTo(0);
assertThat(result.isCacheHit()).isTrue();
assertThat(outErr.outAsLatin1()).isEqualTo("stdout");
assertThat(outErr.errAsLatin1()).isEqualTo("stderr");
}
@Override
public void getQueryProfile(
GetQueryProfileRequest request, StreamObserver<GetQueryProfileResponse> responseObserver) {
try {
QueryId queryId = request.getQueryId();
Preconditions.checkNotNull(queryId);
QueryProfile mergedProfile = fetchOrBuildMergedProfile(queryId);
responseObserver.onNext(
GetQueryProfileResponse.newBuilder().setProfile(mergedProfile).build());
responseObserver.onCompleted();
} catch (IllegalArgumentException e) {
responseObserver.onError(
Status.INVALID_ARGUMENT
.withDescription("get query profile failed " + e.getMessage())
.asRuntimeException());
} catch (Exception ex) {
logger.error("get query profile failed", ex);
responseObserver.onError(
Status.INTERNAL.withDescription(ex.getMessage()).asRuntimeException());
}
}
@Override
public StreamObserver<Messages.StreamingInputCallRequest> streamingInputCall(
StreamObserver<Messages.StreamingInputCallResponse> responseObserver) {
List<String> list = new CopyOnWriteArrayList<>();
return new StreamObserver<Messages.StreamingInputCallRequest>() {
@Override
public void onNext(Messages.StreamingInputCallRequest streamingInputCallRequest) {
list.add(streamingInputCallRequest.getPayload().getBody().toStringUtf8());
}
@Override
public void onError(Throwable throwable) {
responseObserver.onError(throwable);
}
@Override
public void onCompleted() {
assertThat(list).containsExactly("a", "b", "c", "d");
responseObserver.onNext(Messages.StreamingInputCallResponse.newBuilder().build());
responseObserver.onCompleted();
}
};
}
@Test
public void recordsMultipleCalls() throws Throwable {
startGrpcServer(CHEAP_METRICS);
createGrpcBlockingStub().sayHello(REQUEST);
createGrpcBlockingStub().sayHello(REQUEST);
createGrpcBlockingStub().sayHello(REQUEST);
StreamRecorder<HelloResponse> streamRecorder = StreamRecorder.create();
StreamObserver<HelloRequest> requestStream =
createGrpcStub().sayHelloBidiStream(streamRecorder);
requestStream.onNext(REQUEST);
requestStream.onNext(REQUEST);
requestStream.onCompleted();
streamRecorder.awaitCompletion();
assertThat(findRecordedMetricOrThrow("grpc_server_started_total").samples).hasSize(2);
assertThat(findRecordedMetricOrThrow("grpc_server_handled_total").samples).hasSize(2);
}
@Override
public void findAllInputsOutputs(
FindAllInputsOutputs request,
StreamObserver<FindAllInputsOutputs.Response> responseObserver) {
QPSCountResource.inc();
try {
if (request.getItemsCount() == 0) {
throw new ModelDBException("Items not specified", Code.INVALID_ARGUMENT);
}
try (RequestLatencyResource latencyResource =
new RequestLatencyResource(ModelDBAuthInterceptor.METHOD_NAME.get())) {
FindAllInputsOutputs.Response response = lineageDAO.findAllInputsOutputs(request);
responseObserver.onNext(response);
responseObserver.onCompleted();
}
} catch (Exception e) {
ModelDBUtils.observeError(
responseObserver, e, FindAllInputsOutputs.Response.getDefaultInstance());
}
}
/** Returns top level channel aka {@link io.grpc.ManagedChannel}. */
@Override
public void getTopChannels(
GetTopChannelsRequest request, StreamObserver<GetTopChannelsResponse> responseObserver) {
InternalChannelz.RootChannelList rootChannels
= channelz.getRootChannels(request.getStartChannelId(), maxPageSize);
GetTopChannelsResponse resp;
try {
resp = ChannelzProtoUtil.toGetTopChannelResponse(rootChannels);
} catch (StatusRuntimeException e) {
responseObserver.onError(e);
return;
}
responseObserver.onNext(resp);
responseObserver.onCompleted();
}
@Override
public void getMempoolTransactionList(RequestAddress req, StreamObserver<TransactionHashList> observer)
{
AddressSpecHash spec_hash = new AddressSpecHash(req.getAddressSpecHash());
TransactionHashList.Builder list = TransactionHashList.newBuilder();
for(ChainHash h : node.getMemPool().getTransactionsForAddress(spec_hash))
{
list.addTxHashes(h.getBytes());
}
observer.onNext( list.build());
observer.onCompleted();
}
@Override
public void getGeoTargetConstant(
GetGeoTargetConstantRequest request, StreamObserver<GeoTargetConstant> responseObserver) {
Object response = responses.remove();
if (response instanceof GeoTargetConstant) {
requests.add(request);
responseObserver.onNext((GeoTargetConstant) response);
responseObserver.onCompleted();
} else if (response instanceof Exception) {
responseObserver.onError((Exception) response);
} else {
responseObserver.onError(new IllegalArgumentException("Unrecognized response type"));
}
}
@Override
public void testCallServiceThrottling(TestRequest testRequest, StreamObserver<TestResponse> responseObserver) {
String receivedReq = testRequest.getTestReqString();
TestResponse response = TestResponse.newBuilder().setTestResString("response received :" + receivedReq).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
/**
* CDS response containing UpstreamTlsContext for a cluster in a deprecated field.
*/
// TODO(sanjaypujare): remove once we move to envoy proto v3
@Test
public void cdsResponseWithDeprecatedUpstreamTlsContext() {
xdsClient.watchClusterData("cluster-foo.googleapis.com", clusterWatcher);
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
// Management server sends back CDS response with UpstreamTlsContext.
UpstreamTlsContext testUpstreamTlsContext =
buildUpstreamTlsContext("secret1", "unix:/var/uds2");
List<Any> clusters = ImmutableList.of(
Any.pack(buildCluster("cluster-bar.googleapis.com", null, false)),
Any.pack(buildDeprecatedSecureCluster("cluster-foo.googleapis.com",
"eds-cluster-foo.googleapis.com", true, testUpstreamTlsContext)),
Any.pack(buildCluster("cluster-baz.googleapis.com", null, false)));
DiscoveryResponse response =
buildDiscoveryResponse("0", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0000");
responseObserver.onNext(response);
// Client sent an ACK CDS request.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(NODE, "0", "cluster-foo.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS, "0000")));
ArgumentCaptor<ClusterUpdate> clusterUpdateCaptor = ArgumentCaptor.forClass(null);
verify(clusterWatcher, times(1)).onClusterChanged(clusterUpdateCaptor.capture());
ClusterUpdate clusterUpdate = clusterUpdateCaptor.getValue();
assertThat(clusterUpdate.getUpstreamTlsContext())
.isEqualTo(
EnvoyServerProtoData.UpstreamTlsContext.fromEnvoyProtoUpstreamTlsContext(
testUpstreamTlsContext));
}
@Override
public void onNext(ResT value) {
for (StreamObserver<ResT> observer : observers) {
try {
observer.onNext(value);
} catch (Throwable t) {
logger.error("Exception in composite onNext, moving on", t);
}
}
}
@Override
public void createShelf(CreateShelfRequest request, StreamObserver<Shelf> responseObserver) {
Shelf response;
try {
response = data.createShelf(request.getShelf()).getShelf();
} catch (Throwable t) {
responseObserver.onError(t);
return;
}
responseObserver.onNext(response);
responseObserver.onCompleted();
}
@Override
public void getQueueInfo(TaskServicePb.QueueInfoRequest req, StreamObserver<TaskServicePb.QueueInfoResponse> response) {
Map<String, Long> queueInfo = taskService.getAllQueueDetails();
response.onNext(
TaskServicePb.QueueInfoResponse.newBuilder()
.putAllQueues(queueInfo)
.build()
);
response.onCompleted();
}
@Override
public void getProductGroupView(
GetProductGroupViewRequest request, StreamObserver<ProductGroupView> responseObserver) {
Object response = responses.remove();
if (response instanceof ProductGroupView) {
requests.add(request);
responseObserver.onNext((ProductGroupView) response);
responseObserver.onCompleted();
} else if (response instanceof Exception) {
responseObserver.onError((Exception) response);
} else {
responseObserver.onError(new IllegalArgumentException("Unrecognized response type"));
}
}
@Test
public void contextTransfersOneHopAsync() throws Exception {
Metadata.Key<String> ctxKey = Metadata.Key.of("ctx-context-key", Metadata.ASCII_STRING_MARSHALLER);
String expectedCtxValue = "context-value";
AtomicReference<String> ctxValue = new AtomicReference<>();
// Service
GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
ctxValue.set(AmbientContext.current().get(ctxKey));
responseObserver.onNext(HelloResponse.newBuilder().setMessage("Hello " + request.getName()).build());
responseObserver.onCompleted();
}
};
// Plumbing
serverRule1.getServiceRegistry().addService(ServerInterceptors
.intercept(svc, new AmbientContextServerInterceptor("ctx-")));
GreeterGrpc.GreeterFutureStub stub = GreeterGrpc
.newFutureStub(serverRule1.getChannel())
.withInterceptors(new AmbientContextClientInterceptor("ctx-"));
// Test
AmbientContext.initialize(Context.current()).run(() -> {
AmbientContext.current().put(ctxKey, expectedCtxValue);
ListenableFuture<HelloResponse> futureResponse = stub.sayHello(HelloRequest.newBuilder().setName("world").build());
// Verify response callbacks still have context
MoreFutures.onSuccess(
futureResponse,
response -> assertThat(AmbientContext.current().get(ctxKey)).isEqualTo(expectedCtxValue),
Context.currentContextExecutor(Executors.newSingleThreadExecutor()));
await().atMost(Duration.ONE_SECOND).until(futureResponse::isDone);
});
assertThat(ctxValue.get()).isEqualTo(expectedCtxValue);
}
ListenableFuture<FileDescriptorSet> start(
StreamObserver<ServerReflectionRequest> requestStream) {
this.requestStream = requestStream;
requestStream.onNext(requestForSymbol(serviceName));
++outstandingRequests;
return resultFuture;
}
@Override
public void getFile(FileRequest req, StreamObserver<FileResponse> responseObserver) {
try {
String filePath =
fileUtils.joinToAbsolutePath(basePath, "head", "startup-os", req.getFilename());
responseObserver.onNext(
FileResponse.newBuilder().setContent(fileUtils.readFile(filePath)).build());
responseObserver.onCompleted();
} catch (SecurityException | IOException e) {
responseObserver.onError(
Status.NOT_FOUND
.withDescription(String.format("No such file %s", req.getFilename()))
.asException());
}
}
@Override
public void getFeedItem(GetFeedItemRequest request, StreamObserver<FeedItem> responseObserver) {
Object response = responses.remove();
if (response instanceof FeedItem) {
requests.add(request);
responseObserver.onNext((FeedItem) response);
responseObserver.onCompleted();
} else if (response instanceof Exception) {
responseObserver.onError((Exception) response);
} else {
responseObserver.onError(new IllegalArgumentException("Unrecognized response type"));
}
}
@Override
public void setDefaultJobMaxCores(GroupSetDefJobMaxCoresRequest request,
StreamObserver<GroupSetDefJobMaxCoresResponse> responseObserver) {
GroupInterface group = getGroupInterface(request.getGroup());
groupManager.setGroupDefaultJobMaxCores(group, Convert.coresToWholeCoreUnits(request.getMaxCores()));
responseObserver.onNext(GroupSetDefJobMaxCoresResponse.newBuilder().build());
responseObserver.onCompleted();
}
@Override
public void findAgentInstances(AgentQuery request, StreamObserver<AgentInstances> responseObserver) {
Disposable subscription = agentManagementService.findAgentInstances(request).subscribe(
responseObserver::onNext,
e -> safeOnError(logger, e, responseObserver),
responseObserver::onCompleted
);
attachCancellingCallback(responseObserver, subscription);
}
@Override
public void getParentalStatusView(
GetParentalStatusViewRequest request, StreamObserver<ParentalStatusView> responseObserver) {
Object response = responses.remove();
if (response instanceof ParentalStatusView) {
requests.add(request);
responseObserver.onNext((ParentalStatusView) response);
responseObserver.onCompleted();
} else if (response instanceof Exception) {
responseObserver.onError((Exception) response);
} else {
responseObserver.onError(new IllegalArgumentException("Unrecognized response type"));
}
}