类org.springframework.http.client.AsyncClientHttpRequestExecution源码实例Demo

下面列出了怎么用org.springframework.http.client.AsyncClientHttpRequestExecution的API类实例代码及写法,或者点击链接到github查看源代码。

public ListenableFuture<ClientHttpResponse> doExecuteAsync(AsyncClientHttpRequestExecution execution,
                                                           HttpRequest httpRequest,
                                                           byte[] body) throws IOException {
    try {
        ListenableFuture<ClientHttpResponse> response = execution.executeAsync(httpRequest, body);
        logger.info("http request response:" + response);
        if (response == null || !HttpStatus.OK.equals(response.get().getStatusCode())) {
            throw new IOException("response error");
        }
        return response;
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
        throw new IOException(e.getMessage(), e);
    }

}
 
@Override
public ListenableFuture<ClientHttpResponse> intercept(HttpRequest request, byte[] body,
		AsyncClientHttpRequestExecution execution) throws IOException {

	ListenableFuture<ClientHttpResponse> future = execution.executeAsync(request, body);
	future.addCallback(
			resp -> {
				response = resp;
				this.latch.countDown();
			},
			ex -> {
				exception = ex;
				this.latch.countDown();
			});
	return future;
}
 
@Override
public ListenableFuture<ClientHttpResponse> intercept(final HttpRequest request,
		final byte[] body, final AsyncClientHttpRequestExecution execution)
		throws IOException {
	final URI originalUri = request.getURI();
	String serviceName = originalUri.getHost();
	return this.loadBalancer.execute(serviceName,
			new LoadBalancerRequest<ListenableFuture<ClientHttpResponse>>() {
				@Override
				public ListenableFuture<ClientHttpResponse> apply(
						final ServiceInstance instance) throws Exception {
					HttpRequest serviceRequest = new ServiceRequestWrapper(request,
							instance, AsyncLoadBalancerInterceptor.this.loadBalancer);
					return execution.executeAsync(serviceRequest, body);
				}

			});
}
 
@Override public ListenableFuture<ClientHttpResponse> intercept(HttpRequest req,
  byte[] body, AsyncClientHttpRequestExecution execution) throws IOException {
  HttpRequestWrapper request = new HttpRequestWrapper(req);
  Span span = handler.handleSend(request);

  // avoid context sync overhead when we are the root span
  TraceContext invocationContext = span.context().parentIdAsLong() != 0
    ? currentTraceContext.get()
    : null;

  try (Scope ws = currentTraceContext.maybeScope(span.context())) {
    ListenableFuture<ClientHttpResponse> result = execution.executeAsync(req, body);
    result.addCallback(new TraceListenableFutureCallback(request, span, handler));
    return invocationContext != null
      ? new TraceContextListenableFuture<>(result, currentTraceContext, invocationContext)
      : result;
  } catch (Throwable e) {
    handler.handleReceive(new ClientHttpResponseWrapper(request, null, e), span);
    throw e;
  }
}
 
@Override
public ListenableFuture<ClientHttpResponse> intercept(HttpRequest httpRequest, byte[] body,
                                                      AsyncClientHttpRequestExecution execution)
        throws IOException {
    logger.debug("AsyncRestTemplateCircuitInterceptor start");
    URI asUri = httpRequest.getURI();
    String httpMethod = httpRequest.getMethod().toString();
    String serviceName = asUri.getHost();
    String url = asUri.getPath();
    logger.info("http with serviceName:{}, menthod:{}, url:{}", serviceName, httpMethod, url);
    if (circuitBreakerCore.checkRulesExist(httpMethod, serviceName, url)) {
        try {
            Method wrappedMethod = AsyncRestTemplateCircuitInterceptor.class.getMethod(
                    "doExecuteAsync",
                    AsyncClientHttpRequestExecution.class,
                    HttpRequest.class, byte[].class);
            Object[] args = {httpRequest, body};
            ListenableFuture<ClientHttpResponse> response =
                    (ListenableFuture<ClientHttpResponse>) circuitBreakerCore.process(httpMethod,
                            serviceName, url, wrappedMethod, this, args);
            // todo 熔断返回null
            return response;
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            if (e instanceof CircuitBreakerOpenException) {
                throw  new RuntimeException(e.getMessage());
            } else if (e instanceof IOException) {
                throw  new IOException(e.getMessage());
            } else {
                throw new RuntimeException(e.getMessage());
            }
        }
    } else {
        return execution.executeAsync(httpRequest, body);
    }
}
 
@Override
public ListenableFuture<ClientHttpResponse> intercept(HttpRequest request, byte[] body,
                                                      AsyncClientHttpRequestExecution execution)
        throws IOException {
    // add system tags from env
    FormulaConfigUtils.getSystemTags().forEach((k, v) -> {
        request.getHeaders().add(k, v);
    });
    // add service name from input
    request.getHeaders().add(SystemTag.SERVICE_NAME, serviceName);
    logger.debug("AsyncRestTemplate: insert {} into httpHeader: {}", SystemTag.SERVICE_NAME, serviceName);
    return execution.executeAsync(request, body);
}
 
@Override
public ListenableFuture<ClientHttpResponse> intercept(HttpRequest request, byte[] body,
    AsyncClientHttpRequestExecution execution) throws IOException {
    final String urlTemplate = urlTemplateHolder.get();
    urlTemplateHolder.remove();
    final Clock clock = meterRegistry.config().clock();
    final long startTime = clock.monotonicTime();
    ListenableFuture<ClientHttpResponse> future;
    try {
        future = execution.executeAsync(request, body);
    } catch (IOException e) {
        getTimeBuilder(urlTemplate, request, null, e).register(meterRegistry)
            .record(clock.monotonicTime() - startTime, TimeUnit.NANOSECONDS);
        throw e;
    }
    future.addCallback(new ListenableFutureCallback<ClientHttpResponse>() {
        @Override
        public void onFailure(final Throwable ex) {
            getTimeBuilder(urlTemplate, request, null, ex).register(meterRegistry)
                .record(clock.monotonicTime() - startTime, TimeUnit.NANOSECONDS);
        }

        @Override
        public void onSuccess(final ClientHttpResponse response) {
            getTimeBuilder(urlTemplate, request, response, null).register(meterRegistry)
                .record(clock.monotonicTime() - startTime, TimeUnit.NANOSECONDS);
        }
    });
    return future;
}
 
@Override
public ListenableFuture<ClientHttpResponse> intercept(HttpRequest request, byte[] body,
                                                      AsyncClientHttpRequestExecution execution)
                                                                                                throws IOException {
    SofaTracerSpan sofaTracerSpan = restTemplateTracer.clientSend(request.getMethod().name());
    appendRestTemplateRequestSpanTags(request, sofaTracerSpan);
    Exception exception = null;
    try {
        ListenableFuture<ClientHttpResponse> result = execution.executeAsync(request, body);
        result.addCallback(new SofaTraceListenableFutureCallback(restTemplateTracer,
            sofaTracerSpan));
        return result;
    } catch (IOException e) {
        exception = e;
        throw e;
    } finally {
        // when error , clear tl soon
        if (exception != null) {
            SofaTracerSpan currentSpan = SofaTraceContextHolder.getSofaTraceContext()
                .getCurrentSpan();
            currentSpan.setTag(Tags.ERROR.getKey(), exception.getMessage());
            restTemplateTracer.clientReceive(String.valueOf(500));
        } else {
            // clear current
            SofaTraceContextHolder.getSofaTraceContext().pop();
            if (sofaTracerSpan != null && sofaTracerSpan.getParentSofaTracerSpan() != null) {
                // reset parent
                SofaTraceContextHolder.getSofaTraceContext().push(
                    sofaTracerSpan.getParentSofaTracerSpan());
            }
        }
    }
}
 
@Override
@SuppressWarnings("deprecation")
public ListenableFuture<ClientHttpResponse> intercept(
    HttpRequest request, byte[] body, AsyncClientHttpRequestExecution execution
) throws IOException {
    // We need to wrap the request with HttpRequestWrapperWithModifiableHeaders so that tracing info can be
    //      propagated on the headers.
    HttpRequestWrapperWithModifiableHeaders wrapperRequest = new HttpRequestWrapperWithModifiableHeaders(request);

    if (surroundCallsWithSubspan) {
        return createAsyncSubSpanAndExecute(wrapperRequest, body, execution);
    }
    
    return propagateTracingHeadersAndExecute(wrapperRequest, body, execution);
}
 
/**
 * Calls {@link WingtipsSpringUtil#propagateTracingHeaders(HttpMessage, Span)} to propagate the current span's
 * tracing state on the given request's headers, then returns
 * {@link AsyncClientHttpRequestExecution#executeAsync(HttpRequest, byte[])} to execute the request.
 *
 * @return The result of calling {@link AsyncClientHttpRequestExecution#executeAsync(HttpRequest, byte[])}.
 */
protected ListenableFuture<ClientHttpResponse> propagateTracingHeadersAndExecute(
    HttpRequestWrapperWithModifiableHeaders wrapperRequest, byte[] body, AsyncClientHttpRequestExecution execution
) throws IOException {
    propagateTracingHeaders(wrapperRequest, Tracer.getInstance().getCurrentSpan());

    // Execute the request/interceptor chain.
    return execution.executeAsync(wrapperRequest, body);
}
 
/**
 * Creates a subspan (or new trace if no current span exists) to surround the HTTP request, then returns the
 * result of calling {@link #propagateTracingHeadersAndExecute(HttpRequestWrapperWithModifiableHeaders, byte[],
 * AsyncClientHttpRequestExecution)} to actually execute the request. A {@link SpanAroundAsyncCallFinisher} will
 * be registered as a callback to finish the subspan when the request finishes. Request tagging (and initial span
 * naming) is done here, and response tagging (and final span naming) is done in the {@link
 * SpanAroundAsyncCallFinisher}.
 *
 * @return The result of calling {@link #propagateTracingHeadersAndExecute(HttpRequestWrapperWithModifiableHeaders,
 * byte[], AsyncClientHttpRequestExecution)} after surrounding the request with a subspan (or new trace if no
 * current span exists).
 */
protected ListenableFuture<ClientHttpResponse> createAsyncSubSpanAndExecute(
    HttpRequestWrapperWithModifiableHeaders wrapperRequest, byte[] body, AsyncClientHttpRequestExecution execution
) throws IOException {
    // Handle subspan stuff. Start by getting the current thread's tracing state (so we can restore it before
    //      this method returns).
    TracingState originalThreadInfo = TracingState.getCurrentThreadTracingState();

    SpanAroundAsyncCallFinisher subspanFinisher = null;

    try {
        // This will start a new trace if necessary, or a subspan if a trace is already in progress.
        Span subspan = Tracer.getInstance().startSpanInCurrentContext(
            getSubspanSpanName(wrapperRequest, tagAndNamingStrategy, tagAndNamingAdapter),
            Span.SpanPurpose.CLIENT
        );

        // Add request tags to the subspan.
        tagAndNamingStrategy.handleRequestTagging(subspan, wrapperRequest, tagAndNamingAdapter);

        // Create the callback that will complete the subspan when the request finishes.
        subspanFinisher = new SpanAroundAsyncCallFinisher(
            TracingState.getCurrentThreadTracingState(), wrapperRequest, tagAndNamingStrategy, tagAndNamingAdapter
        );

        // Execute the request/interceptor chain, and add the callback to finish the subspan (if one exists).
        ListenableFuture<ClientHttpResponse> result = propagateTracingHeadersAndExecute(
            wrapperRequest, body, execution
        );
        result.addCallback(subspanFinisher);

        return result;
    }
    catch(Throwable t) {
        // Something went wrong, probably in the execution.executeAsync(...) call. Complete the subspan now
        //      (if one exists).
        if (subspanFinisher != null) {
            subspanFinisher.finishCallSpan(null, t);
        }

        throw t;
    }
    finally {
        // Reset back to the original tracing state that was on this thread when this method began.
        //noinspection deprecation
        unlinkTracingFromCurrentThread(originalThreadInfo);
    }
}
 
@Before
public void beforeMethod() throws IOException {
    resetTracing();

    initialSpanNameFromStrategy = new AtomicReference<>("span-name-from-strategy-" + UUID.randomUUID().toString());
    strategyInitialSpanNameMethodCalled = new AtomicBoolean(false);
    strategyRequestTaggingMethodCalled = new AtomicBoolean(false);
    strategyResponseTaggingAndFinalSpanNameMethodCalled = new AtomicBoolean(false);
    strategyInitialSpanNameArgs = new AtomicReference<>(null);
    strategyRequestTaggingArgs = new AtomicReference<>(null);
    strategyResponseTaggingArgs = new AtomicReference<>(null);
    tagAndNamingStrategy = new ArgCapturingHttpTagAndSpanNamingStrategy(
        initialSpanNameFromStrategy, strategyInitialSpanNameMethodCalled, strategyRequestTaggingMethodCalled,
        strategyResponseTaggingAndFinalSpanNameMethodCalled, strategyInitialSpanNameArgs,
        strategyRequestTaggingArgs, strategyResponseTaggingArgs
    );
    tagAndNamingAdapterMock = mock(HttpTagAndSpanNamingAdapter.class);

    spanRecorder = new SpanRecorder();
    Tracer.getInstance().addSpanLifecycleListener(spanRecorder);

    method = HttpMethod.PATCH;
    uri = URI.create("http://localhost:4242/" + UUID.randomUUID().toString());
    headersMock = mock(HttpHeaders.class);
    requestMock = mock(HttpRequest.class);
    doReturn(headersMock).when(requestMock).getHeaders();
    doReturn(method).when(requestMock).getMethod();
    doReturn(uri).when(requestMock).getURI();

    body = UUID.randomUUID().toString().getBytes();
    executionMock = mock(AsyncClientHttpRequestExecution.class);
    doAnswer(invocation -> {
        tracingStateAtTimeOfExecution = TracingState.getCurrentThreadTracingState();
        executionResponseFuture = new SettableListenableFuture<>();
        return executionResponseFuture;
    }).when(executionMock).executeAsync(any(HttpRequest.class), any(byte[].class));

    normalCompletionResponse = mock(ClientHttpResponse.class);
    normalResponseCode = 200; //Normal
    doReturn(normalResponseCode).when(normalCompletionResponse).getRawStatusCode();
}
 
源代码13 项目: haven-platform   文件: BasicAuthAsyncInterceptor.java
@Override
public ListenableFuture<ClientHttpResponse> intercept(HttpRequest request, byte[] body, AsyncClientHttpRequestExecution execution) throws IOException {
    String token = Base64Utils.encodeToString((this.username + ":" + this.password).getBytes(StandardCharsets.UTF_8));
    request.getHeaders().add("Authorization", "Basic " + token);
    return execution.executeAsync(request, body);
}
 
 类所在包
 类方法
 同包方法