下面列出了org.springframework.web.context.request.async.CallableProcessingInterceptor#org.springframework.web.context.request.async.DeferredResult 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public DeferredResult<Object> convertToDeferredResult(Object response) {
final DeferredResult<Object> deferredResult = new DeferredResult<>();
Futures.addCallback((ListenableFuture<?>) response, new FutureCallback<Object>() {
@Override
public void onSuccess(@Nullable Object result) {
deferredResult.setResult(result);
}
@Override
public void onFailure(Throwable t) {
deferredResult.setErrorResult(t);
}
});
return deferredResult;
}
private <T> DeferredResult<T> createDeferredResult(Function<MetaServer, T> function, MetaServer metaServer) {
DeferredResult<T> response = new DeferredResult<>();
if (metaServer instanceof RemoteMetaServer) {
executors.execute(new Runnable() {
@Override
public void run() {
try {
response.setResult(function.apply(metaServer));
} catch (RestClientException restException) {
XpipeException outerException = new XpipeException(restException.getMessage(), restException);
outerException.setOnlyLogMessage(true);
response.setErrorResult(outerException);
} catch (Exception e) {
response.setErrorResult(e);
}
}
});
} else {
response.setResult(function.apply(metaServer));
}
return response;
}
@RequestMapping(params = "deferredResultWithDelayedError")
public DeferredResult<Person> getDeferredResultWithDelayedError() {
final DeferredResult<Person> deferredResult = new DeferredResult<>();
new Thread() {
public void run() {
try {
Thread.sleep(100);
deferredResult.setErrorResult(new RuntimeException("Delayed Error"));
}
catch (InterruptedException e) {
/* no-op */
}
}
}.start();
return deferredResult;
}
@Override
public DeferredResult<?> adaptToDeferredResult(Object returnValue) {
Assert.isInstanceOf(CompletionStage.class, returnValue, "CompletionStage expected");
final DeferredResult<Object> result = new DeferredResult<Object>();
@SuppressWarnings("unchecked")
CompletionStage<?> future = (CompletionStage<?>) returnValue;
future.handle(new BiFunction<Object, Throwable, Object>() {
@Override
public Object apply(Object value, Throwable ex) {
if (ex != null) {
result.setErrorResult(ex);
}
else {
result.setResult(value);
}
return null;
}
});
return result;
}
@Around("comet()")
public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
Date requestTime = new Date();
Comet comet = ReflectUtil.getAnnotation(joinPoint, Comet.class);
// 获取记录原型对象
HttpServletRequest request = WebContext.getRequest();
WebCometData cometData = WebCometData.createFormRequest(request, comet.prototype(), cometProperties.isEnableReadRequestBody());
cometData.setApiCode(comet.apiCode());
cometData.setDescription(StringUtils.isEmpty(comet.name()) ? comet.description() : comet.name());
cometData.setRequestType(comet.requestType());
return applyAround(cometData, threadLocal, joinPoint, request, requestTime, comet.name(), comet.tag(), (returnData) -> {
if (returnData.getClass() == DeferredResult.class) {
return "[DeferredResult]";
}
if (returnData.getClass() == WebAsyncTask.class) {
return "[WebAsyncTask]";
}
return returnData;
});
}
@Test
public void testPollNotificationWithSomeNamespaceAsFile() throws Exception {
String namespace = String.format("someNamespace.xml");
when(namespaceUtil.filterNamespaceName(namespace)).thenReturn(namespace);
String someWatchKey = "someKey";
Set<String> watchKeys = Sets.newHashSet(someWatchKey);
when(watchKeysUtil
.assembleAllWatchKeys(someAppId, someCluster, namespace, someDataCenter))
.thenReturn(
watchKeys);
DeferredResult<ResponseEntity<ApolloConfigNotification>>
deferredResult = controller
.pollNotification(someAppId, someCluster, namespace, someDataCenter,
someNotificationId, someClientIp);
assertEquals(watchKeys.size(), deferredResults.size());
for (String watchKey : watchKeys) {
assertTrue(deferredResults.get(watchKey).contains(deferredResult));
}
}
@RequestMapping(value = "/telemetry",method = RequestMethod.POST)
public DeferredResult<ResponseEntity> postTelemetry(@RequestBody String json, HttpServletRequest request) {
DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>();
responseWriter.setResult(new ResponseEntity("ok", HttpStatus.ACCEPTED));
if (quotaExceeded(request, responseWriter)) {
return responseWriter;
}
Map<Long, List<KvEntry>> telemetryMaps = JsonConverter.convertToTelemetry(new JsonParser().parse(json)).getData();
for (Map.Entry<Long,List<KvEntry>> entry : telemetryMaps.entrySet()) {
System.out.println("key= " + entry.getKey());
for (KvEntry kvEntry: entry.getValue()) {
System.out.println("属性名="+kvEntry.getKey()+ " 属性值="+kvEntry.getValueAsString());
}
}
return responseWriter;
}
@GetMapping("/async-deferredresult-timeout")
public DeferredResult<ResponseEntity<?>> handleReqWithTimeouts(Model model) {
LOG.info("Received async request with a configured timeout");
DeferredResult<ResponseEntity<?>> deferredResult = new DeferredResult<>(500l);
deferredResult.onTimeout(() -> deferredResult.setErrorResult(ResponseEntity.status(HttpStatus.REQUEST_TIMEOUT)
.body("Request timeout occurred.")));
CompletableFuture.supplyAsync(() -> {
LOG.info("Processing in separate thread");
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "error";
})
.whenCompleteAsync((result, exc) -> deferredResult.setResult(ResponseEntity.ok(result)));
LOG.info("servlet thread freed");
return deferredResult;
}
@Test
public void testPollNotificationWithDefaultNamespaceAsFile() throws Exception {
String namespace = String.format("%s.%s", defaultNamespace, "properties");
when(namespaceUtil.filterNamespaceName(namespace)).thenReturn(defaultNamespace);
String someWatchKey = "someKey";
String anotherWatchKey = "anotherKey";
Set<String> watchKeys = Sets.newHashSet(someWatchKey, anotherWatchKey);
when(watchKeysUtil
.assembleAllWatchKeys(someAppId, someCluster, defaultNamespace,
someDataCenter)).thenReturn(
watchKeys);
DeferredResult<ResponseEntity<ApolloConfigNotification>>
deferredResult = controller
.pollNotification(someAppId, someCluster, namespace, someDataCenter,
someNotificationId, someClientIp);
assertEquals(watchKeys.size(), deferredResults.size());
for (String watchKey : watchKeys) {
assertTrue(deferredResults.get(watchKey).contains(deferredResult));
}
}
/**
* Sample usage: curl "http://localhost:9080/aggregate-non-blocking-callback?minMs=1000&maxMs=2000"
*
* @param dbLookupMs
* @param dbHits
* @param minMs
* @param maxMs
* @return
* @throws IOException
*/
@RequestMapping("/aggregate-non-blocking-callback")
public DeferredResult<String> nonBlockingAggregator(
@RequestParam(value = "dbLookupMs", required = false, defaultValue = "0") int dbLookupMs,
@RequestParam(value = "dbHits", required = false, defaultValue = "3") int dbHits,
@RequestParam(value = "minMs", required = false, defaultValue = "0") int minMs,
@RequestParam(value = "maxMs", required = false, defaultValue = "0") int maxMs) throws IOException {
LOG.logStartNonBlocking();
DeferredResult<String> deferredResult = new DeferredResult<String>();
dbThreadPoolExecutor.execute(new DbLookupRunnable(LOG, dbLookupMs, dbHits, SP_NON_BLOCKING_URL, minMs, maxMs, TIMEOUT_MS, deferredResult));
LOG.logLeaveThreadNonBlocking();
// Return to let go of the precious thread we are holding on to...
return deferredResult;
}
/**
* Sample usage: curl "http://localhost:9080/router-non-blocking-spring?minMs=1000&maxMs=2000"
*
* The spring version of asynch http client has two major drawbacks
* 1. It doesn't work with the code below, no call is made to the SP (probably my fault :-)
* 2. The call is not executed non-blocking but instead in a separate thread, i.e. it doesn't scale very good...
*
* Due to the scalability issue it is not used but left as documentation on how it can be used given it is change under the hood to being non-blocking
*
* @param minMs y3
* @param maxMs y3
* @return y3
* @throws java.io.IOException
*/
@RequestMapping("/router-non-blocking-spring")
public DeferredResult<String> nonBlockingRouter_Spring(
@RequestParam(value = "minMs", required = false, defaultValue = "0") int minMs,
@RequestParam(value = "maxMs", required = false, defaultValue = "0") int maxMs) throws IOException {
LOG.logStartNonBlocking();
DeferredResult<String> deferredResult = new DeferredResult<String>();
String url = SP_NON_BLOCKING_URL + "?minMs={minMs}&maxMs={maxMs}";
ListenableFuture<ResponseEntity<String>> futureEntity = asyncRestTemplate.getForEntity(url, String.class, minMs, maxMs);
// Register a callback for the completion of the asynchronous rest call
futureEntity.addCallback(new RouterCallback_Spring_AsyncRestTemplate(LOG, deferredResult));
LOG.logLeaveThreadNonBlocking();
// Return to let go of the precious thread we are holding on to...
return deferredResult;
}
@RequestMapping(value = "/device/token/{deviceToken}/attributes/shadow", method = RequestMethod.GET, produces = "application/json")
public DeferredResult<ResponseEntity> getDeviceAttributesShadow(@PathVariable("deviceToken") String deviceToken) {
DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>();
HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
DeviceId _deviceId = ctx.getDevice().getId();
try {
List<ListenableFuture<List<AttributeKvEntry>>> futures = new ArrayList<>();
Arrays.asList(DataConstants.ALL_SCOPES)
.forEach(attributeType -> futures.add(attributesService.findAll(_deviceId, attributeType)));
ListenableFuture<List<List<AttributeKvEntry>>> successfulAsList = Futures.successfulAsList(futures);
List<AttributeKvEntry> result = new ArrayList<>();
successfulAsList.get().forEach(r -> result.addAll(r));
List<ThingsKVData> collect = result.stream().map(attribute -> new ThingsKVData(attribute.getKey(), attribute.getValue())).collect(Collectors.toList());
responseWriter.setResult(new ResponseEntity<>(collect, HttpStatus.OK));
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
}
} else {
responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
}
return responseWriter;
}
public DeferredResult<LoginDetails> send(String content) {
System.out.println("send request");
final DeferredResult<LoginDetails> response = new DeferredResult<>();
ListenableFuture<LoginDetails> future = asyncRabbitTemplate.convertSendAndReceive(requestQueue.getName(),
content);
future.addCallback(new LoginHandlerResponse(response));
System.out.println(asyncRabbitTemplate.isAutoStartup());
System.out.println(asyncRabbitTemplate.isRunning());
return response;
}
@RequestMapping("/deferred")
public DeferredResult<String> deferredMethod() {
log.info("deferred");
span = this.tracer.currentSpan();
span.tag("tag", "value");
DeferredResult<String> result = new DeferredResult<>();
result.setResult("deferred");
return result;
}
@RequestMapping(value = "/deferred", method = RequestMethod.GET)
public DeferredResult<ResponseEntity<?>> timeDeferred() {
log.info("Deferred time request");
DeferredResult<ResponseEntity<?>> result = new DeferredResult<>();
new Thread(() -> {
result.setResult(ResponseEntity.ok(now()));
}, "MyThread-" + counter.incrementAndGet()).start();
return result;
}
/**
* API of APP
*/
@Bean public Docket appApi() {
return new Docket(DocumentationType.SWAGGER_2)
.groupName(APP)
.genericModelSubstitutes(DeferredResult.class)
.useDefaultResponseMessages(false)
.forCodeGeneration(false)
.pathMapping("/")
.select()
.paths(or(regex("/api/.*/app/.*")))
.build()
.apiInfo(appApiInfo());
}
@RequestMapping("/deferred-result/response-body")
public @ResponseBody
DeferredResult<String> deferredResult() {
DeferredResult<String> result = new DeferredResult<String>();
this.responseBodyQueue.add(result);
return result;
}
@RequestMapping(value = "/{deviceToken}/telemetry", method = RequestMethod.POST)
public DeferredResult<ResponseEntity> postTelemetry(@PathVariable("deviceToken") String deviceToken,
@RequestBody String json) {
DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>();
HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
try {
String kafkaOutboundTopic = KafkaTopics.DEVICE_TELEMETRY_TOPIC;
Device device = ctx.getDevice();
if (device != null && device.getId() != null) {
// BasicToDeviceActorSessionMsg basicToDeviceActorSessionMsg = new
// BasicToDeviceActorSessionMsg(
// device, msg);
JsonObject root = new JsonObject();
JsonElement jsonElement = new JsonParser().parse(json);
root.add("d", jsonElement);
root.addProperty("messageId", DEFAULT_REQUEST_ID);
log.info("msg: {}", root.toString());
this.msgProducer.send(kafkaOutboundTopic, device.getId().toString(), root.toString());
responseWriter.setResult(new ResponseEntity<>(HttpStatus.OK));
} else {
responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
}
// process(ctx, JsonConverter.convertToTelemetry(new
// JsonParser().parse(json)));
} catch (IllegalStateException | JsonSyntaxException ex) {
responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
}
} else {
responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
}
return responseWriter;
}
@Test
public void testPollNotificationWithDefaultNamespaceWithNotificationIdOutDated()
throws Exception {
long notificationId = someNotificationId + 1;
ReleaseMessage someReleaseMessage = mock(ReleaseMessage.class);
String someWatchKey = "someKey";
Set<String> watchKeys = Sets.newHashSet(someWatchKey);
when(watchKeysUtil
.assembleAllWatchKeys(someAppId, someCluster, defaultNamespace,
someDataCenter))
.thenReturn(
watchKeys);
when(someReleaseMessage.getId()).thenReturn(notificationId);
when(releaseMessageService.findLatestReleaseMessageForMessages(watchKeys))
.thenReturn(someReleaseMessage);
DeferredResult<ResponseEntity<ApolloConfigNotification>>
deferredResult = controller
.pollNotification(someAppId, someCluster, defaultNamespace, someDataCenter,
someNotificationId, someClientIp);
ResponseEntity<ApolloConfigNotification> result =
(ResponseEntity<ApolloConfigNotification>) deferredResult.getResult();
assertEquals(HttpStatus.OK, result.getStatusCode());
assertEquals(defaultNamespace, result.getBody().getNamespaceName());
assertEquals(notificationId, result.getBody().getNotificationId());
}
void onMessage(String name) {
for (DeferredResult<Person> deferredResult : this.deferredResults) {
deferredResult.setResult(new Person(name));
this.deferredResults.remove(deferredResult);
}
for (ListenableFutureTask<Person> futureTask : this.futureTasks) {
futureTask.run();
this.futureTasks.remove(futureTask);
}
}
@GetMapping("/async-deferredresult")
public DeferredResult<ResponseEntity<?>> handleReqDefResult(Model model) {
LOG.info("Received async-deferredresult request");
DeferredResult<ResponseEntity<?>> output = new DeferredResult<>();
ForkJoinPool.commonPool().submit(() -> {
LOG.info("Processing in separate thread");
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
}
output.setResult(ResponseEntity.ok("ok"));
});
LOG.info("servlet thread freed");
return output;
}
@RequestMapping(value = "/device/token/{deviceToken}/telemetry/shadow", method = RequestMethod.GET, produces = "application/json")
public DeferredResult<ResponseEntity> getDeviceTelemetryShadow(@PathVariable("deviceToken") String deviceToken) {
DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>();
HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
DeviceId _deviceId = ctx.getDevice().getId();
try {
List<TsKvEntry> tsList = timeseriesService.findAllLatest(_deviceId).get();
// if (tsList != null) {
// for (TsKvEntry tsKvEntry : tsList) {
// String key = tsKvEntry.getKey();
// Object value = tsKvEntry.getValue();
// log.info("key:{}, value:{}", key, value);
// }
// }
List<ThingsKVData> collect = tsList.stream().map(attribute -> new ThingsKVData(attribute.getKey(), attribute.getValue())).collect(Collectors.toList());
responseWriter.setResult(new ResponseEntity<>(collect, HttpStatus.OK));
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
}
} else {
responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
}
return responseWriter;
}
@GetMapping(path = ASYNC_ERROR_PATH)
@SuppressWarnings("unused")
public DeferredResult<String> getAsyncError() {
logger.info("Async error endpoint hit");
sleepThread(SLEEP_TIME_MILLIS);
DeferredResult<String> deferredResult = new DeferredResult<>();
deferredResult.setErrorResult(new RuntimeException("Intentional exception by asyncError endpoint"));
return deferredResult;
}
private String setFileConfData(String env, String key, String value){
// fileName
String confFileName = parseConfDataFileName(env, key);
// valid repeat update
Properties existProp = PropUtil.loadFileProp(confFileName);
if (existProp != null
&& value!=null
&& value.equals(existProp.getProperty("value"))
) {
return new File(confFileName).getPath();
}
// write
Properties prop = new Properties();
if (value == null) {
prop.setProperty("value-deleted", "true");
} else {
prop.setProperty("value", value);
}
PropUtil.writeFileProp(prop, confFileName);
logger.info(">>>>>>>>>>> xxl-conf, setFileConfData: confFileName={}, value={}", confFileName, value);
// brocast monitor client
List<DeferredResult> deferredResultList = confDeferredResultMap.get(confFileName);
if (deferredResultList != null) {
confDeferredResultMap.remove(confFileName);
for (DeferredResult deferredResult: deferredResultList) {
deferredResult.setResult(new ReturnT<>(ReturnT.SUCCESS_CODE, "Monitor key update."));
}
}
return new File(confFileName).getPath();
}
@Override
public void handleMessage(ReleaseMessage message, String channel) {
logger.info("message received - channel: {}, message: {}", channel, message);
String content = message.getMessage();
Tracer.logEvent("Apollo.LongPoll.Messages", content);
if (!Topics.APOLLO_RELEASE_TOPIC.equals(channel) || Strings.isNullOrEmpty(content)) {
return;
}
List<String> keys = STRING_SPLITTER.splitToList(content);
//message should be appId+cluster+namespace
if (keys.size() != 3) {
logger.error("message format invalid - {}", content);
return;
}
ResponseEntity<ApolloConfigNotification> notification =
new ResponseEntity<>(
new ApolloConfigNotification(keys.get(2), message.getId()), HttpStatus.OK);
if (!deferredResults.containsKey(content)) {
return;
}
//create a new list to avoid ConcurrentModificationException
List<DeferredResult<ResponseEntity<ApolloConfigNotification>>> results =
Lists.newArrayList(deferredResults.get(content));
logger.debug("Notify {} clients for key {}", results.size(), content);
for (DeferredResult<ResponseEntity<ApolloConfigNotification>> result : results) {
result.setResult(notification);
}
logger.debug("Notification completed");
}
/**
* API of open
*/
@Bean public Docket openApi() {
return new Docket(DocumentationType.SWAGGER_2)
.groupName(OPEN)
.genericModelSubstitutes(DeferredResult.class)
.useDefaultResponseMessages(false)
.forCodeGeneration(false)
.pathMapping("/")
.select()
.paths(or(regex("/api/.*/open/.*")))
.build()
.apiInfo(openApiInfo());
}
public DeferredResultWriter(
Observable<T> observable
, DeferredResult<ResponseEntity<T>> deferredResult
, MultiValueMap<String, String> headers
, HttpStatus status) {
this.deferredResult = deferredResult;
this.headers = headers;
this.status = status;
this.deferredResult.onTimeout(this);
this.deferredResult.onCompletion(this);
observable.subscribe(this);
}
@GetMapping(path = NESTED_ASYNC_CALL_PATH)
@SuppressWarnings("unused")
public DeferredResult<EndpointSpanInfoDto> getNestedAsyncCall() {
DeferredResult<EndpointSpanInfoDto> asyncResponse = new DeferredResult<>();
executor.execute(runnableWithTracing(() -> {
try {
logger.info("Nested async call endpoint hit. Sleeping...");
sleepThread(SLEEP_TIME_MILLIS);
URI nestedCallUri = URI.create(
"http://localhost:" + serverPort + SPAN_INFO_CALL_PATH + "?someQuery=foobar"
);
logger.info("...Calling: " + nestedCallUri.toString());
ListenableFuture<ResponseEntity<EndpointSpanInfoDto>> asyncRestTemplateResultFuture =
wingtipsEnabledAsyncRestTemplate.exchange(
nestedCallUri, HttpMethod.GET, getHttpEntityWithUserIdHeader(), EndpointSpanInfoDto.class
);
asyncRestTemplateResultFuture.addCallback(
successCallbackWithTracing(result -> {
logger.info("AsyncRestTemplate call complete");
asyncResponse.setResult(result.getBody());
}),
failureCallbackWithTracing(asyncResponse::setErrorResult)
);
}
catch(Throwable t) {
asyncResponse.setErrorResult(t);
}
}));
return asyncResponse;
}
@Test
public void supportsReturnType() throws Exception {
assertTrue(this.handler.supportsReturnType(
on(TestController.class).resolveReturnType(DeferredResult.class, String.class)));
assertTrue(this.handler.supportsReturnType(
on(TestController.class).resolveReturnType(ListenableFuture.class, String.class)));
assertTrue(this.handler.supportsReturnType(
on(TestController.class).resolveReturnType(CompletableFuture.class, String.class)));
}
@Override
public void handleReturnValue(Object returnValue, MethodParameter returnType, ModelAndViewContainer mavContainer,
NativeWebRequest webRequest) throws Exception {
if (returnValue == null) {
mavContainer.setRequestHandled(true);
return;
}
final DeferredResult<Object> deferredResult = new DeferredResult<>();
@SuppressWarnings("unchecked")
ListenableFuture<Object> futureValue = (ListenableFuture<Object>) returnValue;
Futures.addCallback(futureValue,
new FutureCallback<Object>() {
@Override
public void onSuccess(@Nullable Object result) {
deferredResult.setResult(result);
}
@Override
public void onFailure(Throwable ex) {
deferredResult.setErrorResult(ex);
}
},
MoreExecutors.directExecutor());
startDeferredResultProcessing(mavContainer, webRequest, deferredResult);
}