下面列出了怎么用io.reactivex.SingleSource的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public SingleSource<Wallet> apply(Single<Wallet> upstream) {
Wallet wallet = upstream.blockingGet();
return Single.fromCallable(() -> wallet);
// return passwordStore
// .setPassword(wallet, password)
// .onErrorResumeNext(err -> walletRepository.deleteWallet(wallet.getAddress())
// .lift(observer -> new DisposableCompletableObserver() {
// @Override
// public void onComplete() {
// observer.onError(err);
// }
//
// @Override
// public void onError(Throwable e) {
// observer.onError(e);
// }
// }))
// .toSingle(() -> wallet);
}
private void setupSearch() {
//Add text change watcher
compositeDisposable.add(
RxTextView.textChangeEvents(judgeSearchBar)
.skipInitialValue()
.debounce(300, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(getSearchInputObserver()));
compositeDisposable.add(publishSubject
.debounce(300, TimeUnit.MILLISECONDS)
.distinctUntilChanged()
.switchMapSingle(new Function<String, SingleSource<LookupAccount>>() {
@Override
public SingleSource<LookupAccount> apply(String username) {
return RetrofitServiceGenerator
.getService()
.getUsernames(URLS.STEEMIT_API_URL, SteemRequestBody.lookupAccounts(username))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
}).subscribeWith(usernamesResponseObserver()));
hideSearchingProgress();
}
@Override
public SingleSource<T> apply(Single<T> upstream) {
switch (mSchedulerType) {
case _main:
return upstream.observeOn(AndroidSchedulers.mainThread());
case _io:
return upstream.observeOn(RxSchedulerUtils.io(mIOExecutor));
case _io_main:
return upstream
.subscribeOn(RxSchedulerUtils.io(mIOExecutor))
.unsubscribeOn(RxSchedulerUtils.io(mIOExecutor))
.observeOn(AndroidSchedulers.mainThread());
case _io_io:
return upstream
.subscribeOn(RxSchedulerUtils.io(mIOExecutor))
.unsubscribeOn(RxSchedulerUtils.io(mIOExecutor))
.observeOn(RxSchedulerUtils.io(mIOExecutor));
default:
break;
}
return upstream;
}
@Override
public SingleSource<Wallet> apply(Single<Wallet> upstream) {
Wallet wallet = upstream.blockingGet();
return passwordStore
.setPassword(wallet, password)
.onErrorResumeNext(err -> walletRepository.deleteWallet(wallet.address, password)
.lift(observer -> new DisposableCompletableObserver() {
@Override
public void onComplete() {
observer.onError(err);
}
@Override
public void onError(Throwable e) {
observer.onError(e);
}
}))
.toSingle(() -> wallet);
}
public static Single<Boolean> resetStats() {
return Single.defer((Callable<SingleSource<Boolean>>) () -> {
BriteDatabase db = MimiApplication.getInstance().getBriteDatabase();
BriteDatabase.Transaction transaction = db.newTransaction();
ContentValues values = new ContentValues();
values.put(Board.KEY_ACCESS_COUNT, 0);
values.put(Board.KEY_LAST_ACCESSED, 0);
values.put(Board.KEY_POST_COUNT, 0);
int val = 0;
try {
val = db.update(Board.TABLE_NAME, SQLiteDatabase.CONFLICT_IGNORE, values, null, null);
transaction.markSuccessful();
} catch (Exception e) {
Log.e(LOG_TAG, "Error putting post options into the database", e);
} finally {
transaction.end();
}
return Single.just(val > 0);
});
}
@Override
public Single<byte[]> readDescriptor(@NonNull final UUID serviceUuid, @NonNull final UUID characteristicUuid,
@NonNull final UUID descriptorUuid) {
return discoverServices()
.flatMap(new Function<RxBleDeviceServices, SingleSource<BluetoothGattDescriptor>>() {
@Override
public SingleSource<BluetoothGattDescriptor> apply(RxBleDeviceServices rxBleDeviceServices) {
return rxBleDeviceServices.getDescriptor(serviceUuid, characteristicUuid, descriptorUuid);
}
})
.flatMap(new Function<BluetoothGattDescriptor, SingleSource<byte[]>>() {
@Override
public SingleSource<byte[]> apply(BluetoothGattDescriptor descriptor) {
return readDescriptor(descriptor);
}
});
}
@Override
public Completable writeDescriptor(
@NonNull final UUID serviceUuid, @NonNull final UUID characteristicUuid, @NonNull final UUID descriptorUuid,
@NonNull final byte[] data
) {
return discoverServices()
.flatMap(new Function<RxBleDeviceServices, SingleSource<BluetoothGattDescriptor>>() {
@Override
public SingleSource<BluetoothGattDescriptor> apply(RxBleDeviceServices rxBleDeviceServices) {
return rxBleDeviceServices.getDescriptor(serviceUuid, characteristicUuid, descriptorUuid);
}
})
.flatMapCompletable(new Function<BluetoothGattDescriptor, CompletableSource>() {
@Override
public CompletableSource apply(BluetoothGattDescriptor bluetoothGattDescriptor) {
return writeDescriptor(bluetoothGattDescriptor, data);
}
});
}
@Override
public Single<byte[]> readDescriptor(@NonNull final UUID serviceUuid, @NonNull final UUID characteristicUuid,
@NonNull final UUID descriptorUuid) {
return discoverServices()
.flatMap(new Function<RxBleDeviceServices, SingleSource<BluetoothGattDescriptor>>() {
@Override
public SingleSource<BluetoothGattDescriptor> apply(RxBleDeviceServices rxBleDeviceServices) {
return rxBleDeviceServices.getDescriptor(serviceUuid, characteristicUuid, descriptorUuid);
}
})
.map(new Function<BluetoothGattDescriptor, byte[]>() {
@Override
public byte[] apply(BluetoothGattDescriptor bluetoothGattDescriptor) {
return bluetoothGattDescriptor.getValue();
}
});
}
@Override
public Completable writeDescriptor(@NonNull final UUID serviceUuid, @NonNull final UUID characteristicUuid,
@NonNull final UUID descriptorUuid, @NonNull final byte[] data) {
return discoverServices()
.flatMap(new Function<RxBleDeviceServices, SingleSource<BluetoothGattDescriptor>>() {
@Override
public SingleSource<BluetoothGattDescriptor> apply(RxBleDeviceServices rxBleDeviceServices) {
return rxBleDeviceServices.getDescriptor(serviceUuid, characteristicUuid, descriptorUuid);
}
})
.doOnSuccess(new Consumer<BluetoothGattDescriptor>() {
@Override
public void accept(BluetoothGattDescriptor bluetoothGattDescriptor) throws Exception {
bluetoothGattDescriptor.setValue(data);
}
})
.toCompletable();
}
private Single<JWT> validateSignature(SignedJWT jwt, Client client) {
return jwkService.getKeys(client)
.switchIfEmpty(Maybe.error(new InvalidRequestObjectException()))
.flatMap(new Function<JWKSet, MaybeSource<JWK>>() {
@Override
public MaybeSource<JWK> apply(JWKSet jwkSet) throws Exception {
return jwkService.getKey(jwkSet, jwt.getHeader().getKeyID());
}
})
.switchIfEmpty(Maybe.error(new InvalidRequestObjectException()))
.flatMapSingle(new Function<JWK, SingleSource<JWT>>() {
@Override
public SingleSource<JWT> apply(JWK jwk) throws Exception {
// 6.3.2. Signed Request Object
// To perform Signature Validation, the alg Header Parameter in the
// JOSE Header MUST match the value of the request_object_signing_alg
// set during Client Registration
if (jwt.getHeader().getAlgorithm().getName().equals(client.getRequestObjectSigningAlg()) &&
jwsService.isValidSignature(jwt, jwk)) {
return Single.just(jwt);
} else {
return Single.error(new InvalidRequestObjectException("Invalid signature"));
}
}
});
}
/**
* Restarts build process
*/
public void restartBuild() {
RequestBody emptyBody = RequestBody.create(MediaType.parse("application/json"), "");
Disposable subscription = mTravisRestClient.getApiService()
.restartBuild(mBuildId, emptyBody)
.onErrorReturn(throwable -> new Object())
.flatMap(new Function<Object, SingleSource<BuildDetails>>() {
@Override
public SingleSource<BuildDetails> apply(@NonNull Object o) throws Exception {
return mTravisRestClient.getApiService().getBuild(mRepoSlug, mBuildId);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe((buildDetails, throwable) -> {
if (throwable == null) {
handleBuildDetails(buildDetails);
} else {
handleLoadingFailed(throwable);
}
});
mSubscriptions.add(subscription);
}
/**
* Cancels build process
*/
public void cancelBuild() {
RequestBody emptyBody = RequestBody.create(MediaType.parse("application/json"), "");
Disposable subscription = mTravisRestClient.getApiService()
.cancelBuild(mBuildId, emptyBody)
.onErrorReturn(throwable -> new Object())
.flatMap(new Function<Object, SingleSource<BuildDetails>>() {
@Override
public SingleSource<BuildDetails> apply(@NonNull Object o) throws Exception {
return mTravisRestClient.getApiService().getBuild(mRepoSlug, mBuildId);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe((buildDetails, throwable) -> {
if (throwable == null) {
handleBuildDetails(buildDetails);
} else {
handleLoadingFailed(throwable);
}
});
mSubscriptions.add(subscription);
}
@Override
public SingleSource<T> apply(@NonNull Single<B> upstream) {
Single<Buffer> unwrapped = upstream.map(unwrap::apply);
Single<T> unmarshalled = unwrapped.flatMap(buffer -> {
try {
T obj;
if (mapper != null) {
JsonParser parser = mapper.getFactory().createParser(buffer.getBytes());
obj = nonNull(mappedType) ? mapper.readValue(parser, mappedType) :
mapper.readValue(parser, mappedTypeRef);
} else {
obj = getT(buffer, mappedType, mappedTypeRef);
}
return Single.just(obj);
} catch (Exception e) {
return Single.error(e);
}
});
return unmarshalled;
}
/**
* 拆壳
*
* @param <T>
* @return
*/
public static <T> SingleTransformer<RespData<T>, T> handleSingleDataResult() {
return new SingleTransformer<RespData<T>, T>() {
@Override
public SingleSource<T> apply(Single<RespData<T>> upstream) {
return upstream
.map(new TransToData<T>())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
}
/**
* 不拆壳
*
* @param <T>
* @return
*/
public static <T> SingleTransformer<RespData<T>, RespData<T>> handleSingleToResult() {
return new SingleTransformer<RespData<T>, RespData<T>>() {
@Override
public SingleSource<RespData<T>> apply(Single<RespData<T>> upstream) {
return upstream
.map(new TransToResult<T>())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
}
@NonNull
private Function<TokenResponse, SingleSource<? extends Pair<TokenResponse, User>>>
mapUserProfileToAuth(RedditAuthApi service) {
return response ->
service.fetchMe("Bearer " + response.accessToken)
.map(user -> new Pair<>(response, user));
}
/**
* Zwraca strefę czasową na podstawie podanej lokalizacji.
@param location Lokalizacja - miasto, kraj, wieś.
*/
@Override
public Single<TimeZone> getTimeZoneByLocationName(String location) {
return googleGeoApi.getCoordForLocation(location, GOOGLE_GEO_API_KEY)
.flatMap(new Function<CoordResponse, SingleSource<? extends TimeZone>>() {
@Override
public SingleSource<? extends TimeZone> apply(@NonNull CoordResponse coordResponse) throws Exception {
String lat = coordResponse.results.get(0).geometry.location.lat.toString();
String lng = coordResponse.results.get(0).geometry.location.lng.toString();
return timeZoneDbApi.getTimeZone(lat, lng, TIME_ZONE_DB_API_KEY);
}
});
}
/**
* Recherche
*/
public Single<AllocineResponse> search(final String recherche, final List<String> filter, final int count, final int page) {
return Single
.create(new SingleOnSubscribe<Pair<String, String>>() {
@Override
public void subscribe(SingleEmitter<Pair<String, String>> e) throws Exception {
final String params = ServiceSecurity.construireParams(false,
AllocineService.Q, "" + recherche.replace(" ", "+"),
AllocineService.FILTER, filter,
AllocineService.COUNT, "" + count,
AllocineService.PAGE, "" + page
);
final String sed = ServiceSecurity.getSED();
final String sig = ServiceSecurity.getSIG(params, sed);
e.onSuccess(Pair.create(sed, sig));
}
})
.flatMap(new Function<Pair<String, String>, SingleSource<? extends AllocineResponse>>() {
@Override
public SingleSource<? extends AllocineResponse> apply(Pair<String, String> pair) throws Exception {
return allocineService.search(recherche, ServiceSecurity.applatir(filter), count, page, pair.first, pair.second);
}
})
.compose(this.<AllocineResponse>retry());
}
/**
* Recherche
*/
public Single<AllocineResponseSmall> searchSmall(final String recherche, final List<String> filter, final int count, final int page) {
return Single
.create(new SingleOnSubscribe<Pair<String, String>>() {
@Override
public void subscribe(SingleEmitter<Pair<String, String>> e) throws Exception {
final String params = ServiceSecurity.construireParams(false,
AllocineService.Q, "" + recherche.replace(" ", "+"),
AllocineService.FILTER, filter,
AllocineService.COUNT, "" + count,
AllocineService.PAGE, "" + page
);
final String sed = ServiceSecurity.getSED();
final String sig = ServiceSecurity.getSIG(params, sed);
e.onSuccess(Pair.create(sed, sig));
}
})
.flatMap(new Function<Pair<String, String>, SingleSource<AllocineResponseSmall>>() {
@Override
public SingleSource<AllocineResponseSmall> apply(Pair<String, String> pair) throws Exception {
return allocineService.searchSmall(recherche, ServiceSecurity.applatir(filter), count, page, pair.first, pair.second);
}
})
.compose(this.<AllocineResponseSmall>retry());
}
/**
* Informations sur un film
*/
public Single<Movie> movie(final String idFilm, final Profile profile) {
final String filter = FILTER_MOVIE;
return Single
.create(new SingleOnSubscribe<Pair<String, String>>() {
@Override
public void subscribe(SingleEmitter<Pair<String, String>> e) throws Exception {
final String params = ServiceSecurity.construireParams(false,
AllocineService.CODE, idFilm,
AllocineService.PROFILE, profile.getValue(),
AllocineService.FILTER, filter
);
final String sed = ServiceSecurity.getSED();
final String sig = ServiceSecurity.getSIG(params, sed);
e.onSuccess(Pair.create(sed, sig));
}
})
.flatMap(new Function<Pair<String, String>, SingleSource<? extends Movie>>() {
@Override
public SingleSource<? extends Movie> apply(Pair<String, String> pair) throws Exception {
return allocineService.movie(idFilm, profile.getValue(), filter, pair.first, pair.second)
.map(new Function<AllocineResponse, Movie>() {
@Override
public Movie apply(AllocineResponse allocineResponse) throws Exception {
return allocineResponse.getMovie();
}
});
}
})
.compose(this.<Movie>retry());
}
/**
* Informations sur un film
*/
public Single<Theater> theater(final String idCinema, final String profile, final String filter) {
return Single
.create(new SingleOnSubscribe<Pair<String, String>>() {
@Override
public void subscribe(SingleEmitter<Pair<String, String>> e) throws Exception {
final String params = ServiceSecurity.construireParams(false,
AllocineService.CODE, idCinema,
AllocineService.PROFILE, profile,
AllocineService.FILTER, filter
);
final String sed = ServiceSecurity.getSED();
final String sig = ServiceSecurity.getSIG(params, sed);
e.onSuccess(Pair.create(sed, sig));
}
})
.flatMap(new Function<Pair<String, String>, SingleSource<? extends Theater>>() {
@Override
public SingleSource<? extends Theater> apply(Pair<String, String> pair) throws Exception {
return allocineService.theater(idCinema, profile, filter, pair.first, pair.second)
.map(new Function<AllocineResponse, Theater>() {
@Override
public Theater apply(AllocineResponse allocineResponse) throws Exception {
return allocineResponse.getTheater();
}
});
}
})
.compose(this.<Theater>retry());
}
/**
* Informations sur une personne
*/
public Single<PersonFull> person(final String idPerson, final String profile, final String filter) {
return Single
.create(new SingleOnSubscribe<Pair<String, String>>() {
@Override
public void subscribe(SingleEmitter<Pair<String, String>> e) throws Exception {
final String params = ServiceSecurity.construireParams(false,
AllocineService.CODE, idPerson,
AllocineService.PROFILE, profile,
AllocineService.FILTER, filter
);
final String sed = ServiceSecurity.getSED();
final String sig = ServiceSecurity.getSIG(params, sed);
e.onSuccess(Pair.create(sed, sig));
}
})
.flatMap(new Function<Pair<String, String>, SingleSource<PersonFull>>() {
@Override
public SingleSource<PersonFull> apply(Pair<String, String> pair) throws Exception {
return allocineService.person(idPerson, profile, filter, pair.first, pair.second)
.map(new Function<AllocineResponse, PersonFull>() {
@Override
public PersonFull apply(AllocineResponse allocineResponse) throws Exception {
return allocineResponse.getPerson();
}
});
}
})
.compose(this.<PersonFull>retry());
}
/**
* Filmographie d'une personne
*/
public Single<List<Participation>> filmography(final String idPerson, final String profile, final String filter) {
return Single
.create(new SingleOnSubscribe<Pair<String, String>>() {
@Override
public void subscribe(SingleEmitter<Pair<String, String>> e) throws Exception {
final String params = ServiceSecurity.construireParams(false,
AllocineService.CODE, idPerson,
AllocineService.PROFILE, profile,
AllocineService.FILTER, filter
);
final String sed = ServiceSecurity.getSED();
final String sig = ServiceSecurity.getSIG(params, sed);
e.onSuccess(Pair.create(sed, sig));
}
})
.flatMap(new Function<Pair<String, String>, SingleSource<List<Participation>>>() {
@Override
public SingleSource<List<Participation>> apply(Pair<String, String> pair) throws Exception {
return allocineService.filmography(idPerson, profile, filter, pair.first, pair.second)
.map(new Function<AllocineResponse, List<Participation>>() {
@Override
public List<Participation> apply(AllocineResponse allocineResponse) throws Exception {
return allocineResponse.getPerson().getParticipation();
}
});
}
})
.compose(this.<List<Participation>>retry());
}
public Single<List<Movie>> movieList(List<MovieListFilter> filter, final Profile profile, final MovieListOrder order, final int count, final int page) {
final List<String> filterString = new ArrayList<>();
for (MovieListFilter movieListFilter : filter) {
filterString.add(movieListFilter.getValue());
}
return Single
.create(new SingleOnSubscribe<Pair<String, String>>() {
@Override
public void subscribe(SingleEmitter<Pair<String, String>> e) throws Exception {
final String params = ServiceSecurity.construireParams(true,
AllocineService.FILTER, filterString,
AllocineService.PROFILE, profile.getValue(),
AllocineService.ORDER, order.getValue(),
AllocineService.COUNT, "" + count,
AllocineService.PAGE, "" + page
);
final String sed = ServiceSecurity.getSED();
final String sig = ServiceSecurity.getSIG(params, sed);
e.onSuccess(Pair.create(sed, sig));
}
})
.flatMap(new Function<Pair<String, String>, SingleSource<? extends List<Movie>>>() {
@Override
public SingleSource<? extends List<Movie>> apply(Pair<String, String> pair) throws Exception {
return allocineService.movieList(ServiceSecurity.applatir(filterString), profile.getValue(), order.getValue(), count, page, pair.first, pair.second)
.map(new Function<AllocineResponse, List<Movie>>() {
@Override
public List<Movie> apply(AllocineResponse allocineResponse) throws Exception {
return allocineResponse.getFeed().getMovie();
}
});
}
})
.compose(this.<List<Movie>>retry());
}
public Single<List<PersonFull>> starsList(final List<PersonListFilter> filter, final Profile profile, final int count, final int page) {
final List<String> filterString = new ArrayList<>();
for (PersonListFilter movieListFilter : filter) {
filterString.add(movieListFilter.getValue());
}
return Single
.create(new SingleOnSubscribe<Pair<String, String>>() {
@Override
public void subscribe(SingleEmitter<Pair<String, String>> e) throws Exception {
final String params = ServiceSecurity.construireParams(true,
AllocineService.FILTER, filterString,
AllocineService.PROFILE, profile.getValue(),
AllocineService.COUNT, "" + count,
AllocineService.PAGE, "" + page
);
final String sed = ServiceSecurity.getSED();
final String sig = ServiceSecurity.getSIG(params, sed);
e.onSuccess(Pair.create(sed, sig));
}
})
.flatMap(new Function<Pair<String, String>, SingleSource<? extends List<PersonFull>>>() {
@Override
public SingleSource<? extends List<PersonFull>> apply(Pair<String, String> pair) throws Exception {
return allocineService.personList(ServiceSecurity.applatir(filterString), profile.getValue(), count, page, pair.first, pair.second)
.map(new Function<AllocineResponse, List<PersonFull>>() {
@Override
public List<PersonFull> apply(AllocineResponse allocineResponse) throws Exception {
return allocineResponse.getFeed().getPerson();
}
});
}
})
.compose(this.<List<PersonFull>>retry());
}
public Single<List<Theater>> theaterList(final String zip, final int count, final int page) {
return Single
.create(new SingleOnSubscribe<Pair<String, String>>() {
@Override
public void subscribe(SingleEmitter<Pair<String, String>> e) throws Exception {
final String params = ServiceSecurity.construireParams(false,
AllocineService.ZIP, zip,
AllocineService.COUNT, "" + count,
AllocineService.PAGE, "" + page
);
final String sed = ServiceSecurity.getSED();
final String sig = ServiceSecurity.getSIG(params, sed);
e.onSuccess(Pair.create(sed, sig));
}
})
.flatMap(new Function<Pair<String, String>, SingleSource<? extends List<Theater>>>() {
@Override
public SingleSource<? extends List<Theater>> apply(Pair<String, String> pair) throws Exception {
return allocineService.theaterlist(zip, count, page, pair.first, pair.second)
.map(new Function<AllocineResponse, List<Theater>>() {
@Override
public List<Theater> apply(AllocineResponse allocineResponse) throws Exception {
return allocineResponse.getFeed().getTheater();
}
});
}
})
.compose(this.<List<Theater>>retry());
}
public Single<List<Theater>> theaterList(final String lat, final String lng, final int radius, final int count, final int page) {
return Single
.create(new SingleOnSubscribe<Pair<String, String>>() {
@Override
public void subscribe(SingleEmitter<Pair<String, String>> e) throws Exception {
final String params = ServiceSecurity.construireParams(false,
AllocineService.LAT, lat,
AllocineService.LONG, lng,
AllocineService.RADIUS, "" + radius,
AllocineService.COUNT, "" + count,
AllocineService.PAGE, "" + page
);
final String sed = ServiceSecurity.getSED();
final String sig = ServiceSecurity.getSIG(params, sed);
e.onSuccess(Pair.create(sed, sig));
}
})
.flatMap(new Function<Pair<String, String>, SingleSource<? extends List<Theater>>>() {
@Override
public SingleSource<? extends List<Theater>> apply(Pair<String, String> pair) throws Exception {
return allocineService.theaterlist(lat, lng, radius, count, page, pair.first, pair.second)
.map(new Function<AllocineResponse, List<Theater>>() {
@Override
public List<Theater> apply(AllocineResponse allocineResponse) throws Exception {
return allocineResponse.getFeed().getTheater();
}
});
}
})
.compose(this.<List<Theater>>retry());
}
public Single<List<Media>> videoList(final String code, final int count) {
final String subject = "movie:" + code;
final String mediafmt = "mp4";
return Single
.create(new SingleOnSubscribe<Pair<String, String>>() {
@Override
public void subscribe(SingleEmitter<Pair<String, String>> e) throws Exception {
final String params = ServiceSecurity.construireParams(false,
AllocineService.SUBJECT, subject,
AllocineService.COUNT, "" + count,
AllocineService.MEDIAFMT, mediafmt
);
final String sed = ServiceSecurity.getSED();
final String sig = ServiceSecurity.getSIG(params, sed);
e.onSuccess(Pair.create(sed, sig));
}
})
.flatMap(new Function<Pair<String, String>, SingleSource<List<Media>>>() {
@Override
public SingleSource<List<Media>> apply(Pair<String, String> pair) throws Exception {
return allocineService.videoList(subject, count, mediafmt, pair.first, pair.second)
.map(new Function<AllocineResponse, List<Media>>() {
@Override
public List<Media> apply(AllocineResponse allocineResponse) throws Exception {
return allocineResponse.getFeed().getMedia();
}
});
}
})
.compose(this.<List<Media>>retry());
}
private <T> SingleTransformer<T, T> retry() {
return new SingleTransformer<T, T>() {
@Override
public SingleSource<T> apply(Single<T> upstream) {
return upstream.retryWhen(new Function<Flowable<Throwable>, Publisher<Object>>() {
private final int MAX_COUNT = 3;
private int count = 0;
private final int DELAY_SECOND = 10;
@Override
public Publisher<Object> apply(Flowable<Throwable> throwableFlowable) throws Exception {
return throwableFlowable.flatMap(new Function<Throwable, Publisher<?>>() {
@Override
public Publisher<?> apply(Throwable throwable) throws Exception {
if (count++ < MAX_COUNT && throwable instanceof HttpException) {
final HttpException httpException = (HttpException) throwable;
if (httpException.code() == 403) {
return Flowable.timer(DELAY_SECOND, TimeUnit.SECONDS);
}
}
return Flowable.error(throwable);
}
});
}
});
}
};
}
public <R> SingleTransformer<? super R, ? extends R> compose() {
return new SingleTransformer<R, R>() {
@Override
public SingleSource<R> apply(@NonNull Single<R> upstream) {
return upstream
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnSubscribe(AbstractPresenter.this::call);
}
};
}