类io.grpc.stub.ClientCallStreamObserver源码实例Demo

下面列出了怎么用io.grpc.stub.ClientCallStreamObserver的API类实例代码及写法,或者点击链接到github查看源代码。

@Test
public void flowControl() throws Exception {
  FlowControlClientResponseObserver clientResponseObserver =
      new FlowControlClientResponseObserver();
  ClientCallStreamObserver<ServerReflectionRequest> requestObserver =
      (ClientCallStreamObserver<ServerReflectionRequest>)
          stub.serverReflectionInfo(clientResponseObserver);

  // ClientCalls.startCall() calls request(1) initially, so we should get an immediate response.
  requestObserver.onNext(flowControlRequest);
  assertEquals(1, clientResponseObserver.getResponses().size());
  assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(0));

  // Verify we don't receive an additional response until we request it.
  requestObserver.onNext(flowControlRequest);
  assertEquals(1, clientResponseObserver.getResponses().size());

  requestObserver.request(1);
  assertEquals(2, clientResponseObserver.getResponses().size());
  assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(1));

  requestObserver.onCompleted();
  assertTrue(clientResponseObserver.onCompleteCalled());
}
 
@Test
public void flowControlOnCompleteWithPendingRequest() throws Exception {
  FlowControlClientResponseObserver clientResponseObserver =
      new FlowControlClientResponseObserver();
  ClientCallStreamObserver<ServerReflectionRequest> requestObserver =
      (ClientCallStreamObserver<ServerReflectionRequest>)
          stub.serverReflectionInfo(clientResponseObserver);

  // ClientCalls.startCall() calls request(1) initially, so make additional request.
  requestObserver.onNext(flowControlRequest);
  requestObserver.onNext(flowControlRequest);
  requestObserver.onCompleted();
  assertEquals(1, clientResponseObserver.getResponses().size());
  assertFalse(clientResponseObserver.onCompleteCalled());

  requestObserver.request(1);
  assertTrue(clientResponseObserver.onCompleteCalled());
  assertEquals(2, clientResponseObserver.getResponses().size());
  assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(1));
}
 
源代码3 项目: titus-control-plane   文件: GrpcUtil.java
public static <REQ, RESP> ClientResponseObserver<REQ, RESP> createClientResponseObserver(final Action1<ClientCallStreamObserver<REQ>> beforeStart,
                                                                                         final Action1<? super RESP> onNext,
                                                                                         final Action1<Throwable> onError,
                                                                                         final Action0 onCompleted) {
    return new ClientResponseObserver<REQ, RESP>() {
        @Override
        public void beforeStart(ClientCallStreamObserver<REQ> requestStream) {
            beforeStart.call(requestStream);
        }

        @Override
        public void onNext(RESP value) {
            onNext.call(value);
        }

        @Override
        public void onError(Throwable t) {
            onError.call(t);
        }

        @Override
        public void onCompleted() {
            onCompleted.call();
        }
    };
}
 
源代码4 项目: benchmarks   文件: StreamingMessageTransceiver.java
public int send(final int numberOfMessages, final int length, final long timestamp, final long checksum)
{
    final ClientCallStreamObserver<EchoMessage> requestObserver = this.requestObserver;
    final EchoMessage.Builder messageBuilder = this.messageBuilder;
    final ByteString payload = this.payload;
    int count = 0;

    for (int i = 0; i < numberOfMessages && requestObserver.isReady(); i++)
    {
        final EchoMessage request = messageBuilder
            .setTimestamp(timestamp)
            .setPayload(payload)
            .setChecksum(checksum)
            .build();

        requestObserver.onNext(request);
        count++;
    }

    return count;
}
 
源代码5 项目: pinpoint   文件: PingStreamContext.java
@Override
public void beforeStart(final ClientCallStreamObserver<PPing> requestStream) {
    requestStream.setOnReadyHandler(new Runnable() {
        @Override
        public void run() {
            logger.info("{} onReady", streamId);
            reconnector.reset();

            final Runnable pingRunnable = new Runnable() {
                @Override
                public void run() {
                    PPing pPing = newPing();
                    requestStream.onNext(pPing);
                }
            };

            PingClientResponseObserver.this.pingScheduler = schedule(pingRunnable);
        }
    });
}
 
源代码6 项目: grpc-java   文件: ProtoReflectionServiceTest.java
@Test
public void flowControlOnCompleteWithPendingRequest() throws Exception {
  FlowControlClientResponseObserver clientResponseObserver =
      new FlowControlClientResponseObserver();
  ClientCallStreamObserver<ServerReflectionRequest> requestObserver =
      (ClientCallStreamObserver<ServerReflectionRequest>)
          stub.serverReflectionInfo(clientResponseObserver);

  requestObserver.onNext(flowControlRequest);
  requestObserver.onCompleted();
  assertEquals(0, clientResponseObserver.getResponses().size());
  assertFalse(clientResponseObserver.onCompleteCalled());

  requestObserver.request(1);
  assertTrue(clientResponseObserver.onCompleteCalled());
  assertEquals(1, clientResponseObserver.getResponses().size());
  assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(0));
}
 
源代码7 项目: milkman   文件: CompositeStreamObserver.java
@Override
public void beforeStart(ClientCallStreamObserver<ReqT> requestStream) {
	for (StreamObserver<ResT> observer : observers) {
		if (observer instanceof ClientResponseObserver) {
			((ClientResponseObserver)observer).beforeStart(requestStream);
		}
	}
}
 
源代码8 项目: etcd-java   文件: GrpcClient.java
@Override
public void beforeStart(ClientCallStreamObserver<ReqT> rs) {
    rs.setOnReadyHandler(() -> {
        // called from grpc response thread
        if (rs.isReady()) {
            errCounter = 0;
            boolean notify = userReqStream.established(rs);
            if (notify) {
                respStream.onEstablished();
            }
        }
    });
}
 
@Override
public void cancel() {
    if (!isCanceled()) {
        super.cancel();
        ((ClientCallStreamObserver<?>) downstream).cancel("Cancelled", Status.CANCELLED.asException());
    }
}
 
源代码10 项目: reactive-grpc   文件: ReferenceGRpcBenchmark.java
@Benchmark
public Object gRpcBothWaysStreamingCall(Blackhole blackhole) throws InterruptedException {
    PerfObserver observer = new PerfObserver(blackhole) {
        private boolean done;
        @Override
        public void beforeStart(ClientCallStreamObserver<Messages.SimpleRequest> sender) {
            sender.setOnReadyHandler(() -> {
                if (done) {
                    return;
                }

                for (Messages.SimpleRequest request : ARRAY_REQUEST) {
                    sender.onNext(request);
                }

                sender.onCompleted();
                done = true;
            });
            super.beforeStart(observer);
        }
    };

    gRpcClient.streamingFromClient(observer);

    observer.latch.await();

    return observer;
}
 
源代码11 项目: titus-control-plane   文件: GrpcUtil.java
public static void attachCancellingCallback(Emitter emitter, ClientCallStreamObserver... clientCalls) {
    emitter.setCancellation(() -> {
        for (ClientCallStreamObserver call : clientCalls) {
            call.cancel(CANCELLING_MESSAGE, null);
        }
    });
}
 
源代码12 项目: titus-control-plane   文件: FluxMethodBridge.java
private FluxInvocation(FluxSink<Object> sink, Object[] args) {
    StreamObserver<Object> grpcStreamObserver = new ClientResponseObserver<Object, Object>() {
        @Override
        public void beforeStart(ClientCallStreamObserver requestStream) {
            sink.onCancel(() -> requestStream.cancel("React subscription cancelled", null));
        }

        @Override
        public void onNext(Object value) {
            sink.next(value);
        }

        @Override
        public void onError(Throwable error) {
            sink.error(error);
        }

        @Override
        public void onCompleted() {
            sink.complete();
        }
    };

    Object[] grpcArgs = new Object[]{
            grpcArgPos < 0 ? Empty.getDefaultInstance() : args[grpcArgPos],
            grpcStreamObserver
    };

    GRPC_STUB invocationStub = handleCallMetadata(args)
            .withDeadline(Deadline.after(timeout.toMillis(), TimeUnit.MILLISECONDS));

    try {
        grpcMethod.invoke(invocationStub, grpcArgs);
    } catch (Exception e) {
        sink.error(e);
    }
}
 
源代码13 项目: bazel-buildfarm   文件: StubWriteOutputStream.java
private synchronized void initiateWrite() throws IOException {
  if (writeObserver == null) {
    checkNotNull(deadlineAfterUnits);
    writeObserver =
        bsStub
            .get()
            .withDeadlineAfter(deadlineAfter, deadlineAfterUnits)
            .write(
                new ClientResponseObserver<WriteRequest, WriteResponse>() {
                  @Override
                  public void beforeStart(ClientCallStreamObserver<WriteRequest> requestStream) {
                    requestStream.setOnReadyHandler(
                        () -> {
                          if (requestStream.isReady()) {
                            onReadyHandler.run();
                          }
                        });
                  }

                  @Override
                  public void onNext(WriteResponse response) {
                    writeFuture.set(response.getCommittedSize());
                  }

                  @Override
                  public void onError(Throwable t) {
                    writeFuture.setException(t);
                  }

                  @Override
                  public void onCompleted() {
                    synchronized (StubWriteOutputStream.this) {
                      writeObserver = null;
                    }
                  }
                });
  }
}
 
源代码14 项目: bazel-buildfarm   文件: StubWriteOutputStream.java
@Override
public synchronized boolean isReady() {
  if (writeObserver == null) {
    return false;
  }
  ClientCallStreamObserver<WriteRequest> clientCallStreamObserver =
      (ClientCallStreamObserver<WriteRequest>) writeObserver;
  return clientCallStreamObserver.isReady();
}
 
源代码15 项目: benchmarks   文件: StreamingMessageTransceiver.java
public void init(final Configuration configuration)
{
    serverChannel = getServerChannel();

    final StreamObserver<EchoMessage> responseObserver = new StreamObserver<EchoMessage>()
    {
        public void onNext(final EchoMessage response)
        {
            onMessageReceived(response.getTimestamp(), response.getChecksum());
        }

        public void onError(final Throwable t)
        {
            t.printStackTrace();
            LangUtil.rethrowUnchecked(t);
        }

        public void onCompleted()
        {
        }
    };

    final EchoBenchmarksStub asyncClient = EchoBenchmarksGrpc.newStub(serverChannel);
    requestObserver = (ClientCallStreamObserver<EchoMessage>)asyncClient.echoStream(responseObserver);

    messageBuilder = EchoMessage.newBuilder();
    final int payloadLength = configuration.messageLength() - MIN_MESSAGE_LENGTH;
    if (payloadLength == 0)
    {
        payload = ByteString.EMPTY;
    }
    else
    {
        final byte[] bytes = new byte[payloadLength];
        ThreadLocalRandom.current().nextBytes(bytes);
        payload = copyFrom(bytes);
    }
}
 
源代码16 项目: pinpoint   文件: GrpcCommandService.java
@Override
public void beforeStart(final ClientCallStreamObserver<PCmdMessage> requestStream) {
    this.requestStream = requestStream;

    requestStream.setOnReadyHandler(new Runnable() {
        @Override
        public void run() {
            logger.info("Connect to CommandServiceStream completed.");
            reconnector.reset();
        }
    });

}
 
源代码17 项目: pinpoint   文件: ResponseStreamObserver.java
@Override
public void beforeStart(ClientCallStreamObserver<ReqT> requestStream) {
    logger.info("beforeStart:{}", name);
    requestStream.setOnReadyHandler(new Runnable() {
        private final AtomicInteger counter = new AtomicInteger();
        @Override
        public void run() {
            logger.info("onReadyHandler:{} eventNumber:{}", name, counter.getAndIncrement());
            reconnector.reset();
        }
    });
}
 
源代码18 项目: onos   文件: StreamClientImpl.java
private void initIfRequired() {
    if (requestObserver == null) {
        log.debug("Starting new StreamChannel RPC for {}...", deviceId);
        open.set(false);
        client.execRpcNoTimeout(
                s -> requestObserver =
                        (ClientCallStreamObserver<StreamMessageRequest>)
                                s.streamChannel(responseObserver)
        );
    }
}
 
源代码19 项目: onos   文件: GnmiSubscriptionManager.java
private void sendSubscribeRequest() {
    if (requestObserver == null) {
        log.debug("Starting new Subscribe RPC for {}...", deviceId);
        client.execRpcNoTimeout(
                s -> requestObserver =
                        (ClientCallStreamObserver<Gnmi.SubscribeRequest>)
                                s.subscribe(responseObserver)
        );
    }
    requestObserver.onNext(existingSubscription);
    active.set(true);
}
 
源代码20 项目: grpc-java   文件: ProtoReflectionServiceTest.java
@Test
public void flowControl() throws Exception {
  FlowControlClientResponseObserver clientResponseObserver =
      new FlowControlClientResponseObserver();
  ClientCallStreamObserver<ServerReflectionRequest> requestObserver =
      (ClientCallStreamObserver<ServerReflectionRequest>)
          stub.serverReflectionInfo(clientResponseObserver);

  // Verify we don't receive a response until we request it.
  requestObserver.onNext(flowControlRequest);
  assertEquals(0, clientResponseObserver.getResponses().size());

  requestObserver.request(1);
  assertEquals(1, clientResponseObserver.getResponses().size());
  assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(0));

  // Verify we don't receive an additional response until we request it.
  requestObserver.onNext(flowControlRequest);
  assertEquals(1, clientResponseObserver.getResponses().size());

  requestObserver.request(1);
  assertEquals(2, clientResponseObserver.getResponses().size());
  assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(1));

  requestObserver.onCompleted();
  assertTrue(clientResponseObserver.onCompleteCalled());
}
 
源代码21 项目: buck   文件: GrpcRemoteExecutionServiceClient.java
public void cancel() {
  clientObserver.addCallback(
      new FutureCallback<ClientCallStreamObserver<?>>() {
        @Override
        public void onSuccess(@Nullable ClientCallStreamObserver<?> result) {
          Objects.requireNonNull(result).cancel("Cancelled by client.", null);
        }

        @Override
        public void onFailure(Throwable ignored) {}
      },
      MoreExecutors.directExecutor());
}
 
源代码22 项目: grpc-nebula-java   文件: AbstractInteropTest.java
/**
 * Tests client per-message compression for streaming calls. The Java API does not support
 * inspecting a message's compression level, so this is primarily intended to run against a gRPC
 * C++ server.
 */
public void clientCompressedStreaming(boolean probe) throws Exception {
  final StreamingInputCallRequest expectCompressedRequest =
      StreamingInputCallRequest.newBuilder()
          .setExpectCompressed(BoolValue.newBuilder().setValue(true))
          .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[27182])))
          .build();
  final StreamingInputCallRequest expectUncompressedRequest =
      StreamingInputCallRequest.newBuilder()
          .setExpectCompressed(BoolValue.newBuilder().setValue(false))
          .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[45904])))
          .build();
  final StreamingInputCallResponse goldenResponse =
      StreamingInputCallResponse.newBuilder().setAggregatedPayloadSize(73086).build();

  StreamRecorder<StreamingInputCallResponse> responseObserver = StreamRecorder.create();
  StreamObserver<StreamingInputCallRequest> requestObserver =
      asyncStub.streamingInputCall(responseObserver);

  if (probe) {
    // Send a non-compressed message with expectCompress=true. Servers supporting this test case
    // should return INVALID_ARGUMENT.
    requestObserver.onNext(expectCompressedRequest);
    responseObserver.awaitCompletion(operationTimeoutMillis(), TimeUnit.MILLISECONDS);
    Throwable e = responseObserver.getError();
    assertNotNull("expected INVALID_ARGUMENT", e);
    assertEquals(Status.INVALID_ARGUMENT.getCode(), Status.fromThrowable(e).getCode());
  }

  // Start a new stream
  responseObserver = StreamRecorder.create();
  @SuppressWarnings("unchecked")
  ClientCallStreamObserver<StreamingInputCallRequest> clientCallStreamObserver =
      (ClientCallStreamObserver)
          asyncStub.withCompression("gzip").streamingInputCall(responseObserver);
  clientCallStreamObserver.setMessageCompression(true);
  clientCallStreamObserver.onNext(expectCompressedRequest);
  clientCallStreamObserver.setMessageCompression(false);
  clientCallStreamObserver.onNext(expectUncompressedRequest);
  clientCallStreamObserver.onCompleted();
  responseObserver.awaitCompletion();
  assertSuccess(responseObserver);
  assertEquals(goldenResponse, responseObserver.firstValue().get());
}
 
@Override
public void beforeStart(final ClientCallStreamObserver<ServerReflectionRequest> requestStream) {
  requestStream.disableAutoInboundFlowControl();
}
 
@Override
public void beforeStart(ClientCallStreamObserver<RequestT> requestStream) {
  this.rsObserver = requestStream;
  requestStream.disableAutoInboundFlowControl();
  this.sink.onCancel(() -> requestStream.cancel("Flux requested cancel.", null));
}
 
源代码25 项目: milkman   文件: BaseGrpcProcessor.java
@Override
public void beforeStart(ClientCallStreamObserver<ReqT> requestStream) {
	this.requestStream = requestStream;
}
 
@Override
public void beforeStart(ClientCallStreamObserver<T> requestStream) {
    super.onSubscribe(requestStream);
}
 
@Override
protected void doOnCancel() {
    if (subscription != null) {
        ((ClientCallStreamObserver<?>) subscription).cancel("Client canceled request", null);
    }
}
 
源代码28 项目: reactive-grpc   文件: PerfObserver.java
@Override
public void beforeStart(ClientCallStreamObserver<Messages.SimpleRequest> observer) {
    this.observer = observer;
}
 
源代码29 项目: titus-control-plane   文件: FilterOutFirstMarker.java
@Override
public void beforeStart(ClientCallStreamObserver<Empty> requestStream) {
    GrpcUtil.attachCancellingCallback(emitter, requestStream);
}
 
源代码30 项目: titus-control-plane   文件: MonoMethodBridge.java
private MonoInvocation(MonoSink<Object> sink, Object[] args) {
    StreamObserver<Object> grpcStreamObserver = new ClientResponseObserver<Object, Object>() {
        @Override
        public void beforeStart(ClientCallStreamObserver requestStream) {
            sink.onCancel(() -> requestStream.cancel("React subscription cancelled", null));
        }

        @Override
        public void onNext(Object value) {
            if (emptyToVoidReply) {
                sink.success();
            } else {
                sink.success(value);
            }
        }

        @Override
        public void onError(Throwable error) {
            sink.error(error);
        }

        @Override
        public void onCompleted() {
            sink.success();
        }
    };

    Object[] grpcArgs = new Object[]{
            grpcArgPos < 0 ? Empty.getDefaultInstance() : args[grpcArgPos],
            grpcStreamObserver
    };

    GRPC_STUB invocationStub = handleCallMetadata(args)
            .withDeadline(Deadline.after(timeout.toMillis(), TimeUnit.MILLISECONDS));

    try {
        grpcMethod.invoke(invocationStub, grpcArgs);
    } catch (Exception e) {
        sink.error(e);
    }
}
 
 类所在包
 同包方法