下面列出了怎么用io.grpc.stub.ServerCallStreamObserver的API类实例代码及写法,或者点击链接到github查看源代码。
StreamingPullStreamObserver(StreamObserver<StreamingPullResponse> responseObserver) {
// Upcast to a ServerCallStreamObserver to set manual flow control
this.responseObserver = (ServerCallStreamObserver<StreamingPullResponse>) responseObserver;
this.responseObserver.disableAutoInboundFlowControl();
this.responseObserver.setOnReadyHandler(
() -> {
if (isNull(subscriptionManager)) {
this.responseObserver.request(1);
}
});
this.responseObserver.setOnCancelHandler(
() -> {
logger.atInfo().log("Client cancelled StreamingPull %s", streamId);
stopIfRunning();
});
}
/**
* Implements a unary → stream call as {@link Single} → {@link Flowable}, where the server responds with a
* stream of messages.
*/
public static <TRequest, TResponse> void oneToMany(
final TRequest request,
final StreamObserver<TResponse> responseObserver,
final Function<Single<TRequest>, Flowable<TResponse>> delegate) {
try {
final Single<TRequest> rxRequest = Single.just(request);
final Flowable<TResponse> rxResponse = Preconditions.checkNotNull(delegate.apply(rxRequest));
final RxSubscriberAndServerProducer<TResponse> serverProducer =
rxResponse.subscribeWith(new RxSubscriberAndServerProducer<TResponse>());
serverProducer.subscribe((ServerCallStreamObserver<TResponse>) responseObserver);
} catch (Throwable throwable) {
responseObserver.onError(prepareError(throwable));
}
}
/**
* Implements a bidirectional stream → stream call as {@link Flowable} → {@link Flowable}, where both the client
* and the server independently stream to each other.
*/
public static <TRequest, TResponse> StreamObserver<TRequest> manyToMany(
final StreamObserver<TResponse> responseObserver,
final Function<Flowable<TRequest>, Flowable<TResponse>> delegate,
final CallOptions options) {
final int prefetch = RxCallOptions.getPrefetch(options);
final int lowTide = RxCallOptions.getLowTide(options);
final RxServerStreamObserverAndPublisher<TRequest> streamObserverPublisher =
new RxServerStreamObserverAndPublisher<TRequest>((ServerCallStreamObserver<TResponse>) responseObserver, null, prefetch, lowTide);
try {
final Flowable<TResponse> rxResponse = Preconditions.checkNotNull(delegate.apply(Flowable.fromPublisher(streamObserverPublisher)));
final RxSubscriberAndServerProducer<TResponse> subscriber = new RxSubscriberAndServerProducer<TResponse>();
subscriber.subscribe((ServerCallStreamObserver<TResponse>) responseObserver);
// Don't try to respond if the server has already canceled the request
rxResponse.subscribe(subscriber);
} catch (Throwable throwable) {
responseObserver.onError(prepareError(throwable));
}
return streamObserverPublisher;
}
@Override
public void streamingFromServer(Messages.SimpleRequest request, StreamObserver<Messages.SimpleResponse> responseObserver) {
final ServerCallStreamObserver<Messages.SimpleResponse> callStreamObserver = (ServerCallStreamObserver<Messages.SimpleResponse>) responseObserver;
for (Messages.SimpleResponse response : responses) {
if (callStreamObserver.isCancelled()) {
return;
}
callStreamObserver.onNext(response);
}
if (!callStreamObserver.isCancelled()) {
callStreamObserver.onCompleted();
}
}
/**
* Implements a bidirectional stream → stream call as {@link Flux} → {@link Flux}, where both the client
* and the server independently stream to each other.
*/
public static <TRequest, TResponse> StreamObserver<TRequest> manyToMany(
StreamObserver<TResponse> responseObserver,
Function<Flux<TRequest>, Flux<TResponse>> delegate,
CallOptions options) {
final int prefetch = ReactorCallOptions.getPrefetch(options);
final int lowTide = ReactorCallOptions.getLowTide(options);
ReactorServerStreamObserverAndPublisher<TRequest> streamObserverPublisher =
new ReactorServerStreamObserverAndPublisher<>((ServerCallStreamObserver<TResponse>) responseObserver, null, prefetch, lowTide);
try {
Flux<TResponse> rxResponse = Preconditions.checkNotNull(delegate.apply(Flux.from(streamObserverPublisher)));
ReactorSubscriberAndServerProducer<TResponse> subscriber = new ReactorSubscriberAndServerProducer<>();
subscriber.subscribe((ServerCallStreamObserver<TResponse>) responseObserver);
// Don't try to respond if the server has already canceled the request
rxResponse.subscribe(subscriber);
} catch (Throwable throwable) {
responseObserver.onError(prepareError(throwable));
}
return streamObserverPublisher;
}
@Override
public void observeEvents(Empty request, StreamObserver<SupervisorEvent> responseObserver) {
Subscription subscription = supervisorOperations.events()
.map(SupervisorGrpcModelConverters::toGrpcEvent)
.subscribe(
responseObserver::onNext,
e -> responseObserver.onError(
new StatusRuntimeException(Status.INTERNAL
.withDescription("Supervisor event stream terminated with an error")
.withCause(e))
),
responseObserver::onCompleted
);
ServerCallStreamObserver<SupervisorEvent> serverObserver = (ServerCallStreamObserver<SupervisorEvent>) responseObserver;
serverObserver.setOnCancelHandler(subscription::unsubscribe);
}
@Override
public void get(
Digest digest,
long offset,
long limit,
ServerCallStreamObserver<ByteString> blobObserver,
RequestMetadata requestMetadata) {
Blob blob = get(digest);
if (blob == null) {
if (delegate != null) {
// FIXME change this to a read-through get
delegate.get(digest, offset, limit, blobObserver, requestMetadata);
} else {
blobObserver.onError(io.grpc.Status.NOT_FOUND.asException());
}
} else {
blobObserver.onNext(blob.getData());
blobObserver.onCompleted();
}
}
@Override
public void waitExecution(
WaitExecutionRequest request, StreamObserver<Operation> responseObserver) {
String operationName = request.getName();
Instance instance;
try {
instance = instances.getFromOperationName(operationName);
} catch (InstanceNotFoundException e) {
responseObserver.onError(BuildFarmInstances.toStatusException(e));
return;
}
ServerCallStreamObserver<Operation> serverCallStreamObserver =
(ServerCallStreamObserver<Operation>) responseObserver;
withCancellation(
serverCallStreamObserver,
instance.watchOperation(
operationName,
createWatcher(serverCallStreamObserver, TracingMetadataUtils.fromCurrentContext())));
}
void readLimitedBlob(
Instance instance,
Digest digest,
long offset,
long limit,
StreamObserver<ReadResponse> responseObserver) {
ServerCallStreamObserver<ReadResponse> target =
onErrorLogReadObserver(
format("%s(%s)", DigestUtil.toString(digest), instance.getName()),
offset,
(ServerCallStreamObserver<ReadResponse>) responseObserver);
try {
instance.getBlob(
digest,
offset,
limit,
newChunkObserver(target),
TracingMetadataUtils.fromCurrentContext());
} catch (Exception e) {
target.onError(e);
}
}
private void provideBlob(Digest digest, ByteString content) {
blobDigests.add(digest);
// FIXME use better answer definitions, without indexes
doAnswer(
new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) {
StreamObserver<ByteString> blobObserver =
(StreamObserver) invocation.getArguments()[3];
blobObserver.onNext(content);
blobObserver.onCompleted();
return null;
}
})
.when(mockWorkerInstance)
.getBlob(
eq(digest),
eq(0l),
eq(digest.getSizeBytes()),
any(ServerCallStreamObserver.class),
any(RequestMetadata.class));
}
@Override
public void onNext(T value) {
synchronized (lock) {
// in theory we could implement ServerCallStreamObserver and expose isCancelled to our client,
// but for current purposes we only need the StreamObserver API, so treat a cancelled observer
// as something we just want to un-block from and return, and we'll trust the rest of our session
// to shutdown accordingly.
if (delegate instanceof ServerCallStreamObserver && ((ServerCallStreamObserver<T>) delegate).isCancelled()) {
return;
}
while (!delegate.isReady()) {
try {
lock.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}
delegate.onNext(value);
}
/**
* Immediately responds with a payload of the type and size specified in the request.
*/
@Override
public void unaryCall(SimpleRequest req, StreamObserver<SimpleResponse> responseObserver) {
ServerCallStreamObserver<SimpleResponse> obs =
(ServerCallStreamObserver<SimpleResponse>) responseObserver;
SimpleResponse.Builder responseBuilder = SimpleResponse.newBuilder();
try {
if (req.hasResponseCompressed() && req.getResponseCompressed().getValue()) {
obs.setCompression("gzip");
} else {
obs.setCompression("identity");
}
} catch (IllegalArgumentException e) {
obs.onError(Status.UNIMPLEMENTED
.withDescription("compression not supported.")
.withCause(e)
.asRuntimeException());
return;
}
if (req.getResponseSize() != 0) {
// For consistency with the c++ TestServiceImpl, use a random offset for unary calls.
// TODO(wonderfly): whether or not this is a good approach needs further discussion.
int offset = random.nextInt(compressableBuffer.size());
ByteString payload = generatePayload(compressableBuffer, offset, req.getResponseSize());
responseBuilder.setPayload(
Payload.newBuilder()
.setBody(payload));
}
if (req.hasResponseStatus()) {
obs.onError(Status.fromCodeValue(req.getResponseStatus().getCode())
.withDescription(req.getResponseStatus().getMessage())
.asRuntimeException());
return;
}
responseObserver.onNext(responseBuilder.build());
responseObserver.onCompleted();
}
@Override
public StreamObserver<ServerReflectionRequest> serverReflectionInfo(
final StreamObserver<ServerReflectionResponse> responseObserver) {
final ServerCallStreamObserver<ServerReflectionResponse> serverCallStreamObserver =
(ServerCallStreamObserver<ServerReflectionResponse>) responseObserver;
ProtoReflectionStreamObserver requestObserver =
new ProtoReflectionStreamObserver(updateIndexIfNecessary(), serverCallStreamObserver);
serverCallStreamObserver.setOnReadyHandler(requestObserver);
serverCallStreamObserver.disableAutoInboundFlowControl();
serverCallStreamObserver.request(1);
return requestObserver;
}
@Override
public StreamObserver<Messages.SimpleRequest> streamingCall(
final StreamObserver<Messages.SimpleResponse> observer) {
final ServerCallStreamObserver<Messages.SimpleResponse> responseObserver =
(ServerCallStreamObserver<Messages.SimpleResponse>) observer;
// TODO(spencerfang): flow control to stop reading when !responseObserver.isReady
return new StreamObserver<Messages.SimpleRequest>() {
@Override
public void onNext(Messages.SimpleRequest value) {
if (shutdown.get()) {
responseObserver.onCompleted();
return;
}
responseObserver.onNext(Utils.makeResponse(value));
}
@Override
public void onError(Throwable t) {
// other side closed with non OK
responseObserver.onError(t);
}
@Override
public void onCompleted() {
// other side closed with OK
responseObserver.onCompleted();
}
};
}
@Override
public void streamingFromServer(
final Messages.SimpleRequest request,
final StreamObserver<Messages.SimpleResponse> observer) {
// send forever, until the client cancels or we shut down
final Messages.SimpleResponse response = Utils.makeResponse(request);
final ServerCallStreamObserver<Messages.SimpleResponse> responseObserver =
(ServerCallStreamObserver<Messages.SimpleResponse>) observer;
// If the client cancels, copyWithFlowControl takes care of calling
// responseObserver.onCompleted() for us
StreamObservers.copyWithFlowControl(
new Iterator<Messages.SimpleResponse>() {
@Override
public boolean hasNext() {
return !shutdown.get() && !responseObserver.isCancelled();
}
@Override
public Messages.SimpleResponse next() {
return response;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
},
responseObserver);
}
private StreamObserver<DiscoveryRequest> createRequestHandler(
StreamObserver<DiscoveryResponse> responseObserver,
boolean ads,
String defaultTypeUrl) {
long streamId = streamCount.getAndIncrement();
Executor executor = executorGroup.next();
LOGGER.debug("[{}] open stream from {}", streamId, defaultTypeUrl);
callbacks.forEach(cb -> cb.onStreamOpen(streamId, defaultTypeUrl));
final DiscoveryRequestStreamObserver requestStreamObserver;
if (ads) {
requestStreamObserver = new AdsDiscoveryRequestStreamObserver(
responseObserver,
streamId,
executor,
this
);
} else {
requestStreamObserver = new XdsDiscoveryRequestStreamObserver(
defaultTypeUrl,
responseObserver,
streamId,
executor,
this
);
}
if (responseObserver instanceof ServerCallStreamObserver) {
((ServerCallStreamObserver) responseObserver).setOnCancelHandler(requestStreamObserver::onCancelled);
}
return requestStreamObserver;
}
RxServerStreamObserverAndPublisher(
ServerCallStreamObserver<?> serverCallStreamObserver,
Consumer<CallStreamObserver<?>> onSubscribe,
int prefetch,
int lowTide) {
super(serverCallStreamObserver, new SimpleQueueAdapter<T>(new SpscArrayQueue<T>(prefetch)), onSubscribe, prefetch, lowTide);
}
@Override
public void subscribe(CallStreamObserver<T> downstream) {
super.subscribe(downstream);
((ServerCallStreamObserver<?>) downstream).setOnCancelHandler(new Runnable() {
@Override
public void run() {
AbstractSubscriberAndServerProducer.super.cancel();
}
});
}
public AbstractServerStreamObserverAndPublisher(
ServerCallStreamObserver<?> serverCallStreamObserver,
Queue<T> queue,
Consumer<CallStreamObserver<?>> onSubscribe) {
super(queue, onSubscribe);
super.onSubscribe(serverCallStreamObserver);
}
public AbstractServerStreamObserverAndPublisher(
ServerCallStreamObserver<?> serverCallStreamObserver,
Queue<T> queue,
Consumer<CallStreamObserver<?>> onSubscribe,
int prefetch,
int lowTide) {
super(queue, prefetch, lowTide, onSubscribe);
super.onSubscribe(serverCallStreamObserver);
}
@Override
public StreamObserver<Messages.SimpleRequest> streamingBothWays(StreamObserver<Messages.SimpleResponse> responseObserver) {
final ServerCallStreamObserver<Messages.SimpleResponse> callStreamObserver = (ServerCallStreamObserver<Messages.SimpleResponse>) responseObserver;
callStreamObserver.setOnReadyHandler(() -> {
for (Messages.SimpleResponse response : responses) {
if (callStreamObserver.isCancelled()) {
return;
}
callStreamObserver.onNext(response);
}
if (!callStreamObserver.isCancelled()) {
callStreamObserver.onCompleted();
}
});
return new StreamObserver<Messages.SimpleRequest>() {
@Override
public void onNext(Messages.SimpleRequest value) {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
}
};
}
/**
* Implements a unary → stream call as {@link Mono} → {@link Flux}, where the server responds with a
* stream of messages.
*/
public static <TRequest, TResponse> void oneToMany(
TRequest request, StreamObserver<TResponse> responseObserver,
Function<Mono<TRequest>, Flux<TResponse>> delegate) {
try {
Mono<TRequest> rxRequest = Mono.just(request);
Flux<TResponse> rxResponse = Preconditions.checkNotNull(delegate.apply(rxRequest));
ReactorSubscriberAndServerProducer<TResponse> server = rxResponse.subscribeWith(new ReactorSubscriberAndServerProducer<>());
server.subscribe((ServerCallStreamObserver<TResponse>) responseObserver);
} catch (Throwable throwable) {
responseObserver.onError(prepareError(throwable));
}
}
ReactorServerStreamObserverAndPublisher(
ServerCallStreamObserver<?> serverCallStreamObserver,
Consumer<CallStreamObserver<?>> onSubscribe,
int prefetch,
int lowTide) {
super(serverCallStreamObserver, Queues.<T>get(DEFAULT_CHUNK_SIZE).get(), onSubscribe, prefetch, lowTide);
}
@Override
public void observeJobs(ObserveJobsQuery query, StreamObserver<JobChangeNotification> responseObserver) {
JobQueryCriteria<TaskStatus.TaskState, JobDescriptor.JobSpecCase> criteria = toJobQueryCriteria(query);
V3JobQueryCriteriaEvaluator jobsPredicate = new V3JobQueryCriteriaEvaluator(criteria, titusRuntime);
V3TaskQueryCriteriaEvaluator tasksPredicate = new V3TaskQueryCriteriaEvaluator(criteria, titusRuntime);
Observable<JobChangeNotification> eventStream = jobOperations.observeJobs(jobsPredicate, tasksPredicate)
// avoid clogging the computation scheduler
.observeOn(observeJobsScheduler)
.subscribeOn(observeJobsScheduler, false)
.map(event -> GrpcJobManagementModelConverters.toGrpcJobChangeNotification(event, logStorageInfo))
.compose(ObservableExt.head(() -> {
List<JobChangeNotification> snapshot = createJobsSnapshot(jobsPredicate, tasksPredicate);
snapshot.add(SNAPSHOT_END_MARKER);
return snapshot;
}))
.map(this::addTaskContextToJobChangeNotification)
.doOnError(e -> logger.error("Unexpected error in jobs event stream", e));
Subscription subscription = eventStream.subscribe(
responseObserver::onNext,
e -> responseObserver.onError(
new StatusRuntimeException(Status.INTERNAL
.withDescription("All jobs monitoring stream terminated with an error")
.withCause(e))
),
responseObserver::onCompleted
);
ServerCallStreamObserver<JobChangeNotification> serverObserver = (ServerCallStreamObserver<JobChangeNotification>) responseObserver;
serverObserver.setOnCancelHandler(subscription::unsubscribe);
}
@Override
public void observeJob(JobId request, StreamObserver<JobChangeNotification> responseObserver) {
String jobId = request.getId();
Observable<JobChangeNotification> eventStream = jobOperations.observeJob(jobId)
// avoid clogging the computation scheduler
.observeOn(observeJobsScheduler)
.subscribeOn(observeJobsScheduler, false)
.map(event -> GrpcJobManagementModelConverters.toGrpcJobChangeNotification(event, logStorageInfo))
.compose(ObservableExt.head(() -> {
List<JobChangeNotification> snapshot = createJobSnapshot(jobId);
snapshot.add(SNAPSHOT_END_MARKER);
return snapshot;
}))
.map(this::addTaskContextToJobChangeNotification)
.doOnError(e -> {
if (!JobManagerException.isExpected(e)) {
logger.error("Unexpected error in job {} event stream", jobId, e);
} else {
logger.debug("Error in job {} event stream", jobId, e);
}
});
Subscription subscription = eventStream.subscribe(
responseObserver::onNext,
e -> responseObserver.onError(
new StatusRuntimeException(Status.INTERNAL
.withDescription(jobId + " job monitoring stream terminated with an error")
.withCause(e))
),
responseObserver::onCompleted
);
ServerCallStreamObserver<JobChangeNotification> serverObserver = (ServerCallStreamObserver<JobChangeNotification>) responseObserver;
serverObserver.setOnCancelHandler(subscription::unsubscribe);
}
@Override
public void observeEvents(ObserverEventRequest request, StreamObserver<EvictionServiceEvent> responseObserver) {
Disposable subscription = evictionOperations.events(request.getIncludeSnapshot()).subscribe(
next -> toGrpcEvent(next).ifPresent(responseObserver::onNext),
e -> responseObserver.onError(
new StatusRuntimeException(Status.INTERNAL
.withDescription("Eviction event stream terminated with an error")
.withCause(e))
),
responseObserver::onCompleted
);
ServerCallStreamObserver<EvictionServiceEvent> serverObserver = (ServerCallStreamObserver<EvictionServiceEvent>) responseObserver;
serverObserver.setOnCancelHandler(subscription::dispose);
}
private OnReadyHandler(
String requestType,
Executor executor,
ServerCallStreamObserver<V> streamObserver,
Iterator<V> iterator
) {
this.executor = new OnReadyEventExecutor(requestType, executor);
this.responseObserver = streamObserver;
this.iterator = iterator;
}
/** Retrieve a value from the CAS by streaming content when ready */
@ThreadSafe
void get(
Digest digest,
long offset,
long count,
ServerCallStreamObserver<ByteString> blobObserver,
RequestMetadata requestMetadata);
@Override
public void get(
Digest digest,
long offset,
long count,
ServerCallStreamObserver<ByteString> blobObserver,
RequestMetadata requestMetadata) {
ReadRequest request =
ReadRequest.newBuilder()
.setResourceName(getBlobName(digest))
.setReadOffset(offset)
.setReadLimit(count)
.build();
ByteStreamGrpc.newStub(channel)
.withInterceptors(attachMetadataInterceptor(requestMetadata))
.read(
request,
new DelegateServerCallStreamObserver<ReadResponse, ByteString>(blobObserver) {
@Override
public void onNext(ReadResponse response) {
blobObserver.onNext(response.getData());
}
@Override
public void onError(Throwable t) {
blobObserver.onError(t);
}
@Override
public void onCompleted() {
blobObserver.onCompleted();
}
});
}
@Override
public void getBlob(
Digest blobDigest,
long offset,
long count,
ServerCallStreamObserver<ByteString> blobObserver,
RequestMetadata requestMetadata) {
contentAddressableStorage.get(blobDigest, offset, count, blobObserver, requestMetadata);
}