下面列出了怎么用org.springframework.http.client.AsyncClientHttpRequestExecution的API类实例代码及写法,或者点击链接到github查看源代码。
public ListenableFuture<ClientHttpResponse> doExecuteAsync(AsyncClientHttpRequestExecution execution,
HttpRequest httpRequest,
byte[] body) throws IOException {
try {
ListenableFuture<ClientHttpResponse> response = execution.executeAsync(httpRequest, body);
logger.info("http request response:" + response);
if (response == null || !HttpStatus.OK.equals(response.get().getStatusCode())) {
throw new IOException("response error");
}
return response;
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new IOException(e.getMessage(), e);
}
}
@Override
public ListenableFuture<ClientHttpResponse> intercept(HttpRequest request, byte[] body,
AsyncClientHttpRequestExecution execution) throws IOException {
ListenableFuture<ClientHttpResponse> future = execution.executeAsync(request, body);
future.addCallback(
resp -> {
response = resp;
this.latch.countDown();
},
ex -> {
exception = ex;
this.latch.countDown();
});
return future;
}
@Override
public ListenableFuture<ClientHttpResponse> intercept(final HttpRequest request,
final byte[] body, final AsyncClientHttpRequestExecution execution)
throws IOException {
final URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
return this.loadBalancer.execute(serviceName,
new LoadBalancerRequest<ListenableFuture<ClientHttpResponse>>() {
@Override
public ListenableFuture<ClientHttpResponse> apply(
final ServiceInstance instance) throws Exception {
HttpRequest serviceRequest = new ServiceRequestWrapper(request,
instance, AsyncLoadBalancerInterceptor.this.loadBalancer);
return execution.executeAsync(serviceRequest, body);
}
});
}
@Override public ListenableFuture<ClientHttpResponse> intercept(HttpRequest req,
byte[] body, AsyncClientHttpRequestExecution execution) throws IOException {
HttpRequestWrapper request = new HttpRequestWrapper(req);
Span span = handler.handleSend(request);
// avoid context sync overhead when we are the root span
TraceContext invocationContext = span.context().parentIdAsLong() != 0
? currentTraceContext.get()
: null;
try (Scope ws = currentTraceContext.maybeScope(span.context())) {
ListenableFuture<ClientHttpResponse> result = execution.executeAsync(req, body);
result.addCallback(new TraceListenableFutureCallback(request, span, handler));
return invocationContext != null
? new TraceContextListenableFuture<>(result, currentTraceContext, invocationContext)
: result;
} catch (Throwable e) {
handler.handleReceive(new ClientHttpResponseWrapper(request, null, e), span);
throw e;
}
}
@Override
public ListenableFuture<ClientHttpResponse> intercept(HttpRequest httpRequest, byte[] body,
AsyncClientHttpRequestExecution execution)
throws IOException {
logger.debug("AsyncRestTemplateCircuitInterceptor start");
URI asUri = httpRequest.getURI();
String httpMethod = httpRequest.getMethod().toString();
String serviceName = asUri.getHost();
String url = asUri.getPath();
logger.info("http with serviceName:{}, menthod:{}, url:{}", serviceName, httpMethod, url);
if (circuitBreakerCore.checkRulesExist(httpMethod, serviceName, url)) {
try {
Method wrappedMethod = AsyncRestTemplateCircuitInterceptor.class.getMethod(
"doExecuteAsync",
AsyncClientHttpRequestExecution.class,
HttpRequest.class, byte[].class);
Object[] args = {httpRequest, body};
ListenableFuture<ClientHttpResponse> response =
(ListenableFuture<ClientHttpResponse>) circuitBreakerCore.process(httpMethod,
serviceName, url, wrappedMethod, this, args);
// todo 熔断返回null
return response;
} catch (Exception e) {
logger.error(e.getMessage(), e);
if (e instanceof CircuitBreakerOpenException) {
throw new RuntimeException(e.getMessage());
} else if (e instanceof IOException) {
throw new IOException(e.getMessage());
} else {
throw new RuntimeException(e.getMessage());
}
}
} else {
return execution.executeAsync(httpRequest, body);
}
}
@Override
public ListenableFuture<ClientHttpResponse> intercept(HttpRequest request, byte[] body,
AsyncClientHttpRequestExecution execution)
throws IOException {
// add system tags from env
FormulaConfigUtils.getSystemTags().forEach((k, v) -> {
request.getHeaders().add(k, v);
});
// add service name from input
request.getHeaders().add(SystemTag.SERVICE_NAME, serviceName);
logger.debug("AsyncRestTemplate: insert {} into httpHeader: {}", SystemTag.SERVICE_NAME, serviceName);
return execution.executeAsync(request, body);
}
@Override
public ListenableFuture<ClientHttpResponse> intercept(HttpRequest request, byte[] body,
AsyncClientHttpRequestExecution execution) throws IOException {
final String urlTemplate = urlTemplateHolder.get();
urlTemplateHolder.remove();
final Clock clock = meterRegistry.config().clock();
final long startTime = clock.monotonicTime();
ListenableFuture<ClientHttpResponse> future;
try {
future = execution.executeAsync(request, body);
} catch (IOException e) {
getTimeBuilder(urlTemplate, request, null, e).register(meterRegistry)
.record(clock.monotonicTime() - startTime, TimeUnit.NANOSECONDS);
throw e;
}
future.addCallback(new ListenableFutureCallback<ClientHttpResponse>() {
@Override
public void onFailure(final Throwable ex) {
getTimeBuilder(urlTemplate, request, null, ex).register(meterRegistry)
.record(clock.monotonicTime() - startTime, TimeUnit.NANOSECONDS);
}
@Override
public void onSuccess(final ClientHttpResponse response) {
getTimeBuilder(urlTemplate, request, response, null).register(meterRegistry)
.record(clock.monotonicTime() - startTime, TimeUnit.NANOSECONDS);
}
});
return future;
}
@Override
public ListenableFuture<ClientHttpResponse> intercept(HttpRequest request, byte[] body,
AsyncClientHttpRequestExecution execution)
throws IOException {
SofaTracerSpan sofaTracerSpan = restTemplateTracer.clientSend(request.getMethod().name());
appendRestTemplateRequestSpanTags(request, sofaTracerSpan);
Exception exception = null;
try {
ListenableFuture<ClientHttpResponse> result = execution.executeAsync(request, body);
result.addCallback(new SofaTraceListenableFutureCallback(restTemplateTracer,
sofaTracerSpan));
return result;
} catch (IOException e) {
exception = e;
throw e;
} finally {
// when error , clear tl soon
if (exception != null) {
SofaTracerSpan currentSpan = SofaTraceContextHolder.getSofaTraceContext()
.getCurrentSpan();
currentSpan.setTag(Tags.ERROR.getKey(), exception.getMessage());
restTemplateTracer.clientReceive(String.valueOf(500));
} else {
// clear current
SofaTraceContextHolder.getSofaTraceContext().pop();
if (sofaTracerSpan != null && sofaTracerSpan.getParentSofaTracerSpan() != null) {
// reset parent
SofaTraceContextHolder.getSofaTraceContext().push(
sofaTracerSpan.getParentSofaTracerSpan());
}
}
}
}
@Override
@SuppressWarnings("deprecation")
public ListenableFuture<ClientHttpResponse> intercept(
HttpRequest request, byte[] body, AsyncClientHttpRequestExecution execution
) throws IOException {
// We need to wrap the request with HttpRequestWrapperWithModifiableHeaders so that tracing info can be
// propagated on the headers.
HttpRequestWrapperWithModifiableHeaders wrapperRequest = new HttpRequestWrapperWithModifiableHeaders(request);
if (surroundCallsWithSubspan) {
return createAsyncSubSpanAndExecute(wrapperRequest, body, execution);
}
return propagateTracingHeadersAndExecute(wrapperRequest, body, execution);
}
/**
* Calls {@link WingtipsSpringUtil#propagateTracingHeaders(HttpMessage, Span)} to propagate the current span's
* tracing state on the given request's headers, then returns
* {@link AsyncClientHttpRequestExecution#executeAsync(HttpRequest, byte[])} to execute the request.
*
* @return The result of calling {@link AsyncClientHttpRequestExecution#executeAsync(HttpRequest, byte[])}.
*/
protected ListenableFuture<ClientHttpResponse> propagateTracingHeadersAndExecute(
HttpRequestWrapperWithModifiableHeaders wrapperRequest, byte[] body, AsyncClientHttpRequestExecution execution
) throws IOException {
propagateTracingHeaders(wrapperRequest, Tracer.getInstance().getCurrentSpan());
// Execute the request/interceptor chain.
return execution.executeAsync(wrapperRequest, body);
}
/**
* Creates a subspan (or new trace if no current span exists) to surround the HTTP request, then returns the
* result of calling {@link #propagateTracingHeadersAndExecute(HttpRequestWrapperWithModifiableHeaders, byte[],
* AsyncClientHttpRequestExecution)} to actually execute the request. A {@link SpanAroundAsyncCallFinisher} will
* be registered as a callback to finish the subspan when the request finishes. Request tagging (and initial span
* naming) is done here, and response tagging (and final span naming) is done in the {@link
* SpanAroundAsyncCallFinisher}.
*
* @return The result of calling {@link #propagateTracingHeadersAndExecute(HttpRequestWrapperWithModifiableHeaders,
* byte[], AsyncClientHttpRequestExecution)} after surrounding the request with a subspan (or new trace if no
* current span exists).
*/
protected ListenableFuture<ClientHttpResponse> createAsyncSubSpanAndExecute(
HttpRequestWrapperWithModifiableHeaders wrapperRequest, byte[] body, AsyncClientHttpRequestExecution execution
) throws IOException {
// Handle subspan stuff. Start by getting the current thread's tracing state (so we can restore it before
// this method returns).
TracingState originalThreadInfo = TracingState.getCurrentThreadTracingState();
SpanAroundAsyncCallFinisher subspanFinisher = null;
try {
// This will start a new trace if necessary, or a subspan if a trace is already in progress.
Span subspan = Tracer.getInstance().startSpanInCurrentContext(
getSubspanSpanName(wrapperRequest, tagAndNamingStrategy, tagAndNamingAdapter),
Span.SpanPurpose.CLIENT
);
// Add request tags to the subspan.
tagAndNamingStrategy.handleRequestTagging(subspan, wrapperRequest, tagAndNamingAdapter);
// Create the callback that will complete the subspan when the request finishes.
subspanFinisher = new SpanAroundAsyncCallFinisher(
TracingState.getCurrentThreadTracingState(), wrapperRequest, tagAndNamingStrategy, tagAndNamingAdapter
);
// Execute the request/interceptor chain, and add the callback to finish the subspan (if one exists).
ListenableFuture<ClientHttpResponse> result = propagateTracingHeadersAndExecute(
wrapperRequest, body, execution
);
result.addCallback(subspanFinisher);
return result;
}
catch(Throwable t) {
// Something went wrong, probably in the execution.executeAsync(...) call. Complete the subspan now
// (if one exists).
if (subspanFinisher != null) {
subspanFinisher.finishCallSpan(null, t);
}
throw t;
}
finally {
// Reset back to the original tracing state that was on this thread when this method began.
//noinspection deprecation
unlinkTracingFromCurrentThread(originalThreadInfo);
}
}
@Before
public void beforeMethod() throws IOException {
resetTracing();
initialSpanNameFromStrategy = new AtomicReference<>("span-name-from-strategy-" + UUID.randomUUID().toString());
strategyInitialSpanNameMethodCalled = new AtomicBoolean(false);
strategyRequestTaggingMethodCalled = new AtomicBoolean(false);
strategyResponseTaggingAndFinalSpanNameMethodCalled = new AtomicBoolean(false);
strategyInitialSpanNameArgs = new AtomicReference<>(null);
strategyRequestTaggingArgs = new AtomicReference<>(null);
strategyResponseTaggingArgs = new AtomicReference<>(null);
tagAndNamingStrategy = new ArgCapturingHttpTagAndSpanNamingStrategy(
initialSpanNameFromStrategy, strategyInitialSpanNameMethodCalled, strategyRequestTaggingMethodCalled,
strategyResponseTaggingAndFinalSpanNameMethodCalled, strategyInitialSpanNameArgs,
strategyRequestTaggingArgs, strategyResponseTaggingArgs
);
tagAndNamingAdapterMock = mock(HttpTagAndSpanNamingAdapter.class);
spanRecorder = new SpanRecorder();
Tracer.getInstance().addSpanLifecycleListener(spanRecorder);
method = HttpMethod.PATCH;
uri = URI.create("http://localhost:4242/" + UUID.randomUUID().toString());
headersMock = mock(HttpHeaders.class);
requestMock = mock(HttpRequest.class);
doReturn(headersMock).when(requestMock).getHeaders();
doReturn(method).when(requestMock).getMethod();
doReturn(uri).when(requestMock).getURI();
body = UUID.randomUUID().toString().getBytes();
executionMock = mock(AsyncClientHttpRequestExecution.class);
doAnswer(invocation -> {
tracingStateAtTimeOfExecution = TracingState.getCurrentThreadTracingState();
executionResponseFuture = new SettableListenableFuture<>();
return executionResponseFuture;
}).when(executionMock).executeAsync(any(HttpRequest.class), any(byte[].class));
normalCompletionResponse = mock(ClientHttpResponse.class);
normalResponseCode = 200; //Normal
doReturn(normalResponseCode).when(normalCompletionResponse).getRawStatusCode();
}
@Override
public ListenableFuture<ClientHttpResponse> intercept(HttpRequest request, byte[] body, AsyncClientHttpRequestExecution execution) throws IOException {
String token = Base64Utils.encodeToString((this.username + ":" + this.password).getBytes(StandardCharsets.UTF_8));
request.getHeaders().add("Authorization", "Basic " + token);
return execution.executeAsync(request, body);
}