下面列出了怎么用io.grpc.health.v1.HealthCheckResponse.ServingStatus的API类实例代码及写法,或者点击链接到github查看源代码。
void handleResponse(HealthCheckResponse response) {
callHasResponded = true;
backoffPolicy = null;
ServingStatus status = response.getStatus();
// running == true means the Subchannel's state (rawState) is READY
if (Objects.equal(status, ServingStatus.SERVING)) {
subchannelLogger.log(ChannelLogLevel.INFO, "READY: health-check responded SERVING");
gotoState(ConnectivityStateInfo.forNonError(READY));
} else {
subchannelLogger.log(
ChannelLogLevel.INFO, "TRANSIENT_FAILURE: health-check responded {0}", status);
gotoState(
ConnectivityStateInfo.forTransientFailure(
Status.UNAVAILABLE.withDescription(
"Health-check service responded "
+ status + " for '" + callServiceName + "'")));
}
call.request(1);
}
@Test
public void checkValidStatus() throws Exception {
manager.setStatus(SERVICE1, ServingStatus.NOT_SERVING);
manager.setStatus(SERVICE2, ServingStatus.SERVING);
HealthCheckRequest request = HealthCheckRequest.newBuilder().setService(SERVICE1).build();
HealthCheckResponse response =
blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS).check(request);
assertThat(response).isEqualTo(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.NOT_SERVING).build());
request = HealthCheckRequest.newBuilder().setService(SERVICE2).build();
response = blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS).check(request);
assertThat(response).isEqualTo(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVING).build());
assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(0);
assertThat(service.numWatchersForTest(SERVICE2)).isEqualTo(0);
}
@Override
public void check(
HealthCheckRequest request, StreamObserver<HealthCheckResponse> responseObserver) {
ServingStatus globalStatus;
if (!request.getService().equals("")) {
globalStatus = getStatus("");
if (request.getService().equals("ready")) {
if (globalStatus == ServingStatus.SERVING) {
setStatus("ready", ModelDBHibernateUtil.checkReady());
}
} else if (request.getService().equals("live")) {
setStatus("live", ModelDBHibernateUtil.checkLive());
}
}
ServingStatus status = getStatus(request.getService());
if (status == null) {
responseObserver.onError(
new StatusException(
Status.NOT_FOUND.withDescription("unknown service " + request.getService())));
} else {
HealthCheckResponse response = HealthCheckResponse.newBuilder().setStatus(status).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
}
void handleResponse(HealthCheckResponse response) {
callHasResponded = true;
backoffPolicy = null;
ServingStatus status = response.getStatus();
// running == true means the Subchannel's state (rawState) is READY
if (Objects.equal(status, ServingStatus.SERVING)) {
subchannelLogger.log(ChannelLogLevel.INFO, "READY: health-check responded SERVING");
gotoState(ConnectivityStateInfo.forNonError(READY));
} else {
subchannelLogger.log(
ChannelLogLevel.INFO, "TRANSIENT_FAILURE: health-check responded {0}", status);
gotoState(
ConnectivityStateInfo.forTransientFailure(
Status.UNAVAILABLE.withDescription(
"Health-check service responded "
+ status + " for '" + callServiceName + "'")));
}
call.request(1);
}
@Test
public void checkValidStatus() throws Exception {
manager.setStatus(SERVICE1, ServingStatus.NOT_SERVING);
manager.setStatus(SERVICE2, ServingStatus.SERVING);
HealthCheckRequest request = HealthCheckRequest.newBuilder().setService(SERVICE1).build();
HealthCheckResponse response =
blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS).check(request);
assertThat(response).isEqualTo(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.NOT_SERVING).build());
request = HealthCheckRequest.newBuilder().setService(SERVICE2).build();
response = blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS).check(request);
assertThat(response).isEqualTo(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVING).build());
assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(0);
assertThat(service.numWatchersForTest(SERVICE2)).isEqualTo(0);
}
@Override
public void check(HealthCheckRequest request,
StreamObserver<HealthCheckResponse> responseObserver) {
ServingStatus status = statusMap.get(request.getService());
if (status == null) {
responseObserver.onError(new StatusException(
Status.NOT_FOUND.withDescription("unknown service " + request.getService())));
} else {
HealthCheckResponse response = HealthCheckResponse.newBuilder().setStatus(status).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
}
void setStatus(String service, ServingStatus status) {
synchronized (watchLock) {
ServingStatus prevStatus = statusMap.put(service, status);
if (prevStatus != status) {
notifyWatchers(service, status);
}
}
}
void clearStatus(String service) {
synchronized (watchLock) {
ServingStatus prevStatus = statusMap.remove(service);
if (prevStatus != null) {
notifyWatchers(service, null);
}
}
}
@GuardedBy("watchLock")
private void notifyWatchers(String service, @Nullable ServingStatus status) {
HealthCheckResponse response = getResponseForWatch(status);
IdentityHashMap<StreamObserver<HealthCheckResponse>, Boolean> serviceWatchers =
watchers.get(service);
if (serviceWatchers != null) {
for (StreamObserver<HealthCheckResponse> responseObserver : serviceWatchers.keySet()) {
responseObserver.onNext(response);
}
}
}
@Test
public void checkStatusNotFound() throws Exception {
manager.setStatus(SERVICE1, ServingStatus.SERVING);
// SERVICE2's status is not set
HealthCheckRequest request
= HealthCheckRequest.newBuilder().setService(SERVICE2).build();
try {
blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS).check(request);
fail("Should've failed");
} catch (StatusRuntimeException e) {
assertThat(e.getStatus().getCode()).isEqualTo(Status.Code.NOT_FOUND);
}
assertThat(service.numWatchersForTest(SERVICE2)).isEqualTo(0);
}
@Test
public void notFoundForClearedStatus() throws Exception {
manager.setStatus(SERVICE1, ServingStatus.SERVING);
manager.clearStatus(SERVICE1);
HealthCheckRequest request
= HealthCheckRequest.newBuilder().setService(SERVICE1).build();
try {
blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS).check(request);
fail("Should've failed");
} catch (StatusRuntimeException e) {
assertThat(e.getStatus().getCode()).isEqualTo(Status.Code.NOT_FOUND);
}
}
@Test
public void watchRemovedWhenClientCloses() throws Exception {
CancellableContext withCancellation = Context.current().withCancellation();
Context prevCtx = withCancellation.attach();
RespObserver respObs1 = new RespObserver();
try {
assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(0);
stub.watch(HealthCheckRequest.newBuilder().setService(SERVICE1).build(), respObs1);
} finally {
withCancellation.detach(prevCtx);
}
RespObserver respObs1b = new RespObserver();
stub.watch(HealthCheckRequest.newBuilder().setService(SERVICE1).build(), respObs1b);
RespObserver respObs2 = new RespObserver();
stub.watch(HealthCheckRequest.newBuilder().setService(SERVICE2).build(), respObs2);
assertThat(respObs1.responses.poll()).isEqualTo(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVICE_UNKNOWN).build());
assertThat(respObs1b.responses.poll()).isEqualTo(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVICE_UNKNOWN).build());
assertThat(respObs2.responses.poll()).isEqualTo(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVICE_UNKNOWN).build());
assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(2);
assertThat(service.numWatchersForTest(SERVICE2)).isEqualTo(1);
assertThat(respObs1.responses).isEmpty();
assertThat(respObs1b.responses).isEmpty();
assertThat(respObs2.responses).isEmpty();
// This will cancel the RPC with respObs1
withCancellation.close();
assertThat(respObs1.responses.poll()).isInstanceOf(Throwable.class);
assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(1);
assertThat(service.numWatchersForTest(SERVICE2)).isEqualTo(1);
assertThat(respObs1.responses).isEmpty();
assertThat(respObs1b.responses).isEmpty();
assertThat(respObs2.responses).isEmpty();
}
public void start() throws IOException {
actionCacheRequestCounter.start();
instances.start();
server.start();
healthStatusManager.setStatus(
HealthStatusManager.SERVICE_NAME_ALL_SERVICES, ServingStatus.SERVING);
}
public void stop() {
synchronized (this) {
if (stopping) {
return;
}
stopping = true;
}
healthStatusManager.setStatus(
HealthStatusManager.SERVICE_NAME_ALL_SERVICES, ServingStatus.NOT_SERVING);
try {
if (server != null) {
server.shutdown();
}
instances.stop();
server.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
if (server != null) {
server.shutdownNow();
}
}
if (!shutdownAndAwaitTermination(keepaliveScheduler, 10, TimeUnit.SECONDS)) {
logger.log(Level.WARNING, "could not shut down keepalive scheduler");
}
if (!actionCacheRequestCounter.stop()) {
logger.log(Level.WARNING, "count not shut down action cache request counter");
}
}
private void start() throws Exception {
health = new HealthStatusManager();
server =
NettyServerBuilder.forPort(port)
.addService(new TestServiceImpl(serverId))
.addService(new XdsUpdateHealthServiceImpl(health))
.addService(health.getHealthService())
.addService(ProtoReflectionService.newInstance())
.build()
.start();
health.setStatus("", ServingStatus.SERVING);
}
@Override
public void setServing(
EmptyProtos.Empty req, StreamObserver<EmptyProtos.Empty> responseObserver) {
health.setStatus("", ServingStatus.SERVING);
responseObserver.onNext(EmptyProtos.Empty.getDefaultInstance());
responseObserver.onCompleted();
}
@Override
public void setNotServing(
EmptyProtos.Empty req, StreamObserver<EmptyProtos.Empty> responseObserver) {
health.setStatus("", ServingStatus.NOT_SERVING);
responseObserver.onNext(EmptyProtos.Empty.getDefaultInstance());
responseObserver.onCompleted();
}
@Override
public void check(HealthCheckRequest request,
StreamObserver<HealthCheckResponse> responseObserver) {
ServingStatus status = statusMap.get(request.getService());
if (status == null) {
responseObserver.onError(new StatusException(
Status.NOT_FOUND.withDescription("unknown service " + request.getService())));
} else {
HealthCheckResponse response = HealthCheckResponse.newBuilder().setStatus(status).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
}
void setStatus(String service, ServingStatus status) {
synchronized (watchLock) {
if (terminal) {
logger.log(Level.FINE, "Ignoring status {} for {}", new Object[]{status, service});
return;
}
setStatusInternal(service, status);
}
}
@GuardedBy("watchLock")
private void setStatusInternal(String service, ServingStatus status) {
ServingStatus prevStatus = statusMap.put(service, status);
if (prevStatus != status) {
notifyWatchers(service, status);
}
}
void clearStatus(String service) {
synchronized (watchLock) {
if (terminal) {
logger.log(Level.FINE, "Ignoring status clearing for {}", new Object[]{service});
return;
}
ServingStatus prevStatus = statusMap.remove(service);
if (prevStatus != null) {
notifyWatchers(service, null);
}
}
}
void enterTerminalState() {
synchronized (watchLock) {
if (terminal) {
logger.log(Level.WARNING, "Already terminating", new RuntimeException());
return;
}
terminal = true;
for (String service : statusMap.keySet()) {
setStatusInternal(service, ServingStatus.NOT_SERVING);
}
}
}
@GuardedBy("watchLock")
private void notifyWatchers(String service, @Nullable ServingStatus status) {
HealthCheckResponse response = getResponseForWatch(status);
IdentityHashMap<StreamObserver<HealthCheckResponse>, Boolean> serviceWatchers =
watchers.get(service);
if (serviceWatchers != null) {
for (StreamObserver<HealthCheckResponse> responseObserver : serviceWatchers.keySet()) {
responseObserver.onNext(response);
}
}
}
@Test
public void defaultIsServing() throws Exception {
HealthCheckRequest request =
HealthCheckRequest.newBuilder().setService(HealthStatusManager.SERVICE_NAME_ALL_SERVICES)
.build();
HealthCheckResponse response =
blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS).check(request);
assertThat(response).isEqualTo(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVING).build());
}
@Test
public void checkStatusNotFound() throws Exception {
manager.setStatus(SERVICE1, ServingStatus.SERVING);
// SERVICE2's status is not set
HealthCheckRequest request
= HealthCheckRequest.newBuilder().setService(SERVICE2).build();
try {
blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS).check(request);
fail("Should've failed");
} catch (StatusRuntimeException e) {
assertThat(e.getStatus().getCode()).isEqualTo(Status.Code.NOT_FOUND);
}
assertThat(service.numWatchersForTest(SERVICE2)).isEqualTo(0);
}
@Test
public void notFoundForClearedStatus() throws Exception {
manager.setStatus(SERVICE1, ServingStatus.SERVING);
manager.clearStatus(SERVICE1);
HealthCheckRequest request
= HealthCheckRequest.newBuilder().setService(SERVICE1).build();
try {
blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS).check(request);
fail("Should've failed");
} catch (StatusRuntimeException e) {
assertThat(e.getStatus().getCode()).isEqualTo(Status.Code.NOT_FOUND);
}
}
@Test
public void watchRemovedWhenClientCloses() throws Exception {
CancellableContext withCancellation = Context.current().withCancellation();
Context prevCtx = withCancellation.attach();
RespObserver respObs1 = new RespObserver();
try {
assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(0);
stub.watch(HealthCheckRequest.newBuilder().setService(SERVICE1).build(), respObs1);
} finally {
withCancellation.detach(prevCtx);
}
RespObserver respObs1b = new RespObserver();
stub.watch(HealthCheckRequest.newBuilder().setService(SERVICE1).build(), respObs1b);
RespObserver respObs2 = new RespObserver();
stub.watch(HealthCheckRequest.newBuilder().setService(SERVICE2).build(), respObs2);
assertThat(respObs1.responses.poll()).isEqualTo(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVICE_UNKNOWN).build());
assertThat(respObs1b.responses.poll()).isEqualTo(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVICE_UNKNOWN).build());
assertThat(respObs2.responses.poll()).isEqualTo(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVICE_UNKNOWN).build());
assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(2);
assertThat(service.numWatchersForTest(SERVICE2)).isEqualTo(1);
assertThat(respObs1.responses).isEmpty();
assertThat(respObs1b.responses).isEmpty();
assertThat(respObs2.responses).isEmpty();
// This will cancel the RPC with respObs1
withCancellation.close();
assertThat(respObs1.responses.poll()).isInstanceOf(Throwable.class);
assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(1);
assertThat(service.numWatchersForTest(SERVICE2)).isEqualTo(1);
assertThat(respObs1.responses).isEmpty();
assertThat(respObs1b.responses).isEmpty();
assertThat(respObs2.responses).isEmpty();
}
private static HealthCheckResponse getResponseForWatch(@Nullable ServingStatus recordedStatus) {
return HealthCheckResponse.newBuilder().setStatus(
recordedStatus == null ? ServingStatus.SERVICE_UNKNOWN : recordedStatus).build();
}
@Test
public void healthCheckDisabledWhenServiceNotImplemented() {
Attributes resolutionAttrs = attrsWithHealthCheckService("BarService");
hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs);
verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs));
verifyNoMoreInteractions(origLb);
// We create 2 Subchannels. One of them connects to a server that doesn't implement health check
for (int i = 0; i < 2; i++) {
createSubchannel(i, Attributes.EMPTY);
}
InOrder inOrder = inOrder(origLb);
for (int i = 0; i < 2; i++) {
hcLbEventDelivery.handleSubchannelState(
subchannels[i], ConnectivityStateInfo.forNonError(READY));
assertThat(healthImpls[i].calls).hasSize(1);
inOrder.verify(origLb).handleSubchannelState(
same(subchannels[i]), eq(ConnectivityStateInfo.forNonError(CONNECTING)));
}
ServerSideCall serverCall0 = healthImpls[0].calls.poll();
ServerSideCall serverCall1 = healthImpls[1].calls.poll();
subchannels[0].logs.clear();
// subchannels[0] gets UNIMPLEMENTED for health checking, which will disable health
// checking and it'll use the original state, which is currently READY.
// In reality UNIMPLEMENTED is generated by GRPC server library, but the client can't tell
// whether it's the server library or the service implementation that returned this status.
serverCall0.responseObserver.onError(Status.UNIMPLEMENTED.asException());
inOrder.verify(origLb).handleSubchannelState(
same(subchannels[0]), eq(ConnectivityStateInfo.forNonError(READY)));
assertThat(subchannels[0].logs).containsExactly(
"ERROR: Health-check disabled: " + Status.UNIMPLEMENTED,
"INFO: READY (no health-check)").inOrder();
// subchannels[1] has normal health checking
serverCall1.responseObserver.onNext(makeResponse(ServingStatus.NOT_SERVING));
inOrder.verify(origLb).handleSubchannelState(
same(subchannels[1]),
unavailableStateWithMsg("Health-check service responded NOT_SERVING for 'BarService'"));
// Without health checking, states from underlying Subchannel are delivered directly to origLb
hcLbEventDelivery.handleSubchannelState(
subchannels[0], ConnectivityStateInfo.forNonError(IDLE));
inOrder.verify(origLb).handleSubchannelState(
same(subchannels[0]), eq(ConnectivityStateInfo.forNonError(IDLE)));
// Re-connecting on a Subchannel will reset the "disabled" flag.
assertThat(healthImpls[0].calls).hasSize(0);
hcLbEventDelivery.handleSubchannelState(
subchannels[0], ConnectivityStateInfo.forNonError(READY));
assertThat(healthImpls[0].calls).hasSize(1);
serverCall0 = healthImpls[0].calls.poll();
inOrder.verify(origLb).handleSubchannelState(
same(subchannels[0]), eq(ConnectivityStateInfo.forNonError(CONNECTING)));
// Health check now works as normal
serverCall0.responseObserver.onNext(makeResponse(ServingStatus.SERVICE_UNKNOWN));
inOrder.verify(origLb).handleSubchannelState(
same(subchannels[0]),
unavailableStateWithMsg("Health-check service responded SERVICE_UNKNOWN for 'BarService'"));
verifyNoMoreInteractions(origLb);
verifyZeroInteractions(backoffPolicyProvider);
}
@Test
public void backoffRetriesWhenServerErroneouslyClosesRpcBeforeAnyResponse() {
Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService");
hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs);
verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs));
verifyNoMoreInteractions(origLb);
FakeSubchannel subchannel = (FakeSubchannel) createSubchannel(0, Attributes.EMPTY);
assertThat(subchannel).isSameAs(subchannels[0]);
InOrder inOrder = inOrder(origLb, backoffPolicyProvider, backoffPolicy1, backoffPolicy2);
hcLbEventDelivery.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
inOrder.verify(origLb).handleSubchannelState(
same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING)));
HealthImpl healthImpl = healthImpls[0];
assertThat(healthImpl.calls).hasSize(1);
assertThat(clock.getPendingTasks()).isEmpty();
subchannel.logs.clear();
// Server closes the health checking RPC without any response
healthImpl.calls.poll().responseObserver.onCompleted();
// which results in TRANSIENT_FAILURE
inOrder.verify(origLb).handleSubchannelState(
same(subchannel),
unavailableStateWithMsg(
"Health-check stream unexpectedly closed with " + Status.OK + " for 'TeeService'"));
assertThat(subchannel.logs).containsExactly(
"INFO: TRANSIENT_FAILURE: health-check stream closed with " + Status.OK,
"DEBUG: Will retry health-check after 11 ns").inOrder();
// Retry with backoff is scheduled
inOrder.verify(backoffPolicyProvider).get();
inOrder.verify(backoffPolicy1).nextBackoffNanos();
assertThat(clock.getPendingTasks()).hasSize(1);
verifyRetryAfterNanos(inOrder, subchannel, healthImpl, 11);
assertThat(clock.getPendingTasks()).isEmpty();
subchannel.logs.clear();
// Server closes the health checking RPC without any response
healthImpl.calls.poll().responseObserver.onError(Status.CANCELLED.asException());
// which also results in TRANSIENT_FAILURE, with a different description
inOrder.verify(origLb).handleSubchannelState(
same(subchannel),
unavailableStateWithMsg(
"Health-check stream unexpectedly closed with "
+ Status.CANCELLED + " for 'TeeService'"));
assertThat(subchannel.logs).containsExactly(
"INFO: TRANSIENT_FAILURE: health-check stream closed with " + Status.CANCELLED,
"DEBUG: Will retry health-check after 21 ns").inOrder();
// Retry with backoff
inOrder.verify(backoffPolicy1).nextBackoffNanos();
verifyRetryAfterNanos(inOrder, subchannel, healthImpl, 21);
// Server responds this time
healthImpl.calls.poll().responseObserver.onNext(makeResponse(ServingStatus.SERVING));
inOrder.verify(origLb).handleSubchannelState(
same(subchannel), eq(ConnectivityStateInfo.forNonError(READY)));
verifyNoMoreInteractions(origLb, backoffPolicyProvider, backoffPolicy1);
}