下面列出了io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener#io.grpc.Deadline 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void contextDeadlineShouldBePropagatedToStream() {
Context context = Context.current()
.withDeadlineAfter(1000, TimeUnit.MILLISECONDS, deadlineCancellationExecutor);
Context origContext = context.attach();
ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
method,
MoreExecutors.directExecutor(),
baseCallOptions,
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */);
call.start(callListener, new Metadata());
context.detach(origContext);
ArgumentCaptor<Deadline> deadlineCaptor = ArgumentCaptor.forClass(Deadline.class);
verify(stream).setDeadline(deadlineCaptor.capture());
assertTimeoutBetween(deadlineCaptor.getValue().timeRemaining(TimeUnit.MILLISECONDS), 600, 1000);
}
/**
* Prepares for a check that the subject is deadline within the given tolerance of an
* expected value that will be provided in the next call in the fluent chain.
*/
@CheckReturnValue
public TolerantDeadlineComparison isWithin(final long delta, final TimeUnit timeUnit) {
return new TolerantDeadlineComparison() {
@Override
public void of(Deadline expected) {
Deadline actual = actual();
checkNotNull(actual, "actual value cannot be null. expected=%s", expected);
// This is probably overkill, but easier than thinking about overflow.
BigInteger actualTimeRemaining = BigInteger.valueOf(actual.timeRemaining(NANOSECONDS));
BigInteger expectedTimeRemaining = BigInteger.valueOf(expected.timeRemaining(NANOSECONDS));
BigInteger deltaNanos = BigInteger.valueOf(timeUnit.toNanos(delta));
if (actualTimeRemaining.subtract(expectedTimeRemaining).abs().compareTo(deltaNanos) > 0) {
failWithoutActual(
simpleFact(
lenientFormat(
"%s and <%s> should have been within <%sns> of each other",
actualAsString(), expected, deltaNanos)));
}
}
};
}
@Test
public void nearerDeadlineKept_new() {
// TODO(carl-mastrangelo): the deadlines are very large because they change over time.
// This should be fixed, and is tracked in https://github.com/grpc/grpc-java/issues/2531
JsonObj name = new JsonObj("service", "service");
JsonObj methodConfig = new JsonObj("name", new JsonList(name), "timeout", "1s");
JsonObj serviceConfig = new JsonObj("methodConfig", new JsonList(methodConfig));
ManagedChannelServiceConfig parsedServiceConfig =
createManagedChannelServiceConfig(serviceConfig);
interceptor.handleUpdate(parsedServiceConfig);
Deadline existingDeadline = Deadline.after(1234567890, TimeUnit.NANOSECONDS);
interceptor.interceptCall(
methodDescriptor, CallOptions.DEFAULT.withDeadline(existingDeadline), channel);
verify(channel).newCall(eq(methodDescriptor), callOptionsCap.capture());
assertThat(callOptionsCap.getValue().getDeadline()).isNotEqualTo(existingDeadline);
}
private static void logIfContextNarrowedTimeout(
Deadline effectiveDeadline, @Nullable Deadline outerCallDeadline,
@Nullable Deadline callDeadline) {
if (!log.isLoggable(Level.FINE) || effectiveDeadline == null
|| outerCallDeadline != effectiveDeadline) {
return;
}
long effectiveTimeout = max(0, effectiveDeadline.timeRemaining(TimeUnit.NANOSECONDS));
StringBuilder builder = new StringBuilder(String.format(
"Call timeout set to '%d' ns, due to context deadline.", effectiveTimeout));
if (callDeadline == null) {
builder.append(" Explicit call timeout was not set.");
} else {
long callTimeout = callDeadline.timeRemaining(TimeUnit.NANOSECONDS);
builder.append(String.format(" Explicit call timeout was '%d' ns.", callTimeout));
}
log.fine(builder.toString());
}
@Test
public void deadlineTimeoutPopulatedToHeaders() {
AbstractClientStream.Sink sink = mock(AbstractClientStream.Sink.class);
ClientStream stream = new BaseAbstractClientStream(
allocator, new BaseTransportState(statsTraceCtx, transportTracer), sink, statsTraceCtx,
transportTracer);
stream.setDeadline(Deadline.after(1, TimeUnit.SECONDS));
stream.start(mockListener);
ArgumentCaptor<Metadata> headersCaptor = ArgumentCaptor.forClass(Metadata.class);
verify(sink).writeHeaders(headersCaptor.capture(), ArgumentMatchers.<byte[]>any());
Metadata headers = headersCaptor.getValue();
assertTrue(headers.containsKey(GrpcUtil.TIMEOUT_KEY));
assertThat(headers.get(GrpcUtil.TIMEOUT_KEY).longValue())
.isLessThan(TimeUnit.SECONDS.toNanos(1));
assertThat(headers.get(GrpcUtil.TIMEOUT_KEY).longValue())
.isGreaterThan(TimeUnit.MILLISECONDS.toNanos(600));
}
@Test
public void streamCancelAbortsDeadlineTimer() {
fakeClock.forwardTime(System.nanoTime(), TimeUnit.NANOSECONDS);
ClientCallImpl<Void, Void> call = new ClientCallImpl<>(
method,
MoreExecutors.directExecutor(),
baseCallOptions.withDeadline(Deadline.after(1, TimeUnit.SECONDS)),
provider,
deadlineCancellationExecutor,
channelCallTracer,
/* retryEnabled= */ false);
call.start(callListener, new Metadata());
call.cancel("canceled", null);
// Run the deadline timer, which should have been cancelled by the previous call to cancel()
fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(1) + 1);
verify(stream, times(1)).cancel(statusCaptor.capture());
assertEquals(Status.CANCELLED.getCode(), statusCaptor.getValue().getCode());
}
@Test
public void nearerDeadlineKept_existing() {
JsonObj name = new JsonObj("service", "service");
JsonObj methodConfig = new JsonObj("name", new JsonList(name), "timeout", "100000s");
JsonObj serviceConfig = new JsonObj("methodConfig", new JsonList(methodConfig));
ManagedChannelServiceConfig parsedServiceConfig =
createManagedChannelServiceConfig(serviceConfig);
interceptor.handleUpdate(parsedServiceConfig);
Deadline existingDeadline = Deadline.after(1000, TimeUnit.NANOSECONDS);
interceptor.interceptCall(
methodDescriptor, CallOptions.DEFAULT.withDeadline(existingDeadline), channel);
verify(channel).newCall(eq(methodDescriptor), callOptionsCap.capture());
assertThat(callOptionsCap.getValue().getDeadline()).isEqualTo(existingDeadline);
}
@Test
public void contextDeadlineShouldOverrideLargerCallOptionsDeadline() {
Context context = Context.current()
.withDeadlineAfter(1000, TimeUnit.MILLISECONDS, deadlineCancellationExecutor);
Context origContext = context.attach();
CallOptions callOpts = baseCallOptions.withDeadlineAfter(2000, TimeUnit.MILLISECONDS);
ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
method,
MoreExecutors.directExecutor(),
callOpts,
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */);
call.start(callListener, new Metadata());
context.detach(origContext);
ArgumentCaptor<Deadline> deadlineCaptor = ArgumentCaptor.forClass(Deadline.class);
verify(stream).setDeadline(deadlineCaptor.capture());
assertTimeoutBetween(deadlineCaptor.getValue().timeRemaining(TimeUnit.MILLISECONDS), 600, 1000);
}
@Test
public void contextDeadlineShouldNotOverrideSmallerCallOptionsDeadline() {
Context context = Context.current()
.withDeadlineAfter(2000, TimeUnit.MILLISECONDS, deadlineCancellationExecutor);
Context origContext = context.attach();
CallOptions callOpts = baseCallOptions.withDeadlineAfter(1000, TimeUnit.MILLISECONDS);
ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
method,
MoreExecutors.directExecutor(),
callOpts,
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */);
call.start(callListener, new Metadata());
context.detach(origContext);
ArgumentCaptor<Deadline> deadlineCaptor = ArgumentCaptor.forClass(Deadline.class);
verify(stream).setDeadline(deadlineCaptor.capture());
assertTimeoutBetween(deadlineCaptor.getValue().timeRemaining(TimeUnit.MILLISECONDS), 600, 1000);
}
@Test
public void callOptionsDeadlineShouldBePropagatedToStream() {
CallOptions callOpts = baseCallOptions.withDeadlineAfter(1000, TimeUnit.MILLISECONDS);
ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
method,
MoreExecutors.directExecutor(),
callOpts,
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */);
call.start(callListener, new Metadata());
ArgumentCaptor<Deadline> deadlineCaptor = ArgumentCaptor.forClass(Deadline.class);
verify(stream).setDeadline(deadlineCaptor.capture());
assertTimeoutBetween(deadlineCaptor.getValue().timeRemaining(TimeUnit.MILLISECONDS), 600, 1000);
}
@Test
public void expiredDeadlineCancelsStream_CallOptions() {
fakeClock.forwardTime(System.nanoTime(), TimeUnit.NANOSECONDS);
// The deadline needs to be a number large enough to get encompass the call to start, otherwise
// the scheduled cancellation won't be created, and the call will fail early.
ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
method,
MoreExecutors.directExecutor(),
baseCallOptions.withDeadline(Deadline.after(1, TimeUnit.SECONDS)),
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */);
call.start(callListener, new Metadata());
fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(1) + 1);
verify(stream, times(1)).cancel(statusCaptor.capture());
assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode());
}
@Test
public void streamCancelAbortsDeadlineTimer() {
fakeClock.forwardTime(System.nanoTime(), TimeUnit.NANOSECONDS);
ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
method,
MoreExecutors.directExecutor(),
baseCallOptions.withDeadline(Deadline.after(1, TimeUnit.SECONDS)),
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */);
call.start(callListener, new Metadata());
call.cancel("canceled", null);
// Run the deadline timer, which should have been cancelled by the previous call to cancel()
fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(1) + 1);
verify(stream, times(1)).cancel(statusCaptor.capture());
assertEquals(Status.CANCELLED.getCode(), statusCaptor.getValue().getCode());
}
@Test
public void deadlineTimeoutPopulatedToHeaders() {
AbstractClientStream.Sink sink = mock(AbstractClientStream.Sink.class);
ClientStream stream = new BaseAbstractClientStream(
allocator, new BaseTransportState(statsTraceCtx, transportTracer), sink, statsTraceCtx,
transportTracer);
stream.setDeadline(Deadline.after(1, TimeUnit.SECONDS));
stream.start(mockListener);
ArgumentCaptor<Metadata> headersCaptor = ArgumentCaptor.forClass(Metadata.class);
verify(sink).writeHeaders(headersCaptor.capture(), any(byte[].class));
Metadata headers = headersCaptor.getValue();
assertTrue(headers.containsKey(GrpcUtil.TIMEOUT_KEY));
assertThat(headers.get(GrpcUtil.TIMEOUT_KEY).longValue())
.isLessThan(TimeUnit.SECONDS.toNanos(1));
assertThat(headers.get(GrpcUtil.TIMEOUT_KEY).longValue())
.isGreaterThan(TimeUnit.MILLISECONDS.toNanos(600));
}
protected <ReqT,R> ListenableFuture<R> fuCall(MethodDescriptor<ReqT,R> method, ReqT request,
CallOptions callOptions, long timeoutMs) {
if (timeoutMs <= 0L) {
timeoutMs = defaultTimeoutMs;
}
if (timeoutMs > 0L) {
Deadline deadline = callOptions.getDeadline();
Deadline timeoutDeadline = Deadline.after(timeoutMs, MILLISECONDS);
if (deadline == null || timeoutDeadline.isBefore(deadline)) {
callOptions = callOptions.withDeadline(timeoutDeadline);
} else if (deadline.isExpired()) {
return Futures.immediateFailedFuture(
Status.DEADLINE_EXCEEDED.asRuntimeException());
}
}
final CallOptions finalCallOpts = callOptions;
return sendViaEventLoop && !isEventThread.satisfied()
? Futures.submitAsync(() -> fuCall(method, request, finalCallOpts), ses)
: fuCall(method, request, finalCallOpts);
}
@Test
public void contextDeadlineShouldOverrideLargerCallOptionsDeadline() {
Context context = Context.current()
.withDeadlineAfter(1000, TimeUnit.MILLISECONDS, deadlineCancellationExecutor);
Context origContext = context.attach();
CallOptions callOpts = baseCallOptions.withDeadlineAfter(2000, TimeUnit.MILLISECONDS);
ClientCallImpl<Void, Void> call = new ClientCallImpl<>(
method,
MoreExecutors.directExecutor(),
callOpts,
provider,
deadlineCancellationExecutor,
channelCallTracer,
/* retryEnabled= */ false);
call.start(callListener, new Metadata());
context.detach(origContext);
ArgumentCaptor<Deadline> deadlineCaptor = ArgumentCaptor.forClass(Deadline.class);
verify(stream).setDeadline(deadlineCaptor.capture());
assertTimeoutBetween(deadlineCaptor.getValue().timeRemaining(TimeUnit.MILLISECONDS), 600, 1000);
}
@Test
public void newInputRemovesNonExistentEntry() throws IOException, InterruptedException {
Digest nonexistentDigest =
Digest.newBuilder().setHash("file_does_not_exist").setSizeBytes(1).build();
String nonexistentKey = fileCache.getKey(nonexistentDigest, false);
Entry entry = new Entry(nonexistentKey, 1, Deadline.after(10, SECONDS));
entry.before = entry;
entry.after = entry;
storage.put(nonexistentKey, entry);
NoSuchFileException noSuchFileException = null;
try (InputStream in = fileCache.newInput(nonexistentDigest, 0)) {
fail("should not get here");
} catch (NoSuchFileException e) {
noSuchFileException = e;
}
assertThat(noSuchFileException).isNotNull();
assertThat(storage.containsKey(nonexistentKey)).isFalse();
}
@Test
public void readRemovesNonexistentEntry() throws IOException, InterruptedException {
ByteString content = ByteString.copyFromUtf8("Hello, World");
Blob blob = new Blob(content, DIGEST_UTIL);
fileCache.put(blob);
String key = fileCache.getKey(blob.getDigest(), /* isExecutable=*/ false);
// putCreatesFile verifies this
Files.delete(fileCache.getPath(key));
// update entry with expired deadline
storage.get(key).existsDeadline = Deadline.after(0, SECONDS);
try (InputStream in = fileCache.newInput(blob.getDigest(), /* offset=*/ 0)) {
fail("should not get here");
} catch (NoSuchFileException e) {
// success
}
assertThat(storage.containsKey(key)).isFalse();
}
private GrpclbRouteType doRpcAndGetPath(Deadline deadline) {
logger.info("doRpcAndGetPath deadline: " + deadline);
final SimpleRequest request = SimpleRequest.newBuilder()
.setFillGrpclbRouteType(true)
.build();
GrpclbRouteType result = GrpclbRouteType.GRPCLB_ROUTE_TYPE_UNKNOWN;
try {
SimpleResponse response = blockingStub
.withDeadline(deadline)
.unaryCall(request);
result = response.getGrpclbRouteType();
} catch (StatusRuntimeException ex) {
logger.warning("doRpcAndGetPath failed. Status: " + ex);
return GrpclbRouteType.GRPCLB_ROUTE_TYPE_UNKNOWN;
}
logger.info("doRpcAndGetPath. GrpclbRouteType result: " + result);
if (result != GrpclbRouteType.GRPCLB_ROUTE_TYPE_FALLBACK
&& result != GrpclbRouteType.GRPCLB_ROUTE_TYPE_BACKEND) {
throw new AssertionError("Received invalid LB route type. This suggests "
+ "that the server hasn't implemented this test correctly.");
}
return result;
}
@Test
public void contextDeadlineShouldNotOverrideSmallerCallOptionsDeadline() {
Context context = Context.current()
.withDeadlineAfter(2000, TimeUnit.MILLISECONDS, deadlineCancellationExecutor);
Context origContext = context.attach();
CallOptions callOpts = baseCallOptions.withDeadlineAfter(1000, TimeUnit.MILLISECONDS);
ClientCallImpl<Void, Void> call = new ClientCallImpl<>(
method,
MoreExecutors.directExecutor(),
callOpts,
provider,
deadlineCancellationExecutor,
channelCallTracer,
/* retryEnabled= */ false);
call.start(callListener, new Metadata());
context.detach(origContext);
ArgumentCaptor<Deadline> deadlineCaptor = ArgumentCaptor.forClass(Deadline.class);
verify(stream).setDeadline(deadlineCaptor.capture());
assertTimeoutBetween(deadlineCaptor.getValue().timeRemaining(TimeUnit.MILLISECONDS), 600, 1000);
}
private static void logIfContextNarrowedTimeout(
Deadline effectiveDeadline, @Nullable Deadline outerCallDeadline,
@Nullable Deadline callDeadline) {
if (!log.isLoggable(Level.FINE) || effectiveDeadline == null
|| !effectiveDeadline.equals(outerCallDeadline)) {
return;
}
long effectiveTimeout = max(0, effectiveDeadline.timeRemaining(TimeUnit.NANOSECONDS));
StringBuilder builder = new StringBuilder(String.format(
"Call timeout set to '%d' ns, due to context deadline.", effectiveTimeout));
if (callDeadline == null) {
builder.append(" Explicit call timeout was not set.");
} else {
long callTimeout = callDeadline.timeRemaining(TimeUnit.NANOSECONDS);
builder.append(String.format(" Explicit call timeout was '%d' ns.", callTimeout));
}
log.fine(builder.toString());
}
private Context.CancellableContext createContext(
Metadata headers, StatsTraceContext statsTraceCtx) {
Long timeoutNanos = headers.get(TIMEOUT_KEY);
Context baseContext =
statsTraceCtx
.serverFilterContext(rootContext)
.withValue(io.grpc.InternalServer.SERVER_CONTEXT_KEY, ServerImpl.this);
if (timeoutNanos == null) {
return baseContext.withCancellation();
}
Context.CancellableContext context =
baseContext.withDeadline(
Deadline.after(timeoutNanos, NANOSECONDS, ticker),
transport.getScheduledExecutorService());
return context;
}
@Nullable
private static Deadline min(@Nullable Deadline deadline0, @Nullable Deadline deadline1) {
if (deadline0 == null) {
return deadline1;
}
if (deadline1 == null) {
return deadline0;
}
return deadline0.minimum(deadline1);
}
@Test
public void testDeadlinePropagation() throws Exception {
final AtomicInteger recursionDepthRemaining = new AtomicInteger(3);
final SettableFuture<Deadline> finalDeadline = SettableFuture.create();
class DeadlineSaver extends TestServiceGrpc.TestServiceImplBase {
@Override
public void unaryCall(final SimpleRequest request,
final StreamObserver<SimpleResponse> responseObserver) {
Context.currentContextExecutor(otherWork).execute(new Runnable() {
@Override
public void run() {
try {
if (recursionDepthRemaining.decrementAndGet() == 0) {
finalDeadline.set(Context.current().getDeadline());
responseObserver.onNext(SimpleResponse.getDefaultInstance());
} else {
responseObserver.onNext(blockingStub.unaryCall(request));
}
responseObserver.onCompleted();
} catch (Exception ex) {
responseObserver.onError(ex);
}
}
});
}
}
server = InProcessServerBuilder.forName("channel").executor(otherWork)
.addService(new DeadlineSaver())
.build().start();
Deadline initialDeadline = Deadline.after(1, TimeUnit.MINUTES);
blockingStub.withDeadline(initialDeadline).unaryCall(SimpleRequest.getDefaultInstance());
assertNotSame(initialDeadline, finalDeadline);
// Since deadline is re-calculated at each hop, some variance is acceptable and expected.
assertAbout(deadline())
.that(finalDeadline.get()).isWithin(1, TimeUnit.SECONDS).of(initialDeadline);
}
@Test
public void testConfigureDeadline() {
Deadline deadline = Deadline.after(2, NANOSECONDS);
// Create a default stub
TestServiceGrpc.TestServiceBlockingStub stub = TestServiceGrpc.newBlockingStub(channel);
assertNull(stub.getCallOptions().getDeadline());
// Reconfigure it
TestServiceGrpc.TestServiceBlockingStub reconfiguredStub = stub.withDeadline(deadline);
// New altered config
assertEquals(deadline, reconfiguredStub.getCallOptions().getDeadline());
// Default config unchanged
assertNull(stub.getCallOptions().getDeadline());
}
private static Deadline min(@Nullable Deadline deadline0, @Nullable Deadline deadline1) {
if (deadline0 == null) {
return deadline1;
}
if (deadline1 == null) {
return deadline0;
}
return deadline0.minimum(deadline1);
}
@Override
public void setDeadline(final Deadline deadline) {
delayOrExecute(new Runnable() {
@Override
public void run() {
realStream.setDeadline(deadline);
}
});
}
@Test
public void noDeadlineShouldBePropagatedToStream() {
ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
method,
MoreExecutors.directExecutor(),
baseCallOptions,
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */);
call.start(callListener, new Metadata());
verify(stream, never()).setDeadline(any(Deadline.class));
}
@Test
public void nearerDeadlineKept_existing() {
JsonObj name = new JsonObj("service", "service");
JsonObj methodConfig = new JsonObj("name", new JsonList(name), "timeout", "100000s");
JsonObj serviceConfig = new JsonObj("methodConfig", new JsonList(methodConfig));
interceptor.handleUpdate(serviceConfig);
Deadline existingDeadline = Deadline.after(1000, TimeUnit.NANOSECONDS);
interceptor.interceptCall(
methodDescriptor, CallOptions.DEFAULT.withDeadline(existingDeadline), channel);
verify(channel).newCall(eq(methodDescriptor), callOptionsCap.capture());
assertThat(callOptionsCap.getValue().getDeadline()).isEqualTo(existingDeadline);
}
@Override
public void setDeadline(final Deadline deadline) {
delayOrExecute(new Runnable() {
@Override
public void run() {
realStream.setDeadline(deadline);
}
});
}
@Override
public final void setDeadline(final Deadline deadline) {
class DeadlineEntry implements BufferEntry {
@Override
public void runWith(Substream substream) {
substream.stream.setDeadline(deadline);
}
}
delayOrExecute(new DeadlineEntry());
}