类io.reactivex.SingleSource源码实例Demo

下面列出了怎么用io.reactivex.SingleSource的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: ETHWallet   文件: SavePasswordOperator.java
@Override
    public SingleSource<Wallet> apply(Single<Wallet> upstream) {
        Wallet wallet = upstream.blockingGet();
        return Single.fromCallable(() -> wallet);
//        return passwordStore
//                .setPassword(wallet, password)
//                .onErrorResumeNext(err -> walletRepository.deleteWallet(wallet.getAddress())
//                        .lift(observer -> new DisposableCompletableObserver() {
//                            @Override
//                            public void onComplete() {
//                                observer.onError(err);
//                            }
//
//                            @Override
//                            public void onError(Throwable e) {
//                                observer.onError(e);
//                            }
//                        }))
//                .toSingle(() -> wallet);
    }
 
源代码2 项目: 1Rramp-Android   文件: JudgeSelectionActivity.java
private void setupSearch() {
  //Add text change watcher
  compositeDisposable.add(
    RxTextView.textChangeEvents(judgeSearchBar)
      .skipInitialValue()
      .debounce(300, TimeUnit.MILLISECONDS)
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribeWith(getSearchInputObserver()));

  compositeDisposable.add(publishSubject
    .debounce(300, TimeUnit.MILLISECONDS)
    .distinctUntilChanged()
    .switchMapSingle(new Function<String, SingleSource<LookupAccount>>() {
      @Override
      public SingleSource<LookupAccount> apply(String username) {
        return RetrofitServiceGenerator
          .getService()
          .getUsernames(URLS.STEEMIT_API_URL, SteemRequestBody.lookupAccounts(username))
          .subscribeOn(Schedulers.io())
          .observeOn(AndroidSchedulers.mainThread());
      }
    }).subscribeWith(usernamesResponseObserver()));

  hideSearchingProgress();
}
 
源代码3 项目: Collection-Android   文件: SchedulerTransformer.java
@Override
public SingleSource<T> apply(Single<T> upstream) {
    switch (mSchedulerType) {
        case _main:
            return upstream.observeOn(AndroidSchedulers.mainThread());
        case _io:
            return upstream.observeOn(RxSchedulerUtils.io(mIOExecutor));
        case _io_main:
            return upstream
                    .subscribeOn(RxSchedulerUtils.io(mIOExecutor))
                    .unsubscribeOn(RxSchedulerUtils.io(mIOExecutor))
                    .observeOn(AndroidSchedulers.mainThread());
        case _io_io:
            return upstream
                    .subscribeOn(RxSchedulerUtils.io(mIOExecutor))
                    .unsubscribeOn(RxSchedulerUtils.io(mIOExecutor))
                    .observeOn(RxSchedulerUtils.io(mIOExecutor));
        default:
            break;
    }
    return upstream;
}
 
@Override
public SingleSource<Wallet> apply(Single<Wallet> upstream) {
    Wallet wallet = upstream.blockingGet();
    return passwordStore
            .setPassword(wallet, password)
            .onErrorResumeNext(err -> walletRepository.deleteWallet(wallet.address, password)
                    .lift(observer -> new DisposableCompletableObserver() {
                        @Override
                        public void onComplete() {
                            observer.onError(err);
                        }

                        @Override
                        public void onError(Throwable e) {
                            observer.onError(e);
                        }
                    }))
            .toSingle(() -> wallet);
}
 
源代码5 项目: mimi-reader   文件: BoardTableConnection.java
public static Single<Boolean> resetStats() {
    return Single.defer((Callable<SingleSource<Boolean>>) () -> {
        BriteDatabase db = MimiApplication.getInstance().getBriteDatabase();
        BriteDatabase.Transaction transaction = db.newTransaction();

        ContentValues values = new ContentValues();

        values.put(Board.KEY_ACCESS_COUNT, 0);
        values.put(Board.KEY_LAST_ACCESSED, 0);
        values.put(Board.KEY_POST_COUNT, 0);

        int val = 0;
        try {
            val = db.update(Board.TABLE_NAME, SQLiteDatabase.CONFLICT_IGNORE, values, null, null);
            transaction.markSuccessful();
        } catch (Exception e) {
            Log.e(LOG_TAG, "Error putting post options into the database", e);
        } finally {
            transaction.end();
        }

        return Single.just(val > 0);
    });

}
 
源代码6 项目: RxAndroidBle   文件: RxBleConnectionImpl.java
@Override
public Single<byte[]> readDescriptor(@NonNull final UUID serviceUuid, @NonNull final UUID characteristicUuid,
                                     @NonNull final UUID descriptorUuid) {
    return discoverServices()
            .flatMap(new Function<RxBleDeviceServices, SingleSource<BluetoothGattDescriptor>>() {
                @Override
                public SingleSource<BluetoothGattDescriptor> apply(RxBleDeviceServices rxBleDeviceServices) {
                    return rxBleDeviceServices.getDescriptor(serviceUuid, characteristicUuid, descriptorUuid);
                }
            })
            .flatMap(new Function<BluetoothGattDescriptor, SingleSource<byte[]>>() {
                @Override
                public SingleSource<byte[]> apply(BluetoothGattDescriptor descriptor) {
                    return readDescriptor(descriptor);
                }
            });
}
 
源代码7 项目: RxAndroidBle   文件: RxBleConnectionImpl.java
@Override
public Completable writeDescriptor(
        @NonNull final UUID serviceUuid, @NonNull final UUID characteristicUuid, @NonNull final UUID descriptorUuid,
        @NonNull final byte[] data
) {
    return discoverServices()
            .flatMap(new Function<RxBleDeviceServices, SingleSource<BluetoothGattDescriptor>>() {
                @Override
                public SingleSource<BluetoothGattDescriptor> apply(RxBleDeviceServices rxBleDeviceServices) {
                    return rxBleDeviceServices.getDescriptor(serviceUuid, characteristicUuid, descriptorUuid);
                }
            })
            .flatMapCompletable(new Function<BluetoothGattDescriptor, CompletableSource>() {
                @Override
                public CompletableSource apply(BluetoothGattDescriptor bluetoothGattDescriptor) {
                    return writeDescriptor(bluetoothGattDescriptor, data);
                }
            });
}
 
源代码8 项目: RxAndroidBle   文件: RxBleConnectionMock.java
@Override
public Single<byte[]> readDescriptor(@NonNull final UUID serviceUuid, @NonNull final UUID characteristicUuid,
                                         @NonNull final UUID descriptorUuid) {
    return discoverServices()
            .flatMap(new Function<RxBleDeviceServices, SingleSource<BluetoothGattDescriptor>>() {
                @Override
                public SingleSource<BluetoothGattDescriptor> apply(RxBleDeviceServices rxBleDeviceServices) {
                    return rxBleDeviceServices.getDescriptor(serviceUuid, characteristicUuid, descriptorUuid);
                }
            })
            .map(new Function<BluetoothGattDescriptor, byte[]>() {
                @Override
                public byte[] apply(BluetoothGattDescriptor bluetoothGattDescriptor) {
                    return bluetoothGattDescriptor.getValue();
                }
            });
}
 
源代码9 项目: RxAndroidBle   文件: RxBleConnectionMock.java
@Override
public Completable writeDescriptor(@NonNull final UUID serviceUuid, @NonNull final UUID characteristicUuid,
                                          @NonNull final UUID descriptorUuid, @NonNull final byte[] data) {
    return discoverServices()
            .flatMap(new Function<RxBleDeviceServices, SingleSource<BluetoothGattDescriptor>>() {
                @Override
                public SingleSource<BluetoothGattDescriptor> apply(RxBleDeviceServices rxBleDeviceServices) {
                    return rxBleDeviceServices.getDescriptor(serviceUuid, characteristicUuid, descriptorUuid);
                }
            })
            .doOnSuccess(new Consumer<BluetoothGattDescriptor>() {
                @Override
                public void accept(BluetoothGattDescriptor bluetoothGattDescriptor) throws Exception {
                    bluetoothGattDescriptor.setValue(data);
                }
            })
            .toCompletable();
}
 
private Single<JWT> validateSignature(SignedJWT jwt, Client client) {
    return jwkService.getKeys(client)
            .switchIfEmpty(Maybe.error(new InvalidRequestObjectException()))
            .flatMap(new Function<JWKSet, MaybeSource<JWK>>() {
                @Override
                public MaybeSource<JWK> apply(JWKSet jwkSet) throws Exception {
                    return jwkService.getKey(jwkSet, jwt.getHeader().getKeyID());
                }
            })
            .switchIfEmpty(Maybe.error(new InvalidRequestObjectException()))
            .flatMapSingle(new Function<JWK, SingleSource<JWT>>() {
                @Override
                public SingleSource<JWT> apply(JWK jwk) throws Exception {
                    // 6.3.2.  Signed Request Object
                    // To perform Signature Validation, the alg Header Parameter in the
                    // JOSE Header MUST match the value of the request_object_signing_alg
                    // set during Client Registration
                    if (jwt.getHeader().getAlgorithm().getName().equals(client.getRequestObjectSigningAlg()) &&
                            jwsService.isValidSignature(jwt, jwk)) {
                        return Single.just(jwt);
                    } else {
                        return Single.error(new InvalidRequestObjectException("Invalid signature"));
                    }
                }
            });
}
 
源代码11 项目: Varis-Android   文件: BuildsDetailsPresenter.java
/**
 * Restarts build process
 */
public void restartBuild() {
    RequestBody emptyBody = RequestBody.create(MediaType.parse("application/json"), "");
    Disposable subscription = mTravisRestClient.getApiService()
            .restartBuild(mBuildId, emptyBody)
            .onErrorReturn(throwable -> new Object())
            .flatMap(new Function<Object, SingleSource<BuildDetails>>() {
                @Override
                public SingleSource<BuildDetails> apply(@NonNull Object o) throws Exception {
                    return mTravisRestClient.getApiService().getBuild(mRepoSlug, mBuildId);
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe((buildDetails, throwable) -> {
                if (throwable == null) {
                    handleBuildDetails(buildDetails);
                } else {
                    handleLoadingFailed(throwable);
                }
            });

    mSubscriptions.add(subscription);
}
 
源代码12 项目: Varis-Android   文件: BuildsDetailsPresenter.java
/**
 * Cancels build process
 */
public void cancelBuild() {
    RequestBody emptyBody = RequestBody.create(MediaType.parse("application/json"), "");
    Disposable subscription = mTravisRestClient.getApiService()
            .cancelBuild(mBuildId, emptyBody)
            .onErrorReturn(throwable -> new Object())
            .flatMap(new Function<Object, SingleSource<BuildDetails>>() {
                @Override
                public SingleSource<BuildDetails> apply(@NonNull Object o) throws Exception {
                    return mTravisRestClient.getApiService().getBuild(mRepoSlug, mBuildId);
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe((buildDetails, throwable) -> {
                if (throwable == null) {
                    handleBuildDetails(buildDetails);
                } else {
                    handleLoadingFailed(throwable);
                }
            });

    mSubscriptions.add(subscription);
}
 
源代码13 项目: vertx-rx   文件: SingleUnmarshaller.java
@Override
public SingleSource<T> apply(@NonNull Single<B> upstream) {
  Single<Buffer> unwrapped = upstream.map(unwrap::apply);
  Single<T> unmarshalled = unwrapped.flatMap(buffer -> {
    try {
      T obj;
      if (mapper != null) {
        JsonParser parser = mapper.getFactory().createParser(buffer.getBytes());
        obj = nonNull(mappedType) ? mapper.readValue(parser, mappedType) :
          mapper.readValue(parser, mappedTypeRef);
      } else {
        obj = getT(buffer, mappedType, mappedTypeRef);
      }
      return Single.just(obj);
    } catch (Exception e) {
      return Single.error(e);
    }
  });
  return unmarshalled;
}
 
源代码14 项目: tysq-android   文件: RxParser.java
/**
 * 拆壳
 *
 * @param <T>
 * @return
 */
public static <T> SingleTransformer<RespData<T>, T> handleSingleDataResult() {
    return new SingleTransformer<RespData<T>, T>() {
        @Override
        public SingleSource<T> apply(Single<RespData<T>> upstream) {
            return upstream
                    .map(new TransToData<T>())
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread());
        }

    };
}
 
源代码15 项目: tysq-android   文件: RxParser.java
/**
 * 不拆壳
 *
 * @param <T>
 * @return
 */
public static <T> SingleTransformer<RespData<T>, RespData<T>> handleSingleToResult() {
    return new SingleTransformer<RespData<T>, RespData<T>>() {
        @Override
        public SingleSource<RespData<T>> apply(Single<RespData<T>> upstream) {
            return upstream
                    .map(new TransToResult<T>())
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread());
        }

    };
}
 
源代码16 项目: auth   文件: LoginActivity.java
@NonNull
private Function<TokenResponse, SingleSource<? extends Pair<TokenResponse, User>>>
        mapUserProfileToAuth(RedditAuthApi service) {
    return response ->
            service.fetchMe("Bearer " + response.accessToken)
                    .map(user -> new Pair<>(response, user));
}
 
源代码17 项目: black-mirror   文件: LocationDataSource.java
/**
 * Zwraca strefę czasową na podstawie podanej lokalizacji.
   @param location Lokalizacja - miasto, kraj, wieś.
 */
@Override
public Single<TimeZone> getTimeZoneByLocationName(String location) {
    return googleGeoApi.getCoordForLocation(location, GOOGLE_GEO_API_KEY)
            .flatMap(new Function<CoordResponse, SingleSource<? extends TimeZone>>() {
                @Override
                public SingleSource<? extends TimeZone> apply(@NonNull CoordResponse coordResponse) throws Exception {
                    String lat = coordResponse.results.get(0).geometry.location.lat.toString();
                    String lng = coordResponse.results.get(0).geometry.location.lng.toString();
                    return timeZoneDbApi.getTimeZone(lat, lng, TIME_ZONE_DB_API_KEY);
                }
            });
}
 
源代码18 项目: Android-Allocine-Api   文件: AllocineApi.java
/**
 * Recherche
 */
public Single<AllocineResponse> search(final String recherche, final List<String> filter, final int count, final int page) {
    return Single
            .create(new SingleOnSubscribe<Pair<String, String>>() {
                @Override
                public void subscribe(SingleEmitter<Pair<String, String>> e) throws Exception {
                    final String params = ServiceSecurity.construireParams(false,
                            AllocineService.Q, "" + recherche.replace(" ", "+"),
                            AllocineService.FILTER, filter,
                            AllocineService.COUNT, "" + count,
                            AllocineService.PAGE, "" + page
                    );

                    final String sed = ServiceSecurity.getSED();
                    final String sig = ServiceSecurity.getSIG(params, sed);

                    e.onSuccess(Pair.create(sed, sig));
                }
            })
            .flatMap(new Function<Pair<String, String>, SingleSource<? extends AllocineResponse>>() {
                @Override
                public SingleSource<? extends AllocineResponse> apply(Pair<String, String> pair) throws Exception {
                    return allocineService.search(recherche, ServiceSecurity.applatir(filter), count, page, pair.first, pair.second);
                }
            })
            .compose(this.<AllocineResponse>retry());
}
 
源代码19 项目: Android-Allocine-Api   文件: AllocineApi.java
/**
 * Recherche
 */
public Single<AllocineResponseSmall> searchSmall(final String recherche, final List<String> filter, final int count, final int page) {
    return Single
            .create(new SingleOnSubscribe<Pair<String, String>>() {
                @Override
                public void subscribe(SingleEmitter<Pair<String, String>> e) throws Exception {
                    final String params = ServiceSecurity.construireParams(false,
                            AllocineService.Q, "" + recherche.replace(" ", "+"),
                            AllocineService.FILTER, filter,
                            AllocineService.COUNT, "" + count,
                            AllocineService.PAGE, "" + page
                    );

                    final String sed = ServiceSecurity.getSED();
                    final String sig = ServiceSecurity.getSIG(params, sed);

                    e.onSuccess(Pair.create(sed, sig));
                }
            })
            .flatMap(new Function<Pair<String, String>, SingleSource<AllocineResponseSmall>>() {
                @Override
                public SingleSource<AllocineResponseSmall> apply(Pair<String, String> pair) throws Exception {
                    return allocineService.searchSmall(recherche, ServiceSecurity.applatir(filter), count, page, pair.first, pair.second);
                }
            })
            .compose(this.<AllocineResponseSmall>retry());
}
 
源代码20 项目: Android-Allocine-Api   文件: AllocineApi.java
/**
 * Informations sur un film
 */
public Single<Movie> movie(final String idFilm, final Profile profile) {
    final String filter = FILTER_MOVIE;

    return Single
            .create(new SingleOnSubscribe<Pair<String, String>>() {
                @Override
                public void subscribe(SingleEmitter<Pair<String, String>> e) throws Exception {

                    final String params = ServiceSecurity.construireParams(false,
                            AllocineService.CODE, idFilm,
                            AllocineService.PROFILE, profile.getValue(),
                            AllocineService.FILTER, filter
                    );

                    final String sed = ServiceSecurity.getSED();
                    final String sig = ServiceSecurity.getSIG(params, sed);

                    e.onSuccess(Pair.create(sed, sig));
                }
            })
            .flatMap(new Function<Pair<String, String>, SingleSource<? extends Movie>>() {
                @Override
                public SingleSource<? extends Movie> apply(Pair<String, String> pair) throws Exception {
                    return allocineService.movie(idFilm, profile.getValue(), filter, pair.first, pair.second)
                            .map(new Function<AllocineResponse, Movie>() {
                                @Override
                                public Movie apply(AllocineResponse allocineResponse) throws Exception {
                                    return allocineResponse.getMovie();
                                }
                            });
                }
            })
            .compose(this.<Movie>retry());
}
 
源代码21 项目: Android-Allocine-Api   文件: AllocineApi.java
/**
 * Informations sur un film
 */
public Single<Theater> theater(final String idCinema, final String profile, final String filter) {
    return Single
            .create(new SingleOnSubscribe<Pair<String, String>>() {
                @Override
                public void subscribe(SingleEmitter<Pair<String, String>> e) throws Exception {
                    final String params = ServiceSecurity.construireParams(false,
                            AllocineService.CODE, idCinema,
                            AllocineService.PROFILE, profile,
                            AllocineService.FILTER, filter
                    );

                    final String sed = ServiceSecurity.getSED();
                    final String sig = ServiceSecurity.getSIG(params, sed);

                    e.onSuccess(Pair.create(sed, sig));
                }
            })
            .flatMap(new Function<Pair<String, String>, SingleSource<? extends Theater>>() {
                @Override
                public SingleSource<? extends Theater> apply(Pair<String, String> pair) throws Exception {
                    return allocineService.theater(idCinema, profile, filter, pair.first, pair.second)
                            .map(new Function<AllocineResponse, Theater>() {
                                @Override
                                public Theater apply(AllocineResponse allocineResponse) throws Exception {
                                    return allocineResponse.getTheater();
                                }
                            });
                }
            })
            .compose(this.<Theater>retry());
}
 
源代码22 项目: Android-Allocine-Api   文件: AllocineApi.java
/**
 * Informations sur une personne
 */
public Single<PersonFull> person(final String idPerson, final String profile, final String filter) {
    return Single
            .create(new SingleOnSubscribe<Pair<String, String>>() {
                @Override
                public void subscribe(SingleEmitter<Pair<String, String>> e) throws Exception {
                    final String params = ServiceSecurity.construireParams(false,
                            AllocineService.CODE, idPerson,
                            AllocineService.PROFILE, profile,
                            AllocineService.FILTER, filter
                    );

                    final String sed = ServiceSecurity.getSED();
                    final String sig = ServiceSecurity.getSIG(params, sed);

                    e.onSuccess(Pair.create(sed, sig));
                }
            })
            .flatMap(new Function<Pair<String, String>, SingleSource<PersonFull>>() {
                @Override
                public SingleSource<PersonFull> apply(Pair<String, String> pair) throws Exception {
                    return allocineService.person(idPerson, profile, filter, pair.first, pair.second)
                            .map(new Function<AllocineResponse, PersonFull>() {
                                @Override
                                public PersonFull apply(AllocineResponse allocineResponse) throws Exception {
                                    return allocineResponse.getPerson();
                                }
                            });
                }
            })
            .compose(this.<PersonFull>retry());
}
 
源代码23 项目: Android-Allocine-Api   文件: AllocineApi.java
/**
 * Filmographie d'une personne
 */
public Single<List<Participation>> filmography(final String idPerson, final String profile, final String filter) {
    return Single
            .create(new SingleOnSubscribe<Pair<String, String>>() {
                @Override
                public void subscribe(SingleEmitter<Pair<String, String>> e) throws Exception {
                    final String params = ServiceSecurity.construireParams(false,
                            AllocineService.CODE, idPerson,
                            AllocineService.PROFILE, profile,
                            AllocineService.FILTER, filter
                    );

                    final String sed = ServiceSecurity.getSED();
                    final String sig = ServiceSecurity.getSIG(params, sed);

                    e.onSuccess(Pair.create(sed, sig));
                }
            })
            .flatMap(new Function<Pair<String, String>, SingleSource<List<Participation>>>() {
                @Override
                public SingleSource<List<Participation>> apply(Pair<String, String> pair) throws Exception {
                    return allocineService.filmography(idPerson, profile, filter, pair.first, pair.second)
                            .map(new Function<AllocineResponse, List<Participation>>() {
                                @Override
                                public List<Participation> apply(AllocineResponse allocineResponse) throws Exception {
                                    return allocineResponse.getPerson().getParticipation();
                                }
                            });
                }
            })
            .compose(this.<List<Participation>>retry());
}
 
源代码24 项目: Android-Allocine-Api   文件: AllocineApi.java
public Single<List<Movie>> movieList(List<MovieListFilter> filter, final Profile profile, final MovieListOrder order, final int count, final int page) {
    final List<String> filterString = new ArrayList<>();
    for (MovieListFilter movieListFilter : filter) {
        filterString.add(movieListFilter.getValue());
    }

    return Single
            .create(new SingleOnSubscribe<Pair<String, String>>() {
                @Override
                public void subscribe(SingleEmitter<Pair<String, String>> e) throws Exception {
                    final String params = ServiceSecurity.construireParams(true,
                            AllocineService.FILTER, filterString,
                            AllocineService.PROFILE, profile.getValue(),
                            AllocineService.ORDER, order.getValue(),
                            AllocineService.COUNT, "" + count,
                            AllocineService.PAGE, "" + page
                    );

                    final String sed = ServiceSecurity.getSED();
                    final String sig = ServiceSecurity.getSIG(params, sed);

                    e.onSuccess(Pair.create(sed, sig));
                }
            })
            .flatMap(new Function<Pair<String, String>, SingleSource<? extends List<Movie>>>() {
                @Override
                public SingleSource<? extends List<Movie>> apply(Pair<String, String> pair) throws Exception {
                    return allocineService.movieList(ServiceSecurity.applatir(filterString), profile.getValue(), order.getValue(), count, page, pair.first, pair.second)
                            .map(new Function<AllocineResponse, List<Movie>>() {
                                @Override
                                public List<Movie> apply(AllocineResponse allocineResponse) throws Exception {
                                    return allocineResponse.getFeed().getMovie();
                                }
                            });
                }
            })
            .compose(this.<List<Movie>>retry());
}
 
源代码25 项目: Android-Allocine-Api   文件: AllocineApi.java
public Single<List<PersonFull>> starsList(final List<PersonListFilter> filter, final Profile profile, final int count, final int page) {
    final List<String> filterString = new ArrayList<>();
    for (PersonListFilter movieListFilter : filter) {
        filterString.add(movieListFilter.getValue());
    }

    return Single
            .create(new SingleOnSubscribe<Pair<String, String>>() {
                @Override
                public void subscribe(SingleEmitter<Pair<String, String>> e) throws Exception {
                    final String params = ServiceSecurity.construireParams(true,
                            AllocineService.FILTER, filterString,
                            AllocineService.PROFILE, profile.getValue(),
                            AllocineService.COUNT, "" + count,
                            AllocineService.PAGE, "" + page
                    );

                    final String sed = ServiceSecurity.getSED();
                    final String sig = ServiceSecurity.getSIG(params, sed);

                    e.onSuccess(Pair.create(sed, sig));
                }
            })
            .flatMap(new Function<Pair<String, String>, SingleSource<? extends List<PersonFull>>>() {
                @Override
                public SingleSource<? extends List<PersonFull>> apply(Pair<String, String> pair) throws Exception {
                    return allocineService.personList(ServiceSecurity.applatir(filterString), profile.getValue(), count, page, pair.first, pair.second)
                            .map(new Function<AllocineResponse, List<PersonFull>>() {
                                @Override
                                public List<PersonFull> apply(AllocineResponse allocineResponse) throws Exception {
                                    return allocineResponse.getFeed().getPerson();
                                }
                            });
                }
            })
            .compose(this.<List<PersonFull>>retry());
}
 
源代码26 项目: Android-Allocine-Api   文件: AllocineApi.java
public Single<List<Theater>> theaterList(final String zip, final int count, final int page) {
    return Single
            .create(new SingleOnSubscribe<Pair<String, String>>() {
                @Override
                public void subscribe(SingleEmitter<Pair<String, String>> e) throws Exception {
                    final String params = ServiceSecurity.construireParams(false,
                            AllocineService.ZIP, zip,
                            AllocineService.COUNT, "" + count,
                            AllocineService.PAGE, "" + page
                    );

                    final String sed = ServiceSecurity.getSED();
                    final String sig = ServiceSecurity.getSIG(params, sed);

                    e.onSuccess(Pair.create(sed, sig));
                }
            })
            .flatMap(new Function<Pair<String, String>, SingleSource<? extends List<Theater>>>() {
                @Override
                public SingleSource<? extends List<Theater>> apply(Pair<String, String> pair) throws Exception {
                    return allocineService.theaterlist(zip, count, page, pair.first, pair.second)
                            .map(new Function<AllocineResponse, List<Theater>>() {
                                @Override
                                public List<Theater> apply(AllocineResponse allocineResponse) throws Exception {
                                    return allocineResponse.getFeed().getTheater();
                                }
                            });
                }
            })
            .compose(this.<List<Theater>>retry());
}
 
源代码27 项目: Android-Allocine-Api   文件: AllocineApi.java
public Single<List<Theater>> theaterList(final String lat, final String lng, final int radius, final int count, final int page) {
    return Single
            .create(new SingleOnSubscribe<Pair<String, String>>() {
                @Override
                public void subscribe(SingleEmitter<Pair<String, String>> e) throws Exception {
                    final String params = ServiceSecurity.construireParams(false,
                            AllocineService.LAT, lat,
                            AllocineService.LONG, lng,
                            AllocineService.RADIUS, "" + radius,
                            AllocineService.COUNT, "" + count,
                            AllocineService.PAGE, "" + page
                    );

                    final String sed = ServiceSecurity.getSED();
                    final String sig = ServiceSecurity.getSIG(params, sed);


                    e.onSuccess(Pair.create(sed, sig));
                }
            })
            .flatMap(new Function<Pair<String, String>, SingleSource<? extends List<Theater>>>() {
                @Override
                public SingleSource<? extends List<Theater>> apply(Pair<String, String> pair) throws Exception {
                    return allocineService.theaterlist(lat, lng, radius, count, page, pair.first, pair.second)
                            .map(new Function<AllocineResponse, List<Theater>>() {
                                @Override
                                public List<Theater> apply(AllocineResponse allocineResponse) throws Exception {
                                    return allocineResponse.getFeed().getTheater();
                                }
                            });
                }
            })
            .compose(this.<List<Theater>>retry());
}
 
源代码28 项目: Android-Allocine-Api   文件: AllocineApi.java
public Single<List<Media>> videoList(final String code, final int count) {
    final String subject = "movie:" + code;
    final String mediafmt = "mp4";

    return Single
            .create(new SingleOnSubscribe<Pair<String, String>>() {
                @Override
                public void subscribe(SingleEmitter<Pair<String, String>> e) throws Exception {
                    final String params = ServiceSecurity.construireParams(false,
                            AllocineService.SUBJECT, subject,
                            AllocineService.COUNT, "" + count,
                            AllocineService.MEDIAFMT, mediafmt
                    );

                    final String sed = ServiceSecurity.getSED();
                    final String sig = ServiceSecurity.getSIG(params, sed);

                    e.onSuccess(Pair.create(sed, sig));
                }
            })
            .flatMap(new Function<Pair<String, String>, SingleSource<List<Media>>>() {
                @Override
                public SingleSource<List<Media>> apply(Pair<String, String> pair) throws Exception {
                    return allocineService.videoList(subject, count, mediafmt, pair.first, pair.second)
                            .map(new Function<AllocineResponse, List<Media>>() {
                                @Override
                                public List<Media> apply(AllocineResponse allocineResponse) throws Exception {
                                    return allocineResponse.getFeed().getMedia();
                                }
                            });
                }
            })
            .compose(this.<List<Media>>retry());


}
 
源代码29 项目: Android-Allocine-Api   文件: AllocineApi.java
private <T> SingleTransformer<T, T> retry() {
    return new SingleTransformer<T, T>() {
        @Override
        public SingleSource<T> apply(Single<T> upstream) {
            return upstream.retryWhen(new Function<Flowable<Throwable>, Publisher<Object>>() {

                private final int MAX_COUNT = 3;
                private int count = 0;

                private final int DELAY_SECOND = 10;

                @Override
                public Publisher<Object> apply(Flowable<Throwable> throwableFlowable) throws Exception {
                    return throwableFlowable.flatMap(new Function<Throwable, Publisher<?>>() {
                        @Override
                        public Publisher<?> apply(Throwable throwable) throws Exception {
                            if (count++ < MAX_COUNT && throwable instanceof HttpException) {
                                final HttpException httpException = (HttpException) throwable;
                                if (httpException.code() == 403) {
                                    return Flowable.timer(DELAY_SECOND, TimeUnit.SECONDS);
                                }
                            }
                            return Flowable.error(throwable);
                        }
                    });
                }
            });
        }
    };
}
 
源代码30 项目: Open-Mam   文件: AbstractPresenter.java
public <R> SingleTransformer<? super R, ? extends R> compose() {
    return new SingleTransformer<R, R>() {
        @Override
        public SingleSource<R> apply(@NonNull Single<R> upstream) {
            return upstream
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .doOnSubscribe(AbstractPresenter.this::call);
        }
    };
}
 
 类所在包
 同包方法