下面列出了io.reactivex.Single#defer ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
protected Single<VertxResteasyDeployment> setupResteasy(Class<?>... resourceOrProviderClasses) {
return Single.defer(() -> {
// Build the Jax-RS hello world deployment
VertxResteasyDeployment deployment = new VertxResteasyDeployment();
deployment.getDefaultContextObjects().put(Vertx.class, AppGlobals.get().getVertx());
deployment.getDefaultContextObjects().put(AppGlobals.class, AppGlobals.get());
return doOnPlugins(plugin -> plugin.deployToResteasy(deployment)).toSingle(() -> {
for(Class<?> klass : resourceOrProviderClasses) {
if(klass.isAnnotationPresent(Path.class))
deployment.getActualResourceClasses().add(klass);
if(klass.isAnnotationPresent(Provider.class))
deployment.getActualProviderClasses().add(klass);
}
try {
deployment.start();
}catch(ExceptionInInitializerError err) {
// rxjava behaves badly on LinkageError
RedpipeUtil.rethrow(err.getCause());
}
return deployment;
}).doOnError(t -> t.printStackTrace());
});
}
@SuppressWarnings("unchecked")
private Single<TxWithoutValue> build() {
return Single.defer(() -> {
AtomicReference<Connection> con = new AtomicReference<Connection>();
// set the atomic reference when transactedConnection emits
Single<Connection> transactedConnection = b.connection //
.map(c -> Util.toTransactedConnection(con, c));
return Call //
.createWithZeroOutParameters(transactedConnection, b.sql, parameterGroups(), b.params) //
.materialize() //
.filter(x -> !x.isOnNext()) //
.<TxWithoutValue>flatMap(n -> Tx.toTx(n, con.get(), b.db)) //
.doOnNext(tx -> {
if (tx.isComplete()) {
((TxImpl<Object>) tx).connection().commit();
}
}) //
.lastOrError();
});
}
public static Single<Boolean> resetStats() {
return Single.defer((Callable<SingleSource<Boolean>>) () -> {
BriteDatabase db = MimiApplication.getInstance().getBriteDatabase();
BriteDatabase.Transaction transaction = db.newTransaction();
ContentValues values = new ContentValues();
values.put(Board.KEY_ACCESS_COUNT, 0);
values.put(Board.KEY_LAST_ACCESSED, 0);
values.put(Board.KEY_POST_COUNT, 0);
int val = 0;
try {
val = db.update(Board.TABLE_NAME, SQLiteDatabase.CONFLICT_IGNORE, values, null, null);
transaction.markSuccessful();
} catch (Exception e) {
Log.e(LOG_TAG, "Error putting post options into the database", e);
} finally {
transaction.end();
}
return Single.just(val > 0);
});
}
public static <K extends BaseModel> Single<Boolean> updateTable(final K model) {
return Single.defer(() -> {
BriteDatabase db = MimiApplication.getInstance().getBriteDatabase();
BriteDatabase.Transaction transaction = db.newTransaction();
int val = 0;
try {
val = db.update(model.getTableName(), SQLiteDatabase.CONFLICT_REPLACE, model.toContentValues(), model.clause(), model.vals());
transaction.markSuccessful();
} catch (Exception e) {
Log.e(LOG_TAG, "Error putting model " + model.getClass().getSimpleName() + " into the database", e);
} finally {
transaction.end();
}
return Single.just(val > 0);
});
}
@Override
public Object invoke(Object proxy, Method method, @Nullable Object[] args) throws Throwable {
// 根据 https://zhuanlan.zhihu.com/p/40097338 对 Retrofit 进行的优化
if (method.getReturnType() == Observable.class) {
// 如果方法返回值是 Observable 的话,则包一层再返回,
// 只包一层 defer 由外部去控制耗时方法以及网络请求所处线程,
// 如此对原项目的影响为 0,且更可控。
return Observable.defer(() -> {
// 执行真正的 Retrofit 动态代理的方法
return (Observable) method.invoke(getRetrofitService(), args);
});
} else if (method.getReturnType() == Single.class) {
// 如果方法返回值是 Single 的话,则包一层再返回。
return Single.defer(() -> {
// 执行真正的 Retrofit 动态代理的方法
return (Single) method.invoke(getRetrofitService(), args);
});
}
// 返回值不是 Observable 或 Single 的话不处理。
return method.invoke(getRetrofitService(), args);
}
public static <T> Single<T> doInConnection(Func1<? super SQLConnection, ? extends Single<T>> func){
return Single.defer(() -> {
Single<SQLConnection> connection = getConnection();
return connection.flatMap(conn -> {
return func.call(conn).doAfterTerminate(() -> {
conn.close();
});
});
});
}
/**
* Sometimes it happens that the {@link BluetoothGatt} will receive all {@link BluetoothGattService}'s,
* {@link android.bluetooth.BluetoothGattCharacteristic}'s and {@link android.bluetooth.BluetoothGattDescriptor}
* but it won't receive the final callback that the service discovery was completed. This is a potential workaround.
* <p>
* There is a change in Android 7.0.0_r1 where all data is received at once - in this situation returned services size will be always 0
* https://android.googlesource.com/platform/frameworks/base/+/android-7.0.0_r1/core/java/android/bluetooth/BluetoothGatt.java#206
* https://android.googlesource.com/platform/frameworks/base/+/android-6.0.1_r72/core/java/android/bluetooth/BluetoothGatt.java#205
*
* @param bluetoothGatt the BluetoothGatt to use
* @param rxBleGattCallback the RxBleGattCallback to use
* @param timeoutScheduler the Scheduler for timeout to use
* @return Observable that may emit {@link RxBleDeviceServices} or {@link TimeoutException}
*/
@NonNull
@Override
protected Single<RxBleDeviceServices> timeoutFallbackProcedure(
final BluetoothGatt bluetoothGatt,
final RxBleGattCallback rxBleGattCallback,
final Scheduler timeoutScheduler
) {
return Single.defer(new Callable<SingleSource<? extends RxBleDeviceServices>>() {
@Override
public SingleSource<? extends RxBleDeviceServices> call() {
final List<BluetoothGattService> services = bluetoothGatt.getServices();
if (services.size() == 0) {
// if after the timeout services are empty we have no other option to declare a failed discovery
return Single.error(new BleGattCallbackTimeoutException(bluetoothGatt, BleGattOperationType.SERVICE_DISCOVERY));
} else {
/*
it is observed that usually the Android OS is returning services, characteristics and descriptors in a short period of time
if there are some services available we will wait for 5 more seconds just to be sure that
the timeout was not triggered right in the moment of filling the services and then emit a value.
*/
return Single
.timer(5, TimeUnit.SECONDS, timeoutScheduler)
.flatMap(new Function<Long, Single<RxBleDeviceServices>>() {
@Override
public Single<RxBleDeviceServices> apply(Long delayedSeconds) {
return Single.fromCallable(new Callable<RxBleDeviceServices>() {
@Override
public RxBleDeviceServices call() {
return new RxBleDeviceServices(bluetoothGatt.getServices());
}
});
}
});
}
}
});
}
@Override
public Single<Boolean> canReOpenEvent() {
return Single.defer(() -> Single.fromCallable(() -> d2.userModule().authorities()
.byName().in("F_UNCOMPLETE_EVENT", "ALL").one().blockingExists()
));
}
Single<WriteResult> insertBulk(List<ObjectNode> documents) {
return Single.defer(() -> insertBulkInternal(documents));
}