下面列出了怎么用com.google.common.util.concurrent.AsyncFunction的API类实例代码及写法,或者点击链接到github查看源代码。
private ListenableFuture<Set<String>> removeFavoriteTags(UUID placeId, UUID recordingId, Set<String> tags) {
VideoMetadata metadata = findByPlaceAndId(placeId, recordingId);
if(metadata != null && metadata.getTags().contains(VideoConstants.TAG_FAVORITE)) {
BatchStatement stmt = new BatchStatement(BatchStatement.Type.LOGGED);
addStatementsForRemoveFromFavoriteTables(stmt, metadata);
return Futures.transformAsync(
VideoV2Util.executeAsyncAndUpdateTimer(session, stmt, RemoveTagsTimer),
(AsyncFunction<ResultSet, Set<String>>) input -> {
Set<String> expectedTags = new HashSet<>(metadata.getTags());
expectedTags.removeAll(tags);
return Futures.immediateFuture(expectedTags);
},
MoreExecutors.directExecutor()
);
}else{
logger.warn("Can not removeFavoriteTags. Either recording id [{}] is invalid or video does not contain Favorite tag [{}]", recordingId, metadata.getTags());
return Futures.immediateFuture(ImmutableSet.<String>of());
}
}
/**
* This handles the callback for processing the asynchronous results returned from a graph traversal
*
* @return
*/
private static AsyncFunction<GraphResultSet, GraphResultSet> processAsyncResults() {
return new AsyncFunction<GraphResultSet, GraphResultSet>() {
public ListenableFuture<GraphResultSet> apply(GraphResultSet rs) {
// How far we can go without triggering the blocking fetch:
int remainingInPage = rs.getAvailableWithoutFetching();
while (--remainingInPage >= 0) {
GraphNode node = rs.iterator().next();
processNode(node);
}
if (rs.isFullyFetched()) {
System.out.println("Finished Processing Asynchronously Paged Results");
//Check to see if we have retrieved everything and if we have exit
return Futures.immediateFuture(rs);
} else {
// If we have not then fetch the next set of results
ListenableFuture<GraphResultSet> future = rs.fetchMoreResults();
return Futures.transform(future, processAsyncResults());
}
}
};
}
@Override
public ListenableFuture<List<DeviceType>> findDevicesByQuery(DeviceTypeSearchQuery query) {
ListenableFuture<List<EntityRelation>> relations = relationService.findByQuery(query.toEntitySearchQuery());
ListenableFuture<List<DeviceType>> devices = Futures.transform(relations, (AsyncFunction<List<EntityRelation>, List<DeviceType>>) relations1 -> {
EntitySearchDirection direction = query.toEntitySearchQuery().getParameters().getDirection();
List<ListenableFuture<DeviceType>> futures = new ArrayList<>();
for (EntityRelation relation : relations1) {
EntityId entityId = direction == EntitySearchDirection.FROM ? relation.getTo() : relation.getFrom();
if (entityId.getEntityType() == ThingType.DEVICE) {
futures.add(findDeviceByIdAsync(new DeviceTypeId(entityId.getId())));
}
}
return Futures.successfulAsList(futures);
});
devices = Futures.transform(devices, new Function<List<DeviceType>, List<DeviceType>>() {
@Nullable
@Override
public List<DeviceType> apply(@Nullable List<DeviceType> deviceList) {
return deviceList.stream().collect(Collectors.toList());
}
});
return devices;
}
@Override
public ListenableFuture<Boolean> deleteEntityRelations(EntityId entity) {
log.trace("Executing deleteEntityRelations [{}]", entity);
validate(entity);
List<ListenableFuture<List<EntityRelation>>> inboundRelationsList = new ArrayList<>();
for (RelationTypeGroup typeGroup : RelationTypeGroup.values()) {
inboundRelationsList.add(relationDao.findAllByTo(entity, typeGroup));
}
ListenableFuture<List<List<EntityRelation>>> inboundRelations = Futures.allAsList(inboundRelationsList);
ListenableFuture<List<Boolean>> inboundDeletions = Futures.transform(inboundRelations, new AsyncFunction<List<List<EntityRelation>>, List<Boolean>>() {
@Override
public ListenableFuture<List<Boolean>> apply(List<List<EntityRelation>> relations) throws Exception {
List<ListenableFuture<Boolean>> results = new ArrayList<>();
for (List<EntityRelation> relationList : relations) {
relationList.stream().forEach(relation -> results.add(relationDao.deleteRelation(relation)));
}
return Futures.allAsList(results);
}
});
ListenableFuture<Boolean> inboundFuture = Futures.transform(inboundDeletions, getListToBooleanFunction());
ListenableFuture<Boolean> outboundFuture = relationDao.deleteOutboundRelations(entity);
return Futures.transform(Futures.allAsList(Arrays.asList(inboundFuture, outboundFuture)), getListToBooleanFunction());
}
@Override
public ListenableFuture<List<EntityRelationInfo>> findInfoByFrom(EntityId from, RelationTypeGroup typeGroup) {
log.trace("Executing findInfoByFrom [{}][{}]", from, typeGroup);
validate(from);
validateTypeGroup(typeGroup);
ListenableFuture<List<EntityRelation>> relations = relationDao.findAllByFrom(from, typeGroup);
ListenableFuture<List<EntityRelationInfo>> relationsInfo = Futures.transform(relations,
(AsyncFunction<List<EntityRelation>, List<EntityRelationInfo>>) relations1 -> {
List<ListenableFuture<EntityRelationInfo>> futures = new ArrayList<>();
relations1.stream().forEach(relation ->
futures.add(fetchRelationInfoAsync(relation,
relation2 -> relation2.getTo(),
(EntityRelationInfo relationInfo, String entityName) -> relationInfo.setToName(entityName)))
);
return Futures.successfulAsList(futures);
});
return relationsInfo;
}
@Override
public ListenableFuture<List<EntityRelationInfo>> findInfoByTo(EntityId to, RelationTypeGroup typeGroup) {
log.trace("Executing findInfoByTo [{}][{}]", to, typeGroup);
validate(to);
validateTypeGroup(typeGroup);
ListenableFuture<List<EntityRelation>> relations = relationDao.findAllByTo(to, typeGroup);
ListenableFuture<List<EntityRelationInfo>> relationsInfo = Futures.transform(relations,
(AsyncFunction<List<EntityRelation>, List<EntityRelationInfo>>) relations1 -> {
List<ListenableFuture<EntityRelationInfo>> futures = new ArrayList<>();
relations1.stream().forEach(relation ->
futures.add(fetchRelationInfoAsync(relation,
relation2 -> relation2.getFrom(),
(EntityRelationInfo relationInfo, String entityName) -> relationInfo.setFromName(entityName)))
);
return Futures.successfulAsList(futures);
});
return relationsInfo;
}
@Override
public ListenableFuture<List<EntityRelationInfo>> findInfoByQuery(EntityRelationsQuery query) {
log.trace("Executing findInfoByQuery [{}]", query);
ListenableFuture<List<EntityRelation>> relations = findByQuery(query);
EntitySearchDirection direction = query.getParameters().getDirection();
ListenableFuture<List<EntityRelationInfo>> relationsInfo = Futures.transform(relations,
(AsyncFunction<List<EntityRelation>, List<EntityRelationInfo>>) relations1 -> {
List<ListenableFuture<EntityRelationInfo>> futures = new ArrayList<>();
relations1.stream().forEach(relation ->
futures.add(fetchRelationInfoAsync(relation,
relation2 -> direction == EntitySearchDirection.FROM ? relation2.getTo() : relation2.getFrom(),
(EntityRelationInfo relationInfo, String entityName) -> {
if (direction == EntitySearchDirection.FROM) {
relationInfo.setToName(entityName);
} else {
relationInfo.setFromName(entityName);
}
}))
);
return Futures.successfulAsList(futures);
});
return relationsInfo;
}
@Override
public ListenableFuture<List<Device>> findDevicesByQuery(DeviceSearchQuery query) {
ListenableFuture<List<EntityRelation>> relations = relationService.findByQuery(query.toEntitySearchQuery());
ListenableFuture<List<Device>> devices = Futures.transform(relations, (AsyncFunction<List<EntityRelation>, List<Device>>) relations1 -> {
EntitySearchDirection direction = query.toEntitySearchQuery().getParameters().getDirection();
List<ListenableFuture<Device>> futures = new ArrayList<>();
for (EntityRelation relation : relations1) {
EntityId entityId = direction == EntitySearchDirection.FROM ? relation.getTo() : relation.getFrom();
if (entityId.getEntityType() == ThingType.DEVICE) {
futures.add(findDeviceByIdAsync(new DeviceId(entityId.getId())));
}
}
return Futures.successfulAsList(futures);
});
devices = Futures.transform(devices, new Function<List<Device>, List<Device>>() {
@Nullable
@Override
public List<Device> apply(@Nullable List<Device> deviceList) {
return deviceList.stream().filter(device -> query.getDeviceTypes().contains(device.getType())).collect(Collectors.toList());
}
});
return devices;
}
@Override
public ListenableFuture<AlarmInfo> findAlarmInfoByIdAsync(AlarmId alarmId) {
log.trace("Executing findAlarmInfoByIdAsync [{}]", alarmId);
validateId(alarmId, "Incorrect alarmId " + alarmId);
return Futures.transform(alarmDao.findAlarmByIdAsync(alarmId.getId()),
(AsyncFunction<Alarm, AlarmInfo>) alarm1 -> {
AlarmInfo alarmInfo = new AlarmInfo(alarm1);
return Futures.transform(
entityService.fetchEntityNameAsync(alarmInfo.getOriginator()), (Function<String, AlarmInfo>)
originatorName -> {
alarmInfo.setOriginatorName(originatorName);
return alarmInfo;
}
);
});
}
@Override
public ListenableFuture<List<AlarmInfo>> findAlarms(AlarmQuery query) {
log.trace("Try to find alarms by entity [{}], searchStatus [{}], status [{}] and pageLink [{}]", query.getAffectedEntityId(), query.getSearchStatus(), query.getStatus(), query.getPageLink());
EntityId affectedEntity = query.getAffectedEntityId();
String searchStatusName;
if (query.getSearchStatus() == null && query.getStatus() == null) {
searchStatusName = AlarmSearchStatus.ANY.name();
} else if (query.getSearchStatus() != null) {
searchStatusName = query.getSearchStatus().name();
} else {
searchStatusName = query.getStatus().name();
}
String relationType = BaseAlarmService.ALARM_RELATION_PREFIX + searchStatusName;
ListenableFuture<List<EntityRelation>> relations = relationDao.findRelations(affectedEntity, relationType, RelationTypeGroup.ALARM, ThingType.ALARM, query.getPageLink());
return Futures.transform(relations, (AsyncFunction<List<EntityRelation>, List<AlarmInfo>>) input -> {
List<ListenableFuture<AlarmInfo>> alarmFutures = new ArrayList<>(input.size());
for (EntityRelation relation : input) {
alarmFutures.add(Futures.transform(
findAlarmByIdAsync(relation.getTo().getId()),
(Function<Alarm, AlarmInfo>) AlarmInfo::new));
}
return Futures.successfulAsList(alarmFutures);
});
}
private AsyncFunction<List<Long>, List<ResultSet>> getFetchChunksAsyncFunction(EntityId entityId, String key, Aggregation aggregation, long startTs, long endTs) {
return partitions -> {
try {
PreparedStatement proto = getFetchStmt(aggregation);
List<ResultSetFuture> futures = new ArrayList<>(partitions.size());
for (Long partition : partitions) {
log.trace("Fetching data for partition [{}] for entityType {} and entityId {}", partition, entityId.getEntityType(), entityId.getId());
BoundStatement stmt = proto.bind();
stmt.setString(0, entityId.getEntityType().name());
stmt.setUUID(1, entityId.getId());
stmt.setString(2, key);
stmt.setLong(3, partition);
stmt.setLong(4, startTs);
stmt.setLong(5, endTs);
log.debug("Generated query [{}] for entityType {} and entityId {}", stmt, entityId.getEntityType(), entityId.getId());
futures.add(executeAsyncRead(stmt));
}
return Futures.allAsList(futures);
} catch (Throwable e) {
log.error("Failed to fetch data", e);
throw e;
}
};
}
@Override
public ListenableFuture<List<AlarmInfo>> findAlarms(AlarmQuery query) {
log.trace("Try to find alarms by entity [{}], status [{}] and pageLink [{}]", query.getAffectedEntityId(),
query.getStatus(), query.getPageLink());
EntityId affectedEntity = query.getAffectedEntityId();
String searchStatusName;
if (query.getSearchStatus() == null && query.getStatus() == null) {
searchStatusName = AlarmSearchStatus.ANY.name();
} else if (query.getSearchStatus() != null) {
searchStatusName = query.getSearchStatus().name();
} else {
searchStatusName = query.getStatus().name();
}
String relationType = BaseAlarmService.ALARM_RELATION_PREFIX + searchStatusName;
ListenableFuture<List<EntityRelation>> relations = relationDao.findRelations(affectedEntity, relationType,
RelationTypeGroup.ALARM, ThingType.ALARM, query.getPageLink());
return Futures.transform(relations, (AsyncFunction<List<EntityRelation>, List<AlarmInfo>>) input -> {
List<ListenableFuture<AlarmInfo>> alarmFutures = new ArrayList<>(input.size());
for (EntityRelation relation : input) {
alarmFutures.add(Futures.transform(findAlarmByIdAsync(relation.getTo().getId()),
(Function<Alarm, AlarmInfo>) AlarmInfo::new));
}
return Futures.successfulAsList(alarmFutures);
});
}
public static ListenableFuture<?> rollupAsync(Collection<ListenableFuture<ResultSet>> futures,
Executor asyncExecutor, DoRollup function) {
return transformAsync(Futures.allAsList(futures), asyncExecutor,
new AsyncFunction<List<ResultSet>, /*@Nullable*/ Object>() {
@Override
@SuppressWarnings("unchecked")
public ListenableFuture</*@Nullable*/ Object> apply(List<ResultSet> list)
throws Exception {
List<Row> rows = new ArrayList<>();
for (ResultSet results : list) {
rows.addAll(results.all());
}
if (rows.isEmpty()) {
return Futures.immediateFuture(null);
}
return (ListenableFuture</*@Nullable*/ Object>) function.execute(rows);
}
});
}
private static <V, R> ListenableFuture<R> transformAsync(ListenableFuture<V> future,
Executor asyncExecutor, AsyncFunction<V, R> function) {
boolean inRollupThread = Session.isInRollupThread();
return Futures.transformAsync(future,
new AsyncFunction<V, R>() {
@Override
public ListenableFuture<R> apply(V input) throws Exception {
boolean priorInRollupThread = Session.isInRollupThread();
Session.setInRollupThread(inRollupThread);
try {
return function.apply(input);
} finally {
Session.setInRollupThread(priorInRollupThread);
}
}
},
// calls to Session.readAsync() inside of the function could block due to the
// per-thread concurrent limit, so this needs to be executed in its own thread, not
// in the cassandra driver thread that completes the last future which will block
// the cassandra driver thread pool
asyncExecutor);
}
@Before
public void setUp() throws Exception {
//noinspection unchecked
graphBuilder = mock(GraphBuilder.class);
when(graphBuilder.getFallback())
.thenReturn(Optional.<AsyncFunction<Throwable, String>>absent());
Map<Input<?>, Object> emptyMap = Collections.emptyMap();
traverseState = new TraverseState(emptyMap, MoreExecutors.sameThreadExecutor(), true);
List<? extends NodeInfo> currentNodeParameters = ImmutableList.of();
currentNodeInfo = new FakeNodeInfo("the node", currentNodeParameters);
List<ListenableFuture<?>> currentNodeValues = ImmutableList.of();
currentCall = new TraverseState.FutureCallInformation(currentNodeInfo, currentNodeValues);
currentCallInfo = new CallInfo(currentNodeInfo, NO_PARAMS);
fallback = new NodeExecutionFallback<String>(graphBuilder, currentCall, traverseState);
}
@Test
public void shouldApplyFallbackToAnyException() throws Exception {
AsyncFunction<Throwable, String> function = new AsyncFunction<Throwable, String>() {
@Override
public ListenableFuture<String> apply(Throwable input) throws Exception {
return immediateFuture("all is well, nothing to see here");
}
};
when(graphBuilder.getFallback()).thenReturn(Optional.of(function));
Throwable expected = new GraphExecutionException(null, currentCallInfo, NO_CALLS);
ListenableFuture<String> future = fallback.create(expected);
assertThat(future.get(), equalTo("all is well, nothing to see here"));
}
public ListenableFuture<Void> executeAllAsync() {
Callable<ListenableFuture<Void>> c = new Callable<ListenableFuture<Void>>() {
@Override
public ListenableFuture<Void> call() throws Exception {
return executeAll();
}
};
ListenableFuture<ListenableFuture<Void>> f = executor.submit(c);
AsyncFunction<ListenableFuture<Void>, Void> function = new AsyncFunction<ListenableFuture<Void>, Void>() {
@Override
public ListenableFuture<Void> apply(ListenableFuture<Void> input) throws Exception {
return input;
}
};
return Futures.transformAsync(f, function, executor);
}
@Override
public ListenableFuture<AlexaMessage> handle(AlexaMessage message, UUID placeId) {
if(message.getPayload() instanceof HealthCheckRequest) {
ShsMetrics.incHealthCheck();
return Futures.transform(
healthCheckHandler.handle(),
(Function<ResponsePayload, AlexaMessage>) input -> {
Preconditions.checkNotNull(input, "input cannot be null");
Header h = Header.v2(message.getHeader().getMessageId(), input.getName(), input.getNamespace());
return new AlexaMessage(h, input);
},
MoreExecutors.directExecutor()
);
}
if(message.getPayload() instanceof RequestPayload) {
ShsMetrics.incShsRequest();
long startTime = System.nanoTime();
Txfm txfm = Txfm.transformerFor(message);
PlatformMessage platformMessage = txfm.txfmRequest(message, placeId, populationCacheMgr.getPopulationByPlaceId(placeId), (int) config.getRequestTimeoutMs());
logger.debug("[{}] transformed to platform message [{}]", message, platformMessage);
return Futures.transformAsync(
busClient.request(platformMessage),
(AsyncFunction<PlatformMessage, AlexaMessage>) input -> {
metrics.timeServiceSuccess(platformMessage.getMessageType(), startTime);
return Futures.immediateFuture(txfm.transformResponse(message, input));
},
executor
);
} else {
logger.warn("received non-directive request from Alexa {}", message);
ShsMetrics.incNonDirective();
return Futures.immediateFailedFuture(new AlexaException(AlexaErrors.unsupportedDirective(message.getHeader().getName())));
}
}
@Override
public ListenableFuture<AlexaMessage> handle(AlexaMessage message, UUID placeId) {
long startTime = System.nanoTime();
Txfm txfm = Txfm.transformerFor(message);
PlatformMessage platformMessage = txfm.txfmRequest(message, placeId, populationCacheMgr.getPopulationByPlaceId(placeId), (int) config.getRequestTimeoutMs());
logger.debug("[{}] transformed to platform message [{}]", message, platformMessage);
return Futures.transformAsync(
busClient.request(platformMessage),
(AsyncFunction<PlatformMessage, AlexaMessage>) input -> {
metrics.timeServiceSuccess(platformMessage.getMessageType(), startTime);
return Futures.immediateFuture(txfm.transformResponse(input, message.getHeader().getCorrelationToken()));
},
executor
);
}
ListenableFuture<Membership> addMember(Membership m, IdentityService identityService)
throws IOException {
Membership existing = members.get(m.getPreferredMemberKey());
if (existing != null) {
return Futures.immediateFuture(existing);
}
String groupId = groupResourceName.get();
ListenableFuture<Operation> created =
identityService.createMembership(groupId, m);
return Futures.transformAsync(
created,
new AsyncFunction<Operation, Membership>() {
@Override
@Nullable
public ListenableFuture<Membership> apply(@Nullable Operation input) {
try {
Membership memberCreated = m.setName(extractResourceName(input));
addMember(memberCreated);
logger.log(Level.FINE, "Successfully created membership {0}", memberCreated);
return Futures.immediateFuture(memberCreated);
} catch (IOException e) {
logger.log(
Level.WARNING,
String.format("Failed to create membership %s under group %s", m, groupId),
e);
return Futures.immediateFailedFuture(e);
}
}
},
getExecutor());
}
/**
* See {@link OneCamera#startPreview}.
*
* @param surface The preview surface to use.
*/
public ListenableFuture<Void> startPreview(final Surface surface)
{
// When we have the preview surface, start the capture session.
List<Surface> surfaceList = new ArrayList<>();
// Workaround of the face detection failure on Nexus 5 and L. (b/21039466)
// Need to create a capture session with the single preview stream first
// to lock it as the first stream. Then resend the another session with preview
// and JPEG stream.
if (ApiHelper.isLorLMr1() && ApiHelper.IS_NEXUS_5)
{
surfaceList.add(surface);
mCaptureSessionCreator.createCaptureSession(surfaceList);
surfaceList.addAll(mOutputSurfaces);
} else
{
surfaceList.addAll(mOutputSurfaces);
surfaceList.add(surface);
}
final ListenableFuture<CameraCaptureSessionProxy> sessionFuture =
mCaptureSessionCreator.createCaptureSession(surfaceList);
return Futures.transform(sessionFuture,
new AsyncFunction<CameraCaptureSessionProxy, Void>()
{
@Override
public ListenableFuture<Void> apply(
CameraCaptureSessionProxy captureSession) throws Exception
{
mSessionListener.onCameraCaptureSessionCreated(captureSession, surface);
return Futures.immediateFuture(null);
}
});
}
/**
* Create a new joined future from two existing futures and a joining function
* that combines the resulting outputs of the previous functions into a single
* result. The resulting future will fail if any of the dependent futures also
* fail.
*/
public static <T1, T2, TResult> ListenableFuture<TResult> joinAll(
final ListenableFuture<T1> f1,
final ListenableFuture<T2> f2,
final AsyncFunction2<T1, T2, TResult> fn)
{
ListenableFuture<?>[] futures = new ListenableFuture<?>[2];
futures[0] = f1;
futures[1] = f2;
// Futures.allAsList is used instead of Futures.successfulAsList because
// allAsList will propagate the failures instead of null values to the
// parameters of the supplied function.
ListenableFuture<List<Object>> result = Futures.<Object>allAsList(futures);
return Futures.transform(result, new AsyncFunction<List<Object>, TResult>()
{
@Override
public ListenableFuture<TResult> apply(@Nullable List<Object> list) throws Exception
{
T1 value1 = (T1) list.get(0);
T2 value2 = (T2) list.get(1);
return fn.apply(value1, value2);
}
});
}
/**
* Create a new joined future from three existing futures and a joining function
* that combines the resulting outputs of the previous functions into a single
* result. The resulting future will fail if any of the dependent futures also
* fail.
*/
public static <T1, T2, T3, TResult> ListenableFuture<TResult> joinAll(
final ListenableFuture<T1> f1,
final ListenableFuture<T2> f2,
final ListenableFuture<T3> f3,
final AsyncFunction3<T1, T2, T3, TResult> fn)
{
ListenableFuture<?>[] futures = new ListenableFuture<?>[3];
futures[0] = f1;
futures[1] = f2;
futures[2] = f3;
// Futures.allAsList is used instead of Futures.successfulAsList because
// allAsList will propagate the failures instead of null values to the
// parameters of the supplied function.
ListenableFuture<List<Object>> result = Futures.<Object>allAsList(futures);
return Futures.transform(result, new AsyncFunction<List<Object>, TResult>()
{
@Override
public ListenableFuture<TResult> apply(@Nullable List<Object> list) throws Exception
{
T1 value1 = (T1) list.get(0);
T2 value2 = (T2) list.get(1);
T3 value3 = (T3) list.get(2);
return fn.apply(value1, value2, value3);
}
});
}
@Override
public ListenableFuture<List<Asset>> findAssetsByQuery(AssetSearchQuery query) {
ListenableFuture<List<EntityRelation>> relations = relationService.findByQuery(query.toEntitySearchQuery());
ListenableFuture<List<Asset>> assets = Futures.transform(relations,
(AsyncFunction<List<EntityRelation>, List<Asset>>) relations1 -> {
EntitySearchDirection direction = query.toEntitySearchQuery().getParameters().getDirection();
List<ListenableFuture<Asset>> futures = new ArrayList<>();
for (EntityRelation relation : relations1) {
EntityId entityId = direction == EntitySearchDirection.FROM ? relation.getTo() : relation.getFrom();
if (entityId.getEntityType() == ThingType.ASSET) {
futures.add(findAssetByIdAsync(new AssetId(entityId.getId())));
}
}
return Futures.successfulAsList(futures);
});
assets = Futures.transform(assets, new Function<List<Asset>, List<Asset>>() {
@Nullable
@Override
public List<Asset> apply(@Nullable List<Asset> assetList) {
return assetList.stream().filter(asset -> query.getAssetTypes().contains(asset.getType()))
.collect(Collectors.toList());
}
});
return assets;
}
private <T> ListenableFuture<T> getFirstItem(ListenableFuture<List<T>> future) {
return Futures.transform(future, new AsyncFunction<List<T>, T>() {
@Override
public ListenableFuture<T> apply(List<T> items) throws Exception {
return Futures.immediateFuture((items != null && items.size() > 0) ? items.get(0) : null);
}
});
}
private <E extends DirectoryObject, F extends ODataEntityFetcher<E, ? extends DirectoryObjectOperations>, O extends ODataOperations>
ListenableFuture<List<E>> getAllObjects(final ODataCollectionFetcher<E, F, O> fetcher) {
return Futures.transform(fetcher.read(), new AsyncFunction<List<E>, List<E>>() {
@Override
public ListenableFuture<List<E>> apply(List<E> entities) throws Exception {
return Futures.successfulAsList(Lists.transform(entities, new Function<E, ListenableFuture<? extends E>>() {
@Override
public ListenableFuture<? extends E> apply(E e) {
return fetcher.getById(e.getobjectId()).read();
}
}));
}
});
}
/**
* Called for each method. Used to do pre-flight checks, like renewing jwt's.
* @param callable
* @param <T>
* @return
*/
private <T> ListenableFuture<T> baseSubmit(final Callable<T> callable) {
if (shouldRenewJWT()) {
return Futures.transform(this.mixer.use(JWTService.class).authorize(new Object()), new AsyncFunction<Object, T>() {
@Override
public ListenableFuture<T> apply(Object o) throws Exception {
MixerHttpClient.this.renewJwt = false;
return MixerHttpClient.this.executor().submit(callable);
}
});
}
return MixerHttpClient.this.executor().submit(callable);
}
/**
* Retrieve a JWT from the api. When passed in a MixerUser use this as the result.
* @return
*/
public <T> CheckedFuture<T, MixerException> authorize(final T value) {
return new JWT.JWTFutureChecker<T>().check(Futures.transform(
this.post("authorize", null, new Object()),
new AsyncFunction<Object, T>() {
@Override
public ListenableFuture<T> apply(Object o) throws Exception {
return Futures.immediateCheckedFuture(value);
}
}
));
}
public static <I, O> ListenableFuture<O> transformAsync(ListenableFuture<I> input,
AsyncFunction<? super I, ? extends O> function,
Executor executor) {
ListenableFuture<O> result = Futures.transformAsync(input, function, executor);
if (input instanceof TimeoutListenableFuture) {
TimeoutListenableFuture<O> newResult = new TimeoutListenableFuture<>(result);
for (ThrowableConsumer<TimeoutException, Exception> timeoutListener : ((TimeoutListenableFuture<I>) input)
.getTimeoutListeners()) {
newResult.addTimeoutListener(timeoutListener);
}
return newResult;
} else {
return result;
}
}
public ListenableFuture<ResultSet> readAsyncFailIfNoRows(Statement statement,
String errorMessage) throws Exception {
return Futures.transformAsync(readAsync(statement),
new AsyncFunction<ResultSet, ResultSet>() {
@Override
public ListenableFuture<ResultSet> apply(ResultSet results) {
if (results.isExhausted()) {
return Futures.immediateFailedFuture(new Exception(errorMessage));
} else {
return Futures.immediateFuture(results);
}
}
},
MoreExecutors.directExecutor());
}