io.reactivex.rxjava3.core.Observable#subscribe ( )源码实例Demo

下面列出了io.reactivex.rxjava3.core.Observable#subscribe ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: java-specialagent   文件: RxJava3Test.java
@Test
public void consumerTest(final MockTracer tracer) {
  final Observable<Integer> observable = createSequentialObservable(tracer, false);
  final List<Integer> result = new ArrayList<>();
  final Consumer<Integer> onNext = consumer(result);

  final Disposable disposable = observable.subscribe(onNext);
  logger.fine(String.valueOf(disposable));

  assertEquals(5, result.size());

  final List<MockSpan> spans = tracer.finishedSpans();
  assertEquals(1, spans.size());

  assertNull(tracer.scopeManager().active());
}
 
源代码2 项目: java-specialagent   文件: RxJava3Test.java
@Test
public void consumerTest2(final MockTracer tracer) {
  final Observable<Integer> observable = createSequentialObservable(tracer, false);
  final List<Integer> result = new ArrayList<>();
  final Consumer<Integer> onNext = consumer(result);
  final List<String> errorList = new ArrayList<>();
  observable.subscribe(onNext, onError(errorList));

  assertEquals(5, result.size());

  final List<MockSpan> spans = tracer.finishedSpans();
  assertEquals(1, spans.size());

  assertTrue(errorList.isEmpty());

  assertNull(tracer.scopeManager().active());
}
 
@Test public void completeClearsCacheAndResubscribesStartingWithDefault() {
  List<String> start = new ArrayList<>();
  start.add("initA");

  PublishSubject<String> upstream = PublishSubject.create();
  Observable<String> replayed =
      upstream.startWithIterable(start).compose(ReplayingShare.createWithDefault("default"));

  TestObserver<String> observer1 = new TestObserver<>();
  replayed.subscribe(observer1);
  observer1.assertValues("default", "initA");

  TestObserver<String> observer2 = new TestObserver<>();
  replayed.subscribe(observer2);
  observer1.assertValues("default", "initA");

  upstream.onComplete();
  observer1.assertComplete();
  observer2.assertComplete();

  start.set(0, "initB");

  TestObserver<String> observer3 = new TestObserver<>();
  replayed.subscribe(observer3);
  observer3.assertValues("default", "initB");
}
 
源代码4 项目: RxCache   文件: TestWithGuava.java
public static void main(String[] args) {

        ApplicationContext ctx = new AnnotationConfigApplicationContext(ConfigWithGuava.class);

        RxCache rxCache = (RxCache) ctx.getBean("rxCache");

        User u = new User();
        u.name = "tony";
        u.password = "123456";
        rxCache.save("test",u);

        Observable<Record<User>> observable = rxCache.load2Observable("test", User.class);

        observable.subscribe(new Consumer<Record<User>>() {
            @Override
            public void accept(Record<User> record) throws Exception {

                User user = record.getData();
                System.out.println(user.name);
                System.out.println(user.password);
            }
        });
    }
 
@Ignore("No backpressure in Observable")
@Test public void backpressureHonoredWhenCached() {
  PublishSubject<String> subject = PublishSubject.create();
  Observable<String> observable = subject.compose(ReplayingShare.<String>instance());

  TestObserver<String> observer1 = new TestObserver<>();
  observable.subscribe(observer1);
  observer1.assertNoValues();

  subject.onNext("Foo");
  observer1.assertValues("Foo");

  TestObserver<String> observer2 = new TestObserver<>(/*0*/);
  observable.subscribe(observer2);
  observer2.assertNoValues();

  subject.onNext("Bar"); // Replace the cached value...
  observer1.assertValues("Foo", "Bar");

  //observer2.requestMore(1); // ...and ensure new requests see it.
  observer2.assertValues("Bar");
}
 
源代码6 项目: RxCache   文件: TestMapDB.java
public static void main(String[] args) {

        RxCache.config(new RxCache.Builder().memory(new MapDBImpl(100)));

        RxCache rxCache = RxCache.getRxCache();

        User u = new User();
        u.name = "tony";
        u.password = "123456";
        rxCache.save("test",u);

        Observable<Record<User>> observable = rxCache.load2Observable("test", User.class);

        observable.subscribe(new Consumer<Record<User>>() {
            @Override
            public void accept(Record<User> record) throws Exception {

                User user = record.getData();
                System.out.println(user.name);
                System.out.println(user.password);
            }
        });
    }
 
源代码7 项目: RxCache   文件: TestCaffeine.java
public static void main(String[] args) {

        RxCache.config(new RxCache.Builder().memory(new CaffeineImpl(100)));

        RxCache rxCache = RxCache.getRxCache();

        User u = new User();
        u.name = "tony";
        u.password = "123456";
        rxCache.save("test",u);

        Observable<Record<User>> observable = rxCache.load2Observable("test", User.class);

        observable.subscribe(new Consumer<Record<User>>() {
            @Override
            public void accept(Record<User> record) throws Exception {

                User user = record.getData();
                System.out.println(user.name);
                System.out.println(user.password);
            }
        });
    }
 
源代码8 项目: RxCache   文件: TestGuavaCache.java
public static void main(String[] args) {

        RxCache.config(new RxCache.Builder().memory(new GuavaCacheImpl(100)));

        RxCache rxCache = RxCache.getRxCache();

        User u = new User();
        u.name = "tony";
        u.password = "123456";
        rxCache.save("test",u);

        Observable<Record<User>> observable = rxCache.load2Observable("test", User.class);

        observable.subscribe(new Consumer<Record<User>>() {
            @Override
            public void accept(Record<User> record) throws Exception {

                User user = record.getData();
                System.out.println(user.name);
                System.out.println(user.password);
            }
        });
    }
 
@Test public void completeClearsCacheAndResubscribes() {
  List<String> start = new ArrayList<>();
  start.add("initA");

  PublishSubject<String> upstream = PublishSubject.create();
  Observable<String> replayed = upstream.startWithIterable(start).compose(ReplayingShare.<String>instance());

  TestObserver<String> observer1 = new TestObserver<>();
  replayed.subscribe(observer1);
  observer1.assertValues("initA");

  TestObserver<String> observer2 = new TestObserver<>();
  replayed.subscribe(observer2);
  observer1.assertValues("initA");

  upstream.onComplete();
  observer1.assertComplete();
  observer2.assertComplete();

  start.set(0, "initB");

  TestObserver<String> observer3 = new TestObserver<>();
  replayed.subscribe(observer3);
  observer3.assertValues("initB");
}
 
源代码10 项目: RxCache   文件: Test.java
public static void main(String[] args) {

        RxCache.config(new RxCache.Builder());

        RxCache rxCache = RxCache.getRxCache();

        User u = new User();
        u.name = "tony";
        u.password = "123456";
        rxCache.save("test",u);

        Observable<Record<User>> observable = rxCache.load2Observable("test", User.class);

        observable.subscribe(new Consumer<Record<User>>() {

            @Override
            public void accept(Record<User> record) throws Exception {

                User user = record.getData();
                System.out.println(user.name);
                System.out.println(user.password);
            }
        });
    }
 
@Test public void errorClearsCacheAndResubscribesStartingWithDefault() {
  List<String> start = new ArrayList<>();
  start.add("initA");

  PublishSubject<String> upstream = PublishSubject.create();
  Observable<String> replayed =
      upstream.startWithIterable(start).compose(ReplayingShare.createWithDefault("default"));

  TestObserver<String> observer1 = new TestObserver<>();
  replayed.subscribe(observer1);
  observer1.assertValues("default", "initA");

  TestObserver<String> observer2 = new TestObserver<>();
  replayed.subscribe(observer2);
  observer1.assertValues("default", "initA");

  RuntimeException r = new RuntimeException();
  upstream.onError(r);
  observer1.assertError(r);
  observer2.assertError(r);

  start.set(0, "initB");

  TestObserver<String> observer3 = new TestObserver<>();
  replayed.subscribe(observer3);
  observer3.assertValues("default", "initB");
}
 
@Test public void unsubscribeBeforeSubscribePreventsCacheEmission() {
  PublishSubject<String> upstream = PublishSubject.create();
  Observable<String> replayed = upstream.compose(ReplayingShare.<String>instance());
  replayed.subscribe();
  upstream.onNext("something to cache");

  TestObserver<String> testObserver = new TestObserver<>();
  testObserver.dispose();
  replayed.subscribe(testObserver);
  testObserver.assertNoValues();
}
 
源代码13 项目: cxf   文件: ReactiveIOInvoker.java
protected AsyncResponseImpl handleObservable(Message inMessage, Observable<?> obs) {
    final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
    Disposable d = obs.subscribe(v -> asyncResponse.resume(v), t -> handleThrowable(asyncResponse, t));
    if (d == null) {
        throw new IllegalStateException("Subscribe did not return a Disposable");
    }
    return asyncResponse;
}
 
@Test public void initialValueToNewSubscriber() {
  PublishSubject<String> subject = PublishSubject.create();
  Observable<String> observable = subject.compose(ReplayingShare.<String>instance());

  TestObserver<String> observer1 = new TestObserver<>();
  observable.subscribe(observer1);
  observer1.assertNoValues();

  subject.onNext("Foo");
  observer1.assertValues("Foo");

  TestObserver<String> observer2 = new TestObserver<>();
  observable.subscribe(observer2);
  observer2.assertValues("Foo");
}
 
源代码15 项目: java-specialagent   文件: RxJava3Test.java
private static void executeSequentialObservable(final String name, final List<Integer> result, final MockTracer tracer) {
  final Observable<Integer> observable = createSequentialObservable(tracer, false);
  final Observer<Integer> observer = observer(name, result);
  observable.subscribe(observer);
}
 
源代码16 项目: RxCache   文件: TestOffHeap.java
public static void main(String[] args) {

        RxCache.config(new RxCache.Builder().memory(new DirectBufferMemoryImpl(3)));

        RxCache rxCache = RxCache.getRxCache();

        User u1 = new User();
        u1.name = "tony1";
        u1.password = "123456";
        rxCache.save("test1",u1);

        User u2 = new User();
        u2.name = "tony2";
        u2.password = "123456";
        rxCache.save("test2",u2);

        User u3 = new User();
        u3.name = "tony3";
        u3.password = "123456";
        rxCache.save("test3",u3);

        User u4 = new User();
        u4.name = "tony4";
        u4.password = "123456";
        rxCache.save("test4",u4);

        Observable<Record<User>> observable = rxCache.load2Observable("test1", User.class);

        if (observable!=null) {

            observable.subscribe(new Consumer<Record<User>>() {
                @Override
                public void accept(Record<User> record) throws Exception {

                    User user = record.getData();
                    System.out.println(user.name);
                    System.out.println(user.password);
                }
            });
        }
    }
 
源代码17 项目: RxCache   文件: TestDiskImplWithMoshi.java
private static void testMap(RxCache rxCache) {

        Map<String, User> map = new HashMap<>();

        User u1 = new User();
        u1.name = "tonyMap1";
        u1.password = "map1123456";
        map.put("u1", u1);

        User u2 = new User();
        u2.name = "tonyMap12";
        u2.password = "map12345";
        map.put("u2", u2);
        rxCache.save("map", map);

        Type type = TypeBuilder
                .newInstance(Map.class)
                .addTypeParam(String.class)
                .addTypeParam(User.class)
                .build();

        Observable<Record<Map<String, User>>> observable = rxCache.load2Observable("map", type);

        observable.subscribe(new Consumer<Record<Map<String, User>>>() {

            @Override
            public void accept(Record<Map<String, User>> record) throws Exception {

                Map<String, User> data = record.getData();

                if (Preconditions.isNotBlank(data)) {

                    User user = data.get("u1");
                    System.out.println(user.name);
                    System.out.println(user.password);


                    User user2 = data.get("u2");
                    System.out.println(user2.name);
                    System.out.println(user2.password);
                }
            }
        });
    }
 
源代码18 项目: RxCache   文件: TestDiskImplWithEncrypt.java
public static void main(String[] args) {

        File cacheDirectory = new File("aaa");

        if (!cacheDirectory.exists()) {

            cacheDirectory.mkdir();
        }

        AES128Encryptor encryptor = new AES128Encryptor("abcdefghijklmnop");
        DiskImpl diskImpl = new DiskImpl(cacheDirectory,new GsonConverter(encryptor));

        RxCache.config(new RxCache.Builder().persistence(diskImpl));

        RxCache rxCache = RxCache.getRxCache();

        User u = new User();
        u.name = "tony";
        u.password = "123456";
        rxCache.save("test",u);

        Observable<Record<User>> observable = rxCache.load2Observable("test", User.class);

        observable.subscribe(new Consumer<Record<User>>() {

            @Override
            public void accept(Record<User> record) throws Exception {

                User user = record.getData();
                System.out.println(user.name);
                System.out.println(user.password);
            }
        });
    }
 
源代码19 项目: RxCache   文件: TestOkio.java
public static void main(String[] args) {

        File cacheDirectory = new File("aaa");

        if (!cacheDirectory.exists()) {

            cacheDirectory.mkdir();
        }

        OkioImpl okioImpl = new OkioImpl(cacheDirectory);

        RxCache.config(new RxCache.Builder().persistence(okioImpl));

        RxCache rxCache = RxCache.getRxCache();

        User u = new User();
        u.name = "tony";
        u.password = "123456";
        rxCache.save("test",u);

        Observable<Record<User>> observable = rxCache.load2Observable("test", User.class);

        observable.subscribe(new Consumer<Record<User>>() {

            @Override
            public void accept(Record<User> record) throws Exception {

                User user = record.getData();
                System.out.println(user.name);
                System.out.println(user.password);
            }
        });


    }
 
源代码20 项目: RxCache   文件: TestDiskImpl.java
public static void main(String[] args) {

        File cacheDirectory = new File("aaa");

        if (!cacheDirectory.exists()) {

            cacheDirectory.mkdir();
        }

        DiskImpl diskImpl = new DiskImpl(cacheDirectory);

        RxCache.config(new RxCache.Builder().persistence(diskImpl));

        RxCache rxCache = RxCache.getRxCache();

        User u = new User();
        u.name = "tony";
        u.password = "123456";
        rxCache.save("test",u);

        Observable<Record<User>> observable = rxCache.load2Observable("test", User.class);

        observable.subscribe(new Consumer<Record<User>>() {

            @Override
            public void accept(Record<User> record) throws Exception {

                User user = record.getData();
                System.out.println(user.name);
                System.out.println(user.password);
            }
        });
    }