下面列出了怎么用io.reactivex.Notification的API类实例代码及写法,或者点击链接到github查看源代码。
private static <T> Flowable<Tx<T>> inTransaction(CallableBuilder b,
Function<Single<Connection>, Flowable<Notification<T>>> f) {
return Flowable.defer(() -> {
AtomicReference<Connection> con = new AtomicReference<Connection>();
// set the atomic reference when transactedConnection emits
Single<Connection> transactedConnection = b.connection //
.map(c -> Util.toTransactedConnection(con, c));
return f.apply(transactedConnection) //
.<Tx<T>>flatMap(n -> Tx.toTx(n, con.get(), b.db)) //
.doOnNext(tx -> {
if (tx.isComplete()) {
((TxImpl<T>) tx).connection().commit();
}
});
});
}
static Flowable<Notification<TupleN<Object>>> createWithNParameters( //
Single<Connection> connection, //
String sql, //
Flowable<List<Object>> parameterGroups, //
List<ParameterPlaceholder> parameterPlaceholders, //
List<Class<?>> outClasses) {
return connection //
.toFlowable() //
.flatMap( //
con -> createWithParameters( //
con, //
sql, //
parameterGroups, //
parameterPlaceholders, //
(stmt, parameters) -> createWithNParameters(stmt, parameters, parameterPlaceholders,
outClasses)));
}
private static <T1> Flowable<Notification<CallableResultSet1<T1>>> createWithOneResultSet(Connection con,
String sql, Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders,
Function<? super ResultSet, ? extends T1> f1, int fetchSize) {
log.debug("Update.create {}", sql);
Callable<NamedCallableStatement> resourceFactory = () -> Util.prepareCall(con, sql, parameterPlaceholders);
final Function<NamedCallableStatement, Flowable<Notification<CallableResultSet1<T1>>>> flowableFactory = //
stmt -> parameterGroups //
.flatMap(parameters -> {
List<Object> outputValues = executeAndReturnOutputValues(parameterPlaceholders, stmt,
parameters);
Flowable<T1> flowable1 = createFlowable(stmt, f1);
return Single.just(new CallableResultSet1<T1>(outputValues, flowable1)).toFlowable();
}) //
.materialize() //
.doOnComplete(() -> Util.commit(stmt.stmt)) //
.doOnError(e -> Util.rollback(stmt.stmt));
Consumer<NamedCallableStatement> disposer = Util::closeCallableStatementAndConnection;
return Flowable.using(resourceFactory, flowableFactory, disposer, true);
}
private static <T1, T2> Flowable<Notification<CallableResultSet2<T1, T2>>> createWithTwoResultSets(Connection con,
String sql, Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders,
Function<? super ResultSet, ? extends T1> f1, Function<? super ResultSet, ? extends T2> f2, int fetchSize) {
Callable<NamedCallableStatement> resourceFactory = () -> Util.prepareCall(con, sql, parameterPlaceholders);
final Function<NamedCallableStatement, Flowable<Notification<CallableResultSet2<T1, T2>>>> flowableFactory = //
stmt -> parameterGroups //
.flatMap(parameters -> {
List<Object> outputValues = executeAndReturnOutputValues(parameterPlaceholders, stmt,
parameters);
final Flowable<T1> flowable1 = createFlowable(stmt, f1);
stmt.stmt.getMoreResults(Statement.KEEP_CURRENT_RESULT);
final Flowable<T2> flowable2 = createFlowable(stmt, f2);
return Single.just(new CallableResultSet2<T1, T2>(outputValues, flowable1, flowable2))
.toFlowable();
}) //
.materialize() //
.doOnComplete(() -> Util.commit(stmt.stmt)) //
.doOnError(e -> Util.rollback(stmt.stmt));
Consumer<NamedCallableStatement> disposer = Util::closeCallableStatementAndConnection;
return Flowable.using(resourceFactory, flowableFactory, disposer, true);
}
private static <T1, T2, T3> Flowable<Notification<CallableResultSet3<T1, T2, T3>>> createWithThreeResultSets(
Connection con, String sql, Flowable<List<Object>> parameterGroups,
List<ParameterPlaceholder> parameterPlaceholders, Function<? super ResultSet, ? extends T1> f1,
Function<? super ResultSet, ? extends T2> f2, Function<? super ResultSet, ? extends T3> f3, int fetchSize) {
Callable<NamedCallableStatement> resourceFactory = () -> Util.prepareCall(con, sql, parameterPlaceholders);
final Function<NamedCallableStatement, Flowable<Notification<CallableResultSet3<T1, T2, T3>>>> flowableFactory = //
stmt -> parameterGroups //
.flatMap(parameters -> {
List<Object> outputValues = executeAndReturnOutputValues(parameterPlaceholders, stmt,
parameters);
final Flowable<T1> flowable1 = createFlowable(stmt, f1);
stmt.stmt.getMoreResults(Statement.KEEP_CURRENT_RESULT);
final Flowable<T2> flowable2 = createFlowable(stmt, f2);
stmt.stmt.getMoreResults(Statement.KEEP_CURRENT_RESULT);
final Flowable<T3> flowable3 = createFlowable(stmt, f3);
return Single.just(
new CallableResultSet3<T1, T2, T3>(outputValues, flowable1, flowable2, flowable3))
.toFlowable();
}) //
.materialize() //
.doOnComplete(() -> Util.commit(stmt.stmt)) //
.doOnError(e -> Util.rollback(stmt.stmt));
Consumer<NamedCallableStatement> disposer = Util::closeCallableStatementAndConnection;
return Flowable.using(resourceFactory, flowableFactory, disposer, true);
}
private static Flowable<Notification<CallableResultSetN>> createWithNResultSets(Connection con, String sql,
Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders,
List<Function<? super ResultSet, ?>> functions, int fetchSize) {
Callable<NamedCallableStatement> resourceFactory = () -> Util.prepareCall(con, sql, parameterPlaceholders);
final Function<NamedCallableStatement, Flowable<Notification<CallableResultSetN>>> flowableFactory = //
stmt -> parameterGroups //
.flatMap(parameters -> {
List<Object> outputValues = executeAndReturnOutputValues(parameterPlaceholders, stmt,
parameters);
List<Flowable<?>> flowables = Lists.newArrayList();
int i = 0;
do {
Function<? super ResultSet, ?> f = functions.get(i);
flowables.add(createFlowable(stmt, f));
i++;
} while (stmt.stmt.getMoreResults(Statement.KEEP_CURRENT_RESULT));
return Single.just(new CallableResultSetN(outputValues, flowables)).toFlowable();
}) //
.materialize() //
.doOnComplete(() -> Util.commit(stmt.stmt)) //
.doOnError(e -> Util.rollback(stmt.stmt));
Consumer<NamedCallableStatement> disposer = Util::closeCallableStatementAndConnection;
return Flowable.using(resourceFactory, flowableFactory, disposer, true);
}
@Test public void completeCompletesInner() {
Observable<Message> messages = Observable.just(new Message("Bob", "Hello"));
final AtomicInteger seen = new AtomicInteger();
WindowIfChanged.create(messages, userSelector)
.switchMap(
new Function<GroupedObservable<String, Message>, Observable<Notification<String>>>() {
@Override public Observable<Notification<String>> apply(
GroupedObservable<String, Message> group) {
final int count = seen.incrementAndGet();
return group.map(new Function<Message, String>() {
@Override public String apply(Message message) throws Exception {
return count + " " + message;
}
}).materialize();
}
})
.test()
.assertValues( //
Notification.createOnNext("1 Bob Hello"), //
Notification.<String>createOnComplete()) //
.assertComplete();
}
@Test public void errorCompletesInner() {
RuntimeException error = new RuntimeException("boom!");
Observable<Message> messages = Observable.just( //
Notification.createOnNext(new Message("Bob", "Hello")),
Notification.createOnError(error)
).dematerialize();
final AtomicInteger seen = new AtomicInteger();
WindowIfChanged.create(messages, userSelector)
.switchMap(
new Function<GroupedObservable<String, Message>, Observable<Notification<String>>>() {
@Override public Observable<Notification<String>> apply(
GroupedObservable<String, Message> group) {
final int count = seen.incrementAndGet();
return group.map(new Function<Message, String>() {
@Override public String apply(Message message) throws Exception {
return count + " " + message;
}
}).materialize();
}
})
.test()
.assertValues( //
Notification.createOnNext("1 Bob Hello"), //
Notification.<String>createOnComplete()) //
.assertError(error);
}
@Test
public void testMaterializeObservable() throws InterruptedException {
Observable.just("A", "B", "C", "D", "E", "F")
.materialize()
.subscribe(new Observer<Notification<String>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Notification<String> stringNotification) {
/*
* From the notification object, we can check if the
* emitted item is:
* isOnNext() or isOnError() or isOnComplete()
*
* Here we can basically fetch items that are successful
* & omit items that resulted in error.
*
* */
System.out.println(stringNotification.getValue());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
private Observable<Notification<String>> start(final String msg){
return Observable.create(new ObservableOnSubscribe<Notification<String>>() {
@Override
public void subscribe(final ObservableEmitter<Notification<String>> e) throws Exception {
start_(msg, new CallBack() {
@Override
public void onCompleted(int code) {
e.onNext(Notification.createOnNext(code+""));
e.onComplete();
}
});
}
}).subscribeOn(Schedulers.computation());
}
private Notification<T> takeNotification() {
Notification<T> notification = events.pollFirst();
if (notification == null) {
throw new AssertionError("No event found!");
}
return notification;
}
public T takeValue() {
Notification<T> notification = takeNotification();
assertThat(notification.isOnNext())
.as("Expected onNext event but was " + notification)
.isTrue();
return notification.getValue();
}
public Throwable takeError() {
Notification<T> notification = takeNotification();
assertThat(notification.isOnError())
.as("Expected onError event but was " + notification)
.isTrue();
return notification.getError();
}
public void assertComplete() {
Notification<T> notification = takeNotification();
assertThat(notification.isOnComplete())
.as("Expected onCompleted event but was " + notification)
.isTrue();
assertNoEvents();
}
private Notification<T> takeNotification() {
Notification<T> notification = events.pollFirst();
if (notification == null) {
throw new AssertionError("No event found!");
}
return notification;
}
public T takeValue() {
Notification<T> notification = takeNotification();
assertThat(notification.isOnNext())
.as("Expected onNext event but was " + notification)
.isTrue();
return notification.getValue();
}
public Throwable takeError() {
Notification<T> notification = takeNotification();
assertThat(notification.isOnError())
.as("Expected onError event but was " + notification)
.isTrue();
return notification.getError();
}
public void assertComplete() {
Notification<T> notification = takeNotification();
assertThat(notification.isOnComplete())
.as("Expected onCompleted event but was " + notification)
.isTrue();
assertNoEvents();
}
private Notification<?> takeNotification() {
Notification<?> notification = events.pollFirst();
if (notification == null) {
throw new AssertionError("No event found!");
}
return notification;
}
public Throwable takeError() {
Notification<?> notification = takeNotification();
assertThat(notification.isOnError())
.as("Expected onError event but was " + notification)
.isTrue();
return notification.getError();
}
public void assertComplete() {
Notification<?> notification = takeNotification();
assertThat(notification.isOnComplete())
.as("Expected onCompleted event but was " + notification)
.isTrue();
assertNoEvents();
}
private Notification<T> takeNotification() {
Notification<T> notification = events.pollFirst();
if (notification == null) {
throw new AssertionError("No event found!");
}
return notification;
}
public T takeValue() {
Notification<T> notification = takeNotification();
assertThat(notification.isOnNext())
.as("Expected onNext event but was " + notification)
.isTrue();
return notification.getValue();
}
public Throwable takeError() {
Notification<T> notification = takeNotification();
assertThat(notification.isOnError())
.as("Expected onError event but was " + notification)
.isTrue();
return notification.getError();
}
private Notification<T> takeNotification() {
Notification<T> notification = events.pollFirst();
if (notification == null) {
throw new AssertionError("No event found!");
}
return notification;
}
public T takeValue() {
Notification<T> notification = takeNotification();
assertThat(notification.isOnNext())
.as("Expected onNext event but was " + notification)
.isTrue();
return notification.getValue();
}
public Throwable takeError() {
Notification<T> notification = takeNotification();
assertThat(notification.isOnError())
.as("Expected onError event but was " + notification)
.isTrue();
return notification.getError();
}
static Flowable<Notification<Integer>> create(Single<Connection> connection,
Flowable<List<Object>> parameterGroups, String sql, int batchSize,
boolean eagerDispose, int queryTimeoutSec) {
return connection //
.toFlowable() //
.flatMap(con -> create(con, sql, parameterGroups, batchSize, eagerDispose, queryTimeoutSec), true,
1);
}
private static Flowable<Notification<Integer>> executeFinalBatch(NamedPreparedStatement ps,
Notification<Integer> n, boolean outstandingBatch) throws SQLException {
if (n.isOnComplete() && outstandingBatch) {
log.debug("executing final batch");
return toFlowable(ps.ps.executeBatch()) //
.map(x -> Notification.createOnNext(x)) //
.concatWith(Flowable.just(n));
} else {
return Flowable.just(n);
}
}
static <T1, T2> Flowable<Notification<Tuple2<T1, T2>>> createWithTwoOutParameters(Single<Connection> connection,
String sql, Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders,
Class<T1> cls1, Class<T2> cls2) {
return connection.toFlowable().flatMap(con -> createWithParameters(con, sql, parameterGroups,
parameterPlaceholders,
(stmt, parameters) -> createWithTwoParameters(stmt, parameters, parameterPlaceholders, cls1, cls2)));
}