下面列出了怎么用io.grpc.stub.AbstractStub的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void doCallSync() throws Exception {
Class grpc = Loader.load("df.open.grpc.hello.HelloServiceGrpc");
Class proto = Loader.load("df.open.grpc.hello.HelloServiceProto");
Method futureStub = grpc.getMethod("newFutureStub", Channel.class);
AbstractStub stub = (AbstractStub) futureStub.invoke(grpc, channel);
System.out.println(stub);
Class<?> reqClz = Class.forName("df.open.grpc.hello.HelloServiceProto$HelloReq", false, Loader.getLoader());
Constructor<?> constructor = reqClz.getDeclaredConstructor();
constructor.setAccessible(true);
Method sayHello = stub.getClass().getMethod("sayHello", reqClz);
ListenableFuture future = (ListenableFuture) sayHello.invoke(stub, constructor.newInstance());
System.out.println(future.isDone());
System.out.println(future.get());
System.out.println(future.isDone());
}
public static void main(String[] args) throws IOException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
GrpcWorker worker = new GrpcWorker();
//"test.proto"
// String protilFileName = "test.proto";
// ProtoFile protoFile = worker.parserProtoFile(protilFileName);
// // "/v1/example/hello"
// worker.listProtoServiceInProtoFile(protoFile);
// GrpcServiceDefine grpcService = worker.findGrpcService("/v1/example/hello", "post");
// System.out.println("grpcService:" + grpcService);
DynamicMultiClassLoader loader = DynamicMultiClassLoader.getLoader(URLTools.toUrl(Consts.JAVA_OUT_DIR));
Class grpc = loader.loadClass("df.open.grpc.hello.HelloServiceGrpc");
Class proto = loader.loadClass("df.open.grpc.hello.HelloServiceProto");
Method newBlockingStub = grpc.getMethod("newBlockingStub", Channel.class);
System.out.println(newBlockingStub);
AbstractStub stub = (AbstractStub) newBlockingStub.invoke(grpc, channel);
System.out.println(stub);
}
/**
* Creates the instance to be injected for the given member.
*
* @param <T> The type of the instance to be injected.
* @param name The name that was used to create the channel.
* @param injectionTarget The target member for the injection.
* @param injectionType The class that should injected.
* @param channel The channel that should be used to create the instance.
* @return The value that matches the type of the given field.
* @throws BeansException If the value of the field could not be created or the type of the field is unsupported.
*/
protected <T> T valueForMember(final String name, final Member injectionTarget,
final Class<T> injectionType,
final Channel channel) throws BeansException {
if (Channel.class.equals(injectionType)) {
return injectionType.cast(channel);
} else if (AbstractStub.class.isAssignableFrom(injectionType)) {
@SuppressWarnings("unchecked") // Eclipse incorrectly marks this as not required
AbstractStub<?> stub = createStub(injectionType.asSubclass(AbstractStub.class), channel);
for (final StubTransformer stubTransformer : getStubTransformers()) {
stub = stubTransformer.transform(name, stub);
}
return injectionType.cast(stub);
} else {
throw new InvalidPropertyException(injectionTarget.getDeclaringClass(), injectionTarget.getName(),
"Unsupported type " + injectionType.getName());
}
}
/**
* Creates a stub of the given type.
*
* @param <T> The type of the instance to be injected.
* @param stubType The type of the stub to create.
* @param channel The channel used to create the stub.
* @return The newly created stub.
*
* @throws BeanInstantiationException If the stub couldn't be created.
*/
protected <T extends AbstractStub<T>> T createStub(final Class<T> stubType, final Channel channel) {
try {
// First try the public static factory method
final String methodName = deriveStubFactoryMethodName(stubType);
final Class<?> enclosingClass = stubType.getEnclosingClass();
final Method factoryMethod = enclosingClass.getMethod(methodName, Channel.class);
return stubType.cast(factoryMethod.invoke(null, channel));
} catch (final Exception e) {
try {
// Use the private constructor as backup
final Constructor<T> constructor = stubType.getDeclaredConstructor(Channel.class);
constructor.setAccessible(true);
return constructor.newInstance(channel);
} catch (final Exception e1) {
e.addSuppressed(e1);
}
throw new BeanInstantiationException(stubType, "Failed to create gRPC client", e);
}
}
@SuppressWarnings({"rawtypes", "unchecked"})
private GrpcProtocolClient<Object> buildProtoClient(GrpcURL refUrl) {
boolean isGeneric = refUrl.getParameter(Constants.GENERIC_KEY, Boolean.FALSE);
boolean isGrpcStub = refUrl.getParameter(Constants.GRPC_STUB_KEY, Boolean.FALSE);
if (isGeneric) {
return new GenericProxyClient<Object>(refUrl);
} else {
if (isGrpcStub) {
String stubClassName = refUrl.getParameter(Constants.INTERFACECLASS_KEY);
try {
Class<? extends AbstractStub> stubClass =
(Class<? extends AbstractStub>) ReflectUtils.name2class(stubClassName);
return new GrpcStubClient<Object>(stubClass, refUrl);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("grpc stub client the class must exist in classpath",
e);
}
} else {
return new DefaultProxyClient<Object>(refUrl);
}
}
}
/**
* Merge results by returning the first non-error value encountered. Otherwise merge all error results with a
* fallback merger.
*
* @param errorMerger fallback merger for errors
*/
static <STUB extends AbstractStub<STUB>, T>
ResponseMerger<STUB, Either<T, Throwable>> singleValue(ErrorMerger<STUB, T> errorMerger) {
return (one, other) -> {
if (isEmptyResponseMarker(one)) {
return other;
}
if (isEmptyResponseMarker(other)) {
return one;
}
boolean bothHaveValues = one.getResult().hasValue() && other.getResult().hasValue();
Preconditions.checkArgument(!bothHaveValues, "expecting a single non-error response");
if (one.getResult().hasValue()) {
return one;
}
if (other.getResult().hasValue()) {
return other;
}
return errorMerger.call(one, other);
};
}
/**
* Call services on all Cells and collect results. Results from each {@link Cell} are emitted individually on the
* returned {@link Observable}. The first error encountered will be propagated (if any).
*/
<STUB extends AbstractStub<STUB>, RespT> Observable<CellResponse<STUB, RespT>> call(
Function<ManagedChannel, STUB> stubFactory,
BiConsumer<STUB, StreamObserver<RespT>> fnCall) {
Map<Cell, STUB> clients = stubs(connector, stubFactory);
List<Observable<CellResponse<STUB, RespT>>> results = clients.entrySet().stream().map(entry -> {
Cell cell = entry.getKey();
STUB client = entry.getValue();
Observable<RespT> request = createRequestObservable(emitter -> {
StreamObserver<RespT> streamObserver = createSimpleClientResponseObserver(emitter);
fnCall.accept(client, streamObserver);
});
return request.map(result -> new CellResponse<>(cell, client, result));
}).collect(Collectors.toList());
return Observable.merge(results);
}
/**
* Call services on all Cells and collect results, which can be {@link Either Either<RespT, Throwable>}. Results
* from each {@link Cell} are emitted individually on the returned {@link Observable}.
*/
<STUB extends AbstractStub<STUB>, RespT>
Observable<CellResponse<STUB, Either<RespT, Throwable>>> callExpectingErrors(
Function<ManagedChannel, STUB> stubFactory,
BiConsumer<STUB, StreamObserver<RespT>> fnCall) {
Map<Cell, STUB> clients = stubs(connector, stubFactory);
List<Observable<CellResponse<STUB, Either<RespT, Throwable>>>> results = clients.entrySet().stream().map(entry -> {
Cell cell = entry.getKey();
STUB client = entry.getValue();
Observable<RespT> request = callSingleCell(client, fnCall);
return request.map(result ->
new CellResponse<>(cell, client, Either.<RespT, Throwable>ofValue(result))
).onErrorResumeNext(error -> Observable.just(
new CellResponse<>(cell, client, Either.ofError(error)))
);
}).collect(Collectors.toList());
return Observable.merge(results);
}
public static Method getGrpcMethod(AbstractStub<?> grpcStub, Method method, Set<Class> handlerTypes) {
List<Class> transferredParameters = Arrays.stream((Class[]) method.getParameterTypes())
.filter(type -> !handlerTypes.contains(type))
.collect(Collectors.toList());
Preconditions.checkArgument(transferredParameters.size() <= 1, "Method must have one or zero protobuf object parameters but is: %s", method);
Class<?> requestType = CollectionsExt.firstOrDefault(transferredParameters, Empty.class);
Preconditions.checkArgument(Message.class.isAssignableFrom(requestType), "Not protobuf message in method parameter: %s", method);
Class<?> returnType = method.getReturnType();
Preconditions.checkArgument(Flux.class.isAssignableFrom(returnType) || Mono.class.isAssignableFrom(returnType), "Flux or Mono return type expected but is: %s", returnType);
try {
return grpcStub.getClass().getMethod(method.getName(), requestType, StreamObserver.class);
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("React method does not match any of the available GRPC stub methods: " + method);
}
}
/**
* Creates the instance to be injected for the given member.
*
* @param <T> The type of the instance to be injected.
* @param name The name that was used to create the channel.
* @param injectionTarget The target member for the injection.
* @param injectionType The class that should injected.
* @param channel The channel that should be used to create the instance.
* @return The value that matches the type of the given field.
* @throws BeansException If the value of the field could not be created or the type of the field is unsupported.
*/
protected <T> T valueForMember(final String name, final Member injectionTarget,
final Class<T> injectionType,
final Channel channel) throws BeansException {
if (Channel.class.equals(injectionType)) {
return injectionType.cast(channel);
} else if (AbstractStub.class.isAssignableFrom(injectionType)) {
@SuppressWarnings("unchecked") // Eclipse incorrectly marks this as not required
AbstractStub<?> stub = createStub(injectionType.asSubclass(AbstractStub.class), channel);
for (final StubTransformer stubTransformer : getStubTransformers()) {
stub = stubTransformer.transform(name, stub);
}
return injectionType.cast(stub);
} else {
throw new InvalidPropertyException(injectionTarget.getDeclaringClass(), injectionTarget.getName(),
"Unsupported type " + injectionType.getName());
}
}
/**
* Creates a stub of the given type.
*
* @param <T> The type of the instance to be injected.
* @param stubType The type of the stub to create.
* @param channel The channel used to create the stub.
* @return The newly created stub.
*
* @throws BeanInstantiationException If the stub couldn't be created.
*/
protected <T extends AbstractStub<T>> T createStub(final Class<T> stubType, final Channel channel) {
try {
// First try the public static factory method
final String methodName = deriveStubFactoryMethodName(stubType);
final Class<?> enclosingClass = stubType.getEnclosingClass();
final Method factoryMethod = enclosingClass.getMethod(methodName, Channel.class);
return stubType.cast(factoryMethod.invoke(null, channel));
} catch (final Exception e) {
try {
// Use the private constructor as backup
final Constructor<T> constructor = stubType.getDeclaredConstructor(Channel.class);
constructor.setAccessible(true);
return constructor.newInstance(channel);
} catch (final Exception e1) {
e.addSuppressed(e1);
}
throw new BeanInstantiationException(stubType, "Failed to create gRPC client", e);
}
}
<T extends AbstractStub<T>, R> CompletableFuture<R> withNewChannel(URI endpoint, Function<ManagedChannel, T> stubCustomizer,
Function<T, CompletableFuture<R>> stubConsumer) {
final ManagedChannel channel = defaultChannelBuilder().nameResolverFactory(
forEndpoints(
Util.supplyIfNull(builder.authority(), () -> ""),
Collections.singleton(endpoint),
Util.supplyIfNull(builder.uriResolverLoader(), URIResolverLoader::defaultLoader)))
.build();
try {
T stub = stubCustomizer.apply(channel);
return stubConsumer.apply(stub).whenComplete((r, t) -> channel.shutdown());
} catch (Exception e) {
channel.shutdown();
throw EtcdExceptionFactory.toEtcdException(e);
}
}
@Override
public <T> T unwrap(Object client, Class<T> type) {
final T unwrapped = super.unwrap(client, type);
if (unwrapped != null) {
return unwrapped;
}
if (!(client instanceof AbstractStub)) {
return null;
}
final Channel ch = ((AbstractStub<?>) client).getChannel();
if (!(ch instanceof Unwrappable)) {
return null;
}
return ((Unwrappable) ch).as(type);
}
/**
* Check that all gRPC client stubs are available in context, that they are prototype and not singletons and that
* they all share a single channel singleton.
*/
@Test
public void clientsBeans() {
Assert.assertNotNull(applicationContext);
final Class<?>[] clientStubClasses = {
PingServiceGrpc.PingServiceFutureStub.class,
JobServiceGrpc.JobServiceFutureStub.class,
HeartBeatServiceGrpc.HeartBeatServiceStub.class,
JobKillServiceGrpc.JobKillServiceFutureStub.class,
FileStreamServiceGrpc.FileStreamServiceStub.class,
};
for (final Class<?> clientStubClass : clientStubClasses) {
final AbstractStub stub1 = (AbstractStub) applicationContext.getBean(clientStubClass);
final AbstractStub stub2 = (AbstractStub) applicationContext.getBean(clientStubClass);
Assert.assertNotNull(stub1);
Assert.assertNotNull(stub2);
Assert.assertNotSame(stub1, stub2);
Assert.assertSame(stub1.getChannel(), stub2.getChannel());
}
}
/**
* Derives the name of the factory method from the given stub type.
*
* @param stubType The type of the stub to get it for.
* @return The name of the factory method.
* @throws IllegalArgumentException If the method was called with an unsupported stub type.
*/
protected String deriveStubFactoryMethodName(final Class<? extends AbstractStub<?>> stubType) {
if (AbstractAsyncStub.class.isAssignableFrom(stubType)) {
return "newStub";
} else if (AbstractBlockingStub.class.isAssignableFrom(stubType)) {
return "newBlockingStub";
} else if (AbstractFutureStub.class.isAssignableFrom(stubType)) {
return "newFutureStub";
} else {
throw new IllegalArgumentException(
"Unsupported stub type: " + stubType.getName() + " -> Please report this issue.");
}
}
private boolean isGrpcStubClient(Class<?> referenceClass) {
if (AbstractStub.class.isAssignableFrom(referenceClass)) {
return true;
} else {
return false;
}
}
private <STUB extends AbstractStub<STUB>> STUB attachCallHeaders(STUB client) {
Metadata metadata = new Metadata();
metadata.put(V3HeaderInterceptor.CALLER_ID_KEY, "embeddedGatewayClient");
metadata.put(V3HeaderInterceptor.CALL_REASON_KEY, "test call");
metadata.put(V3HeaderInterceptor.DEBUG_KEY, "true");
return client.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata));
}
private <STUB extends AbstractStub<STUB>> STUB attachCallHeaders(STUB client) {
Metadata metadata = new Metadata();
metadata.put(V3HeaderInterceptor.CALLER_ID_KEY, "embeddedFederationClient");
metadata.put(V3HeaderInterceptor.CALL_REASON_KEY, "test call");
metadata.put(V3HeaderInterceptor.DEBUG_KEY, "true");
return client.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata));
}
public static <STUB extends AbstractStub<STUB>> STUB attachCallHeaders(STUB client) {
Metadata metadata = new Metadata();
metadata.put(V3HeaderInterceptor.CALLER_ID_KEY, "testkitClient");
metadata.put(V3HeaderInterceptor.CALL_REASON_KEY, "test call");
metadata.put(V3HeaderInterceptor.DEBUG_KEY, "true");
return client.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata));
}
public GrpcToReactorClientFactory getGrpcToReactorClientFactory() {
return new GrpcToReactorClientFactory() {
@Override
public <GRPC_STUB extends AbstractStub<GRPC_STUB>, REACT_API> REACT_API apply(GRPC_STUB stub, Class<REACT_API> apiType, ServiceDescriptor serviceDescriptor) {
return ReactorToGrpcClientBuilder
.<REACT_API, GRPC_STUB, CallMetadata>newBuilder(
apiType, stub, serviceDescriptor, CallMetadata.class
)
.withGrpcStubDecorator(CommonCallMetadataUtils.newGrpcStubDecorator(AnonymousCallMetadataResolver.getInstance()))
.withTimeout(Duration.ofMillis(GrpcRequestConfiguration.DEFAULT_REQUEST_TIMEOUT_MS))
.withStreamingTimeout(Duration.ofMillis(GrpcRequestConfiguration.DEFAULT_STREAMING_TIMEOUT_MS))
.build();
}
};
}
static <T extends AbstractStub> Map<Cell, T> stubs(CellConnector connector, Function<ManagedChannel, T> stubFactory) {
return connector.getChannels()
.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> stubFactory.apply(entry.getValue())
));
}
static <STUB extends AbstractStub<STUB>, RespT> Observable<RespT> callToCell(
Cell cell,
CellConnector connector,
Function<ManagedChannel, STUB> stubFactory,
BiConsumer<STUB, StreamObserver<RespT>> fnCall) {
Optional<ManagedChannel> channel = connector.getChannelForCell(cell);
if (!channel.isPresent()) {
return Observable.error(TitusServiceException.invalidArgument("Invalid Cell " + cell));
}
STUB targetClient = stubFactory.apply(channel.get());
return GrpcUtil.createRequestObservable(emitter -> {
StreamObserver<RespT> streamObserver = GrpcUtil.createSimpleClientResponseObserver(emitter);
fnCall.accept(targetClient, streamObserver);
});
}
private <STUB extends AbstractStub<STUB>, RespT>
Observable<RespT> callSingleCell(STUB client, BiConsumer<STUB, StreamObserver<RespT>> fnCall) {
return createRequestObservable(emitter -> {
StreamObserver<RespT> streamObserver = createSimpleClientResponseObserver(emitter);
fnCall.accept(client, streamObserver);
});
}
/**
* Rank errors with a provided {@link Comparator} by gRPC error codes, and return the most important one. In case of
* a tie, the return is undefined and can be any of the most important errors.
*/
static <STUB extends AbstractStub<STUB>, T> ErrorMerger<STUB, T> grpc(Comparator<Status> rank) {
return (one, other) -> {
Status oneStatus = Status.fromThrowable(one.getResult().getError());
Status otherStatus = Status.fromThrowable(other.getResult().getError());
Preconditions.checkArgument(isNotOK(oneStatus), "status is not an error");
Preconditions.checkArgument(isNotOK(otherStatus), "status is not an error");
if (rank.compare(oneStatus, otherStatus) <= 0) {
return one;
}
return other;
};
}
public DefaultGrpcToReactorClientFactory(GrpcRequestConfiguration configuration,
BiFunction<AbstractStub, Optional<CONTEXT>, AbstractStub> grpcStubDecorator,
Class<CONTEXT> contextType) {
this.configuration = configuration;
this.grpcStubDecorator = grpcStubDecorator;
this.contextType = contextType;
}
@Override
public <GRPC_STUB extends AbstractStub<GRPC_STUB>, REACT_API> REACT_API apply(GRPC_STUB stub, Class<REACT_API> apiType, ServiceDescriptor serviceDescriptor) {
return ReactorToGrpcClientBuilder
.newBuilder(
apiType, stub, serviceDescriptor, contextType
)
.withGrpcStubDecorator((BiFunction<GRPC_STUB, Optional<CONTEXT>, GRPC_STUB>) grpcStubDecorator)
.withTimeout(Duration.ofMillis(configuration.getRequestTimeoutMs()))
.withStreamingTimeout(Duration.ofMillis(configuration.getStreamingTimeoutMs()))
.build();
}
public static <STUB extends AbstractStub<STUB>> BiFunction<STUB, Optional<CallMetadata>, STUB> newGrpcStubDecorator(CallMetadataResolver callMetadataResolver) {
return (grpcStub, callMetadataOpt) -> callMetadataOpt
.map(callMetadata -> V3HeaderInterceptor.attachCallMetadata(grpcStub, callMetadata))
.orElseGet(() ->
callMetadataResolver.resolve()
.map(callMetadata -> V3HeaderInterceptor.attachCallMetadata(grpcStub, callMetadata))
.orElse(grpcStub)
);
}
public static <REACT_API, GRPC_STUB extends AbstractStub<GRPC_STUB>, CONTEXT> ReactorToGrpcClientBuilder<REACT_API, GRPC_STUB, CONTEXT> newBuilder(
Class<REACT_API> reactApi,
GRPC_STUB grpcStub,
ServiceDescriptor grpcServiceDescriptor,
Class<CONTEXT> contextType) {
Preconditions.checkArgument(reactApi.isInterface(), "Interface type required");
return new ReactorToGrpcClientBuilder<>(reactApi, grpcStub, grpcServiceDescriptor, contextType);
}
public static <REACT_API, GRPC_STUB extends AbstractStub<GRPC_STUB>, CONTEXT> ReactorToGrpcClientBuilder<REACT_API, GRPC_STUB, CONTEXT> newBuilderWithDefaults(
Class<REACT_API> reactApi,
GRPC_STUB grpcStub,
ServiceDescriptor grpcServiceDescriptor,
Class<CONTEXT> contextType) {
Preconditions.checkArgument(reactApi.isInterface(), "Interface type required");
return new ReactorToGrpcClientBuilder<>(reactApi, grpcStub, grpcServiceDescriptor, contextType)
.withTimeout(Duration.ofMillis(DEFAULT_REQUEST_TIMEOUT_MS))
.withStreamingTimeout(Duration.ofMillis(DEFAULT_STREAMING_TIMEOUT_MS))
.withGrpcStubDecorator(EMPTY_STUB_DECORATOR);
}
private <T extends AbstractStub<T>> T deadlined(Supplier<T> getter) {
T stub = getter.get();
if (deadlineAfter > 0) {
stub = stub.withDeadlineAfter(deadlineAfter, deadlineAfterUnits);
}
return stub;
}