io.reactivex.rxjava3.functions.Function#io.reactivex.rxjava3.core.Observer源码实例Demo

下面列出了io.reactivex.rxjava3.functions.Function#io.reactivex.rxjava3.core.Observer 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: extentreports-java   文件: JsonFormatter.java
@Override
public Observer<ReportEntity> getReportObserver() {
    return new Observer<ReportEntity>() {
        @Override
        public void onSubscribe(Disposable d) {
        }

        @Override
        public void onNext(ReportEntity value) {
            flush(value);
        }

        @Override
        public void onError(Throwable e) {
        }

        @Override
        public void onComplete() {
        }
    };
}
 
源代码2 项目: extentreports-java   文件: ExtentSparkReporter.java
@Override
public Observer<ReportEntity> getReportObserver() {
    return new Observer<ReportEntity>() {
        @Override
        public void onSubscribe(Disposable d) {
            start(d);
        }

        @Override
        public void onNext(ReportEntity value) {
            flush(value);
        }

        @Override
        public void onError(Throwable e) {
        }

        @Override
        public void onComplete() {
        }
    };
}
 
@Override
public Observer<ReportEntity> getReportObserver() {
    return new Observer<ReportEntity>() {
        @Override
        public void onSubscribe(Disposable d) {
            disp = d;
        }

        @Override
        public void onNext(ReportEntity value) {
            entity = value;
        }

        @Override
        public void onError(Throwable e) {
        }

        @Override
        public void onComplete() {
        }
    };
}
 
源代码4 项目: java-specialagent   文件: RxJava3Test.java
private static <T>Observer<T> observer(final String name, final List<T> result) {
  return new Observer<T>() {
    @Override
    public void onSubscribe(final Disposable d) {
    }

    @Override
    public void onNext(final T next) {
      logger.fine(name + ": " + next);
      result.add(next);
    }

    @Override
    public void onError(final Throwable e) {
      e.printStackTrace();
    }

    @Override
    public void onComplete() {
      logger.fine(name + ": onComplete");
    }
  };
}
 
源代码5 项目: RxRelay   文件: BehaviorRelayTest.java
@Test
public void testThatSubscriberReceivesDefaultValueAndSubsequentEvents() {
    BehaviorRelay<String> subject = BehaviorRelay.createDefault("default");

    Observer<String> observer = TestHelper.mockObserver();
    subject.subscribe(observer);

    subject.accept("one");
    subject.accept("two");
    subject.accept("three");

    verify(observer, times(1)).onNext("default");
    verify(observer, times(1)).onNext("one");
    verify(observer, times(1)).onNext("two");
    verify(observer, times(1)).onNext("three");
    verify(observer, Mockito.never()).onError(any(Throwable.class));
    verify(observer, Mockito.never()).onComplete();
}
 
源代码6 项目: RxRelay   文件: BehaviorRelayTest.java
@Test
public void testThatSubscriberReceivesLatestAndThenSubsequentEvents() {
    BehaviorRelay<String> subject = BehaviorRelay.createDefault("default");

    subject.accept("one");

    Observer<String> observer = TestHelper.mockObserver();
    subject.subscribe(observer);

    subject.accept("two");
    subject.accept("three");

    verify(observer, Mockito.never()).onNext("default");
    verify(observer, times(1)).onNext("one");
    verify(observer, times(1)).onNext("two");
    verify(observer, times(1)).onNext("three");
    verify(observer, Mockito.never()).onError(any(Throwable.class));
    verify(observer, Mockito.never()).onComplete();
}
 
源代码7 项目: RxRelay   文件: BehaviorRelayTest.java
@Test
public void testThatSubscriberReceivesDefaultValueAndSubsequentEvents() {
    BehaviorRelay<String> subject = BehaviorRelay.createDefault("default");

    Observer<String> observer = TestHelper.mockObserver();
    subject.subscribe(observer);

    subject.accept("one");
    subject.accept("two");
    subject.accept("three");

    verify(observer, times(1)).onNext("default");
    verify(observer, times(1)).onNext("one");
    verify(observer, times(1)).onNext("two");
    verify(observer, times(1)).onNext("three");
    verify(observer, Mockito.never()).onError(any(Throwable.class));
    verify(observer, Mockito.never()).onComplete();
}
 
源代码8 项目: RxRelay   文件: BehaviorRelayTest.java
@Test
public void testThatSubscriberReceivesLatestAndThenSubsequentEvents() {
    BehaviorRelay<String> subject = BehaviorRelay.createDefault("default");

    subject.accept("one");

    Observer<String> observer = TestHelper.mockObserver();
    subject.subscribe(observer);

    subject.accept("two");
    subject.accept("three");

    verify(observer, Mockito.never()).onNext("default");
    verify(observer, times(1)).onNext("one");
    verify(observer, times(1)).onNext("two");
    verify(observer, times(1)).onNext("three");
    verify(observer, Mockito.never()).onError(any(Throwable.class));
    verify(observer, Mockito.never()).onComplete();
}
 
源代码9 项目: java-specialagent   文件: RxJava3AgentIntercept.java
@SuppressWarnings("unchecked")
public static Object enter(final Object thiz, final int argc, final Object arg0, final Object arg1, final Object arg2) {
  if (arg0 == null || arg0.getClass().getName().startsWith("io.reactivex.rxjava3.internal") || arg0 instanceof TracingConsumer)
    return NULL;

  if (!isTracingEnabled) {
    isTracingEnabled = true;
    TracingRxJava3Utils.enableTracing();
  }

  if (arg0 instanceof Observer)
    return new TracingObserver<>((Observer<?>)arg0, "observer", GlobalTracer.get());

  if (!(arg0 instanceof Consumer))
    return NULL;

  final TracingConsumer<Object> tracingConsumer;
  if (argc == 1)
    tracingConsumer = new TracingConsumer<>((Consumer<Object>)arg0, "consumer", GlobalTracer.get());
  else if (argc == 2)
    tracingConsumer = new TracingConsumer<>((Consumer<Object>)arg0, (Consumer<Throwable>)arg1, "consumer", GlobalTracer.get());
  else if (argc == 3)
    tracingConsumer = new TracingConsumer<>((Consumer<Object>)arg0, (Consumer<Throwable>)arg1, (Action)arg2, "consumer", GlobalTracer.get());
  else
    tracingConsumer = null;

  if (tracingConsumer != null)
    ((Observable<Object>)thiz).subscribe(tracingConsumer);

  return null;
}
 
源代码10 项目: RxRelay   文件: BehaviorRelay.java
@Override
protected void subscribeActual(Observer<? super T> observer) {
    BehaviorDisposable<T> bs = new BehaviorDisposable<T>(observer, this);
    observer.onSubscribe(bs);
    add(bs);
    if (bs.cancelled) {
        remove(bs);
    } else {
        bs.emitFirst();
    }
}
 
源代码11 项目: RxRelay   文件: ReplayRelay.java
@Override
protected void subscribeActual(Observer<? super T> observer) {
    ReplayDisposable<T> rs = new ReplayDisposable<T>(observer, this);
    observer.onSubscribe(rs);

    if (!rs.cancelled) {
        if (add(rs)) {
            if (rs.cancelled) {
                remove(rs);
                return;
            }
        }
        buffer.replay(rs);
    }
}
 
源代码12 项目: RxRelay   文件: PublishRelay.java
@Override
protected void subscribeActual(Observer<? super T> t) {
    PublishDisposable<T> ps = new PublishDisposable<T>(t, this);
    t.onSubscribe(ps);
    add(ps);
    // if cancellation happened while a successful add, the remove() didn't work
    // so we need to do it again
    if (ps.isDisposed()) {
        remove(ps);
    }
}
 
源代码13 项目: RxRelay   文件: BehaviorRelayTest.java
@Test(timeout = 1000)
public void testUnsubscriptionCase() {
    BehaviorRelay<String> src = BehaviorRelay.createDefault("null"); // FIXME was plain null which is not allowed

    for (int i = 0; i < 10; i++) {
        final Observer<Object> o = TestHelper.mockObserver();
        InOrder inOrder = inOrder(o);
        String v = "" + i;
        src.accept(v);
        System.out.printf("Turn: %d%n", i);
        src.firstElement()
            .toObservable()
            .flatMap(new Function<String, Observable<String>>() {

                @Override
                public Observable<String> apply(String t1) {
                    return Observable.just(t1 + ", " + t1);
                }
            })
            .subscribe(new DefaultObserver<String>() {
                @Override
                public void onNext(String t) {
                    o.onNext(t);
                }

                @Override
                public void onError(Throwable e) {
                    o.onError(e);
                }

                @Override
                public void onComplete() {
                    o.onComplete();
                }
            });
        inOrder.verify(o).onNext(v + ", " + v);
        inOrder.verify(o).onComplete();
        verify(o, never()).onError(any(Throwable.class));
    }
}
 
源代码14 项目: RxRelay   文件: BehaviorRelayTest.java
@Test
public void testTakeOneSubscriber() {
    BehaviorRelay<Integer> source = BehaviorRelay.createDefault(1);
    final Observer<Object> o = TestHelper.mockObserver();

    source.take(1).subscribe(o);

    verify(o).onNext(1);
    verify(o).onComplete();
    verify(o, never()).onError(any(Throwable.class));

    assertEquals(0, source.subscriberCount());
    assertFalse(source.hasObservers());
}
 
源代码15 项目: RxRelay   文件: BehaviorRelayTest.java
@Test
public void innerDisposed() {
    BehaviorRelay.create()
    .subscribe(new Observer<Object>() {
        @Override
        public void onSubscribe(Disposable d) {
            assertFalse(d.isDisposed());

            d.dispose();

            assertTrue(d.isDisposed());
        }

        @Override
        public void onNext(Object value) {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onComplete() {

        }
    });
}
 
源代码16 项目: RxRelay   文件: PublishRelayTest.java
/**
 * Should be able to unsubscribe all Subscribers, have it stop emitting, then subscribe new ones and it start emitting again.
 */
@Test
public void testReSubscribe() {
    final PublishRelay<Integer> ps = PublishRelay.create();

    Observer<Integer> o1 = TestHelper.mockObserver();
    TestObserver<Integer> ts = new TestObserver<Integer>(o1);
    ps.subscribe(ts);

    // emit
    ps.accept(1);

    // validate we got it
    InOrder inOrder1 = inOrder(o1);
    inOrder1.verify(o1, times(1)).onNext(1);
    inOrder1.verifyNoMoreInteractions();

    // unsubscribe
    ts.dispose();

    // emit again but nothing will be there to receive it
    ps.accept(2);

    Observer<Integer> o2 = TestHelper.mockObserver();
    TestObserver<Integer> ts2 = new TestObserver<Integer>(o2);
    ps.subscribe(ts2);

    // emit
    ps.accept(3);

    // validate we got it
    InOrder inOrder2 = inOrder(o2);
    inOrder2.verify(o2, times(1)).onNext(3);
    inOrder2.verifyNoMoreInteractions();

    ts2.dispose();
}
 
源代码17 项目: RxRelay   文件: BehaviorRelay.java
@Override
protected void subscribeActual(Observer<? super T> observer) {
    BehaviorDisposable<T> bs = new BehaviorDisposable<T>(observer, this);
    observer.onSubscribe(bs);
    add(bs);
    if (bs.cancelled) {
        remove(bs);
    } else {
        bs.emitFirst();
    }
}
 
源代码18 项目: RxRelay   文件: ReplayRelay.java
@Override
protected void subscribeActual(Observer<? super T> observer) {
    ReplayDisposable<T> rs = new ReplayDisposable<T>(observer, this);
    observer.onSubscribe(rs);

    if (!rs.cancelled) {
        if (add(rs)) {
            if (rs.cancelled) {
                remove(rs);
                return;
            }
        }
        buffer.replay(rs);
    }
}
 
源代码19 项目: RxRelay   文件: PublishRelay.java
@Override
protected void subscribeActual(Observer<? super T> t) {
    PublishDisposable<T> ps = new PublishDisposable<T>(t, this);
    t.onSubscribe(ps);
    add(ps);
    // if cancellation happened while a successful add, the remove() didn't work
    // so we need to do it again
    if (ps.isDisposed()) {
        remove(ps);
    }
}
 
源代码20 项目: RxRelay   文件: BehaviorRelayTest.java
@Test(timeout = 1000)
public void testUnsubscriptionCase() {
    BehaviorRelay<String> src = BehaviorRelay.createDefault("null"); // FIXME was plain null which is not allowed

    for (int i = 0; i < 10; i++) {
        final Observer<Object> o = TestHelper.mockObserver();
        InOrder inOrder = inOrder(o);
        String v = "" + i;
        src.accept(v);
        System.out.printf("Turn: %d%n", i);
        src.firstElement()
            .toObservable()
            .flatMap(new Function<String, Observable<String>>() {

                @Override
                public Observable<String> apply(String t1) {
                    return Observable.just(t1 + ", " + t1);
                }
            })
            .subscribe(new DefaultObserver<String>() {
                @Override
                public void onNext(String t) {
                    o.onNext(t);
                }

                @Override
                public void onError(Throwable e) {
                    o.onError(e);
                }

                @Override
                public void onComplete() {
                    o.onComplete();
                }
            });
        inOrder.verify(o).onNext(v + ", " + v);
        inOrder.verify(o).onComplete();
        verify(o, never()).onError(any(Throwable.class));
    }
}
 
源代码21 项目: RxRelay   文件: BehaviorRelayTest.java
@Test
public void testTakeOneSubscriber() {
    BehaviorRelay<Integer> source = BehaviorRelay.createDefault(1);
    final Observer<Object> o = TestHelper.mockObserver();

    source.take(1).subscribe(o);

    verify(o).onNext(1);
    verify(o).onComplete();
    verify(o, never()).onError(any(Throwable.class));

    assertEquals(0, source.subscriberCount());
    assertFalse(source.hasObservers());
}
 
源代码22 项目: RxRelay   文件: BehaviorRelayTest.java
@Test
public void innerDisposed() {
    BehaviorRelay.create()
    .subscribe(new Observer<Object>() {
        @Override
        public void onSubscribe(Disposable d) {
            assertFalse(d.isDisposed());

            d.dispose();

            assertTrue(d.isDisposed());
        }

        @Override
        public void onNext(Object value) {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onComplete() {

        }
    });
}
 
源代码23 项目: RxRelay   文件: PublishRelayTest.java
/**
 * Should be able to unsubscribe all Subscribers, have it stop emitting, then subscribe new ones and it start emitting again.
 */
@Test
public void testReSubscribe() {
    final PublishRelay<Integer> ps = PublishRelay.create();

    Observer<Integer> o1 = TestHelper.mockObserver();
    TestObserver<Integer> ts = new TestObserver<Integer>(o1);
    ps.subscribe(ts);

    // emit
    ps.accept(1);

    // validate we got it
    InOrder inOrder1 = inOrder(o1);
    inOrder1.verify(o1, times(1)).onNext(1);
    inOrder1.verifyNoMoreInteractions();

    // unsubscribe
    ts.dispose();

    // emit again but nothing will be there to receive it
    ps.accept(2);

    Observer<Integer> o2 = TestHelper.mockObserver();
    TestObserver<Integer> ts2 = new TestObserver<Integer>(o2);
    ps.subscribe(ts2);

    // emit
    ps.accept(3);

    // validate we got it
    InOrder inOrder2 = inOrder(o2);
    inOrder2.verify(o2, times(1)).onNext(3);
    inOrder2.verifyNoMoreInteractions();

    ts2.dispose();
}
 
源代码24 项目: resilience4j   文件: ObserverCircuitBreaker.java
@Override
protected void subscribeActual(Observer<? super T> downstream) {
    if (circuitBreaker.tryAcquirePermission()) {
        upstream.subscribe(new CircuitBreakerObserver(downstream));
    } else {
        downstream.onSubscribe(EmptyDisposable.INSTANCE);
        downstream.onError(createCallNotPermittedException(circuitBreaker));
    }
}
 
源代码25 项目: resilience4j   文件: ObserverBulkhead.java
@Override
protected void subscribeActual(Observer<? super T> downstream) {
    if (bulkhead.tryAcquirePermission()) {
        upstream.subscribe(new BulkheadObserver(downstream));
    } else {
        downstream.onSubscribe(EmptyDisposable.INSTANCE);
        downstream.onError(BulkheadFullException.createBulkheadFullException(bulkhead));
    }
}
 
源代码26 项目: resilience4j   文件: ObserverRateLimiter.java
@Override
protected void subscribeActual(Observer<? super T> downstream) {
    long waitDuration = rateLimiter.reservePermission();
    if (waitDuration >= 0) {
        if (waitDuration > 0) {
            Completable.timer(waitDuration, TimeUnit.NANOSECONDS)
                .subscribe(() -> upstream.subscribe(new RateLimiterObserver(downstream)));
        } else {
            upstream.subscribe(new RateLimiterObserver(downstream));
        }
    } else {
        downstream.onSubscribe(EmptyDisposable.INSTANCE);
        downstream.onError(createRequestNotPermitted(rateLimiter));
    }
}
 
源代码27 项目: extentreports-java   文件: ObservableList.java
public void subscribe(Observer<T> observer) {
    observable.subscribe(observer);
}
 
源代码28 项目: java-specialagent   文件: RxJava3Test.java
private static void executeSequentialObservable(final String name, final List<Integer> result, final MockTracer tracer) {
  final Observable<Integer> observable = createSequentialObservable(tracer, false);
  final Observer<Integer> observer = observer(name, result);
  observable.subscribe(observer);
}
 
@Override
protected void subscribeActual(@NonNull Observer<? super Object> observer) {
    source.subscribe(new SubscriberAdapter(observer,EMPTY_DISPOSABLE));
}
 
private SubscriberAdapter(Observer<? super Object> observer,Disposable disposable) {
    this.observer = observer;
    this.disposable = disposable;
}