类io.grpc.stub.ServerCallStreamObserver源码实例Demo

下面列出了怎么用io.grpc.stub.ServerCallStreamObserver的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: kafka-pubsub-emulator   文件: SubscriberService.java
StreamingPullStreamObserver(StreamObserver<StreamingPullResponse> responseObserver) {
  // Upcast to a ServerCallStreamObserver to set manual flow control
  this.responseObserver = (ServerCallStreamObserver<StreamingPullResponse>) responseObserver;
  this.responseObserver.disableAutoInboundFlowControl();

  this.responseObserver.setOnReadyHandler(
      () -> {
        if (isNull(subscriptionManager)) {
          this.responseObserver.request(1);
        }
      });
  this.responseObserver.setOnCancelHandler(
      () -> {
        logger.atInfo().log("Client cancelled StreamingPull %s", streamId);
        stopIfRunning();
      });
}
 
源代码2 项目: reactive-grpc   文件: ServerCalls.java
/**
 * Implements a unary → stream call as {@link Single} → {@link Flowable}, where the server responds with a
 * stream of messages.
 */
public static <TRequest, TResponse> void oneToMany(
        final TRequest request,
        final StreamObserver<TResponse> responseObserver,
        final Function<Single<TRequest>, Flowable<TResponse>> delegate) {
    try {
        final Single<TRequest> rxRequest = Single.just(request);

        final Flowable<TResponse> rxResponse = Preconditions.checkNotNull(delegate.apply(rxRequest));
        final RxSubscriberAndServerProducer<TResponse> serverProducer =
                rxResponse.subscribeWith(new RxSubscriberAndServerProducer<TResponse>());
        serverProducer.subscribe((ServerCallStreamObserver<TResponse>) responseObserver);
    } catch (Throwable throwable) {
        responseObserver.onError(prepareError(throwable));
    }
}
 
源代码3 项目: reactive-grpc   文件: ServerCalls.java
/**
 * Implements a bidirectional stream → stream call as {@link Flowable} → {@link Flowable}, where both the client
 * and the server independently stream to each other.
 */
public static <TRequest, TResponse> StreamObserver<TRequest> manyToMany(
        final StreamObserver<TResponse> responseObserver,
        final Function<Flowable<TRequest>, Flowable<TResponse>> delegate,
        final CallOptions options) {

    final int prefetch = RxCallOptions.getPrefetch(options);
    final int lowTide = RxCallOptions.getLowTide(options);

    final RxServerStreamObserverAndPublisher<TRequest> streamObserverPublisher =
            new RxServerStreamObserverAndPublisher<TRequest>((ServerCallStreamObserver<TResponse>) responseObserver, null, prefetch, lowTide);

    try {
        final Flowable<TResponse> rxResponse = Preconditions.checkNotNull(delegate.apply(Flowable.fromPublisher(streamObserverPublisher)));
        final RxSubscriberAndServerProducer<TResponse> subscriber = new RxSubscriberAndServerProducer<TResponse>();
        subscriber.subscribe((ServerCallStreamObserver<TResponse>) responseObserver);
        // Don't try to respond if the server has already canceled the request
        rxResponse.subscribe(subscriber);
    } catch (Throwable throwable) {
        responseObserver.onError(prepareError(throwable));
    }

    return streamObserverPublisher;
}
 
@Override
public void streamingFromServer(Messages.SimpleRequest request, StreamObserver<Messages.SimpleResponse> responseObserver) {
    final ServerCallStreamObserver<Messages.SimpleResponse> callStreamObserver = (ServerCallStreamObserver<Messages.SimpleResponse>) responseObserver;

    for (Messages.SimpleResponse response : responses) {
        if (callStreamObserver.isCancelled()) {
            return;
        }

        callStreamObserver.onNext(response);
    }

    if (!callStreamObserver.isCancelled()) {
        callStreamObserver.onCompleted();
    }
}
 
源代码5 项目: reactive-grpc   文件: ServerCalls.java
/**
 * Implements a bidirectional stream → stream call as {@link Flux} → {@link Flux}, where both the client
 * and the server independently stream to each other.
 */
public static <TRequest, TResponse> StreamObserver<TRequest> manyToMany(
        StreamObserver<TResponse> responseObserver,
        Function<Flux<TRequest>, Flux<TResponse>> delegate,
        CallOptions options) {

    final int prefetch = ReactorCallOptions.getPrefetch(options);
    final int lowTide = ReactorCallOptions.getLowTide(options);

    ReactorServerStreamObserverAndPublisher<TRequest> streamObserverPublisher =
            new ReactorServerStreamObserverAndPublisher<>((ServerCallStreamObserver<TResponse>) responseObserver, null, prefetch, lowTide);

    try {
        Flux<TResponse> rxResponse = Preconditions.checkNotNull(delegate.apply(Flux.from(streamObserverPublisher)));
        ReactorSubscriberAndServerProducer<TResponse> subscriber = new ReactorSubscriberAndServerProducer<>();
        subscriber.subscribe((ServerCallStreamObserver<TResponse>) responseObserver);
        // Don't try to respond if the server has already canceled the request
        rxResponse.subscribe(subscriber);
    } catch (Throwable throwable) {
        responseObserver.onError(prepareError(throwable));
    }

    return streamObserverPublisher;
}
 
@Override
public void observeEvents(Empty request, StreamObserver<SupervisorEvent> responseObserver) {
    Subscription subscription = supervisorOperations.events()
            .map(SupervisorGrpcModelConverters::toGrpcEvent)
            .subscribe(
                    responseObserver::onNext,
                    e -> responseObserver.onError(
                            new StatusRuntimeException(Status.INTERNAL
                                    .withDescription("Supervisor event stream terminated with an error")
                                    .withCause(e))
                    ),
                    responseObserver::onCompleted
            );
    ServerCallStreamObserver<SupervisorEvent> serverObserver = (ServerCallStreamObserver<SupervisorEvent>) responseObserver;
    serverObserver.setOnCancelHandler(subscription::unsubscribe);
}
 
源代码7 项目: bazel-buildfarm   文件: MemoryCAS.java
@Override
public void get(
    Digest digest,
    long offset,
    long limit,
    ServerCallStreamObserver<ByteString> blobObserver,
    RequestMetadata requestMetadata) {
  Blob blob = get(digest);
  if (blob == null) {
    if (delegate != null) {
      // FIXME change this to a read-through get
      delegate.get(digest, offset, limit, blobObserver, requestMetadata);
    } else {
      blobObserver.onError(io.grpc.Status.NOT_FOUND.asException());
    }
  } else {
    blobObserver.onNext(blob.getData());
    blobObserver.onCompleted();
  }
}
 
源代码8 项目: bazel-buildfarm   文件: ExecutionService.java
@Override
public void waitExecution(
    WaitExecutionRequest request, StreamObserver<Operation> responseObserver) {
  String operationName = request.getName();
  Instance instance;
  try {
    instance = instances.getFromOperationName(operationName);
  } catch (InstanceNotFoundException e) {
    responseObserver.onError(BuildFarmInstances.toStatusException(e));
    return;
  }

  ServerCallStreamObserver<Operation> serverCallStreamObserver =
      (ServerCallStreamObserver<Operation>) responseObserver;
  withCancellation(
      serverCallStreamObserver,
      instance.watchOperation(
          operationName,
          createWatcher(serverCallStreamObserver, TracingMetadataUtils.fromCurrentContext())));
}
 
源代码9 项目: bazel-buildfarm   文件: ByteStreamService.java
void readLimitedBlob(
    Instance instance,
    Digest digest,
    long offset,
    long limit,
    StreamObserver<ReadResponse> responseObserver) {
  ServerCallStreamObserver<ReadResponse> target =
      onErrorLogReadObserver(
          format("%s(%s)", DigestUtil.toString(digest), instance.getName()),
          offset,
          (ServerCallStreamObserver<ReadResponse>) responseObserver);
  try {
    instance.getBlob(
        digest,
        offset,
        limit,
        newChunkObserver(target),
        TracingMetadataUtils.fromCurrentContext());
  } catch (Exception e) {
    target.onError(e);
  }
}
 
源代码10 项目: bazel-buildfarm   文件: ShardInstanceTest.java
private void provideBlob(Digest digest, ByteString content) {
  blobDigests.add(digest);
  // FIXME use better answer definitions, without indexes
  doAnswer(
          new Answer<Void>() {
            @Override
            public Void answer(InvocationOnMock invocation) {
              StreamObserver<ByteString> blobObserver =
                  (StreamObserver) invocation.getArguments()[3];
              blobObserver.onNext(content);
              blobObserver.onCompleted();
              return null;
            }
          })
      .when(mockWorkerInstance)
      .getBlob(
          eq(digest),
          eq(0l),
          eq(digest.getSizeBytes()),
          any(ServerCallStreamObserver.class),
          any(RequestMetadata.class));
}
 
源代码11 项目: mirror   文件: BlockingStreamObserver.java
@Override
public void onNext(T value) {
  synchronized (lock) {
    // in theory we could implement ServerCallStreamObserver and expose isCancelled to our client,
    // but for current purposes we only need the StreamObserver API, so treat a cancelled observer
    // as something we just want to un-block from and return, and we'll trust the rest of our session
    // to shutdown accordingly.
    if (delegate instanceof ServerCallStreamObserver && ((ServerCallStreamObserver<T>) delegate).isCancelled()) {
      return;
    }
    while (!delegate.isReady()) {
      try {
        lock.wait();
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        return;
      }
    }
  }
  delegate.onNext(value);
}
 
源代码12 项目: grpc-nebula-java   文件: TestServiceImpl.java
/**
 * Immediately responds with a payload of the type and size specified in the request.
 */
@Override
public void unaryCall(SimpleRequest req, StreamObserver<SimpleResponse> responseObserver) {
  ServerCallStreamObserver<SimpleResponse> obs =
      (ServerCallStreamObserver<SimpleResponse>) responseObserver;
  SimpleResponse.Builder responseBuilder = SimpleResponse.newBuilder();
  try {
    if (req.hasResponseCompressed() && req.getResponseCompressed().getValue()) {
      obs.setCompression("gzip");
    } else {
      obs.setCompression("identity");
    }
  } catch (IllegalArgumentException e) {
    obs.onError(Status.UNIMPLEMENTED
        .withDescription("compression not supported.")
        .withCause(e)
        .asRuntimeException());
    return;
  }

  if (req.getResponseSize() != 0) {
    // For consistency with the c++ TestServiceImpl, use a random offset for unary calls.
    // TODO(wonderfly): whether or not this is a good approach needs further discussion.
    int offset = random.nextInt(compressableBuffer.size());
    ByteString payload = generatePayload(compressableBuffer, offset, req.getResponseSize());
    responseBuilder.setPayload(
        Payload.newBuilder()
            .setBody(payload));
  }

  if (req.hasResponseStatus()) {
    obs.onError(Status.fromCodeValue(req.getResponseStatus().getCode())
        .withDescription(req.getResponseStatus().getMessage())
        .asRuntimeException());
    return;
  }

  responseObserver.onNext(responseBuilder.build());
  responseObserver.onCompleted();
}
 
源代码13 项目: grpc-nebula-java   文件: ProtoReflectionService.java
@Override
public StreamObserver<ServerReflectionRequest> serverReflectionInfo(
    final StreamObserver<ServerReflectionResponse> responseObserver) {
  final ServerCallStreamObserver<ServerReflectionResponse> serverCallStreamObserver =
      (ServerCallStreamObserver<ServerReflectionResponse>) responseObserver;
  ProtoReflectionStreamObserver requestObserver =
      new ProtoReflectionStreamObserver(updateIndexIfNecessary(), serverCallStreamObserver);
  serverCallStreamObserver.setOnReadyHandler(requestObserver);
  serverCallStreamObserver.disableAutoInboundFlowControl();
  serverCallStreamObserver.request(1);
  return requestObserver;
}
 
源代码14 项目: grpc-nebula-java   文件: AsyncServer.java
@Override
public StreamObserver<Messages.SimpleRequest> streamingCall(
    final StreamObserver<Messages.SimpleResponse> observer) {
  final ServerCallStreamObserver<Messages.SimpleResponse> responseObserver =
      (ServerCallStreamObserver<Messages.SimpleResponse>) observer;
  // TODO(spencerfang): flow control to stop reading when !responseObserver.isReady
  return new StreamObserver<Messages.SimpleRequest>() {
    @Override
    public void onNext(Messages.SimpleRequest value) {
      if (shutdown.get()) {
        responseObserver.onCompleted();
        return;
      }
      responseObserver.onNext(Utils.makeResponse(value));
    }

    @Override
    public void onError(Throwable t) {
      // other side closed with non OK
      responseObserver.onError(t);
    }

    @Override
    public void onCompleted() {
      // other side closed with OK
      responseObserver.onCompleted();
    }
  };
}
 
源代码15 项目: grpc-nebula-java   文件: AsyncServer.java
@Override
public void streamingFromServer(
    final Messages.SimpleRequest request,
    final StreamObserver<Messages.SimpleResponse> observer) {
  // send forever, until the client cancels or we shut down
  final Messages.SimpleResponse response = Utils.makeResponse(request);
  final ServerCallStreamObserver<Messages.SimpleResponse> responseObserver =
      (ServerCallStreamObserver<Messages.SimpleResponse>) observer;
  // If the client cancels, copyWithFlowControl takes care of calling
  // responseObserver.onCompleted() for us
  StreamObservers.copyWithFlowControl(
      new Iterator<Messages.SimpleResponse>() {
        @Override
        public boolean hasNext() {
          return !shutdown.get() && !responseObserver.isCancelled();
        }

        @Override
        public Messages.SimpleResponse next() {
          return response;
        }

        @Override
        public void remove() {
          throw new UnsupportedOperationException();
        }
      },
      responseObserver);
}
 
源代码16 项目: java-control-plane   文件: DiscoveryServer.java
private StreamObserver<DiscoveryRequest> createRequestHandler(
    StreamObserver<DiscoveryResponse> responseObserver,
    boolean ads,
    String defaultTypeUrl) {

  long streamId = streamCount.getAndIncrement();
  Executor executor = executorGroup.next();

  LOGGER.debug("[{}] open stream from {}", streamId, defaultTypeUrl);

  callbacks.forEach(cb -> cb.onStreamOpen(streamId, defaultTypeUrl));

  final DiscoveryRequestStreamObserver requestStreamObserver;
  if (ads) {
    requestStreamObserver = new AdsDiscoveryRequestStreamObserver(
        responseObserver,
        streamId,
        executor,
        this
    );
  } else {
    requestStreamObserver = new XdsDiscoveryRequestStreamObserver(
        defaultTypeUrl,
        responseObserver,
        streamId,
        executor,
        this
    );
  }

  if (responseObserver instanceof ServerCallStreamObserver) {
    ((ServerCallStreamObserver) responseObserver).setOnCancelHandler(requestStreamObserver::onCancelled);
  }

  return requestStreamObserver;
}
 
RxServerStreamObserverAndPublisher(
        ServerCallStreamObserver<?> serverCallStreamObserver,
        Consumer<CallStreamObserver<?>> onSubscribe,
        int prefetch,
        int lowTide) {
    super(serverCallStreamObserver, new SimpleQueueAdapter<T>(new SpscArrayQueue<T>(prefetch)), onSubscribe, prefetch, lowTide);
}
 
@Override
public void subscribe(CallStreamObserver<T> downstream) {
    super.subscribe(downstream);
    ((ServerCallStreamObserver<?>) downstream).setOnCancelHandler(new Runnable() {
        @Override
        public void run() {
            AbstractSubscriberAndServerProducer.super.cancel();
        }
    });
}
 
public AbstractServerStreamObserverAndPublisher(
    ServerCallStreamObserver<?> serverCallStreamObserver,
    Queue<T> queue,
    Consumer<CallStreamObserver<?>> onSubscribe) {
    super(queue, onSubscribe);
    super.onSubscribe(serverCallStreamObserver);
}
 
public AbstractServerStreamObserverAndPublisher(
        ServerCallStreamObserver<?> serverCallStreamObserver,
        Queue<T> queue,
        Consumer<CallStreamObserver<?>> onSubscribe,
        int prefetch,
        int lowTide) {
    super(queue, prefetch, lowTide, onSubscribe);
    super.onSubscribe(serverCallStreamObserver);
}
 
@Override
public StreamObserver<Messages.SimpleRequest> streamingBothWays(StreamObserver<Messages.SimpleResponse> responseObserver) {
    final ServerCallStreamObserver<Messages.SimpleResponse> callStreamObserver = (ServerCallStreamObserver<Messages.SimpleResponse>) responseObserver;

    callStreamObserver.setOnReadyHandler(() -> {
        for (Messages.SimpleResponse response : responses) {
            if (callStreamObserver.isCancelled()) {
                return;
            }

            callStreamObserver.onNext(response);
        }

        if (!callStreamObserver.isCancelled()) {
            callStreamObserver.onCompleted();
        }
    });

    return new StreamObserver<Messages.SimpleRequest>() {
        @Override
        public void onNext(Messages.SimpleRequest value) {

        }

        @Override
        public void onError(Throwable t) {

        }

        @Override
        public void onCompleted() {

        }
    };
}
 
源代码22 项目: reactive-grpc   文件: ServerCalls.java
/**
 * Implements a unary → stream call as {@link Mono} → {@link Flux}, where the server responds with a
 * stream of messages.
 */
public static <TRequest, TResponse> void oneToMany(
        TRequest request, StreamObserver<TResponse> responseObserver,
        Function<Mono<TRequest>, Flux<TResponse>> delegate) {
    try {
        Mono<TRequest> rxRequest = Mono.just(request);

        Flux<TResponse> rxResponse = Preconditions.checkNotNull(delegate.apply(rxRequest));
        ReactorSubscriberAndServerProducer<TResponse> server = rxResponse.subscribeWith(new ReactorSubscriberAndServerProducer<>());
        server.subscribe((ServerCallStreamObserver<TResponse>) responseObserver);
    } catch (Throwable throwable) {
        responseObserver.onError(prepareError(throwable));
    }
}
 
ReactorServerStreamObserverAndPublisher(
        ServerCallStreamObserver<?> serverCallStreamObserver,
        Consumer<CallStreamObserver<?>> onSubscribe,
        int prefetch,
        int lowTide) {
    super(serverCallStreamObserver, Queues.<T>get(DEFAULT_CHUNK_SIZE).get(), onSubscribe, prefetch, lowTide);
}
 
@Override
public void observeJobs(ObserveJobsQuery query, StreamObserver<JobChangeNotification> responseObserver) {
    JobQueryCriteria<TaskStatus.TaskState, JobDescriptor.JobSpecCase> criteria = toJobQueryCriteria(query);
    V3JobQueryCriteriaEvaluator jobsPredicate = new V3JobQueryCriteriaEvaluator(criteria, titusRuntime);
    V3TaskQueryCriteriaEvaluator tasksPredicate = new V3TaskQueryCriteriaEvaluator(criteria, titusRuntime);

    Observable<JobChangeNotification> eventStream = jobOperations.observeJobs(jobsPredicate, tasksPredicate)
            // avoid clogging the computation scheduler
            .observeOn(observeJobsScheduler)
            .subscribeOn(observeJobsScheduler, false)
            .map(event -> GrpcJobManagementModelConverters.toGrpcJobChangeNotification(event, logStorageInfo))
            .compose(ObservableExt.head(() -> {
                List<JobChangeNotification> snapshot = createJobsSnapshot(jobsPredicate, tasksPredicate);
                snapshot.add(SNAPSHOT_END_MARKER);
                return snapshot;
            }))
            .map(this::addTaskContextToJobChangeNotification)
            .doOnError(e -> logger.error("Unexpected error in jobs event stream", e));

    Subscription subscription = eventStream.subscribe(
            responseObserver::onNext,
            e -> responseObserver.onError(
                    new StatusRuntimeException(Status.INTERNAL
                            .withDescription("All jobs monitoring stream terminated with an error")
                            .withCause(e))
            ),
            responseObserver::onCompleted
    );

    ServerCallStreamObserver<JobChangeNotification> serverObserver = (ServerCallStreamObserver<JobChangeNotification>) responseObserver;
    serverObserver.setOnCancelHandler(subscription::unsubscribe);
}
 
@Override
public void observeJob(JobId request, StreamObserver<JobChangeNotification> responseObserver) {
    String jobId = request.getId();
    Observable<JobChangeNotification> eventStream = jobOperations.observeJob(jobId)
            // avoid clogging the computation scheduler
            .observeOn(observeJobsScheduler)
            .subscribeOn(observeJobsScheduler, false)
            .map(event -> GrpcJobManagementModelConverters.toGrpcJobChangeNotification(event, logStorageInfo))
            .compose(ObservableExt.head(() -> {
                List<JobChangeNotification> snapshot = createJobSnapshot(jobId);
                snapshot.add(SNAPSHOT_END_MARKER);
                return snapshot;
            }))
            .map(this::addTaskContextToJobChangeNotification)
            .doOnError(e -> {
                if (!JobManagerException.isExpected(e)) {
                    logger.error("Unexpected error in job {} event stream", jobId, e);
                } else {
                    logger.debug("Error in job {} event stream", jobId, e);
                }
            });

    Subscription subscription = eventStream.subscribe(
            responseObserver::onNext,
            e -> responseObserver.onError(
                    new StatusRuntimeException(Status.INTERNAL
                            .withDescription(jobId + " job monitoring stream terminated with an error")
                            .withCause(e))
            ),
            responseObserver::onCompleted
    );

    ServerCallStreamObserver<JobChangeNotification> serverObserver = (ServerCallStreamObserver<JobChangeNotification>) responseObserver;
    serverObserver.setOnCancelHandler(subscription::unsubscribe);
}
 
源代码26 项目: titus-control-plane   文件: GrpcEvictionService.java
@Override
public void observeEvents(ObserverEventRequest request, StreamObserver<EvictionServiceEvent> responseObserver) {
    Disposable subscription = evictionOperations.events(request.getIncludeSnapshot()).subscribe(
            next -> toGrpcEvent(next).ifPresent(responseObserver::onNext),
            e -> responseObserver.onError(
                    new StatusRuntimeException(Status.INTERNAL
                            .withDescription("Eviction event stream terminated with an error")
                            .withCause(e))
            ),
            responseObserver::onCompleted
    );
    ServerCallStreamObserver<EvictionServiceEvent> serverObserver = (ServerCallStreamObserver<EvictionServiceEvent>) responseObserver;
    serverObserver.setOnCancelHandler(subscription::dispose);
}
 
源代码27 项目: dremio-oss   文件: InformationSchemaServiceImpl.java
private OnReadyHandler(
  String requestType,
  Executor executor,
  ServerCallStreamObserver<V> streamObserver,
  Iterator<V> iterator
) {
  this.executor = new OnReadyEventExecutor(requestType, executor);
  this.responseObserver = streamObserver;

  this.iterator = iterator;
}
 
/** Retrieve a value from the CAS by streaming content when ready */
@ThreadSafe
void get(
    Digest digest,
    long offset,
    long count,
    ServerCallStreamObserver<ByteString> blobObserver,
    RequestMetadata requestMetadata);
 
源代码29 项目: bazel-buildfarm   文件: GrpcCAS.java
@Override
public void get(
    Digest digest,
    long offset,
    long count,
    ServerCallStreamObserver<ByteString> blobObserver,
    RequestMetadata requestMetadata) {
  ReadRequest request =
      ReadRequest.newBuilder()
          .setResourceName(getBlobName(digest))
          .setReadOffset(offset)
          .setReadLimit(count)
          .build();
  ByteStreamGrpc.newStub(channel)
      .withInterceptors(attachMetadataInterceptor(requestMetadata))
      .read(
          request,
          new DelegateServerCallStreamObserver<ReadResponse, ByteString>(blobObserver) {
            @Override
            public void onNext(ReadResponse response) {
              blobObserver.onNext(response.getData());
            }

            @Override
            public void onError(Throwable t) {
              blobObserver.onError(t);
            }

            @Override
            public void onCompleted() {
              blobObserver.onCompleted();
            }
          });
}
 
源代码30 项目: bazel-buildfarm   文件: AbstractServerInstance.java
@Override
public void getBlob(
    Digest blobDigest,
    long offset,
    long count,
    ServerCallStreamObserver<ByteString> blobObserver,
    RequestMetadata requestMetadata) {
  contentAddressableStorage.get(blobDigest, offset, count, blobObserver, requestMetadata);
}
 
 类所在包
 同包方法