io.reactivex.Single#defer ( )源码实例Demo

下面列出了io.reactivex.Single#defer ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: redpipe   文件: Server.java
protected Single<VertxResteasyDeployment> setupResteasy(Class<?>... resourceOrProviderClasses) {
	return Single.defer(() -> {
		// Build the Jax-RS hello world deployment
		VertxResteasyDeployment deployment = new VertxResteasyDeployment();
		deployment.getDefaultContextObjects().put(Vertx.class, AppGlobals.get().getVertx());
		deployment.getDefaultContextObjects().put(AppGlobals.class, AppGlobals.get());

		return doOnPlugins(plugin -> plugin.deployToResteasy(deployment)).toSingle(() -> {
			for(Class<?> klass : resourceOrProviderClasses) {
				if(klass.isAnnotationPresent(Path.class))
					deployment.getActualResourceClasses().add(klass);
				if(klass.isAnnotationPresent(Provider.class))
					deployment.getActualProviderClasses().add(klass);
			}
			try {
				deployment.start();
			}catch(ExceptionInInitializerError err) {
				// rxjava behaves badly on LinkageError
				RedpipeUtil.rethrow(err.getCause());
			}
			return deployment;
		}).doOnError(t -> t.printStackTrace());
	});
}
 
源代码2 项目: rxjava2-jdbc   文件: TransactedCallableBuilder.java
@SuppressWarnings("unchecked")
private Single<TxWithoutValue> build() {
    return Single.defer(() -> {
        AtomicReference<Connection> con = new AtomicReference<Connection>();
        // set the atomic reference when transactedConnection emits
        Single<Connection> transactedConnection = b.connection //
                .map(c -> Util.toTransactedConnection(con, c));
        return Call //
                .createWithZeroOutParameters(transactedConnection, b.sql, parameterGroups(), b.params) //
                .materialize() //
                .filter(x -> !x.isOnNext()) //
                .<TxWithoutValue>flatMap(n -> Tx.toTx(n, con.get(), b.db)) //
                .doOnNext(tx -> {
                    if (tx.isComplete()) {
                        ((TxImpl<Object>) tx).connection().commit();
                    }
                }) //
                .lastOrError();
    });
}
 
源代码3 项目: mimi-reader   文件: BoardTableConnection.java
public static Single<Boolean> resetStats() {
    return Single.defer((Callable<SingleSource<Boolean>>) () -> {
        BriteDatabase db = MimiApplication.getInstance().getBriteDatabase();
        BriteDatabase.Transaction transaction = db.newTransaction();

        ContentValues values = new ContentValues();

        values.put(Board.KEY_ACCESS_COUNT, 0);
        values.put(Board.KEY_LAST_ACCESSED, 0);
        values.put(Board.KEY_POST_COUNT, 0);

        int val = 0;
        try {
            val = db.update(Board.TABLE_NAME, SQLiteDatabase.CONFLICT_IGNORE, values, null, null);
            transaction.markSuccessful();
        } catch (Exception e) {
            Log.e(LOG_TAG, "Error putting post options into the database", e);
        } finally {
            transaction.end();
        }

        return Single.just(val > 0);
    });

}
 
源代码4 项目: mimi-reader   文件: DatabaseUtils.java
public static <K extends BaseModel> Single<Boolean> updateTable(final K model) {
    return Single.defer(() -> {
        BriteDatabase db = MimiApplication.getInstance().getBriteDatabase();
        BriteDatabase.Transaction transaction = db.newTransaction();
        int val = 0;
        try {
            val = db.update(model.getTableName(), SQLiteDatabase.CONFLICT_REPLACE, model.toContentValues(), model.clause(), model.vals());
            transaction.markSuccessful();
        } catch (Exception e) {
            Log.e(LOG_TAG, "Error putting model " + model.getClass().getSimpleName() + " into the database", e);
        } finally {
            transaction.end();
        }

        return Single.just(val > 0);
    });
}
 
源代码5 项目: MVPArms   文件: RetrofitServiceProxyHandler.java
@Override
public Object invoke(Object proxy, Method method, @Nullable Object[] args) throws Throwable {

    // 根据 https://zhuanlan.zhihu.com/p/40097338 对 Retrofit 进行的优化

    if (method.getReturnType() == Observable.class) {
        // 如果方法返回值是 Observable 的话,则包一层再返回,
        // 只包一层 defer 由外部去控制耗时方法以及网络请求所处线程,
        // 如此对原项目的影响为 0,且更可控。
        return Observable.defer(() -> {
            // 执行真正的 Retrofit 动态代理的方法
            return (Observable) method.invoke(getRetrofitService(), args);
        });
    } else if (method.getReturnType() == Single.class) {
        // 如果方法返回值是 Single 的话,则包一层再返回。
        return Single.defer(() -> {
            // 执行真正的 Retrofit 动态代理的方法
            return (Single) method.invoke(getRetrofitService(), args);
        });
    }

    // 返回值不是 Observable 或 Single 的话不处理。
    return method.invoke(getRetrofitService(), args);
}
 
源代码6 项目: redpipe   文件: SQLUtil.java
public static <T> Single<T> doInConnection(Func1<? super SQLConnection, ? extends Single<T>> func){
	return Single.defer(() -> {
		Single<SQLConnection> connection = getConnection();
		return connection.flatMap(conn -> {
			return func.call(conn).doAfterTerminate(() -> {
				conn.close();
			});
		});
	});
}
 
源代码7 项目: RxAndroidBle   文件: ServiceDiscoveryOperation.java
/**
 * Sometimes it happens that the {@link BluetoothGatt} will receive all {@link BluetoothGattService}'s,
 * {@link android.bluetooth.BluetoothGattCharacteristic}'s and {@link android.bluetooth.BluetoothGattDescriptor}
 * but it won't receive the final callback that the service discovery was completed. This is a potential workaround.
 * <p>
 * There is a change in Android 7.0.0_r1 where all data is received at once - in this situation returned services size will be always 0
 * https://android.googlesource.com/platform/frameworks/base/+/android-7.0.0_r1/core/java/android/bluetooth/BluetoothGatt.java#206
 * https://android.googlesource.com/platform/frameworks/base/+/android-6.0.1_r72/core/java/android/bluetooth/BluetoothGatt.java#205
 *
 * @param bluetoothGatt     the BluetoothGatt to use
 * @param rxBleGattCallback the RxBleGattCallback to use
 * @param timeoutScheduler  the Scheduler for timeout to use
 * @return Observable that may emit {@link RxBleDeviceServices} or {@link TimeoutException}
 */
@NonNull
@Override
protected Single<RxBleDeviceServices> timeoutFallbackProcedure(
        final BluetoothGatt bluetoothGatt,
        final RxBleGattCallback rxBleGattCallback,
        final Scheduler timeoutScheduler
) {
    return Single.defer(new Callable<SingleSource<? extends RxBleDeviceServices>>() {
        @Override
        public SingleSource<? extends RxBleDeviceServices> call() {
            final List<BluetoothGattService> services = bluetoothGatt.getServices();
            if (services.size() == 0) {
                // if after the timeout services are empty we have no other option to declare a failed discovery
                return Single.error(new BleGattCallbackTimeoutException(bluetoothGatt, BleGattOperationType.SERVICE_DISCOVERY));
            } else {
            /*
            it is observed that usually the Android OS is returning services, characteristics and descriptors in a short period of time
            if there are some services available we will wait for 5 more seconds just to be sure that
            the timeout was not triggered right in the moment of filling the services and then emit a value.
             */
                return Single
                        .timer(5, TimeUnit.SECONDS, timeoutScheduler)
                        .flatMap(new Function<Long, Single<RxBleDeviceServices>>() {
                            @Override
                            public Single<RxBleDeviceServices> apply(Long delayedSeconds) {
                                return Single.fromCallable(new Callable<RxBleDeviceServices>() {
                                    @Override
                                    public RxBleDeviceServices call() {
                                        return new RxBleDeviceServices(bluetoothGatt.getServices());
                                    }
                                });
                            }
                        });
            }
        }
    });
}
 
@Override
public Single<Boolean> canReOpenEvent() {
    return Single.defer(() -> Single.fromCallable(() -> d2.userModule().authorities()
            .byName().in("F_UNCOMPLETE_EVENT", "ALL").one().blockingExists()
    ));
}
 
源代码9 项目: immutables   文件: ElasticsearchOps.java
Single<WriteResult> insertBulk(List<ObjectNode> documents) {
  return Single.defer(() -> insertBulkInternal(documents));
}