下面列出了怎么用org.springframework.scheduling.annotation.AsyncResult的API类实例代码及写法,或者点击链接到github查看源代码。
@Async // needs to run on a spring background thread
@Override
public Future<Object> execute(Object[] parameters) throws Exception {
Assert.assertNotNull("Parameters cannot be null", parameters);
Assert.assertEquals("Parameters should contain two elements", 2, parameters.length);
Object configObj = parameters[0];
Assert.assertNotNull("The first parameter cannot be null", configObj);
Assert.assertTrue("First parameter should be of type ITestConfig, found type " + configObj.getClass().getName(), configObj instanceof ITestConfig);
Object compNameObj = parameters[1];
Assert.assertNotNull("The second parameter cannot be null", compNameObj);
Assert.assertTrue("Second parameter should be of type String, found type " + compNameObj.getClass().getName(), compNameObj instanceof String);
String compName = (String) compNameObj;
ITestConfig config = (ITestConfig) configObj;
Object result = this.executeTest(config, compName);
return new AsyncResult<>(result);
}
@Override
@Async
public Future<Void> handleVNF(
NetworkServiceDescriptor networkServiceDescriptor,
NetworkServiceRecord networkServiceRecord,
DeployNSRBody body,
Map<String, Set<String>> vduVimInstances,
VirtualNetworkFunctionDescriptor vnfd,
String monitoringIp)
throws NotFoundException, BadFormatException, ExecutionException, InterruptedException {
log.debug(
"Processing VNFD ("
+ vnfd.getName()
+ ") for NSD ("
+ networkServiceDescriptor.getName()
+ ")");
VnfmSender vnfmSender = generator.getVnfmSender(vnfd);
NFVMessage message =
generator.getNextMessage(vnfd, vduVimInstances, networkServiceRecord, body, monitoringIp);
VnfmManagerEndpoint endpoint = generator.getEndpoint(vnfd);
log.debug("----------Executing ACTION: " + message.getAction());
executeAction(vnfmSender.sendCommand(message, endpoint));
log.info("Sent " + message.getAction() + " to VNF: " + vnfd.getName());
return new AsyncResult<>(null);
}
@Async
@Override
public Future<Void> deleteNetwork(VirtualLinkRecord vlr)
throws PluginException, NotFoundException, VimException {
BaseVimInstance vimInstance = this.query(vlr.getVim_id());
if (vimInstance == null)
throw new NotFoundException(
String.format("VimInstance with it %s not found", vlr.getVim_id()));
vimBroker
.getVim(vimInstance.getType())
.delete(
vimInstance,
vimInstance
.getNetworks()
.parallelStream()
.filter(n -> n.getExtId().equals(vlr.getExtId()))
.findFirst()
.orElseThrow(
() ->
new NotFoundException(
String.format("Network with it %s not found", vlr.getExtId()))));
return new AsyncResult<>(null);
}
/**
* redis:更新角色状态
*
* @param roleDO 角色信息
* @return
*/
public Future<String> roleStatusToRedis(RoleDO roleDO, Integer disabledFlag) {
Future<String> future = new AsyncResult<>("redis:变更角色状态成功");
RoleDORedisCache roleDORedisCache = RoleDORedisCache.builder().sysRoleName(roleDO.getSysRoleName())
.sysRoleType(roleDO.getSysRoleType()).disabledFlag(disabledFlag)
.id(roleDO.getId()).remark(roleDO.getRemark()).createBy(roleDO.getCreateBy())
.createTime(roleDO.getCreateTime()).delFlag(roleDO.getDelFlag())
.updateTime(roleDO.getUpdateTime()).updateBy(roleDO.getUpdateBy())
.tenantId(roleDO.getTenantId()).uuid(roleDO.getUuid()).build();
String redisKey = RedisKeyEnum.REDIS_ROLE_STR.getKey() + roleDORedisCache.getId();
String redisAdminKey = RedisKeyEnum.REDIS_ADMIN_ROLE_STR.getKey() + roleDORedisCache.getTenantId();
String roleStr = JSONObject.toJSONString(roleDORedisCache, SerializerFeature.WriteNullStringAsEmpty, SerializerFeature.WriteDateUseDateFormat);
redisUtil.set(redisKey, roleStr);
//角色的类型,0:管理员(老板),1:管理员(员工) 2:普通员工 3:其他
if (roleDO.getSysRoleType().equals(0)) {
redisUtil.set(redisAdminKey, roleStr);
}
return future;
}
@Override
@Async
public Future<Void> indexValidateAllTags(String indexName)
{
final String documentType = configurationHelper.getProperty(ConfigurationValue.ELASTICSEARCH_BDEF_DOCUMENT_TYPE, String.class);
// Get a list of all tags
final List<TagEntity> tagEntityList = Collections.unmodifiableList(tagDao.getTags());
// Remove any index documents that are not in the database
removeAnyIndexDocumentsThatAreNotInTagsList(indexName, documentType, tagEntityList);
// Validate all Tags
tagHelper.executeFunctionForTagEntities(indexName, documentType, tagEntityList, indexFunctionsDao::validateDocumentIndex);
// Return an AsyncResult so callers will know the future is "done". They can call "isDone" to know when this method has completed and they
// can call "get" to see if any exceptions were thrown.
return new AsyncResult<>(null);
}
/**
* 查询用户在线状态
*
* @param fromUserId 用户ID
* @param userIdList 查询列表
* @return
* @since 1.0
*/
@Async
public ListenableFuture<List<IMBaseDefine.UserStat>> userStatusReq(Long fromUserId,
List<Long> userIdList) {
logger.debug("查询用户在线状态, user_cnt={}", userIdList.size());
List<IMBaseDefine.UserStat> userStatList = new ArrayList<>();
for (Long userId : userIdList) {
UserClientInfoManager.UserClientInfo userClientInfo =
userClientInfoManager.getUserInfo(userId);
IMBaseDefine.UserStat.Builder userStatBuiler = IMBaseDefine.UserStat.newBuilder();
userStatBuiler.setUserId(userId);
if (userClientInfo != null) {
userStatBuiler.setStatus(userClientInfo.getStatus());
} else {
userStatBuiler.setStatus(IMBaseDefine.UserStatType.USER_STATUS_OFFLINE);
}
userStatList.add(userStatBuiler.build());
}
AsyncResult<List<IMBaseDefine.UserStat>> result = new AsyncResult<>(userStatList);
return result;
}
@Async
public ListenableFuture<?> webrtcInitateCallRes(long fromId, long toId, long netId) {
// FIXME
// 从当前的通话中查看是否已存在
IMAVCall toAvCall = userClientInfoManager.getCalled(toId);
if (toAvCall != null) {
// 如果存在,返回给呼叫发起方
// TODO 其他端,已经接受了 IMAVCallCancelReq
return AsyncResult.forExecutionException(new Exception());
}
// 如果不存在,则处理呼叫
userClientInfoManager.addCalled(toId, netId);
return AsyncResult.forValue("");
}
@Test
public void nsrManagementDeleteTest()
throws VimException, InterruptedException, ExecutionException, NotFoundException,
WrongStatusException, PluginException, BadFormatException {
NetworkServiceRecord nsd_exp = createNetworkServiceRecord();
when(resourceManagement.release(any(VirtualDeploymentUnit.class), any(VNFCInstance.class)))
.thenReturn(new AsyncResult<>(null));
when(nsrRepository.findFirstByIdAndProjectId(nsd_exp.getId(), projectId)).thenReturn(nsd_exp);
when(vnfrRepository.findByParentNsId(anyString())).thenReturn(new ArrayList<>());
Configuration system = new Configuration();
system.setConfigurationParameters(new HashSet<>());
ConfigurationParameter configurationParameter = new ConfigurationParameter();
configurationParameter.setConfKey("delete-on-all-status");
configurationParameter.setValue("true");
nsrManagement.delete(nsd_exp.getId(), projectId);
}
@Async("taskExecutor")
public Future<String> doTaskTwo() throws Exception {
log.info("开始做任务二");
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(1000));
long end = System.currentTimeMillis();
log.info("完成任务二,耗时:" + (end - start) + "毫秒");
return new AsyncResult<>("任务二完成");
}
@Async
public Future<String> asyncMethodWithReturnType() {
System.out.println("Execute method asynchronously " + Thread.currentThread().getName());
try {
Thread.sleep(5000);
return new AsyncResult<>("hello world !!!!");
} catch (final InterruptedException e) {
}
return null;
}
@Async
public Future<String> doTaskOne() throws Exception {
System.out.println("开始做任务一");
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
System.out.println("完成任务一,耗时:" + (end - start) + "毫秒");
return new AsyncResult<>("任务一完成");
}
@Override
@Async("threadPoolTaskExecutor")
public Future<String> task3() throws Exception {
logger.info("task3开始执行");
Thread.sleep(3000);
logger.info("task3执行结束");
return new AsyncResult<String>("task3 success");
}
@Override
@Async("threadPoolTaskExecutor")
public Future<String> task4() throws Exception {
logger.info("task4开始执行");
Thread.sleep(3000);
logger.info("task4执行结束");
return new AsyncResult<String>("task4 success");
}
@Override
@Async
public Future<Void> removeVnfcDependency(
VirtualNetworkFunctionRecord virtualNetworkFunctionRecord, VNFCInstance vnfcInstance)
throws NotFoundException, BadFormatException, ExecutionException, InterruptedException {
VnfmManagerEndpoint endpoint = generator.getVnfm(virtualNetworkFunctionRecord.getEndpoint());
if (endpoint == null) {
throw new NotFoundException(
"VnfManager of type "
+ virtualNetworkFunctionRecord.getType()
+ " (endpoint = "
+ virtualNetworkFunctionRecord.getEndpoint()
+ ") is not registered");
}
OrVnfmScalingMessage message = new OrVnfmScalingMessage();
message.setAction(Action.SCALE_IN);
message.setVirtualNetworkFunctionRecord(virtualNetworkFunctionRecord);
message.setVnfcInstance(vnfcInstance);
VnfmSender vnfmSender;
try {
vnfmSender = generator.getVnfmSender(endpoint.getEndpointType());
} catch (BeansException e) {
throw new NotFoundException(e);
}
vnfStateHandler.executeAction(vnfmSender.sendCommand(message, endpoint));
return new AsyncResult<>(null);
}
@Override
@Async
public Future<Void> release(VirtualNetworkFunctionRecord virtualNetworkFunctionRecord)
throws NotFoundException, BadFormatException, ExecutionException, InterruptedException {
VnfmManagerEndpoint endpoint = generator.getVnfm(virtualNetworkFunctionRecord.getEndpoint());
if (endpoint == null) {
throw new NotFoundException(
"VnfManager of type "
+ virtualNetworkFunctionRecord.getType()
+ " (endpoint = "
+ virtualNetworkFunctionRecord.getEndpoint()
+ ") is not registered");
}
OrVnfmGenericMessage orVnfmGenericMessage =
new OrVnfmGenericMessage(virtualNetworkFunctionRecord, Action.RELEASE_RESOURCES);
VnfmSender vnfmSender;
try {
vnfmSender = generator.getVnfmSender(endpoint.getEndpointType());
} catch (BeansException e) {
throw new NotFoundException(e);
}
vnfStateHandler.executeAction(vnfmSender.sendCommand(orVnfmGenericMessage, endpoint));
return new AsyncResult<>(null);
}
@Override
@Async
public Future<Void> operate(
BaseVimInstance vimInstance,
VirtualDeploymentUnit vdu,
VNFCInstance vnfcInstance,
String operation)
throws VimException {
switch (operation) {
case "rebuild":
String imageId = this.chooseImage(vdu.getVm_image(), vimInstance);
try {
client.rebuildServer(vimInstance, vnfcInstance.getVc_id(), imageId);
} catch (VimDriverException vde) {
throw new VimException(
"Not rebuild VM with ExtId "
+ vnfcInstance.getVc_id()
+ " successfully from VimInstance "
+ vimInstance.getName()
+ ". Caused by: "
+ vde.getMessage(),
vde);
}
break;
default:
log.error("Operation not supported");
break;
}
return new AsyncResult<>(null);
}
@Async
public <T, B> Future<T> postResource(URI uri, B body, ParameterizedTypeReference<T> returnType) {
RequestEntity<B> request = RequestEntity.post(uri).contentType(MediaType.APPLICATION_JSON).accept(MediaType
.APPLICATION_JSON).body(body);
LOG.debug("Requesting: " + request.toString());
T responseBody = restProxyTemplate.getRestTemplate().exchange(request, returnType).getBody();
LOG.debug("Received: " + responseBody);
return new AsyncResult<>(responseBody);
}
private void setUpThreadPrefixVerification(String threadPrefix) {
when(mockSubscriberTemplate.pullAsync("testSubscription", Integer.MAX_VALUE, true))
.then(arg -> {
assertThat(Thread.currentThread().getName()).startsWith(threadPrefix);
return AsyncResult.forValue(Arrays.asList(mockMessage, mockMessage, mockMessage));
});
}
private Future<CommandResult> executeCommand(String command, File executionDirectory, boolean silent)
throws IOException {
StringWriter writer = new StringWriter();
DefaultExecuteResultHandler resultHandler = new DefaultExecuteResultHandler();
try (WriterOutputStream outputStream = new WriterOutputStream(writer)) {
String outerCommand = "/bin/bash -lc";
CommandLine outer = CommandLine.parse(outerCommand);
outer.addArgument(command, false);
DefaultExecutor executor = new DefaultExecutor();
executor.setWorkingDirectory(executionDirectory);
executor.setStreamHandler(new PumpStreamHandler(silent ? outputStream : System.out, null));
executor.execute(outer, ENVIRONMENT, resultHandler);
resultHandler.waitFor();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return new AsyncResult<CommandResult>(
new CommandResult(resultHandler.getExitValue(), writer.toString(), resultHandler.getException()));
}
@Override
@Async
public ListenableFuture<RolloutGroupsValidation> validateTargetsInGroups(final List<RolloutGroupCreate> groups,
final String targetFilter, final Long createdAt) {
final String baseFilter = RolloutHelper.getTargetFilterQuery(targetFilter, createdAt);
final long totalTargets = targetManagement.countByRsql(baseFilter);
if (totalTargets == 0) {
throw new ConstraintDeclarationException("Rollout target filter does not match any targets");
}
return new AsyncResult<>(validateTargetsInGroups(
groups.stream().map(RolloutGroupCreate::build).collect(Collectors.toList()), baseFilter, totalTargets));
}
@Async("asyncTaskExecutor")
@Override
public Future<String> asyncTask(String s) {
long startTime = System.currentTimeMillis();
try {
//模拟耗时
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
long endTime = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName() + ":Future<String> asyncTask(String s),耗时:" + (endTime - startTime));
return AsyncResult.forValue(s);
}
/**
* 有返回值的异步调用
* 1. 添加 @Async
* 2. 返回值包装为 Future
*/
@Async
public Future<String> asyncSimplesReturn(Serializable id) {
// 注意查看本 日志打印
log.info("asyncSimplesReturn");
return new AsyncResult<>("hhhh:" + id);
}
@Async("taskExecutor")
public void executeCrawler(ICrawler crawler){
Future<CrawlerResult> result = crawler.getResult();
// if(result != null && !result.isDone() && !result.isCancelled() && result.){
// log.warn("The crawler is yet indexing files... No more jobs can be submitted");
// } else {
CrawlerResult crawlerResult = crawler.start();
crawler.setResult(new AsyncResult<>(crawlerResult));
// }
// return result;
}
@Async
public Future<String> doTaskTwo() throws Exception {
System.out.println("开始做任务二");
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
System.out.println("完成任务二,耗时:" + (end - start) + "毫秒");
return new AsyncResult<>("任务二完成");
}
@Override
@Async
public Future<Void> release(
VirtualDeploymentUnit virtualDeploymentUnit, VNFCInstance vnfcInstance)
throws ExecutionException, InterruptedException, PluginException, VimException {
BaseVimInstance vimInstance = vimInstanceRepository.findFirstById(vnfcInstance.getVim_id());
org.openbaton.nfvo.vim_interfaces.resource_management.ResourceManagement vim =
vimBroker.getVim(vimInstance.getType());
log.debug("Removing vnfcInstance: " + vnfcInstance);
vim.release(vnfcInstance, vimInstance).get();
virtualDeploymentUnit.getVnfc().remove(vnfcInstance.getVnfComponent());
return new AsyncResult<>(null);
}
@Async
public Future<String> futureTest() throws InterruptedException {
System.out.println("任务执行开始,需要:" + 1000 + "ms");
for (int i = 0; i < 10; i++) {
Thread.sleep(100);
System.out.println("do:" + i);
}
System.out.println("完成任务");
return new AsyncResult<>(Thread.currentThread().getName());
}
@Async
public Future<String> syncCustomerAccount() throws InterruptedException {
LOGGER.info("Sync Account Processing Started - Thread id: " + Thread.currentThread().getId());
// Sleeps 2s
Thread.sleep(2000);
String processInfo = String.format("Sync Account Processing Completed - Thread Name= %d, Thread Name= %s",
Thread.currentThread().getId(), Thread.currentThread().getName());
LOGGER.info(processInfo);
return new AsyncResult<String>(processInfo);
}
@Async
public Future<String> doAsyncTaskWithReturnType() {
try
{
return new AsyncResult<String>("Running Async Task thread : " + Thread.currentThread().getName());
}
catch (Exception e) {
//
}
return null;
}
@Async
public Future<String> doTaskTwo() throws Exception {
System.out.println("开始做任务二");
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
System.out.println("完成任务二,耗时:" + (end - start) + "毫秒");
return new AsyncResult<>("任务二完成");
}
@Async
public Future<String> doTaskThree() throws Exception {
System.out.println("开始做任务三");
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
System.out.println("完成任务三,耗时:" + (end - start) + "毫秒");
return new AsyncResult<>("任务三完成");
}