下面列出了io.grpc.stub.ClientCalls#asyncBidiStreamingCall ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private ListenableFuture<Void> callBidiStreaming(
ImmutableList<DynamicMessage> requests,
StreamObserver<DynamicMessage> responseObserver,
CallOptions callOptions) {
DoneObserver<DynamicMessage> doneObserver = new DoneObserver<>();
StreamObserver<DynamicMessage> requestObserver = ClientCalls.asyncBidiStreamingCall(
createCall(callOptions),
CompositeStreamObserver.of(responseObserver, doneObserver));
requests.forEach(requestObserver::onNext);
requestObserver.onCompleted();
return doneObserver.getCompletionFuture();
}
private void refreshBackingStream() {
if (finished) {
return;
}
CallOptions callOpts = getCallOptions();
sentCallOptions = callOpts;
callOpts = callOpts.withExecutor(responseExecutor);
initialReqStream = ClientCalls.asyncBidiStreamingCall(
channel.newCall(method, callOpts), respWrapper);
}
private ListenableFuture<Void> callBidiStreaming(
ImmutableList<DynamicMessage> requests,
StreamObserver<DynamicMessage> responseObserver,
CallOptions callOptions) {
DoneObserver<DynamicMessage> doneObserver = new DoneObserver<>();
StreamObserver<DynamicMessage> requestObserver = ClientCalls.asyncBidiStreamingCall(
createCall(callOptions),
ComponentObserver.of(responseObserver, doneObserver));
requests.forEach(requestObserver::onNext);
requestObserver.onCompleted();
return doneObserver.getCompletionFuture();
}
@SuppressWarnings("unchecked")
@Override
public Object invoke(Object proxy, Method method, Object[] args) {
String methodName = method.getName();
String className = method.getDeclaringClass().getName();
if ("toString".equals(methodName) && args.length == 0) {
return className + "@" + invoker.hashCode();
} else if ("hashCode".equals(methodName) && args.length == 0) {
return invoker.hashCode();
} else if ("equals".equals(methodName) && args.length == 1) {
Object another = Utils.safeElement(args, 0);
return proxy == another;
}
String annotationMethodName = method.getAnnotation(GRpcMethod.class).value();
MethodCallProperty methodCallProperty = callDefinitions.get(StringUtils.isEmpty(annotationMethodName) ? methodName : annotationMethodName);
ClientCall<Object, Object> clientCall = buildCall(methodCallProperty);
switch (methodCallProperty.getMethodType()) {
case UNARY:
if (method.getReturnType() == ListenableFuture.class) { //等于ClientCalls.futureUnaryCall()
return ClientCalls.futureUnaryCall(clientCall, Utils.safeElement(args, 0));
} else if (method.getReturnType().getName().equals("void")) { //等于ClientCalls.asyncUnaryCall();
if (Utils.checkMethodHasParamClass(method, StreamObserver.class)) {
ClientCalls.asyncUnaryCall(clientCall, Utils.safeElement(args, 0), (StreamObserver<Object>) Utils.safeElement(args, 1));
return null;
} else {
ClientCalls.blockingUnaryCall(clientCall, Utils.safeElement(args, 0));
return null;
}
}
return ClientCalls.blockingUnaryCall(clientCall, Utils.safeElement(args, 0));
case BIDI_STREAMING://双向流,相当于asyncBidiStreamingCall
//获取返回类型的泛型
return ClientCalls.asyncBidiStreamingCall(clientCall, (StreamObserver<Object>) Utils.safeElement(args, 0));
case CLIENT_STREAMING: //客户端流。等于ClientCalls.asyncClientStreamingCall()
return ClientCalls.asyncClientStreamingCall(clientCall, (StreamObserver<Object>) Utils.safeElement(args, 0));
case SERVER_STREAMING://等于ClientCalls.blockingServerStreamingCall
return ClientCalls.blockingServerStreamingCall(clientCall, Utils.safeElement(args, 0));
}
return null;
}
/**
* Start a continuously executing set of duplex streaming ping-pong calls that will terminate when
* {@code done.get()} is true. Each completed call will increment the counter by the specified
* delta which benchmarks can use to measure messages per second or bandwidth.
*/
protected CountDownLatch startStreamingCalls(int callsPerChannel, final AtomicLong counter,
final AtomicBoolean record, final AtomicBoolean done, final long counterDelta) {
final CountDownLatch latch = new CountDownLatch(callsPerChannel * channels.length);
for (final ManagedChannel channel : channels) {
for (int i = 0; i < callsPerChannel; i++) {
final ClientCall<ByteBuf, ByteBuf> streamingCall =
channel.newCall(pingPongMethod, CALL_OPTIONS);
final AtomicReference<StreamObserver<ByteBuf>> requestObserverRef =
new AtomicReference<StreamObserver<ByteBuf>>();
final AtomicBoolean ignoreMessages = new AtomicBoolean();
StreamObserver<ByteBuf> requestObserver = ClientCalls.asyncBidiStreamingCall(
streamingCall,
new StreamObserver<ByteBuf>() {
@Override
public void onNext(ByteBuf value) {
if (done.get()) {
if (!ignoreMessages.getAndSet(true)) {
requestObserverRef.get().onCompleted();
}
return;
}
requestObserverRef.get().onNext(request.slice());
if (record.get()) {
counter.addAndGet(counterDelta);
}
// request is called automatically because the observer implicitly has auto
// inbound flow control
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "call error", t);
latch.countDown();
}
@Override
public void onCompleted() {
latch.countDown();
}
});
requestObserverRef.set(requestObserver);
requestObserver.onNext(request.slice());
requestObserver.onNext(request.slice());
}
}
return latch;
}
/**
* Start a continuously executing set of duplex streaming ping-pong calls that will terminate when
* {@code done.get()} is true. Each completed call will increment the counter by the specified
* delta which benchmarks can use to measure messages per second or bandwidth.
*/
protected CountDownLatch startFlowControlledStreamingCalls(int callsPerChannel,
final AtomicLong counter, final AtomicBoolean record, final AtomicBoolean done,
final long counterDelta) {
final CountDownLatch latch = new CountDownLatch(callsPerChannel * channels.length);
for (final ManagedChannel channel : channels) {
for (int i = 0; i < callsPerChannel; i++) {
final ClientCall<ByteBuf, ByteBuf> streamingCall =
channel.newCall(flowControlledStreaming, CALL_OPTIONS);
final AtomicReference<StreamObserver<ByteBuf>> requestObserverRef =
new AtomicReference<StreamObserver<ByteBuf>>();
final AtomicBoolean ignoreMessages = new AtomicBoolean();
StreamObserver<ByteBuf> requestObserver = ClientCalls.asyncBidiStreamingCall(
streamingCall,
new StreamObserver<ByteBuf>() {
@Override
public void onNext(ByteBuf value) {
StreamObserver<ByteBuf> obs = requestObserverRef.get();
if (done.get()) {
if (!ignoreMessages.getAndSet(true)) {
obs.onCompleted();
}
return;
}
if (record.get()) {
counter.addAndGet(counterDelta);
}
// request is called automatically because the observer implicitly has auto
// inbound flow control
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "call error", t);
latch.countDown();
}
@Override
public void onCompleted() {
latch.countDown();
}
});
requestObserverRef.set(requestObserver);
// Add some outstanding requests to ensure the server is filling the connection
streamingCall.request(5);
requestObserver.onNext(request.slice());
}
}
return latch;
}
/**
* Start a continuously executing set of duplex streaming ping-pong calls that will terminate when
* {@code done.get()} is true. Each completed call will increment the counter by the specified
* delta which benchmarks can use to measure messages per second or bandwidth.
*/
protected CountDownLatch startStreamingCalls(int callsPerChannel, final AtomicLong counter,
final AtomicBoolean record, final AtomicBoolean done, final long counterDelta) {
final CountDownLatch latch = new CountDownLatch(callsPerChannel * channels.length);
for (final ManagedChannel channel : channels) {
for (int i = 0; i < callsPerChannel; i++) {
final ClientCall<ByteBuf, ByteBuf> streamingCall =
channel.newCall(pingPongMethod, CALL_OPTIONS);
final AtomicReference<StreamObserver<ByteBuf>> requestObserverRef =
new AtomicReference<>();
final AtomicBoolean ignoreMessages = new AtomicBoolean();
StreamObserver<ByteBuf> requestObserver = ClientCalls.asyncBidiStreamingCall(
streamingCall,
new StreamObserver<ByteBuf>() {
@Override
public void onNext(ByteBuf value) {
if (done.get()) {
if (!ignoreMessages.getAndSet(true)) {
requestObserverRef.get().onCompleted();
}
return;
}
requestObserverRef.get().onNext(request.slice());
if (record.get()) {
counter.addAndGet(counterDelta);
}
// request is called automatically because the observer implicitly has auto
// inbound flow control
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "call error", t);
latch.countDown();
}
@Override
public void onCompleted() {
latch.countDown();
}
});
requestObserverRef.set(requestObserver);
requestObserver.onNext(request.slice());
requestObserver.onNext(request.slice());
}
}
return latch;
}
/**
* Start a continuously executing set of duplex streaming ping-pong calls that will terminate when
* {@code done.get()} is true. Each completed call will increment the counter by the specified
* delta which benchmarks can use to measure messages per second or bandwidth.
*/
protected CountDownLatch startFlowControlledStreamingCalls(int callsPerChannel,
final AtomicLong counter, final AtomicBoolean record, final AtomicBoolean done,
final long counterDelta) {
final CountDownLatch latch = new CountDownLatch(callsPerChannel * channels.length);
for (final ManagedChannel channel : channels) {
for (int i = 0; i < callsPerChannel; i++) {
final ClientCall<ByteBuf, ByteBuf> streamingCall =
channel.newCall(flowControlledStreaming, CALL_OPTIONS);
final AtomicReference<StreamObserver<ByteBuf>> requestObserverRef =
new AtomicReference<>();
final AtomicBoolean ignoreMessages = new AtomicBoolean();
StreamObserver<ByteBuf> requestObserver = ClientCalls.asyncBidiStreamingCall(
streamingCall,
new StreamObserver<ByteBuf>() {
@Override
public void onNext(ByteBuf value) {
StreamObserver<ByteBuf> obs = requestObserverRef.get();
if (done.get()) {
if (!ignoreMessages.getAndSet(true)) {
obs.onCompleted();
}
return;
}
if (record.get()) {
counter.addAndGet(counterDelta);
}
// request is called automatically because the observer implicitly has auto
// inbound flow control
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "call error", t);
latch.countDown();
}
@Override
public void onCompleted() {
latch.countDown();
}
});
requestObserverRef.set(requestObserver);
// Add some outstanding requests to ensure the server is filling the connection
streamingCall.request(5);
requestObserver.onNext(request.slice());
}
}
return latch;
}