下面列出了io.reactivex.Observable#map ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private Observable<D2Progress> downloadTeis(D2ProgressManager progressManager,
ProgramDataDownloadParams params,
BooleanWrapper allOkay,
Set<ProgramOrganisationUnitLastUpdated> programOrganisationUnitSet) {
List<TeiQuery.Builder> teiQueryBuilders = trackedEntityInstanceQueryBuilderFactory.getTeiQueryBuilders(params);
Observable<List<TrackedEntityInstance>> teiDownloadObservable =
Observable.fromIterable(teiQueryBuilders)
.flatMap(teiQueryBuilder -> {
return getTrackedEntityInstancesWithPaging(teiQueryBuilder, allOkay);
// TODO .subscribeOn(teiDownloadScheduler);
});
Date serverDate = systemInfoRepository.blockingGet().serverDate();
return teiDownloadObservable.map(
teiList -> {
boolean isFullUpdate = params.program() == null;
boolean overwrite = params.overwrite();
persistenceCallFactory.getCall(teiList, isFullUpdate, overwrite).call();
programOrganisationUnitSet.addAll(
TrackedEntityInstanceHelper.getProgramOrganisationUnitTuple(teiList, serverDate));
return progressManager.increaseProgress(TrackedEntityInstance.class, false);
});
}
/**
* {@inheritDoc}
*/
@NonNull
@Override
public ObservableTransformer<ParseSingleGroupOp, Card> getSingleGroupTransformer() {
return new ObservableTransformer<ParseSingleGroupOp, Card>() {
@Override
public ObservableSource<Card> apply(Observable<ParseSingleGroupOp> upstream) {
return upstream.map(new Function<ParseSingleGroupOp, Card>() {
@Override
public Card apply(ParseSingleGroupOp parseSingleGroupOp) throws Exception {
return parseSingleGroup(parseSingleGroupOp.getArg1(), parseSingleGroupOp.getArg2());
}
});
}
};
}
public static ObservableTransformer<WxPayResult, WxPayResult> checkWechatResult() {
return new ObservableTransformer<WxPayResult, WxPayResult>() {
@Override
public ObservableSource<WxPayResult> apply(Observable<WxPayResult> payResultObservable) {
return payResultObservable.map(new Function<WxPayResult, WxPayResult>() {
@Override
public WxPayResult apply(WxPayResult wxPayResult) {
if (!wxPayResult.isSucceed()) {
throw new PayFailedException(String.valueOf(wxPayResult.getErrCode()), wxPayResult.getErrInfo());
}
return wxPayResult;
}
});
}
};
}
/**
* {@inheritDoc}
*/
@NonNull
@Override
public ObservableTransformer<ParseSingleComponentOp, BaseCell> getSingleComponentTransformer() {
return new ObservableTransformer<ParseSingleComponentOp, BaseCell>() {
@Override
public ObservableSource<BaseCell> apply(Observable<ParseSingleComponentOp> upstream) {
return upstream.map(new Function<ParseSingleComponentOp, BaseCell>() {
@Override
public BaseCell apply(ParseSingleComponentOp parseSingleComponentOp) throws Exception {
return parseSingleComponent(parseSingleComponentOp.getArg1(), parseSingleComponentOp.getArg2(), parseSingleComponentOp
.getArg3());
}
});
}
};
}
public <E extends AVObject> Observable<E> saveWholeObject(final Class<E> clazz, final String endpointClass,
String objectId,
JSONObject object, boolean fetchFlag, JSONObject where) {
Observable<AVObject> result = null;
if (StringUtil.isEmpty(objectId)) {
result = wrapObservable(apiService.saveWholeObject(endpointClass, object, fetchFlag, where));
} else {
result = wrapObservable(apiService.saveWholeObject(endpointClass, objectId, object, fetchFlag, where));
}
if (null == result) {
return null;
}
return result.map(new Function<AVObject, E>() {
@Override
public E apply(AVObject avObject) throws Exception {
return Transformer.transform(avObject, clazz);
}
});
}
@Override
@NonNull
@CheckReturnValue
public ObservableSource<DataValue<T>> apply(@NonNull Observable<DataSnapshot> upstream) {
return upstream.map(new Function<DataSnapshot, DataValue<T>>() {
@Override
public DataValue<T> apply(DataSnapshot dataSnapshot) throws Exception {
DataValue<T> result;
if (dataSnapshot.exists()) {
result = DataValue.of(dataSnapshot.getValue(clazz));
} else {
result = DataValue.empty();
}
return result;
}
});
}
/**
* {@inheritDoc}
*/
@NonNull
@Override
public ObservableTransformer<ParseGroupsOp, List<Card>> getGroupTransformer() {
return new ObservableTransformer<ParseGroupsOp, List<Card>>() {
@Override
public ObservableSource<List<Card>> apply(Observable<ParseGroupsOp> upstream) {
return upstream.map(new Function<ParseGroupsOp, List<Card>>() {
@Override
public List<Card> apply(ParseGroupsOp parseGroupsOp) throws Exception {
return parseGroup(parseGroupsOp.getArg1(), parseGroupsOp.getArg2());
}
});
}
};
}
/**
* {@inheritDoc}
*/
@NonNull
@Override
public ObservableTransformer<ParseComponentsOp, List<BaseCell>> getComponentTransformer() {
return new ObservableTransformer<ParseComponentsOp, List<BaseCell>>() {
@Override
public ObservableSource<List<BaseCell>> apply(Observable<ParseComponentsOp> upstream) {
return upstream.map(new Function<ParseComponentsOp, List<BaseCell>>() {
@Override
public List<BaseCell> apply(ParseComponentsOp parseComponentsOp) throws Exception {
return parseComponent(parseComponentsOp.getArg1(), parseComponentsOp.getArg2(), parseComponentsOp.getArg3());
}
});
}
};
}
@Override
public <T> Observable<CacheResult<T>> execute(RxCache rxCache, String cacheKey, long cacheTime, Observable<T> source, Type type) {
return source.map(new Function<T, CacheResult<T>>() {
@Override
public CacheResult<T> apply(@NonNull T t) throws Exception {
return new CacheResult<T>(false, t);
}
});
}
private Observable<TransactionFactory<? extends Transaction>> resolveTransactionFactory(
AggregateTransaction transaction, ReceiptSource aggregateTransactionReceiptSource) {
Observable<List<Transaction>> innerTransactions = Observable
.just(transaction.getInnerTransactions()).flatMapIterable(m -> m)
.flatMap(innerTransaction -> resolveTransaction(innerTransaction,
createExpectedReceiptSource(aggregateTransactionReceiptSource, innerTransaction)))
.toList().toObservable();
return innerTransactions.map(txs -> AggregateTransactionFactory
.create(transaction.getType(), transaction.getNetworkType(), txs,
transaction.getCosignatures()));
}
private Observable<RTMConnectionServerResponse> fetchRTMServerFromRemote(final String routerHost, final String appId,
final String installationId, int secure) {
LOGGER.d("fetchRTMServerFromRemote. router=" + routerHost + ", appId=" + appId
+ ", installationId=" + installationId);
Retrofit tmpRetrofit = retrofit.newBuilder().baseUrl(routerHost).build();
AppRouterService tmpService = tmpRetrofit.create(AppRouterService.class);
Observable<RTMConnectionServerResponse> result = tmpService.getRTMConnectionServer(appId, installationId, secure);
if (AppConfiguration.isAsynchronized()) {
result = result.subscribeOn(Schedulers.io());
}
AppConfiguration.SchedulerCreator creator = AppConfiguration.getDefaultScheduler();
if (null != creator) {
result = result.observeOn(creator.create());
}
return result.map(new Function<RTMConnectionServerResponse, RTMConnectionServerResponse>() {
@Override
public RTMConnectionServerResponse apply(RTMConnectionServerResponse rtmConnectionServerResponse) throws Exception {
SystemSetting setting = AppConfiguration.getDefaultSetting();
if (null != rtmConnectionServerResponse && null != setting) {
rtmConnectionServerResponse.setTtl(rtmConnectionServerResponse.getTtl() + System.currentTimeMillis() / 1000);
String cacheResult = JSON.toJSONString(rtmConnectionServerResponse);
setting.saveString(getPersistenceKeyZone(appId, false), routerHost, cacheResult);
}
return rtmConnectionServerResponse;
}
});
}
public Observable<? extends AVObject> saveObject(final String className, String objectId, JSONObject data,
boolean fetchFlag, JSONObject where) {
Observable<AVObject> object = wrapObservable(apiService.updateObject(className, objectId, data, fetchFlag, where));
if (null == object) {
return null;
}
return object.map(new Function<AVObject, AVObject>() {
public AVObject apply(AVObject avObject) {
LOGGER.d("saveObject finished. intermediaObj=" + avObject.toString() + ", convert to " + className);
return Transformer.transform(avObject, className);
}
});
}
private Observable<MosaicId> getResolvedMosaicId(
Transaction transaction,
UnresolvedMosaicId unresolvedMosaicId,
Observable<Statement> statementObservable, ReceiptSource expectedReceiptSource) {
if (unresolvedMosaicId instanceof MosaicId) {
return Observable.just((MosaicId) unresolvedMosaicId);
}
return statementObservable.map(statement -> statement
.getResolvedMosaicId(getTransactionInfo(transaction).getHeight(), unresolvedMosaicId,
expectedReceiptSource.getPrimaryId(),
expectedReceiptSource.getSecondaryId())
.orElseThrow(() -> new IllegalArgumentException(
"MosaicId could not be resolved for alias "
+ unresolvedMosaicId.getIdAsHex())));
}
@Override
public <T> Observable<CacheResult<T>> execute(RxCache rxCache, final String key, Observable<T> source, Type type) {
return source.map(new Function<T, CacheResult<T>>() {
@Override
public CacheResult<T> apply(@NonNull T t) throws Exception {
return new CacheResult<>(ResultFrom.Remote, key, t);
}
});
}
public Observable<SingleReturn<T>> listen() throws Exception {
Observable<Log> logObservable = createFilter();
if( returns.size() == 0 ){
return logObservable.map(log -> wrapDecodedLogs(null));
} else{
return logObservable.map(log -> {
SType[] decodedParams = SCoder.decodeParams(ByteString.of(log.getData()).hex(), returns);
return wrapDecodedLogs(decodedParams);
});
}
}
private <T extends Transaction> Observable<T> getTransactionOrRaiseError(Address address, String transactionHash,
Observable<T> transactionListener) {
// I may move this method to the Listener
IllegalStateException caller = new IllegalStateException("The Caller");
Observable<TransactionStatusError> errorListener = status(address, transactionHash);
Observable<Object> errorOrTransactionObservable = Observable.merge(transactionListener, errorListener).take(1);
return errorOrTransactionObservable.map(errorOrTransaction -> {
if (errorOrTransaction instanceof TransactionStatusError) {
throw new TransactionStatusException(caller, (TransactionStatusError) errorOrTransaction);
} else {
return (T) errorOrTransaction;
}
});
}
private Observable<TransactionFactory<? extends Transaction>> resolveTransactionFactory(
HashLockTransaction transaction, ReceiptSource expectedReceiptSource) {
Observable<Statement> statementObservable = getStatement(transaction);
Observable<Mosaic> resolvedMosaic = getResolvedMosaic(transaction, transaction.getMosaic(),
statementObservable, expectedReceiptSource
);
return resolvedMosaic.map(mosaic -> HashLockTransactionFactory
.create(transaction.getNetworkType(), mosaic, transaction.getDuration(),
transaction.getHash()));
}
private Observable<TransactionFactory<? extends Transaction>> resolveTransactionFactory(
SecretProofTransaction transaction, ReceiptSource expectedReceiptSource) {
Observable<Statement> statementObservable = getStatement(transaction);
Observable<Address> resolvedAddress = getResolvedAddress(transaction,
transaction.getRecipient(), statementObservable, expectedReceiptSource);
return resolvedAddress.map(address -> SecretProofTransactionFactory
.create(transaction.getNetworkType(), transaction.getHashType(), address,
transaction.getSecret(), transaction.getProof()));
}
private Observable<TransactionFactory<? extends Transaction>> resolveTransactionFactory(
MosaicMetadataTransaction transaction, ReceiptSource expectedReceiptSource) {
Observable<Statement> statementObservable = getStatement(transaction);
Observable<MosaicId> resolvedMosaicId = getResolvedMosaicId(transaction,
transaction.getTargetMosaicId(), statementObservable, expectedReceiptSource);
return resolvedMosaicId.map(mosaicId -> MosaicMetadataTransactionFactory
.create(transaction.getNetworkType(), transaction.getTargetAddress(), mosaicId,
transaction.getScopedMetadataKey(), transaction.getValue())
.valueSizeDelta(transaction.getValueSizeDelta()).valueSize(transaction.getValueSize()));
}
public Observable<Boolean> transactionFromAddress(final Transaction transaction, final Address address,
final Observable<List<NamespaceId>> namespaceIdsObservable) {
if (transaction.getSigner().filter(s -> s.getAddress().equals(address)).isPresent()) {
return Observable.just(true);
}
if (transaction instanceof AggregateTransaction) {
final AggregateTransaction aggregateTransaction = (AggregateTransaction) transaction;
if (aggregateTransaction.getCosignatures().stream()
.anyMatch(c -> c.getSigner().getAddress().equals(address))) {
return Observable.just(true);
}
//Recursion...
Observable<Transaction> innerTransactionObservable = Observable
.fromIterable(aggregateTransaction.getInnerTransactions());
return innerTransactionObservable
.flatMap(t -> this.transactionFromAddress(t, address, namespaceIdsObservable).filter(a -> a))
.first(false).toObservable();
}
if (transaction instanceof PublicKeyLinkTransaction) {
return Observable.just(Address
.createFromPublicKey(((PublicKeyLinkTransaction) transaction).getLinkedPublicKey().toHex(),
transaction.getNetworkType()).equals(address));
}
if (transaction instanceof MetadataTransaction) {
MetadataTransaction metadataTransaction = (MetadataTransaction) transaction;
return Observable.just(metadataTransaction.getTargetAddress().equals(address));
}
if (transaction instanceof TargetAddressTransaction) {
TargetAddressTransaction targetAddressTransaction = (TargetAddressTransaction) transaction;
if (targetAddressTransaction.getTargetAddress() instanceof Address) {
return Observable.just(targetAddressTransaction.getTargetAddress().equals(address));
}
return namespaceIdsObservable
.map(namespaceIds -> namespaceIds.contains(targetAddressTransaction.getTargetAddress()));
}
if (transaction instanceof MultisigAccountModificationTransaction) {
MultisigAccountModificationTransaction multisigAccountModificationTransaction = (MultisigAccountModificationTransaction) transaction;
if (multisigAccountModificationTransaction.getAddressAdditions().stream()
.anyMatch(a -> a.equals(address))) {
return Observable.just(true);
}
return Observable.just(
multisigAccountModificationTransaction.getAddressDeletions().stream().anyMatch(a -> a.equals(address)));
}
if (transaction instanceof AccountAddressRestrictionTransaction) {
AccountAddressRestrictionTransaction accountAddressRestrictionTransaction = (AccountAddressRestrictionTransaction) transaction;
if (accountAddressRestrictionTransaction.getRestrictionAdditions().contains(address)) {
return Observable.just(true);
}
if (accountAddressRestrictionTransaction.getRestrictionDeletions().contains(address)) {
return Observable.just(true);
}
return namespaceIdsObservable.flatMap(namespaceIds -> {
if (namespaceIds.stream().anyMatch(
namespaceId -> accountAddressRestrictionTransaction.getRestrictionAdditions()
.contains(namespaceId))) {
return Observable.just(true);
}
if (namespaceIds.stream().anyMatch(
namespaceId -> accountAddressRestrictionTransaction.getRestrictionDeletions()
.contains(namespaceId))) {
return Observable.just(true);
}
return Observable.just(false);
});
}
if (transaction instanceof RecipientTransaction) {
RecipientTransaction recipientTransaction = (RecipientTransaction) transaction;
if (recipientTransaction.getRecipient() instanceof NamespaceId) {
return namespaceIdsObservable
.map(namespaceIds -> namespaceIds.contains(recipientTransaction.getRecipient()));
}
return Observable.just(recipientTransaction.getRecipient().equals(address));
}
return Observable.just(false);
}