下面列出了怎么用io.grpc.Context.CancellableContext的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Test {@link Context} cancellation propagates from the first node in the call chain all the way
* to the last.
*/
@Test
public void testCascadingCancellationViaOuterContextCancellation() throws Exception {
observedCancellations = new CountDownLatch(2);
receivedCancellations = new CountDownLatch(3);
Future<?> chainReady = startChainingServer(3);
CancellableContext context = Context.current().withCancellation();
Future<SimpleResponse> future;
Context prevContext = context.attach();
try {
future = futureStub.unaryCall(SimpleRequest.getDefaultInstance());
} finally {
context.detach(prevContext);
}
chainReady.get(5, TimeUnit.SECONDS);
context.cancel(null);
try {
future.get(5, TimeUnit.SECONDS);
fail("Expected cancellation");
} catch (ExecutionException ex) {
Status status = Status.fromThrowable(ex);
assertEquals(Status.Code.CANCELLED, status.getCode());
// Should have observed 2 cancellations responses from downstream servers
if (!observedCancellations.await(5, TimeUnit.SECONDS)) {
fail("Expected number of cancellations not observed by clients");
}
if (!receivedCancellations.await(5, TimeUnit.SECONDS)) {
fail("Expected number of cancellations to be received by servers not observed");
}
}
}
@Test
public void watchRemovedWhenClientCloses() throws Exception {
CancellableContext withCancellation = Context.current().withCancellation();
Context prevCtx = withCancellation.attach();
RespObserver respObs1 = new RespObserver();
try {
assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(0);
stub.watch(HealthCheckRequest.newBuilder().setService(SERVICE1).build(), respObs1);
} finally {
withCancellation.detach(prevCtx);
}
RespObserver respObs1b = new RespObserver();
stub.watch(HealthCheckRequest.newBuilder().setService(SERVICE1).build(), respObs1b);
RespObserver respObs2 = new RespObserver();
stub.watch(HealthCheckRequest.newBuilder().setService(SERVICE2).build(), respObs2);
assertThat(respObs1.responses.poll()).isEqualTo(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVICE_UNKNOWN).build());
assertThat(respObs1b.responses.poll()).isEqualTo(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVICE_UNKNOWN).build());
assertThat(respObs2.responses.poll()).isEqualTo(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVICE_UNKNOWN).build());
assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(2);
assertThat(service.numWatchersForTest(SERVICE2)).isEqualTo(1);
assertThat(respObs1.responses).isEmpty();
assertThat(respObs1b.responses).isEmpty();
assertThat(respObs2.responses).isEmpty();
// This will cancel the RPC with respObs1
withCancellation.close();
assertThat(respObs1.responses.poll()).isInstanceOf(Throwable.class);
assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(1);
assertThat(service.numWatchersForTest(SERVICE2)).isEqualTo(1);
assertThat(respObs1.responses).isEmpty();
assertThat(respObs1b.responses).isEmpty();
assertThat(respObs2.responses).isEmpty();
}
/**
* Test {@link Context} cancellation propagates from the first node in the call chain all the way
* to the last.
*/
@Test
public void testCascadingCancellationViaOuterContextCancellation() throws Exception {
observedCancellations = new CountDownLatch(2);
receivedCancellations = new CountDownLatch(3);
Future<?> chainReady = startChainingServer(3);
CancellableContext context = Context.current().withCancellation();
Future<SimpleResponse> future;
Context prevContext = context.attach();
try {
future = futureStub.unaryCall(SimpleRequest.getDefaultInstance());
} finally {
context.detach(prevContext);
}
chainReady.get(5, TimeUnit.SECONDS);
context.cancel(null);
try {
future.get(5, TimeUnit.SECONDS);
fail("Expected cancellation");
} catch (ExecutionException ex) {
Status status = Status.fromThrowable(ex);
assertEquals(Status.Code.CANCELLED, status.getCode());
// Should have observed 2 cancellations responses from downstream servers
if (!observedCancellations.await(5, TimeUnit.SECONDS)) {
fail("Expected number of cancellations not observed by clients");
}
if (!receivedCancellations.await(5, TimeUnit.SECONDS)) {
fail("Expected number of cancellations to be received by servers not observed");
}
}
}
@Test
public void watchRemovedWhenClientCloses() throws Exception {
CancellableContext withCancellation = Context.current().withCancellation();
Context prevCtx = withCancellation.attach();
RespObserver respObs1 = new RespObserver();
try {
assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(0);
stub.watch(HealthCheckRequest.newBuilder().setService(SERVICE1).build(), respObs1);
} finally {
withCancellation.detach(prevCtx);
}
RespObserver respObs1b = new RespObserver();
stub.watch(HealthCheckRequest.newBuilder().setService(SERVICE1).build(), respObs1b);
RespObserver respObs2 = new RespObserver();
stub.watch(HealthCheckRequest.newBuilder().setService(SERVICE2).build(), respObs2);
assertThat(respObs1.responses.poll()).isEqualTo(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVICE_UNKNOWN).build());
assertThat(respObs1b.responses.poll()).isEqualTo(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVICE_UNKNOWN).build());
assertThat(respObs2.responses.poll()).isEqualTo(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVICE_UNKNOWN).build());
assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(2);
assertThat(service.numWatchersForTest(SERVICE2)).isEqualTo(1);
assertThat(respObs1.responses).isEmpty();
assertThat(respObs1b.responses).isEmpty();
assertThat(respObs2.responses).isEmpty();
// This will cancel the RPC with respObs1
withCancellation.close();
assertThat(respObs1.responses.poll()).isInstanceOf(Throwable.class);
assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(1);
assertThat(service.numWatchersForTest(SERVICE2)).isEqualTo(1);
assertThat(respObs1.responses).isEmpty();
assertThat(respObs1b.responses).isEmpty();
assertThat(respObs2.responses).isEmpty();
}