下面列出了怎么用io.grpc.health.v1.HealthCheckResponse的API类实例代码及写法,或者点击链接到github查看源代码。
void handleResponse(HealthCheckResponse response) {
callHasResponded = true;
backoffPolicy = null;
ServingStatus status = response.getStatus();
// running == true means the Subchannel's state (rawState) is READY
if (Objects.equal(status, ServingStatus.SERVING)) {
subchannelLogger.log(ChannelLogLevel.INFO, "READY: health-check responded SERVING");
gotoState(ConnectivityStateInfo.forNonError(READY));
} else {
subchannelLogger.log(
ChannelLogLevel.INFO, "TRANSIENT_FAILURE: health-check responded {0}", status);
gotoState(
ConnectivityStateInfo.forTransientFailure(
Status.UNAVAILABLE.withDescription(
"Health-check service responded "
+ status + " for '" + callServiceName + "'")));
}
call.request(1);
}
@Test
public void checkValidStatus() throws Exception {
manager.setStatus(SERVICE1, ServingStatus.NOT_SERVING);
manager.setStatus(SERVICE2, ServingStatus.SERVING);
HealthCheckRequest request = HealthCheckRequest.newBuilder().setService(SERVICE1).build();
HealthCheckResponse response =
blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS).check(request);
assertThat(response).isEqualTo(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.NOT_SERVING).build());
request = HealthCheckRequest.newBuilder().setService(SERVICE2).build();
response = blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS).check(request);
assertThat(response).isEqualTo(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVING).build());
assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(0);
assertThat(service.numWatchersForTest(SERVICE2)).isEqualTo(0);
}
/**
* Configures the services that should be served by the server.
*
* @param builder The server builder to configure.
*/
protected void configureServices(final T builder) {
// support health check
if (this.properties.isHealthServiceEnabled()) {
builder.addService(this.healthStatusManager.getHealthService());
}
if (this.properties.isReflectionServiceEnabled()) {
builder.addService(ProtoReflectionService.newInstance());
}
for (final GrpcServiceDefinition service : this.serviceList) {
final String serviceName = service.getDefinition().getServiceDescriptor().getName();
log.info("Registered gRPC service: " + serviceName + ", bean: " + service.getBeanName() + ", class: "
+ service.getBeanClazz().getName());
builder.addService(service.getDefinition());
this.healthStatusManager.setStatus(serviceName, HealthCheckResponse.ServingStatus.SERVING);
}
}
@Override
public void check(HealthCheckRequest request, StreamObserver<HealthCheckResponse> responseObserver) {
try {
if (healthCheck.check().get().isHealthy()) {
responseObserver.onNext(
HealthCheckResponse.newBuilder().setStatus(HealthCheckResponse.ServingStatus.SERVING).build()
);
} else {
responseObserver.onNext(
HealthCheckResponse.newBuilder().setStatus(HealthCheckResponse.ServingStatus.NOT_SERVING).build()
);
}
} catch (Exception ex) {
GRPC_HELPER.onError(responseObserver, ex);
} finally {
responseObserver.onCompleted();
}
}
@Test
public void healthServing() throws Exception {
// Generate a unique in-process server name.
String serverName = InProcessServerBuilder.generateName();
HealthCheckAggregator hca = mock(HealthCheckAggregator.class);
CompletableFuture<HealthCheckStatus> hcsf = mock(CompletableFuture.class);
HealthCheckStatus hcs = mock(HealthCheckStatus.class);
when(hcs.isHealthy()).thenReturn(true);
when(hcsf.get()).thenReturn(hcs);
when(hca.check()).thenReturn(hcsf);
HealthServiceImpl healthyService = new HealthServiceImpl(hca);
addService(serverName, healthyService);
HealthGrpc.HealthBlockingStub blockingStub = HealthGrpc.newBlockingStub(
// Create a client channel and register for automatic graceful shutdown.
grpcCleanup.register(InProcessChannelBuilder.forName(serverName).directExecutor().build()));
HealthCheckResponse reply = blockingStub.check(HealthCheckRequest.newBuilder().build());
assertEquals(HealthCheckResponse.ServingStatus.SERVING, reply.getStatus());
}
@Test
public void healthNotServing() throws Exception {
// Generate a unique in-process server name.
String serverName = InProcessServerBuilder.generateName();
HealthCheckAggregator hca = mock(HealthCheckAggregator.class);
CompletableFuture<HealthCheckStatus> hcsf = mock(CompletableFuture.class);
HealthCheckStatus hcs = mock(HealthCheckStatus.class);
when(hcs.isHealthy()).thenReturn(false);
when(hcsf.get()).thenReturn(hcs);
when(hca.check()).thenReturn(hcsf);
HealthServiceImpl healthyService = new HealthServiceImpl(hca);
addService(serverName, healthyService);
HealthGrpc.HealthBlockingStub blockingStub = HealthGrpc.newBlockingStub(
// Create a client channel and register for automatic graceful shutdown.
grpcCleanup.register(InProcessChannelBuilder.forName(serverName).directExecutor().build()));
HealthCheckResponse reply = blockingStub.check(HealthCheckRequest.newBuilder().build());
assertEquals(HealthCheckResponse.ServingStatus.NOT_SERVING, reply.getStatus());
}
/**
* Configures the services that should be served by the server.
*
* @param builder The server builder to configure.
*/
protected void configureServices(final T builder) {
// support health check
if (this.properties.isHealthServiceEnabled()) {
builder.addService(this.healthStatusManager.getHealthService());
}
if (this.properties.isReflectionServiceEnabled()) {
builder.addService(ProtoReflectionService.newInstance());
}
for (final GrpcServiceDefinition service : this.serviceList) {
final String serviceName = service.getDefinition().getServiceDescriptor().getName();
log.info("Registered gRPC service: " + serviceName + ", bean: " + service.getBeanName() + ", class: "
+ service.getBeanClazz().getName());
builder.addService(service.getDefinition());
this.healthStatusManager.setStatus(serviceName, HealthCheckResponse.ServingStatus.SERVING);
}
}
@Override
public void check(
HealthCheckRequest request, StreamObserver<HealthCheckResponse> responseObserver) {
ServingStatus globalStatus;
if (!request.getService().equals("")) {
globalStatus = getStatus("");
if (request.getService().equals("ready")) {
if (globalStatus == ServingStatus.SERVING) {
setStatus("ready", ModelDBHibernateUtil.checkReady());
}
} else if (request.getService().equals("live")) {
setStatus("live", ModelDBHibernateUtil.checkLive());
}
}
ServingStatus status = getStatus(request.getService());
if (status == null) {
responseObserver.onError(
new StatusException(
Status.NOT_FOUND.withDescription("unknown service " + request.getService())));
} else {
HealthCheckResponse response = HealthCheckResponse.newBuilder().setStatus(status).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
}
void handleResponse(HealthCheckResponse response) {
callHasResponded = true;
backoffPolicy = null;
ServingStatus status = response.getStatus();
// running == true means the Subchannel's state (rawState) is READY
if (Objects.equal(status, ServingStatus.SERVING)) {
subchannelLogger.log(ChannelLogLevel.INFO, "READY: health-check responded SERVING");
gotoState(ConnectivityStateInfo.forNonError(READY));
} else {
subchannelLogger.log(
ChannelLogLevel.INFO, "TRANSIENT_FAILURE: health-check responded {0}", status);
gotoState(
ConnectivityStateInfo.forTransientFailure(
Status.UNAVAILABLE.withDescription(
"Health-check service responded "
+ status + " for '" + callServiceName + "'")));
}
call.request(1);
}
@Test
public void checkValidStatus() throws Exception {
manager.setStatus(SERVICE1, ServingStatus.NOT_SERVING);
manager.setStatus(SERVICE2, ServingStatus.SERVING);
HealthCheckRequest request = HealthCheckRequest.newBuilder().setService(SERVICE1).build();
HealthCheckResponse response =
blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS).check(request);
assertThat(response).isEqualTo(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.NOT_SERVING).build());
request = HealthCheckRequest.newBuilder().setService(SERVICE2).build();
response = blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS).check(request);
assertThat(response).isEqualTo(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVING).build());
assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(0);
assertThat(service.numWatchersForTest(SERVICE2)).isEqualTo(0);
}
private void start() throws IOException {
int port = 50051;
server = ServerBuilder.forPort(port)
.addService(manager.getHealthService())
.addService(new GreeterImpl())
.build()
.start();
List<ServerServiceDefinition> services = server.getServices();
String serviceName;
for (ServerServiceDefinition s : services) {
serviceName = s.getServiceDescriptor().getName();
manager.setStatus(serviceName, HealthCheckResponse.ServingStatus.SERVING);
}
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
HealthCheckServer.this.stop();
System.err.println("*** server shut down");
}
});
}
public void check() {
HealthCheckRequest request = HealthCheckRequest.newBuilder().setService(GreeterGrpc.SERVICE_NAME).build();
HealthCheckResponse response;
try {
response = blockingStub.check(request);
} catch (Exception e) {
logger.error("RPC failed:", e);
return;
}
logger.info("Greeting response: " + response.getStatus());
}
@Override
public void check(HealthCheckRequest request,
StreamObserver<HealthCheckResponse> responseObserver) {
ServingStatus status = statusMap.get(request.getService());
if (status == null) {
responseObserver.onError(new StatusException(
Status.NOT_FOUND.withDescription("unknown service " + request.getService())));
} else {
HealthCheckResponse response = HealthCheckResponse.newBuilder().setStatus(status).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
}
@VisibleForTesting
int numWatchersForTest(String service) {
synchronized (watchLock) {
IdentityHashMap<StreamObserver<HealthCheckResponse>, Boolean> serviceWatchers =
watchers.get(service);
if (serviceWatchers == null) {
return 0;
}
return serviceWatchers.size();
}
}
@GuardedBy("watchLock")
private void notifyWatchers(String service, @Nullable ServingStatus status) {
HealthCheckResponse response = getResponseForWatch(status);
IdentityHashMap<StreamObserver<HealthCheckResponse>, Boolean> serviceWatchers =
watchers.get(service);
if (serviceWatchers != null) {
for (StreamObserver<HealthCheckResponse> responseObserver : serviceWatchers.keySet()) {
responseObserver.onNext(response);
}
}
}
@Override
public void onMessage(final HealthCheckResponse response) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (activeRpc == HcStream.this) {
handleResponse(response);
}
}
});
}
@Override
public void watch(HealthCheckRequest request,
StreamObserver<HealthCheckResponse> responseObserver) {
final ServerSideCall call = new ServerSideCall(request, responseObserver);
Context.current().addListener(
new CancellationListener() {
@Override
public void cancelled(Context ctx) {
call.cancelled = true;
}
}, MoreExecutors.directExecutor());
calls.add(call);
}
@Test
public void watchRemovedWhenClientCloses() throws Exception {
CancellableContext withCancellation = Context.current().withCancellation();
Context prevCtx = withCancellation.attach();
RespObserver respObs1 = new RespObserver();
try {
assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(0);
stub.watch(HealthCheckRequest.newBuilder().setService(SERVICE1).build(), respObs1);
} finally {
withCancellation.detach(prevCtx);
}
RespObserver respObs1b = new RespObserver();
stub.watch(HealthCheckRequest.newBuilder().setService(SERVICE1).build(), respObs1b);
RespObserver respObs2 = new RespObserver();
stub.watch(HealthCheckRequest.newBuilder().setService(SERVICE2).build(), respObs2);
assertThat(respObs1.responses.poll()).isEqualTo(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVICE_UNKNOWN).build());
assertThat(respObs1b.responses.poll()).isEqualTo(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVICE_UNKNOWN).build());
assertThat(respObs2.responses.poll()).isEqualTo(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVICE_UNKNOWN).build());
assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(2);
assertThat(service.numWatchersForTest(SERVICE2)).isEqualTo(1);
assertThat(respObs1.responses).isEmpty();
assertThat(respObs1b.responses).isEmpty();
assertThat(respObs2.responses).isEmpty();
// This will cancel the RPC with respObs1
withCancellation.close();
assertThat(respObs1.responses.poll()).isInstanceOf(Throwable.class);
assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(1);
assertThat(service.numWatchersForTest(SERVICE2)).isEqualTo(1);
assertThat(respObs1.responses).isEmpty();
assertThat(respObs1b.responses).isEmpty();
assertThat(respObs2.responses).isEmpty();
}
@Override
public void onNext(HealthCheckResponse healthCheckResponse) {
if (healthCheckResponse.getStatus() == HealthCheckResponse.ServingStatus.SERVING) {
health = health.up();
} else {
health = health.down();
}
}
@Test
public void testHealthCheck() throws ExecutionException, InterruptedException {
final HealthCheckRequest healthCheckRequest = HealthCheckRequest.newBuilder().setService(GreeterGrpc.getServiceDescriptor().getName()).build();
final HealthGrpc.HealthFutureStub healthFutureStub = HealthGrpc.newFutureStub(channel);
final HealthCheckResponse.ServingStatus servingStatus = healthFutureStub.check(healthCheckRequest).get().getStatus();
assertNotNull(servingStatus);
assertEquals(servingStatus, HealthCheckResponse.ServingStatus.SERVING);
}
@Override
public void run(String... args) throws Exception {
log.info("Starting gRPC Server ...");
Collection<ServerInterceptor> globalInterceptors = getBeanNamesByTypeWithAnnotation(GRpcGlobalInterceptor.class, ServerInterceptor.class)
.map(name -> applicationContext.getBeanFactory().getBean(name, ServerInterceptor.class))
.collect(Collectors.toList());
// Adding health service
serverBuilder.addService(healthStatusManager.getHealthService());
// find and register all GRpcService-enabled beans
getBeanNamesByTypeWithAnnotation(GRpcService.class, BindableService.class)
.forEach(name -> {
BindableService srv = applicationContext.getBeanFactory().getBean(name, BindableService.class);
ServerServiceDefinition serviceDefinition = srv.bindService();
GRpcService gRpcServiceAnn = applicationContext.findAnnotationOnBean(name, GRpcService.class);
serviceDefinition = bindInterceptors(serviceDefinition, gRpcServiceAnn, globalInterceptors);
serverBuilder.addService(serviceDefinition);
String serviceName = serviceDefinition.getServiceDescriptor().getName();
healthStatusManager.setStatus(serviceName, HealthCheckResponse.ServingStatus.SERVING);
log.info("'{}' service has been registered.", srv.getClass().getName());
});
if (gRpcServerProperties.isEnableReflection()) {
serverBuilder.addService(ProtoReflectionService.newInstance());
log.info("'{}' service has been registered.", ProtoReflectionService.class.getName());
}
configurator.accept(serverBuilder);
server = serverBuilder.build().start();
applicationContext.publishEvent(new GRpcServerInitializedEvent(applicationContext,server));
log.info("gRPC Server started, listening on port {}.", server.getPort());
startDaemonAwaitThread();
}
@Override
public void check(HealthCheckRequest request,
StreamObserver<HealthCheckResponse> responseObserver) {
ServingStatus status = statusMap.get(request.getService());
if (status == null) {
responseObserver.onError(new StatusException(
Status.NOT_FOUND.withDescription("unknown service " + request.getService())));
} else {
HealthCheckResponse response = HealthCheckResponse.newBuilder().setStatus(status).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
}
@VisibleForTesting
int numWatchersForTest(String service) {
synchronized (watchLock) {
IdentityHashMap<StreamObserver<HealthCheckResponse>, Boolean> serviceWatchers =
watchers.get(service);
if (serviceWatchers == null) {
return 0;
}
return serviceWatchers.size();
}
}
@GuardedBy("watchLock")
private void notifyWatchers(String service, @Nullable ServingStatus status) {
HealthCheckResponse response = getResponseForWatch(status);
IdentityHashMap<StreamObserver<HealthCheckResponse>, Boolean> serviceWatchers =
watchers.get(service);
if (serviceWatchers != null) {
for (StreamObserver<HealthCheckResponse> responseObserver : serviceWatchers.keySet()) {
responseObserver.onNext(response);
}
}
}
@Override
public void onMessage(final HealthCheckResponse response) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (activeRpc == HcStream.this) {
handleResponse(response);
}
}
});
}
@Override
public void watch(HealthCheckRequest request,
StreamObserver<HealthCheckResponse> responseObserver) {
final ServerSideCall call = new ServerSideCall(request, responseObserver);
Context.current().addListener(
new CancellationListener() {
@Override
public void cancelled(Context ctx) {
call.cancelled = true;
}
}, MoreExecutors.directExecutor());
calls.add(call);
}
@Test
public void defaultIsServing() throws Exception {
HealthCheckRequest request =
HealthCheckRequest.newBuilder().setService(HealthStatusManager.SERVICE_NAME_ALL_SERVICES)
.build();
HealthCheckResponse response =
blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS).check(request);
assertThat(response).isEqualTo(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVING).build());
}
@Test
public void watchRemovedWhenClientCloses() throws Exception {
CancellableContext withCancellation = Context.current().withCancellation();
Context prevCtx = withCancellation.attach();
RespObserver respObs1 = new RespObserver();
try {
assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(0);
stub.watch(HealthCheckRequest.newBuilder().setService(SERVICE1).build(), respObs1);
} finally {
withCancellation.detach(prevCtx);
}
RespObserver respObs1b = new RespObserver();
stub.watch(HealthCheckRequest.newBuilder().setService(SERVICE1).build(), respObs1b);
RespObserver respObs2 = new RespObserver();
stub.watch(HealthCheckRequest.newBuilder().setService(SERVICE2).build(), respObs2);
assertThat(respObs1.responses.poll()).isEqualTo(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVICE_UNKNOWN).build());
assertThat(respObs1b.responses.poll()).isEqualTo(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVICE_UNKNOWN).build());
assertThat(respObs2.responses.poll()).isEqualTo(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVICE_UNKNOWN).build());
assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(2);
assertThat(service.numWatchersForTest(SERVICE2)).isEqualTo(1);
assertThat(respObs1.responses).isEmpty();
assertThat(respObs1b.responses).isEmpty();
assertThat(respObs2.responses).isEmpty();
// This will cancel the RPC with respObs1
withCancellation.close();
assertThat(respObs1.responses.poll()).isInstanceOf(Throwable.class);
assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(1);
assertThat(service.numWatchersForTest(SERVICE2)).isEqualTo(1);
assertThat(respObs1.responses).isEmpty();
assertThat(respObs1b.responses).isEmpty();
assertThat(respObs2.responses).isEmpty();
}
private static HealthCheckResponse getResponseForWatch(@Nullable ServingStatus recordedStatus) {
return HealthCheckResponse.newBuilder().setStatus(
recordedStatus == null ? ServingStatus.SERVICE_UNKNOWN : recordedStatus).build();
}
private HealthCheckResponse makeResponse(ServingStatus status) {
return HealthCheckResponse.newBuilder().setStatus(status).build();
}