类io.reactivex.Notification源码实例Demo

下面列出了怎么用io.reactivex.Notification的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: rxjava2-jdbc   文件: TransactedCallableBuilder.java
private static <T> Flowable<Tx<T>> inTransaction(CallableBuilder b,
        Function<Single<Connection>, Flowable<Notification<T>>> f) {
    return Flowable.defer(() -> {
        AtomicReference<Connection> con = new AtomicReference<Connection>();
        // set the atomic reference when transactedConnection emits
        Single<Connection> transactedConnection = b.connection //
                .map(c -> Util.toTransactedConnection(con, c));
        return f.apply(transactedConnection) //
                .<Tx<T>>flatMap(n -> Tx.toTx(n, con.get(), b.db)) //
                .doOnNext(tx -> {
                    if (tx.isComplete()) {
                        ((TxImpl<T>) tx).connection().commit();
                    }
                });
    });
}
 
源代码2 项目: rxjava2-jdbc   文件: Call.java
static Flowable<Notification<TupleN<Object>>> createWithNParameters( //
        Single<Connection> connection, //
        String sql, //
        Flowable<List<Object>> parameterGroups, //
        List<ParameterPlaceholder> parameterPlaceholders, //
        List<Class<?>> outClasses) {
    return connection //
            .toFlowable() //
            .flatMap( //
                    con -> createWithParameters( //
                            con, //
                            sql, //
                            parameterGroups, //
                            parameterPlaceholders, //
                            (stmt, parameters) -> createWithNParameters(stmt, parameters, parameterPlaceholders,
                                    outClasses)));
}
 
源代码3 项目: rxjava2-jdbc   文件: Call.java
private static <T1> Flowable<Notification<CallableResultSet1<T1>>> createWithOneResultSet(Connection con,
        String sql, Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders,
        Function<? super ResultSet, ? extends T1> f1, int fetchSize) {
    log.debug("Update.create {}", sql);
    Callable<NamedCallableStatement> resourceFactory = () -> Util.prepareCall(con, sql, parameterPlaceholders);
    final Function<NamedCallableStatement, Flowable<Notification<CallableResultSet1<T1>>>> flowableFactory = //
            stmt -> parameterGroups //
                    .flatMap(parameters -> {
                        List<Object> outputValues = executeAndReturnOutputValues(parameterPlaceholders, stmt,
                                parameters);
                        Flowable<T1> flowable1 = createFlowable(stmt, f1);
                        return Single.just(new CallableResultSet1<T1>(outputValues, flowable1)).toFlowable();
                    }) //
                    .materialize() //
                    .doOnComplete(() -> Util.commit(stmt.stmt)) //
                    .doOnError(e -> Util.rollback(stmt.stmt));
    Consumer<NamedCallableStatement> disposer = Util::closeCallableStatementAndConnection;
    return Flowable.using(resourceFactory, flowableFactory, disposer, true);
}
 
源代码4 项目: rxjava2-jdbc   文件: Call.java
private static <T1, T2> Flowable<Notification<CallableResultSet2<T1, T2>>> createWithTwoResultSets(Connection con,
        String sql, Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders,
        Function<? super ResultSet, ? extends T1> f1, Function<? super ResultSet, ? extends T2> f2, int fetchSize) {
    Callable<NamedCallableStatement> resourceFactory = () -> Util.prepareCall(con, sql, parameterPlaceholders);
    final Function<NamedCallableStatement, Flowable<Notification<CallableResultSet2<T1, T2>>>> flowableFactory = //
            stmt -> parameterGroups //
                    .flatMap(parameters -> {
                        List<Object> outputValues = executeAndReturnOutputValues(parameterPlaceholders, stmt,
                                parameters);
                        final Flowable<T1> flowable1 = createFlowable(stmt, f1);
                        stmt.stmt.getMoreResults(Statement.KEEP_CURRENT_RESULT);
                        final Flowable<T2> flowable2 = createFlowable(stmt, f2);
                        return Single.just(new CallableResultSet2<T1, T2>(outputValues, flowable1, flowable2))
                                .toFlowable();
                    }) //
                    .materialize() //
                    .doOnComplete(() -> Util.commit(stmt.stmt)) //
                    .doOnError(e -> Util.rollback(stmt.stmt));
    Consumer<NamedCallableStatement> disposer = Util::closeCallableStatementAndConnection;
    return Flowable.using(resourceFactory, flowableFactory, disposer, true);
}
 
源代码5 项目: rxjava2-jdbc   文件: Call.java
private static <T1, T2, T3> Flowable<Notification<CallableResultSet3<T1, T2, T3>>> createWithThreeResultSets(
        Connection con, String sql, Flowable<List<Object>> parameterGroups,
        List<ParameterPlaceholder> parameterPlaceholders, Function<? super ResultSet, ? extends T1> f1,
        Function<? super ResultSet, ? extends T2> f2, Function<? super ResultSet, ? extends T3> f3, int fetchSize) {
    Callable<NamedCallableStatement> resourceFactory = () -> Util.prepareCall(con, sql, parameterPlaceholders);
    final Function<NamedCallableStatement, Flowable<Notification<CallableResultSet3<T1, T2, T3>>>> flowableFactory = //
            stmt -> parameterGroups //
                    .flatMap(parameters -> {
                        List<Object> outputValues = executeAndReturnOutputValues(parameterPlaceholders, stmt,
                                parameters);
                        final Flowable<T1> flowable1 = createFlowable(stmt, f1);
                        stmt.stmt.getMoreResults(Statement.KEEP_CURRENT_RESULT);
                        final Flowable<T2> flowable2 = createFlowable(stmt, f2);
                        stmt.stmt.getMoreResults(Statement.KEEP_CURRENT_RESULT);
                        final Flowable<T3> flowable3 = createFlowable(stmt, f3);
                        return Single.just(
                                new CallableResultSet3<T1, T2, T3>(outputValues, flowable1, flowable2, flowable3))
                                .toFlowable();
                    }) //
                    .materialize() //
                    .doOnComplete(() -> Util.commit(stmt.stmt)) //
                    .doOnError(e -> Util.rollback(stmt.stmt));
    Consumer<NamedCallableStatement> disposer = Util::closeCallableStatementAndConnection;
    return Flowable.using(resourceFactory, flowableFactory, disposer, true);
}
 
源代码6 项目: rxjava2-jdbc   文件: Call.java
private static Flowable<Notification<CallableResultSetN>> createWithNResultSets(Connection con, String sql,
        Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders,
        List<Function<? super ResultSet, ?>> functions, int fetchSize) {
    Callable<NamedCallableStatement> resourceFactory = () -> Util.prepareCall(con, sql, parameterPlaceholders);
    final Function<NamedCallableStatement, Flowable<Notification<CallableResultSetN>>> flowableFactory = //
            stmt -> parameterGroups //
                    .flatMap(parameters -> {
                        List<Object> outputValues = executeAndReturnOutputValues(parameterPlaceholders, stmt,
                                parameters);
                        List<Flowable<?>> flowables = Lists.newArrayList();
                        int i = 0;
                        do {
                            Function<? super ResultSet, ?> f = functions.get(i);
                            flowables.add(createFlowable(stmt, f));
                            i++;
                        } while (stmt.stmt.getMoreResults(Statement.KEEP_CURRENT_RESULT));
                        return Single.just(new CallableResultSetN(outputValues, flowables)).toFlowable();
                    }) //
                    .materialize() //
                    .doOnComplete(() -> Util.commit(stmt.stmt)) //
                    .doOnError(e -> Util.rollback(stmt.stmt));
    Consumer<NamedCallableStatement> disposer = Util::closeCallableStatementAndConnection;
    return Flowable.using(resourceFactory, flowableFactory, disposer, true);
}
 
源代码7 项目: RxWindowIfChanged   文件: WindowIfChangedTest.java
@Test public void completeCompletesInner() {
  Observable<Message> messages = Observable.just(new Message("Bob", "Hello"));
  final AtomicInteger seen = new AtomicInteger();
  WindowIfChanged.create(messages, userSelector)
      .switchMap(
          new Function<GroupedObservable<String, Message>, Observable<Notification<String>>>() {
            @Override public Observable<Notification<String>> apply(
                GroupedObservable<String, Message> group) {
              final int count = seen.incrementAndGet();
              return group.map(new Function<Message, String>() {
                @Override public String apply(Message message) throws Exception {
                  return count + " " + message;
                }
              }).materialize();
            }
          })
      .test()
      .assertValues( //
          Notification.createOnNext("1 Bob Hello"), //
          Notification.<String>createOnComplete()) //
      .assertComplete();
}
 
源代码8 项目: RxWindowIfChanged   文件: WindowIfChangedTest.java
@Test public void errorCompletesInner() {
  RuntimeException error = new RuntimeException("boom!");
  Observable<Message> messages = Observable.just( //
      Notification.createOnNext(new Message("Bob", "Hello")),
      Notification.createOnError(error)
  ).dematerialize();
  final AtomicInteger seen = new AtomicInteger();
  WindowIfChanged.create(messages, userSelector)
      .switchMap(
          new Function<GroupedObservable<String, Message>, Observable<Notification<String>>>() {
            @Override public Observable<Notification<String>> apply(
                GroupedObservable<String, Message> group) {
              final int count = seen.incrementAndGet();
              return group.map(new Function<Message, String>() {
                @Override public String apply(Message message) throws Exception {
                  return count + " " + message;
                }
              }).materialize();
            }
          })
      .test()
      .assertValues( //
          Notification.createOnNext("1 Bob Hello"), //
          Notification.<String>createOnComplete()) //
      .assertError(error);
}
 
源代码9 项目: RxAndroid-Sample   文件: ExampleUnitTest.java
@Test
public void testMaterializeObservable() throws InterruptedException {


    Observable.just("A", "B", "C", "D", "E", "F")
            .materialize()
            .subscribe(new Observer<Notification<String>>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(Notification<String> stringNotification) {
                    /*
                     * From the notification object, we can check if the
                     * emitted item is:
                     * isOnNext() or isOnError() or isOnComplete()
                     *
                     * Here we can basically fetch items that are successful
                     * & omit items that resulted in error.
                     *
                     *  */
                    System.out.println(stringNotification.getValue());
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {

                }
            });
}
 
源代码10 项目: AssistantBySDK   文件: ExampleUnitTest.java
private Observable<Notification<String>> start(final String msg){

        return Observable.create(new ObservableOnSubscribe<Notification<String>>() {
            @Override
            public void subscribe(final ObservableEmitter<Notification<String>> e) throws Exception {
                start_(msg, new CallBack() {
                    @Override
                    public void onCompleted(int code) {
                        e.onNext(Notification.createOnNext(code+""));
                        e.onComplete();
                    }
                });
            }
        }).subscribeOn(Schedulers.computation());
    }
 
源代码11 项目: retrocache   文件: RecordingObserver.java
private Notification<T> takeNotification() {
    Notification<T> notification = events.pollFirst();
    if (notification == null) {
        throw new AssertionError("No event found!");
    }
    return notification;
}
 
源代码12 项目: retrocache   文件: RecordingObserver.java
public T takeValue() {
    Notification<T> notification = takeNotification();
    assertThat(notification.isOnNext())
            .as("Expected onNext event but was " + notification)
            .isTrue();
    return notification.getValue();
}
 
源代码13 项目: retrocache   文件: RecordingObserver.java
public Throwable takeError() {
    Notification<T> notification = takeNotification();
    assertThat(notification.isOnError())
            .as("Expected onError event but was " + notification)
            .isTrue();
    return notification.getError();
}
 
源代码14 项目: retrocache   文件: RecordingObserver.java
public void assertComplete() {
    Notification<T> notification = takeNotification();
    assertThat(notification.isOnComplete())
            .as("Expected onCompleted event but was " + notification)
            .isTrue();
    assertNoEvents();
}
 
源代码15 项目: retrocache   文件: RecordingSubscriber.java
private Notification<T> takeNotification() {
    Notification<T> notification = events.pollFirst();
    if (notification == null) {
        throw new AssertionError("No event found!");
    }
    return notification;
}
 
源代码16 项目: retrocache   文件: RecordingSubscriber.java
public T takeValue() {
    Notification<T> notification = takeNotification();
    assertThat(notification.isOnNext())
            .as("Expected onNext event but was " + notification)
            .isTrue();
    return notification.getValue();
}
 
源代码17 项目: retrocache   文件: RecordingSubscriber.java
public Throwable takeError() {
    Notification<T> notification = takeNotification();
    assertThat(notification.isOnError())
            .as("Expected onError event but was " + notification)
            .isTrue();
    return notification.getError();
}
 
源代码18 项目: retrocache   文件: RecordingSubscriber.java
public void assertComplete() {
    Notification<T> notification = takeNotification();
    assertThat(notification.isOnComplete())
            .as("Expected onCompleted event but was " + notification)
            .isTrue();
    assertNoEvents();
}
 
源代码19 项目: retrocache   文件: RecordingCompletableObserver.java
private Notification<?> takeNotification() {
    Notification<?> notification = events.pollFirst();
    if (notification == null) {
        throw new AssertionError("No event found!");
    }
    return notification;
}
 
源代码20 项目: retrocache   文件: RecordingCompletableObserver.java
public Throwable takeError() {
    Notification<?> notification = takeNotification();
    assertThat(notification.isOnError())
            .as("Expected onError event but was " + notification)
            .isTrue();
    return notification.getError();
}
 
源代码21 项目: retrocache   文件: RecordingCompletableObserver.java
public void assertComplete() {
    Notification<?> notification = takeNotification();
    assertThat(notification.isOnComplete())
            .as("Expected onCompleted event but was " + notification)
            .isTrue();
    assertNoEvents();
}
 
源代码22 项目: retrocache   文件: RecordingSingleObserver.java
private Notification<T> takeNotification() {
    Notification<T> notification = events.pollFirst();
    if (notification == null) {
        throw new AssertionError("No event found!");
    }
    return notification;
}
 
源代码23 项目: retrocache   文件: RecordingSingleObserver.java
public T takeValue() {
    Notification<T> notification = takeNotification();
    assertThat(notification.isOnNext())
            .as("Expected onNext event but was " + notification)
            .isTrue();
    return notification.getValue();
}
 
源代码24 项目: retrocache   文件: RecordingSingleObserver.java
public Throwable takeError() {
    Notification<T> notification = takeNotification();
    assertThat(notification.isOnError())
            .as("Expected onError event but was " + notification)
            .isTrue();
    return notification.getError();
}
 
源代码25 项目: retrocache   文件: RecordingMaybeObserver.java
private Notification<T> takeNotification() {
    Notification<T> notification = events.pollFirst();
    if (notification == null) {
        throw new AssertionError("No event found!");
    }
    return notification;
}
 
源代码26 项目: retrocache   文件: RecordingMaybeObserver.java
public T takeValue() {
    Notification<T> notification = takeNotification();
    assertThat(notification.isOnNext())
            .as("Expected onNext event but was " + notification)
            .isTrue();
    return notification.getValue();
}
 
源代码27 项目: retrocache   文件: RecordingMaybeObserver.java
public Throwable takeError() {
    Notification<T> notification = takeNotification();
    assertThat(notification.isOnError())
            .as("Expected onError event but was " + notification)
            .isTrue();
    return notification.getError();
}
 
源代码28 项目: rxjava2-jdbc   文件: Update.java
static Flowable<Notification<Integer>> create(Single<Connection> connection,
                                              Flowable<List<Object>> parameterGroups, String sql, int batchSize,
                                              boolean eagerDispose, int queryTimeoutSec) {
    return connection //
            .toFlowable() //
            .flatMap(con -> create(con, sql, parameterGroups, batchSize, eagerDispose, queryTimeoutSec), true,
                    1);
}
 
源代码29 项目: rxjava2-jdbc   文件: Update.java
private static Flowable<Notification<Integer>> executeFinalBatch(NamedPreparedStatement ps,
        Notification<Integer> n, boolean outstandingBatch) throws SQLException {
    if (n.isOnComplete() && outstandingBatch) {
        log.debug("executing final batch");
        return toFlowable(ps.ps.executeBatch()) //
                .map(x -> Notification.createOnNext(x)) //
                .concatWith(Flowable.just(n));
    } else {
        return Flowable.just(n);
    }
}
 
源代码30 项目: rxjava2-jdbc   文件: Call.java
static <T1, T2> Flowable<Notification<Tuple2<T1, T2>>> createWithTwoOutParameters(Single<Connection> connection,
        String sql, Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders,
        Class<T1> cls1, Class<T2> cls2) {
    return connection.toFlowable().flatMap(con -> createWithParameters(con, sql, parameterGroups,
            parameterPlaceholders,
            (stmt, parameters) -> createWithTwoParameters(stmt, parameters, parameterPlaceholders, cls1, cls2)));
}
 
 类所在包
 类方法
 同包方法