下面列出了io.reactivex.rxjava3.core.BackpressureStrategy#com.linecorp.armeria.common.RequestContext 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void onError(Throwable t) {
if (closed || !StatusUtil.isRetryable(t)) {
return;
}
Duration backoff = streamReconnectBackoff;
streamReconnectBackoff = streamReconnectBackoff.multipliedBy(2);
if (streamReconnectBackoff.compareTo(MAX_CHANNEL_RECONNECT_BACKOFF) > 0) {
streamReconnectBackoff = MAX_CHANNEL_RECONNECT_BACKOFF;
}
// Possible to come straight to here without onNext, so access the current RequestContext
// regardless.
RequestContext.current()
.eventLoop()
.schedule(this::open, backoff.toMillis(), TimeUnit.MILLISECONDS);
}
/**
* Binds the specified request-scoped {@link MDC} properties to the specified {@link RequestContext}.
*
* @param ctx the {@link RequestContext}
* @param map the {@link Map} that contains the request-scoped {@link MDC} properties
*/
public static void putAll(RequestContext ctx, Map<String, String> map) {
requireNonNull(ctx, "ctx");
requireNonNull(map, "map");
if (map.isEmpty()) {
return;
}
synchronized (ctx) {
final Object2ObjectMap<String, String> oldMap = getMap(ctx);
final Object2ObjectMap<String, String> newMap;
if (oldMap.isEmpty()) {
newMap = new Object2ObjectOpenHashMap<>(map);
} else {
newMap = new Object2ObjectOpenHashMap<>(oldMap.size() + map.size());
newMap.putAll(oldMap);
newMap.putAll(map);
}
ctx.setAttr(MAP, Object2ObjectMaps.unmodifiable(newMap));
}
}
private Function<? super HttpService, ContentPreviewingService> decodingContentPreviewDecorator() {
final BiPredicate<? super RequestContext, ? super HttpHeaders> previewerPredicate =
(requestContext, headers) -> "gzip".equals(headers.get(HttpHeaderNames.CONTENT_ENCODING));
final BiFunction<HttpHeaders, ByteBuf, String> producer = (headers, data) -> {
final byte[] bytes = new byte[data.readableBytes()];
data.getBytes(0, bytes);
final byte[] decoded;
try (GZIPInputStream unzipper = new GZIPInputStream(new ByteArrayInputStream(bytes))) {
decoded = ByteStreams.toByteArray(unzipper);
} catch (Exception e) {
throw new IllegalArgumentException(e);
}
return new String(decoded, StandardCharsets.UTF_8);
};
final ContentPreviewerFactory factory =
ContentPreviewerFactory.builder()
.maxLength(100)
.binary(producer, previewerPredicate)
.build();
return ContentPreviewingService.newDecorator(factory);
}
@Test
void pushedContextOnAsyncMethodCallback() throws Exception {
final AtomicReference<ClientRequestContext> ctxHolder = new AtomicReference<>();
final AsyncIface client = Clients.newClient(server.httpUri(BINARY) + "/hello", AsyncIface.class);
final ClientRequestContext ctx;
final CountDownLatch latch = new CountDownLatch(1);
try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) {
client.hello("foo", new AsyncMethodCallback<String>() {
@Override
public void onComplete(String response) {
assertThat(response).isEqualTo("Hello, foo!");
ctxHolder.set(RequestContext.currentOrNull());
latch.countDown();
}
@Override
public void onError(Exception exception) {}
});
ctx = captor.get();
}
latch.await();
assertThat(ctx).isSameAs(ctxHolder.get());
}
@Test
void currentOrNull() {
assertThat(ServiceRequestContext.currentOrNull()).isNull();
final ServiceRequestContext sctx = serviceRequestContext();
try (SafeCloseable unused = sctx.push()) {
assertThat(ServiceRequestContext.currentOrNull()).isSameAs(sctx);
final ClientRequestContext cctx = clientRequestContext();
try (SafeCloseable unused1 = cctx.push()) {
assertThat(ServiceRequestContext.currentOrNull()).isSameAs(sctx);
assertThat(ClientRequestContext.current()).isSameAs(cctx);
assertThat((ClientRequestContext) RequestContext.current()).isSameAs(cctx);
}
assertCurrentCtx(sctx);
}
assertCurrentCtx(null);
try (SafeCloseable unused = clientRequestContext().push()) {
assertThatThrownBy(ServiceRequestContext::currentOrNull)
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("not a server-side context");
}
}
@Override
public HttpResponse convertResponse(ServiceRequestContext ctx, ResponseHeaders headers,
@Nullable Object resObj,
HttpHeaders trailingHeaders) throws Exception {
try {
final HttpRequest request = RequestContext.current().request();
if (resObj == null || HttpMethod.DELETE == request.method() ||
(resObj instanceof Iterable && Iterables.size((Iterable<?>) resObj) == 0)) {
return HttpResponse.of(HttpStatus.NO_CONTENT);
}
final ResponseHeaders resHeaders;
if (headers.contentType() == null) {
final ResponseHeadersBuilder builder = headers.toBuilder();
builder.contentType(MediaType.JSON_UTF_8);
resHeaders = builder.build();
} else {
resHeaders = headers;
}
final HttpData httpData = HttpData.wrap(Jackson.writeValueAsBytes(resObj));
return HttpResponse.of(resHeaders, httpData, trailingHeaders);
} catch (JsonProcessingException e) {
logger.debug("Failed to convert a response:", e);
return HttpApiUtil.newResponse(ctx, HttpStatus.INTERNAL_SERVER_ERROR, e);
}
}
@Get
@Path("/param/precedence/{username}")
public String paramPrecedence(RequestContext ctx,
@Param("username") String username,
@Param("password") String password) {
validateContext(ctx);
return username + '/' + password;
}
/**
* Awaits and retrieves the latest revision of the commit that changed the file that matches the specified
* {@link Query} since the specified {@code lastKnownRevision}. This will wait until the specified
* {@code timeoutMillis} passes. If there's no change during the time, the returned future will be
* exceptionally completed with the {@link CancellationException}.
*/
public <T> CompletableFuture<Entry<T>> watchFile(Repository repo, Revision lastKnownRevision,
Query<T> query, long timeoutMillis) {
final ServiceRequestContext ctx = RequestContext.current();
updateRequestTimeout(ctx, timeoutMillis);
final CompletableFuture<Entry<T>> result = repo.watch(lastKnownRevision, query);
if (result.isDone()) {
return result;
}
scheduleTimeout(ctx, result, timeoutMillis);
return result;
}
/**
* Returns a newly created {@link HttpResponse} with the specified {@link HttpStatus} and the formatted
* message.
*/
public static HttpResponse newResponse(RequestContext ctx, HttpStatus status,
String format, Object... args) {
requireNonNull(ctx, "ctx");
requireNonNull(status, "status");
requireNonNull(format, "format");
requireNonNull(args, "args");
return newResponse(ctx, status, String.format(Locale.ENGLISH, format, args));
}
@Nullable
@Override
public String apply(RequestContext ctx, @Nullable Object input) {
if (input == null) {
return null;
}
String rawData = input.toString();
for (Pattern pattern : patterns) {
final Matcher m = pattern.matcher(rawData);
rawData = m.replaceAll("");
}
return rawData;
}
/**
* Returns a newly created {@link HttpResponse} with the specified {@link HttpStatus}, {@code cause} and
* {@code message}.
*/
public static HttpResponse newResponse(RequestContext ctx, HttpStatus status,
Throwable cause, String message) {
requireNonNull(ctx, "ctx");
requireNonNull(status, "status");
requireNonNull(cause, "cause");
requireNonNull(message, "message");
return newResponse0(ctx, status, cause, message);
}
@Get
@Path("/param/default1")
public String paramDefault1(RequestContext ctx,
@Param("username") @Default("hello") String username,
@Param("password") @Default("world") Optional<String> password,
@Param("extra") Optional<String> extra,
@Param("number") Optional<Integer> number) {
// "extra" might be null because there is no default value specified.
validateContext(ctx);
return username + '/' + password.get() + '/' + extra.orElse("(null)") +
(number.isPresent() ? "/" + number.get() : "");
}
/**
* Stores the {@link ByteBuf} backing the specified {@link Message} to be released later using
* {@link #releaseBuffer(Object, RequestContext)}.
*/
public static void storeBuffer(ByteBuf buf, Object message, RequestContext ctx) {
IdentityHashMap<Object, ByteBuf> buffers = ctx.attr(BUFFERS);
if (buffers == null) {
buffers = new IdentityHashMap<>();
ctx.setAttr(BUFFERS, buffers);
}
buffers.put(message, buf);
}
static ImmutableMap<String, String> get() {
ImmutableMap<String, String> loggingContext =
MoreObjects.firstNonNull(
RequestContext.mapCurrent(ctx -> ctx.attr(LOGGING_CONTEXT), ImmutableMap::of),
ImmutableMap.of());
checkNotNull(loggingContext);
return loggingContext;
}
/**
* Creates a new instance that logs {@link Request}s and {@link Response}s at the specified
* {@link LogLevel}s with the specified sanitizers.
*/
AbstractLoggingClient(
Client<I, O> delegate,
@Nullable Logger logger,
Function<? super RequestOnlyLog, LogLevel> requestLogLevelMapper,
Function<? super RequestLog, LogLevel> responseLogLevelMapper,
BiFunction<? super RequestContext, ? super HttpHeaders, ?> requestHeadersSanitizer,
BiFunction<? super RequestContext, Object, ?> requestContentSanitizer,
BiFunction<? super RequestContext, ? super HttpHeaders, ?> requestTrailersSanitizer,
BiFunction<? super RequestContext, ? super HttpHeaders, ?> responseHeadersSanitizer,
BiFunction<? super RequestContext, Object, ?> responseContentSanitizer,
BiFunction<? super RequestContext, ? super HttpHeaders, ?> responseTrailersSanitizer,
BiFunction<? super RequestContext, ? super Throwable, ?> responseCauseSanitizer,
Sampler<? super ClientRequestContext> sampler) {
super(requireNonNull(delegate, "delegate"));
this.logger = logger != null ? logger : LoggerFactory.getLogger(getClass());
this.requestLogLevelMapper = requireNonNull(requestLogLevelMapper, "requestLogLevelMapper");
this.responseLogLevelMapper = requireNonNull(responseLogLevelMapper, "responseLogLevelMapper");
this.requestHeadersSanitizer = requireNonNull(requestHeadersSanitizer, "requestHeadersSanitizer");
this.requestContentSanitizer = requireNonNull(requestContentSanitizer, "requestContentSanitizer");
this.requestTrailersSanitizer = requireNonNull(requestTrailersSanitizer, "requestTrailersSanitizer");
this.responseHeadersSanitizer = requireNonNull(responseHeadersSanitizer, "responseHeadersSanitizer");
this.responseContentSanitizer = requireNonNull(responseContentSanitizer, "responseContentSanitizer");
this.responseTrailersSanitizer = requireNonNull(responseTrailersSanitizer, "responseTrailersSanitizer");
this.responseCauseSanitizer = requireNonNull(responseCauseSanitizer, "responseCauseSanitizer");
this.sampler = requireNonNull(sampler, "sampler");
}
@Override
@Nullable
public String get(String key) {
final RequestContext ctx = RequestContext.currentOrNull();
if (ctx != null) {
final String value = RequestScopedMdc.get(ctx, key);
if (value != null) {
return value;
}
}
return delegate.get(key);
}
@Produces
static ListenableFuture<Recipe> recipe(
List<String> ingredients,
SearchResponse searchResponse,
Supplier<Random> randomSupplier,
YummlyApi yummly) {
int totalCount = searchResponse.totalMatchCount();
ListenableFuture<SearchResponse> future = Futures.immediateFuture(null);
// Get a random recipe to return. Search request fails randomly so try a few times.
Executor executor = RequestContext.current().contextAwareEventLoop();
Random random = randomSupplier.get();
for (int i = 0; i < 5; i++) {
int resultIndex = random.nextInt(totalCount);
future =
Futures.transformAsync(
future,
result -> {
if (result != null && !result.matches().isEmpty()) {
return Futures.immediateFuture(result);
}
return yummly.search(
EggworldConstants.EGG_QUERY,
ingredients,
resultIndex,
1,
true,
ImmutableList.of());
},
executor);
}
return Futures.transform(future, r -> r.matches().get(0), MoreExecutors.directExecutor());
}
/**
* Returns the client-side context of the {@link Request} that is being handled in the current thread.
*
* @return the {@link ClientRequestContext} available in the current thread, or {@code null} if unavailable.
* @throws IllegalStateException if the current context is not a {@link ClientRequestContext}.
*/
@Nullable
static ClientRequestContext currentOrNull() {
final RequestContext ctx = RequestContext.currentOrNull();
if (ctx == null) {
return null;
}
checkState(ctx instanceof ClientRequestContext,
"The current context is not a client-side context: %s", ctx);
return (ClientRequestContext) ctx;
}
/**
* Returns an {@link IllegalStateException} which is raised when popping a context from
* the unexpected thread or forgetting to close the previous context.
*/
public static IllegalStateException newIllegalContextPoppingException(
RequestContext currentCtx, RequestContext contextInStorage) {
requireNonNull(currentCtx, "currentCtx");
requireNonNull(contextInStorage, "contextInStorage");
final IllegalStateException ex = new IllegalStateException(
"The currentCtx " + currentCtx + " is not the same as the context in the storage: " +
contextInStorage + ". This means the callback was called from " +
"unexpected thread or forgetting to close previous context.");
if (REPORTED_THREADS.add(Thread.currentThread())) {
logger.warn("An error occurred while popping a context", ex);
}
return ex;
}
/**
* Creates a new instance that logs {@link HttpRequest}s and {@link HttpResponse}s at the specified
* {@link LogLevel}s with the specified sanitizers.
*/
LoggingService(
HttpService delegate,
@Nullable Logger logger,
Function<? super RequestOnlyLog, LogLevel> requestLogLevelMapper,
Function<? super RequestLog, LogLevel> responseLogLevelMapper,
BiFunction<? super RequestContext, ? super RequestHeaders, ?> requestHeadersSanitizer,
BiFunction<? super RequestContext, Object, ?> requestContentSanitizer,
BiFunction<? super RequestContext, ? super HttpHeaders, ?> requestTrailersSanitizer,
BiFunction<? super RequestContext, ? super ResponseHeaders, ?> responseHeadersSanitizer,
BiFunction<? super RequestContext, Object, ?> responseContentSanitizer,
BiFunction<? super RequestContext, ? super HttpHeaders, ?> responseTrailersSanitizer,
BiFunction<? super RequestContext, ? super Throwable, ?> responseCauseSanitizer,
Sampler<? super ServiceRequestContext> sampler) {
super(requireNonNull(delegate, "delegate"));
this.logger = firstNonNull(logger, defaultLogger);
this.requestLogLevelMapper = requireNonNull(requestLogLevelMapper, "requestLogLevelMapper");
this.responseLogLevelMapper = requireNonNull(responseLogLevelMapper, "responseLogLevelMapper");
this.requestHeadersSanitizer = requireNonNull(requestHeadersSanitizer, "requestHeadersSanitizer");
this.requestContentSanitizer = requireNonNull(requestContentSanitizer, "requestContentSanitizer");
this.requestTrailersSanitizer = requireNonNull(requestTrailersSanitizer, "requestTrailersSanitizer");
this.responseHeadersSanitizer = requireNonNull(responseHeadersSanitizer, "responseHeadersSanitizer");
this.responseContentSanitizer = requireNonNull(responseContentSanitizer, "responseContentSanitizer");
this.responseTrailersSanitizer = requireNonNull(responseTrailersSanitizer, "responseTrailersSanitizer");
this.responseCauseSanitizer = requireNonNull(responseCauseSanitizer, "responseCauseSanitizer");
this.sampler = requireNonNull(sampler, "sampler");
}
/**
* Sets the specified {@link BiPredicate} to produce the preview using the specified
* {@link BiFunction} when the predicate returns {@code true}.
*/
public ContentPreviewerFactoryBuilder binary(
BiFunction<? super HttpHeaders, ? super ByteBuf, String> producer,
BiPredicate<? super RequestContext, ? super HttpHeaders> predicate) {
requireNonNull(predicate, "predicate");
requireNonNull(producer, "producer");
previewSpecsBuilder.add(new PreviewSpec(predicate, PreviewMode.BINARY, producer));
return this;
}
private static Single<String> single(String input) {
RequestContext.current();
return Single.create(emitter -> {
RequestContext.current();
emitter.onSuccess(input);
});
}
private static Completable completable(@SuppressWarnings("unused") String input) {
RequestContext.current();
return Completable.create(emitter -> {
RequestContext.current();
emitter.onComplete();
});
}
public static Logger newMockLogger(RequestContext ctx, AtomicReference<Throwable> capturedCause) {
return mock(Logger.class, withSettings().invocationListeners(report -> {
final DescribedInvocation describedInvocation = report.getInvocation();
if (!(describedInvocation instanceof InvocationOnMock)) {
return;
}
final InvocationOnMock invocation = (InvocationOnMock) describedInvocation;
final Object[] arguments = invocation.getArguments();
if (arguments.length == 0) {
return;
}
if (arguments[0] == null) {
// Invoked at verification phase
return;
}
switch (invocation.getMethod().getName()) {
case "trace":
case "debug":
case "info":
case "warn":
case "error":
try {
assertThat((RequestContext) RequestContext.current()).isSameAs(ctx);
} catch (Throwable cause) {
capturedCause.set(cause);
}
}
}));
}
private static Maybe<String> maybe(String input) {
RequestContext.current();
return Maybe.create(emitter -> {
RequestContext.current();
emitter.onSuccess(input);
});
}
public static Author currentAuthor() {
return currentAuthor(RequestContext.current());
}
public static User currentUser() {
return currentUser(RequestContext.current());
}
@Get
@Path("/param/default_null")
public String paramDefaultNull(RequestContext ctx, @Param @Default String value) {
validateContext(ctx);
return value;
}
@Override
public RequestContext context() {
return ctx;
}
/**
* Throws a newly created {@link HttpResponseException} with the specified {@link HttpStatus},
* {@code cause} and the formatted message.
*/
public static <T> T throwResponse(RequestContext ctx, HttpStatus status, Throwable cause,
String format, Object... args) {
throw HttpResponseException.of(newResponse(ctx, status, cause, format, args));
}