io.reactivex.Observable#subscribe ( )源码实例Demo

下面列出了io.reactivex.Observable#subscribe ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: rxjava2-lab   文件: Code4.java
public static void main(String... args) {
    Observable<String> stream = Observable.create(subscriber -> {
        // Emit items
        subscriber.onNext("Black Canary");
        subscriber.onNext("Catwoman");
        subscriber.onNext("Elektra");
        // Notify the completion
        subscriber.onComplete();
    });

    stream
        .subscribe(
            i -> System.out.println("Received: " + i),
            err -> System.out.println("BOOM"),
            () -> System.out.println("Completion")
        );

}
 
源代码2 项目: a   文件: ReplaceRulePresenter.java
@Override
public void importDataS(String text) {
    Observable<Boolean> observable = ReplaceRuleManager.importReplaceRule(text);
    if (observable != null) {
        observable.subscribe(new MyObserver<Boolean>() {
            @Override
            public void onNext(Boolean aBoolean) {
                mView.refresh();
                mView.toast("导入成功");
            }

            @Override
            public void onError(Throwable e) {
                mView.toast("格式不对");
            }
        });
    } else {
        mView.toast("导入失败");
    }
}
 
public static void main(String[] args) {
 Observable<String> month_observable = Observable.create(new 
      ObservableOnSubscribe<String>() {
		@Override
		public void subscribe(ObservableEmitter<String> emitter)    
           throws Exception {
			// TODO Auto-generated method stub
			try {
				String[] monthArray = { "Jan", "Feb", "Mar",    
                       "Apl", "May", "Jun", "July", "Aug",  
                       "Sept", "Oct","Nov", "Dec" };

				List<String> months = Arrays.asList(monthArray);

				for (String month : months) {
					emitter.onNext(month);
				}
				emitter.onComplete();
			} catch (Exception e) {
				// TODO: handle exception
				emitter.onError(e);
			}
		}
	});
	 month_observable.subscribe(s -> System.out.println(s));
}
 
public static void main(String[] args) {
	String[] monthArray = { "Jan", "Feb", "Mar", "Apl", "May"};
	List<String> months = Arrays.asList(monthArray);

	Observable<Object> observable = Observable.create(subscriber -> {
		try {
			System.out.println("creating ");
			for (String data : months) {
				subscriber.onNext(data);
			}
			subscriber.onComplete();
		} catch (Exception e) {
			// TODO: handle exception
			subscriber.onError(e);
		}
	});
	observable.subscribe(item -> System.out.println("month:-" + item));
	observable.subscribe(item -> System.out.println("month:-" + item));

}
 
public static void main(String[] args) {

		Observable<String> source =
				Observable.create(
						data-> {
							System.out.println(" ** Starting Emitting **");
							data.onNext("One");
							data.onNext("Two");
							data.onNext("Three");
							data.onNext("Four");
							data.onNext("Five");
						});
				//("One","Two","Three","Four","Five");

		//first observer
		source.filter(data->data.contains("o"))
			 .subscribe(data -> System.out.println("Observer 1 Received:" + data));

		//second observer
		source.subscribe(data -> System.out.println("Observer 2 Received:" + data));

	}
 
源代码6 项目: AndroidSamples   文件: DeleteButtonView.java
public DeleteButtonView(Context context, @Nullable AttributeSet attrs) {
    super(context, attrs);
    mContext = context;
    mTimer = new Timer();
    Observable<Integer> observable = Observable.create(e -> mEmitter = e);
    Disposable subscribe = observable
            .subscribe(integer -> {
                if (integer == 1) {
                    int index = mEditText.getSelectionStart();
                    if (index > 0) {
                        Editable editable = mEditText.getText();
                        editable.delete(index - 1, index);
                    }
                } else {
                    mTimer.cancel();
                }
            });
}
 
源代码7 项目: a   文件: SourceEditActivity.java
@Override
protected void onActivityResult(int requestCode, int resultCode, Intent data) {
    super.onActivityResult(requestCode, resultCode, data);
    if (requestCode == REQUEST_QR && resultCode == RESULT_OK && null != data) {
        String result = data.getStringExtra("result");
        Observable<List<BookSourceBean>> observable = BookSourceManager.importSource(result);
        if (observable != null) {
            observable.subscribe(new MyObserver<List<BookSourceBean>>() {
                @SuppressLint("DefaultLocale")
                @Override
                public void onNext(List<BookSourceBean> bookSourceBeans) {
                    if (bookSourceBeans.size() > 1) {
                        toast(String.format("导入成功%d个书源, 显示第一个", bookSourceBeans.size()));
                        setText(bookSourceBeans.get(0));
                    } else if (bookSourceBeans.size() == 1) {
                        setText(bookSourceBeans.get(0));
                    } else {
                        toast("未导入");
                    }
                }

                @Override
                public void onError(Throwable e) {
                    toast(e.getLocalizedMessage());
                }
            });
        } else {
            toast("导入失败");
        }
    }
}
 
源代码8 项目: rxjava2-lab   文件: Code11_Solution.java
public static void main(String[] args) {
	Observable<String> files = getHeroesNames();
	files.subscribe(value -> System.out.println("Subscriber 1: " + value),
		Throwable::printStackTrace);
	files.subscribe(value -> System.out.println("Subscriber 2: " + value),
		Throwable::printStackTrace);
}
 
private void doSomeWork() {

        Observable<List<String>> buffered = getObservable().buffer(3, 1);

        // 3 means,  it takes max of three from its start index and create list
        // 1 means, it jumps one step every time
        // so the it gives the following list
        // 1 - one, two, three
        // 2 - two, three, four
        // 3 - three, four, five
        // 4 - four, five
        // 5 - five

        buffered.subscribe(getObserver());
    }
 
源代码10 项目: java-specialagent   文件: RxJava2Test.java
@Test
public void consumerTest4(final MockTracer tracer) {
  final Observable<Integer> observable = createSequentialObservable(tracer, false);
  final List<Integer> result = new ArrayList<>();
  final Consumer<Integer> onNext = consumer(result);
  final List<String> subscribeList = new ArrayList<>();
  final String subscribed = "subscribed";
  final Consumer<Object> onSubscribe = new Consumer<Object>() {
    @Override
    public void accept(final Object t) {
      subscribeList.add(subscribed);
    }
  };

  final List<String> completeList = new ArrayList<>();
  final List<String> errorList = new ArrayList<>();
  observable.subscribe(onNext, onError(errorList), onComplete(completeList, tracer), onSubscribe);

  assertEquals(5, result.size());

  assertEquals(1, completeList.size());
  assertTrue(completeList.contains(COMPLETED));

  assertEquals(1, subscribeList.size());
  assertTrue(subscribeList.contains(subscribed));

  assertTrue(errorList.isEmpty());

  final List<MockSpan> spans = tracer.finishedSpans();
  assertEquals(1, spans.size());

  assertNull(tracer.scopeManager().active());
}
 
public static void main(String[] args) {

		Observable observable = Observable.create(observer -> {
			observer.onNext("I am Hot Observable " + Math.random()*100);
			observer.onComplete();
		});

		observable.subscribe(consumer -> System.out.println("message:-" + consumer));
		observable.subscribe(consumer -> System.out.println("message:-" + consumer));
	}
 
源代码12 项目: akarnokd-misc   文件: BehaviorMulticast.java
@Test
public void test() {
    Subject<String> subject = PublishSubject.create();
    Observable<String> observable = subject.scan("zero", (a, b) -> a + ", " + b);
    Subject<String> multicast = BehaviorSubject.create();

    observable.subscribe(multicast);

    Disposable first = multicast.subscribe(System.out::println); // "zero"
    subject.onNext("one"); // "zero, one"
    first.dispose();
    Disposable second = multicast.subscribe(System.out::println); // "zero, one"
    subject.onNext("two"); // "zero, one, two"
    second.dispose();
}
 
源代码13 项目: RxJava2RetrofitDemo   文件: MainPresenter.java
@Override
public void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    Observable<Integer> zip = Observable
            .zip(getDemoObservable(), getDemoObservable(), (i, i2) -> 0)
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(Schedulers.io());
    final long startTime = System.currentTimeMillis();
    zip.subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            bindDisposable(d);
        }

        @Override
        public void onNext(Integer value) {
        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onComplete() {
            Logger.d("--->> onCompleted bind time " + (System.currentTimeMillis() - startTime));
            long delayTime = BuildConfig.SPLASH_TIME - (System.currentTimeMillis() - startTime);
            if (delayTime < 0) {
                delayTime = 0;
            }
            new WeakHandler(message -> {//TODO refactor
                mView.enterHomeActivity();
                return true;
            }).sendEmptyMessageDelayed(0, delayTime);
        }
    });
}
 
源代码14 项目: objectbox-java   文件: QueryObserverTest.java
@Test
public void testObservable() {
    Observable observable = RxQuery.observable(mockQuery.getQuery());
    observable.subscribe((Observer) this);
    assertLatchCountedDown(latch, 2);
    assertEquals(1, receivedChanges.size());
    assertEquals(0, receivedChanges.get(0).size());
    assertNull(error);

    latch = new CountDownLatch(1);
    receivedChanges.clear();
    publisher.setQueryResult(listResult);
    publisher.publish();

    assertLatchCountedDown(latch, 5);
    assertEquals(1, receivedChanges.size());
    assertEquals(2, receivedChanges.get(0).size());

    assertEquals(0, completedCount.get());

    //Unsubscribe?
    //        receivedChanges.clear();
    //        latch = new CountDownLatch(1);
    //        assertLatchCountedDown(latch, 5);
    //
    //        assertEquals(1, receivedChanges.size());
    //        assertEquals(3, receivedChanges.get(0).size());
}
 
public static void main(String[] args) {
	// TODO Auto-generated method stub
	Observable<String> observable = Observable.empty();
	observable.subscribe(item -> System.out.println("we got" + item), 
			error -> System.out.print(error),
			()->System.out.print("I am Done!! Completed normally"));
}
 
public static void main(String[] args) {
	// TODO Auto-generated method stub
	Observable<Long> observable=Observable.timer(1,TimeUnit.SECONDS,Schedulers.computation()).flatMap(i->Observable.just(12l,13l,300l,400l));
	observable.subscribe(new Observer<Long>() {

		@Override
		public void onComplete() {
			// TODO Auto-generated method stub00
			System.out.println("completed");
			
		}

		@Override
		public void onError(Throwable throwable) {
			// TODO Auto-generated method stub
			throwable.printStackTrace();
		}

		@Override
		public void onNext(Long value) {
			// TODO Auto-generated method stub
			System.out.println(value);
			
		}

		@Override
		public void onSubscribe(Disposable arg0) {
			// TODO Auto-generated method stub
			System.out.println("subscribe");
			
		}
	});

	try {
		Thread.sleep(80000);
	} catch (InterruptedException e) {
		// TODO Auto-generated catch block
		e.printStackTrace();
	}
}
 
public static void main(String[] args) {
	// TODO Auto-generated method stub
	
	Observable<Long>observable=Observable.rangeLong(1l,3l);
	Observer< Long> observer=new Observer<Long>() {

		@Override
		public void onComplete() {
			// TODO Auto-generated method stub
			System.out.println("on complete");
			
		}

		@Override
		public void onError(Throwable throwable) {
			// TODO Auto-generated method stub
			throwable.printStackTrace();
			
		}

		@Override
		public void onNext(Long value) {
			// TODO Auto-generated method stub
			System.out.println(""+value);
			
		}

		@Override
		public void onSubscribe(Disposable disposable) {
			// TODO Auto-generated method stub
			System.out.println(disposable.isDisposed());
			disposable.dispose();
			
		}
	};
	observable.subscribe(observer);
	
}
 
源代码18 项目: My-MVP   文件: NetUnitTest.java
@Test
public void netTest() {

    final Retrofit mRetrofit = initRetrofit();

    GankApi mGankApi = mRetrofit.create(GankApi.class);
    Observable<GankAndroid> mAndroidObservable = mGankApi.getData("10/1");
    mAndroidObservable.subscribe(gankAndroid -> {
        doAssert(gankAndroid.getResults().get(0));
    }, throwable -> System.out.println("fail"));
}
 
源代码19 项目: akarnokd-misc   文件: TrackedRefcount.java
@Test
public void test() throws Exception {
    RxJavaAssemblyTracking.enable();
    
    System.out.println("start");

    PublishSubject<String> stringsEmitter = PublishSubject.create();

    Observable<String> combineSource = stringsEmitter
            .doOnSubscribe(d -> System.out.println("Connected!"))
            .replay(1)
            .refCount()
            .doOnSubscribe(d -> System.out.println("Subscribed!"))
            .doOnDispose(() -> System.out.println("Disposed!"));

    CompositeDisposable c = new CompositeDisposable();

    c.add(
            combineSource
                    //.subscribeOn(Schedulers.io())
                    .subscribe((string) -> System.out.println("A1:" + string))
    );
    c.add(
            combineSource
                    //.subscribeOn(Schedulers.io())
                    .subscribe((string) -> System.out.println("B1:" + string))
    );

    stringsEmitter.onNext("s1");

    Thread.sleep(100);
    c.clear();

    Thread.sleep(100);

    System.out.println("---");

    combineSource
        //.subscribeOn(Schedulers.io())
        //.subscribe((string) -> System.out.println("A2:" + string));
            .subscribe((string) -> System.out.println("A2:" + string), Throwable::printStackTrace, () -> System.out.println("Done A2"));

    combineSource
        //.subscribeOn(Schedulers.io())
            .subscribe((string) -> System.out.println("B2:" + string), Throwable::printStackTrace, () -> System.out.println("Done B2"));

    Thread.sleep(1000);

    stringsEmitter.onNext("s2");

    Thread.sleep(1000);

    stringsEmitter.onNext("s3");
    stringsEmitter.onNext("s4");
}
 
private void doSomeWork() {

        Car car = new Car();

        Observable<String> brandDeferObservable = car.brandDeferObservable();

        car.setBrand("BMW");  // Even if we are setting the brand after creating Observable
        // we will get the brand as BMW.
        // If we had not used defer, we would have got null as the brand.

        brandDeferObservable
                .subscribe(getObserver());
    }