下面列出了io.grpc.stub.ClientCalls#blockingServerStreamingCall ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Get a blocking iterator which returns messages for the given topic with consensus timestamps
* between two {@link Instant}s.
*
* @param topicId
* @param startTime the lower bound for timestamps (inclusive), may be in the past or future.
* @param endTime the upper bound for timestamps (exclusive), may also be in the past or future.
* @return
*/
public Iterator<ConsensusMessage> getMessages(ConsensusTopicId topicId, Instant startTime, Instant endTime) {
final ConsensusTopicQuery topicQuery = ConsensusTopicQuery.newBuilder()
.setTopicID(topicId.toProto())
.setConsensusStartTime(TimestampHelper.timestampFrom(startTime))
.setConsensusEndTime(TimestampHelper.timestampFrom(endTime))
.build();
final Iterator<ConsensusTopicResponse> iter = ClientCalls.blockingServerStreamingCall(
channel,
ConsensusServiceGrpc.getSubscribeTopicMethod(),
CallOptions.DEFAULT,
topicQuery);
return Iterators.transform(iter, message -> new ConsensusMessage(topicId, Objects.requireNonNull(message)));
}
/**
* Get a blocking iterator which returns messages for the given topic with consensus timestamps
* starting now and continuing until the given {@link Instant}.
*
* @param topicId
* @param endTime the upper bound for timestamps (exclusive), may be in the past or future.
* @return
*/
public Iterator<ConsensusMessage> getMessagesUntil(ConsensusTopicId topicId, Instant endTime) {
final ConsensusTopicQuery topicQuery = ConsensusTopicQuery.newBuilder()
.setTopicID(topicId.toProto())
.setConsensusEndTime(TimestampHelper.timestampFrom(endTime))
.build();
final Iterator<ConsensusTopicResponse> iter = ClientCalls.blockingServerStreamingCall(
channel,
ConsensusServiceGrpc.getSubscribeTopicMethod(),
CallOptions.DEFAULT,
topicQuery);
return Iterators.transform(iter, message -> new ConsensusMessage(topicId, Objects.requireNonNull(message)));
}
@SuppressWarnings("unchecked")
@Override
public Object invoke(Object proxy, Method method, Object[] args) {
String methodName = method.getName();
String className = method.getDeclaringClass().getName();
if ("toString".equals(methodName) && args.length == 0) {
return className + "@" + invoker.hashCode();
} else if ("hashCode".equals(methodName) && args.length == 0) {
return invoker.hashCode();
} else if ("equals".equals(methodName) && args.length == 1) {
Object another = Utils.safeElement(args, 0);
return proxy == another;
}
String annotationMethodName = method.getAnnotation(GRpcMethod.class).value();
MethodCallProperty methodCallProperty = callDefinitions.get(StringUtils.isEmpty(annotationMethodName) ? methodName : annotationMethodName);
ClientCall<Object, Object> clientCall = buildCall(methodCallProperty);
switch (methodCallProperty.getMethodType()) {
case UNARY:
if (method.getReturnType() == ListenableFuture.class) { //等于ClientCalls.futureUnaryCall()
return ClientCalls.futureUnaryCall(clientCall, Utils.safeElement(args, 0));
} else if (method.getReturnType().getName().equals("void")) { //等于ClientCalls.asyncUnaryCall();
if (Utils.checkMethodHasParamClass(method, StreamObserver.class)) {
ClientCalls.asyncUnaryCall(clientCall, Utils.safeElement(args, 0), (StreamObserver<Object>) Utils.safeElement(args, 1));
return null;
} else {
ClientCalls.blockingUnaryCall(clientCall, Utils.safeElement(args, 0));
return null;
}
}
return ClientCalls.blockingUnaryCall(clientCall, Utils.safeElement(args, 0));
case BIDI_STREAMING://双向流,相当于asyncBidiStreamingCall
//获取返回类型的泛型
return ClientCalls.asyncBidiStreamingCall(clientCall, (StreamObserver<Object>) Utils.safeElement(args, 0));
case CLIENT_STREAMING: //客户端流。等于ClientCalls.asyncClientStreamingCall()
return ClientCalls.asyncClientStreamingCall(clientCall, (StreamObserver<Object>) Utils.safeElement(args, 0));
case SERVER_STREAMING://等于ClientCalls.blockingServerStreamingCall
return ClientCalls.blockingServerStreamingCall(clientCall, Utils.safeElement(args, 0));
}
return null;
}