下面列出了io.reactivex.rxjava3.core.Observable#subscribe ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void consumerTest(final MockTracer tracer) {
final Observable<Integer> observable = createSequentialObservable(tracer, false);
final List<Integer> result = new ArrayList<>();
final Consumer<Integer> onNext = consumer(result);
final Disposable disposable = observable.subscribe(onNext);
logger.fine(String.valueOf(disposable));
assertEquals(5, result.size());
final List<MockSpan> spans = tracer.finishedSpans();
assertEquals(1, spans.size());
assertNull(tracer.scopeManager().active());
}
@Test
public void consumerTest2(final MockTracer tracer) {
final Observable<Integer> observable = createSequentialObservable(tracer, false);
final List<Integer> result = new ArrayList<>();
final Consumer<Integer> onNext = consumer(result);
final List<String> errorList = new ArrayList<>();
observable.subscribe(onNext, onError(errorList));
assertEquals(5, result.size());
final List<MockSpan> spans = tracer.finishedSpans();
assertEquals(1, spans.size());
assertTrue(errorList.isEmpty());
assertNull(tracer.scopeManager().active());
}
@Test public void completeClearsCacheAndResubscribesStartingWithDefault() {
List<String> start = new ArrayList<>();
start.add("initA");
PublishSubject<String> upstream = PublishSubject.create();
Observable<String> replayed =
upstream.startWithIterable(start).compose(ReplayingShare.createWithDefault("default"));
TestObserver<String> observer1 = new TestObserver<>();
replayed.subscribe(observer1);
observer1.assertValues("default", "initA");
TestObserver<String> observer2 = new TestObserver<>();
replayed.subscribe(observer2);
observer1.assertValues("default", "initA");
upstream.onComplete();
observer1.assertComplete();
observer2.assertComplete();
start.set(0, "initB");
TestObserver<String> observer3 = new TestObserver<>();
replayed.subscribe(observer3);
observer3.assertValues("default", "initB");
}
public static void main(String[] args) {
ApplicationContext ctx = new AnnotationConfigApplicationContext(ConfigWithGuava.class);
RxCache rxCache = (RxCache) ctx.getBean("rxCache");
User u = new User();
u.name = "tony";
u.password = "123456";
rxCache.save("test",u);
Observable<Record<User>> observable = rxCache.load2Observable("test", User.class);
observable.subscribe(new Consumer<Record<User>>() {
@Override
public void accept(Record<User> record) throws Exception {
User user = record.getData();
System.out.println(user.name);
System.out.println(user.password);
}
});
}
@Ignore("No backpressure in Observable")
@Test public void backpressureHonoredWhenCached() {
PublishSubject<String> subject = PublishSubject.create();
Observable<String> observable = subject.compose(ReplayingShare.<String>instance());
TestObserver<String> observer1 = new TestObserver<>();
observable.subscribe(observer1);
observer1.assertNoValues();
subject.onNext("Foo");
observer1.assertValues("Foo");
TestObserver<String> observer2 = new TestObserver<>(/*0*/);
observable.subscribe(observer2);
observer2.assertNoValues();
subject.onNext("Bar"); // Replace the cached value...
observer1.assertValues("Foo", "Bar");
//observer2.requestMore(1); // ...and ensure new requests see it.
observer2.assertValues("Bar");
}
public static void main(String[] args) {
RxCache.config(new RxCache.Builder().memory(new MapDBImpl(100)));
RxCache rxCache = RxCache.getRxCache();
User u = new User();
u.name = "tony";
u.password = "123456";
rxCache.save("test",u);
Observable<Record<User>> observable = rxCache.load2Observable("test", User.class);
observable.subscribe(new Consumer<Record<User>>() {
@Override
public void accept(Record<User> record) throws Exception {
User user = record.getData();
System.out.println(user.name);
System.out.println(user.password);
}
});
}
public static void main(String[] args) {
RxCache.config(new RxCache.Builder().memory(new CaffeineImpl(100)));
RxCache rxCache = RxCache.getRxCache();
User u = new User();
u.name = "tony";
u.password = "123456";
rxCache.save("test",u);
Observable<Record<User>> observable = rxCache.load2Observable("test", User.class);
observable.subscribe(new Consumer<Record<User>>() {
@Override
public void accept(Record<User> record) throws Exception {
User user = record.getData();
System.out.println(user.name);
System.out.println(user.password);
}
});
}
public static void main(String[] args) {
RxCache.config(new RxCache.Builder().memory(new GuavaCacheImpl(100)));
RxCache rxCache = RxCache.getRxCache();
User u = new User();
u.name = "tony";
u.password = "123456";
rxCache.save("test",u);
Observable<Record<User>> observable = rxCache.load2Observable("test", User.class);
observable.subscribe(new Consumer<Record<User>>() {
@Override
public void accept(Record<User> record) throws Exception {
User user = record.getData();
System.out.println(user.name);
System.out.println(user.password);
}
});
}
@Test public void completeClearsCacheAndResubscribes() {
List<String> start = new ArrayList<>();
start.add("initA");
PublishSubject<String> upstream = PublishSubject.create();
Observable<String> replayed = upstream.startWithIterable(start).compose(ReplayingShare.<String>instance());
TestObserver<String> observer1 = new TestObserver<>();
replayed.subscribe(observer1);
observer1.assertValues("initA");
TestObserver<String> observer2 = new TestObserver<>();
replayed.subscribe(observer2);
observer1.assertValues("initA");
upstream.onComplete();
observer1.assertComplete();
observer2.assertComplete();
start.set(0, "initB");
TestObserver<String> observer3 = new TestObserver<>();
replayed.subscribe(observer3);
observer3.assertValues("initB");
}
public static void main(String[] args) {
RxCache.config(new RxCache.Builder());
RxCache rxCache = RxCache.getRxCache();
User u = new User();
u.name = "tony";
u.password = "123456";
rxCache.save("test",u);
Observable<Record<User>> observable = rxCache.load2Observable("test", User.class);
observable.subscribe(new Consumer<Record<User>>() {
@Override
public void accept(Record<User> record) throws Exception {
User user = record.getData();
System.out.println(user.name);
System.out.println(user.password);
}
});
}
@Test public void errorClearsCacheAndResubscribesStartingWithDefault() {
List<String> start = new ArrayList<>();
start.add("initA");
PublishSubject<String> upstream = PublishSubject.create();
Observable<String> replayed =
upstream.startWithIterable(start).compose(ReplayingShare.createWithDefault("default"));
TestObserver<String> observer1 = new TestObserver<>();
replayed.subscribe(observer1);
observer1.assertValues("default", "initA");
TestObserver<String> observer2 = new TestObserver<>();
replayed.subscribe(observer2);
observer1.assertValues("default", "initA");
RuntimeException r = new RuntimeException();
upstream.onError(r);
observer1.assertError(r);
observer2.assertError(r);
start.set(0, "initB");
TestObserver<String> observer3 = new TestObserver<>();
replayed.subscribe(observer3);
observer3.assertValues("default", "initB");
}
@Test public void unsubscribeBeforeSubscribePreventsCacheEmission() {
PublishSubject<String> upstream = PublishSubject.create();
Observable<String> replayed = upstream.compose(ReplayingShare.<String>instance());
replayed.subscribe();
upstream.onNext("something to cache");
TestObserver<String> testObserver = new TestObserver<>();
testObserver.dispose();
replayed.subscribe(testObserver);
testObserver.assertNoValues();
}
protected AsyncResponseImpl handleObservable(Message inMessage, Observable<?> obs) {
final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
Disposable d = obs.subscribe(v -> asyncResponse.resume(v), t -> handleThrowable(asyncResponse, t));
if (d == null) {
throw new IllegalStateException("Subscribe did not return a Disposable");
}
return asyncResponse;
}
@Test public void initialValueToNewSubscriber() {
PublishSubject<String> subject = PublishSubject.create();
Observable<String> observable = subject.compose(ReplayingShare.<String>instance());
TestObserver<String> observer1 = new TestObserver<>();
observable.subscribe(observer1);
observer1.assertNoValues();
subject.onNext("Foo");
observer1.assertValues("Foo");
TestObserver<String> observer2 = new TestObserver<>();
observable.subscribe(observer2);
observer2.assertValues("Foo");
}
private static void executeSequentialObservable(final String name, final List<Integer> result, final MockTracer tracer) {
final Observable<Integer> observable = createSequentialObservable(tracer, false);
final Observer<Integer> observer = observer(name, result);
observable.subscribe(observer);
}
public static void main(String[] args) {
RxCache.config(new RxCache.Builder().memory(new DirectBufferMemoryImpl(3)));
RxCache rxCache = RxCache.getRxCache();
User u1 = new User();
u1.name = "tony1";
u1.password = "123456";
rxCache.save("test1",u1);
User u2 = new User();
u2.name = "tony2";
u2.password = "123456";
rxCache.save("test2",u2);
User u3 = new User();
u3.name = "tony3";
u3.password = "123456";
rxCache.save("test3",u3);
User u4 = new User();
u4.name = "tony4";
u4.password = "123456";
rxCache.save("test4",u4);
Observable<Record<User>> observable = rxCache.load2Observable("test1", User.class);
if (observable!=null) {
observable.subscribe(new Consumer<Record<User>>() {
@Override
public void accept(Record<User> record) throws Exception {
User user = record.getData();
System.out.println(user.name);
System.out.println(user.password);
}
});
}
}
private static void testMap(RxCache rxCache) {
Map<String, User> map = new HashMap<>();
User u1 = new User();
u1.name = "tonyMap1";
u1.password = "map1123456";
map.put("u1", u1);
User u2 = new User();
u2.name = "tonyMap12";
u2.password = "map12345";
map.put("u2", u2);
rxCache.save("map", map);
Type type = TypeBuilder
.newInstance(Map.class)
.addTypeParam(String.class)
.addTypeParam(User.class)
.build();
Observable<Record<Map<String, User>>> observable = rxCache.load2Observable("map", type);
observable.subscribe(new Consumer<Record<Map<String, User>>>() {
@Override
public void accept(Record<Map<String, User>> record) throws Exception {
Map<String, User> data = record.getData();
if (Preconditions.isNotBlank(data)) {
User user = data.get("u1");
System.out.println(user.name);
System.out.println(user.password);
User user2 = data.get("u2");
System.out.println(user2.name);
System.out.println(user2.password);
}
}
});
}
public static void main(String[] args) {
File cacheDirectory = new File("aaa");
if (!cacheDirectory.exists()) {
cacheDirectory.mkdir();
}
AES128Encryptor encryptor = new AES128Encryptor("abcdefghijklmnop");
DiskImpl diskImpl = new DiskImpl(cacheDirectory,new GsonConverter(encryptor));
RxCache.config(new RxCache.Builder().persistence(diskImpl));
RxCache rxCache = RxCache.getRxCache();
User u = new User();
u.name = "tony";
u.password = "123456";
rxCache.save("test",u);
Observable<Record<User>> observable = rxCache.load2Observable("test", User.class);
observable.subscribe(new Consumer<Record<User>>() {
@Override
public void accept(Record<User> record) throws Exception {
User user = record.getData();
System.out.println(user.name);
System.out.println(user.password);
}
});
}
public static void main(String[] args) {
File cacheDirectory = new File("aaa");
if (!cacheDirectory.exists()) {
cacheDirectory.mkdir();
}
OkioImpl okioImpl = new OkioImpl(cacheDirectory);
RxCache.config(new RxCache.Builder().persistence(okioImpl));
RxCache rxCache = RxCache.getRxCache();
User u = new User();
u.name = "tony";
u.password = "123456";
rxCache.save("test",u);
Observable<Record<User>> observable = rxCache.load2Observable("test", User.class);
observable.subscribe(new Consumer<Record<User>>() {
@Override
public void accept(Record<User> record) throws Exception {
User user = record.getData();
System.out.println(user.name);
System.out.println(user.password);
}
});
}
public static void main(String[] args) {
File cacheDirectory = new File("aaa");
if (!cacheDirectory.exists()) {
cacheDirectory.mkdir();
}
DiskImpl diskImpl = new DiskImpl(cacheDirectory);
RxCache.config(new RxCache.Builder().persistence(diskImpl));
RxCache rxCache = RxCache.getRxCache();
User u = new User();
u.name = "tony";
u.password = "123456";
rxCache.save("test",u);
Observable<Record<User>> observable = rxCache.load2Observable("test", User.class);
observable.subscribe(new Consumer<Record<User>>() {
@Override
public void accept(Record<User> record) throws Exception {
User user = record.getData();
System.out.println(user.name);
System.out.println(user.password);
}
});
}