下面列出了怎么用io.reactivex.ObservableEmitter的API类实例代码及写法,或者点击链接到github查看源代码。
@OnClick({R.id.tv_cancel, R.id.tv_ok})
public void onClick(View view) {
switch (view.getId()) {
case R.id.tv_cancel:
onBackPressedSupport();
break;
case R.id.tv_ok:
Observable.create(new ObservableOnSubscribe<Uri>() {
@Override
public void subscribe(ObservableEmitter<Uri> e) throws
Exception {
e.onNext(generateUri());
e.onComplete();
}
}).compose(RxHelper.<Uri>rxSchedulerHelper())
.subscribe(new Consumer<Uri>() {
@Override
public void accept(Uri uri) throws Exception {
RxEventHeadBean rxEventHeadBean = new RxEventHeadBean(uri);
RxBus.get().send(RX_BUS_CODE_HEAD_IMAGE_URI, rxEventHeadBean);
onBackPressedSupport();
}
});
break;
}
}
@Override
protected void protectedRun(final ObservableEmitter<BluetoothGatt> emitter, final QueueReleaseInterface queueReleaseInterface) {
final Action queueReleaseAction = new Action() {
@Override
public void run() {
queueReleaseInterface.release();
}
};
final DisposableSingleObserver<BluetoothGatt> disposableGattObserver = getConnectedBluetoothGatt()
.compose(wrapWithTimeoutWhenNotAutoconnecting())
// when there are no subscribers there is no point of continuing work -> next will be disconnect operation
.doFinally(queueReleaseAction)
.subscribeWith(disposableSingleObserverFromEmitter(emitter));
emitter.setDisposable(disposableGattObserver);
if (autoConnect) {
// with autoConnect the connection may be established after a really long time
queueReleaseInterface.release();
}
}
public static Observable<OpsResult> setMode(final Context context, final String pkgName,
final OpEntryInfo opEntryInfo) {
return Observable.create(new ObservableOnSubscribe<OpsResult>() {
@Override
public void subscribe(ObservableEmitter<OpsResult> e) throws Exception {
OpsResult opsForPackage = AppOpsx.getInstance(context)
.setOpsMode(pkgName, opEntryInfo.opEntry.getOp(), opEntryInfo.mode);
if (opsForPackage != null) {
if (opsForPackage.getException() == null) {
e.onNext(opsForPackage);
} else {
throw new Exception(opsForPackage.getException());
}
}
e.onComplete();
}
}).retry(5, new Predicate<Throwable>() {
@Override
public boolean test(Throwable throwable) throws Exception {
return throwable instanceof IOException || throwable instanceof NullPointerException;
}
});
}
@Override
public void subscribe(@NonNull final ObservableEmitter<AppState> appStateEmitter) throws Exception {
final AppStateListener appStateListener = new AppStateListener() {
@Override
public void onAppDidEnterForeground() {
appStateEmitter.onNext(FOREGROUND);
}
@Override
public void onAppDidEnterBackground() {
appStateEmitter.onNext(BACKGROUND);
}
};
appStateEmitter.setCancellable(new Cancellable() {
@Override public void cancel() throws Exception {
recognizer.removeListener(appStateListener);
recognizer.stop();
}
});
recognizer.addListener(appStateListener);
recognizer.start();
}
/**
* 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
*/
public <T> Observable<T> toObservableSticky(final Class<T> eventType) {
synchronized (mStickyEventMap) {
Observable<T> observable = mBus.ofType(eventType);
final Object event = mStickyEventMap.get(eventType);
if (event != null) {
return Observable.merge(observable, Observable.create(new ObservableOnSubscribe<T>() {
@Override
public void subscribe(ObservableEmitter<T> emitter) throws Exception {
emitter.onNext(eventType.cast(event));
}
}));
} else {
return observable;
}
}
}
/**
* Converts an {@link ApolloQueryWatcher} to an asynchronous Observable.
*
* @param watcher the ApolloQueryWatcher to convert.
* @param <T> the value type
* @return the converted Observable
* @throws NullPointerException if watcher == null
*/
@NotNull
@CheckReturnValue
public static <T> Observable<Response<T>> from(@NotNull final ApolloQueryWatcher<T> watcher) {
checkNotNull(watcher, "watcher == null");
return Observable.create(new ObservableOnSubscribe<Response<T>>() {
@Override public void subscribe(final ObservableEmitter<Response<T>> emitter) throws Exception {
cancelOnObservableDisposed(emitter, watcher);
watcher.enqueueAndWatch(new ApolloCall.Callback<T>() {
@Override public void onResponse(@NotNull Response<T> response) {
if (!emitter.isDisposed()) {
emitter.onNext(response);
}
}
@Override public void onFailure(@NotNull ApolloException e) {
Exceptions.throwIfFatal(e);
if (!emitter.isDisposed()) {
emitter.onError(e);
}
}
});
}
});
}
public static void demo1() {
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
Log.d(TAG, "emit 1");
emitter.onNext(1);
}
});
Consumer<Integer> consumer = new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
Log.d(TAG, "onNext: " + integer);
}
};
observable.subscribe(consumer);
}
private synchronized <T> void doIfUsedObservable(ObservableEmitter<T> emitter, AtomicInteger usedObservable, AtomicReferenceArray<Disposable> disposables, int myI, Runnable runnable) {
int used = usedObservable.get();
if (used == -1) {
// We are the first!
usedObservable.set(myI);
runnable.run();
// Kill all of the others.
Disposable myDisposable = disposables.get(myI);
disposables.set(myI, null);
emitter.setCancellable(() -> {
if (myDisposable != null) {
myDisposable.dispose();
}
}); // Calling this calls the previous one, so all of the others get disposed.
} else if (used == myI) {
// We are the used observable.
runnable.run();
}
}
private Observable<ThumbnailBitmap> obtainThumbnail(MediaMetadataRetrieverCompat mmrc, int itemCount) {
//每500毫秒取一帧
final long interval = 500;
final int size = getResources().getDimensionPixelSize(R.dimen.thumbnail_size);
//取帧是耗时的操作,需要放在子线程
return Observable.create(new ObservableOnSubscribe<ThumbnailBitmap>() {
@Override
public void subscribe(@NonNull ObservableEmitter<ThumbnailBitmap> s) throws Exception {
//异步获取缩略图
for (int i = 0; i < itemCount; i++) {
long millis = i * interval;
Bitmap atTime;
if (ORIGINAL_SIZE == mSizeType) {
atTime = mmrc.getFrameAtTime(millis, mFetchFrameOption);
} else if (FLOOR_SCALE == mSizeType) {
atTime = mmrc.getScaledFrameAtTime(millis, mFetchFrameOption, size, size);
} else {
atTime = mmrc.getCenterCropFrameAtTime(millis, mFetchFrameOption, size, size);
}
s.onNext(new ThumbnailBitmap(i, atTime));
}
s.onComplete();
}
}).subscribeOn(Schedulers.io());
}
public static <T> Observable<Result<T>> observableWrapped(final OnSubscribeAction<T> subscribe) {
return Observable.<Result<T>>create(new ObservableOnSubscribe<Result<T>>() {
@Override
public void subscribe(final ObservableEmitter<Result<T>> emitter) throws Exception {
final RxActionDelegate<T> delegate = new RxActionDelegate<>(new ActionDelegate<T>() {
@Override
public void onSuccess(T result) {
emitter.onNext(new Result<T>(result));
}
@Override
public void onError(Exception e) {
emitter.onNext(new Result<T>(e));
}
});
emitter.setDisposable(delegate);
subscribe.subscribe(delegate);
}
});
}
public static Observable<Bitmap> getBitmap(final String url) {
return Observable.create(new ObservableOnSubscribe<Bitmap>() {
@Override
public void subscribe(ObservableEmitter<Bitmap> emitter) throws Exception {
Request request = new Request.Builder()
.url(url)
.get()
.build();
Response response = OK_HTTP_CLIENT.newCall(request).execute();
InputStream inputStream = response.body().byteStream();
Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
inputStream.close();
if (bitmap == null) {
emitter.onError(new NullPointerException("Bitmap fetched from url is null"));
} else {
emitter.onNext(bitmap);
}
emitter.onNext(bitmap);
}
});
}
@Override
public void subscribe(ObservableEmitter<DocumentSnapshot> e) throws Exception {
DocEventListener listener = new DocEventListener(e);
ListenerRegistration registration = mDocumentReference.addSnapshotListener(listener);
e.setDisposable(new Disposable() {
boolean disposed = false;
@Override
public void dispose() {
if (!isDisposed()) {
registration.remove();
listener.emitter = null;
disposed = true;
}
}
@Override
public boolean isDisposed() {
return disposed;
}
});
}
public static void demo3() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).concatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
final List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("I am value " + integer);
}
return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});
}
@Override
public void subscribe(ObservableEmitter<List<ReadTypeBean>> subscriber) throws Exception {
List<ReadTypeBean> datas = new ArrayList<>();
try {
Document doc = Jsoup.connect(Constants.API_URL_READ).get();
Elements tads = doc.select("div#xiandu_cat").select("a");
for (Element tad : tads) {
ReadTypeBean bean = new ReadTypeBean();
bean.setTitle(tad.text());//获取标题
bean.setUrl(tad.absUrl("href"));//absUrl可以获取地址的绝对路径
datas.add(bean);
Log.v("Jsoup","title= "+bean.getTitle()+" url= "+bean.getUrl());
}
} catch (IOException e) {
subscriber.onError(e);
}
subscriber.onNext(datas);
subscriber.onComplete();
}
private Observable<Boolean> checkPlayServicesAvailable() {
return Observable.create(new ObservableOnSubscribe<Boolean>() {
@Override
public void subscribe(ObservableEmitter<Boolean> e) throws Exception {
final Activity activity = activityReference.get();
if (activity != null) {
final GoogleApiAvailability apiAvailability = GoogleApiAvailability.getInstance();
final int status = apiAvailability.isGooglePlayServicesAvailable(activity);
if (status != ConnectionResult.SUCCESS) {
e.onError(new PlayServicesNotAvailableException());
} else {
e.onNext(true);
e.onComplete();
}
}
}
});
}
@Override
public Observable<String> login(final String account, final String oldPassword) {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
String path = "http://36078d58.nat123.cc/AndroidRegisterAndLogin_war/login";
Map<String, String> params = new HashMap<>();
params.put("username", account);
params.put("password", oldPassword);
ResponseBody responseBody=executeHttp(path,params);
if (responseBody != null) {
String result = responseBody.string().replaceAll("(\\\r\\\n|\\\r|\\\n|\\\n\\\r)", "");
emitter.onNext(result);
responseBody.close();
} else {
emitter.onError(new Throwable("响应体为空"));
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
@Override
public Observable<Long> decompression(final String type, String original, String purpose, boolean deleteZip) {
final File mInput = new File(original);
final File mOutput = new File(purpose);
mProgress = 0L;
return Observable.create(new ObservableOnSubscribe<Long>() {
@Override
public void subscribe(ObservableEmitter<Long> emitter) throws Exception {
switch (type) {
case "zip":
unzip(mInput, mOutput, emitter);
break;
case "rar":
unRar(mInput, mOutput, emitter);
break;
default:
emitter.onError(new Throwable("无法解压该类型的压缩文件"));
break;
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
@CheckResult
@NonNull
public static Observable<ChangeEvent> changes(@NonNull final PowerAdapter adapter) {
checkNotNull(adapter, "adapter");
return Observable.create(new ObservableOnSubscribe<ChangeEvent>() {
@Override
public void subscribe(final ObservableEmitter<ChangeEvent> emitter) throws Exception {
verifyMainThread();
final DataObserver dataObserver = new Observer() {
@Override
public void onItemRangeChanged(int positionStart, int itemCount, @Nullable Object payload) {
emitter.onNext(new ChangeEvent(positionStart, itemCount, payload));
}
};
adapter.registerDataObserver(dataObserver);
emitter.setDisposable(new MainThreadDisposable() {
@Override
protected void onDispose() {
adapter.unregisterDataObserver(dataObserver);
}
});
}
});
}
public Observable<PermissionResult> request(final List<String> permissions) {
return Observable.create(new ObservableOnSubscribe<PermissionResult>() {
@Override
public void subscribe(final ObservableEmitter<PermissionResult> emitter) throws Exception {
runtimePermission
.request(permissions)
.onResponse(new ResponseCallback() {
@Override
public void onResponse(PermissionResult result) {
if (result.isAccepted()) {
emitter.onNext(result);
emitter.onComplete();
} else {
emitter.onError(new Error(result));
}
}
}).ask();
}
});
}
public Observable<RxBeaconRange> beaconsInRegion() {
return startup()
.flatMap(new Function<Boolean, ObservableSource<RxBeaconRange>>() {
@Override
public ObservableSource<RxBeaconRange> apply(@NonNull Boolean aBoolean) throws Exception {
return Observable.create(new ObservableOnSubscribe<RxBeaconRange>() {
@Override
public void subscribe(@NonNull final ObservableEmitter<RxBeaconRange> objectObservableEmitter) throws Exception {
beaconManager.addRangeNotifier(new RangeNotifier() {
@Override
public void didRangeBeaconsInRegion(Collection<Beacon> collection, Region region) {
objectObservableEmitter.onNext(new RxBeaconRange(collection, region));
}
});
beaconManager.startRangingBeaconsInRegion(getRegion());
}
});
}
});
}
@Override
public Observable<Location> getLocationObservable() {
return Observable.create(new ObservableOnSubscribe<Location>() {
@Override
public void subscribe(final ObservableEmitter<Location> source) throws Exception {
FusedLocationProviderClient locationClient = LocationServices.getFusedLocationProviderClient(context);
locationClient.getLastLocation().addOnSuccessListener(new OnSuccessListener<Location>() {
@Override
public void onSuccess(Location location) {
if (location != null) {
source.onNext(location);
}
}
}).addOnFailureListener(new OnFailureListener() {
@Override
public void onFailure(@NonNull Exception e) {
source.onError(e);
}
});
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
private static void sendMessage(final ContactsKey receiverKey, final boolean onLine,
final String msg, final MessageType msgType, final boolean isFriend) {
final ToxManager manager = ToxManager.getManager();
final InfoRepository infoRepo = State.infoRepo();
for (final String splitMsg : splitMessage(msg)) {
ContactsKey senderKey = manager.toxBase.getSelfKey();
ToxNickname senderName = manager.toxBase.getName();
final long dbId = infoRepo.addMessage(receiverKey, senderKey, senderName, splitMsg,
GlobalParams.SEND_ING, true, msgType, -2);//can't -1
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
long id = 0;
if (isFriend) {
if (onLine) {
id = manager.toxBase.friendSendMessage(receiverKey,
ToxFriendMessage.unsafeFromValue(splitMsg.getBytes()),
MessageType.toToxMessageType(msgType));
} else {
id = manager.toxBase.generateUniqueId(receiverKey);
ContactsKey botKey =
new ToxAddress(GlobalParams.OFFLINE_BOT_TOK_ID).getKey();
//ignore the id
LogUtil.i(TAG, "botKey:" + botKey.getKey() + ",offlineMsgId:" + id);
manager.toxBase.friendSendMessageOffline(botKey,
OfflineBuilder.offlineMsgSend(id, receiverKey.getKey(), splitMsg));
}
}
if (id > 0) {
infoRepo.setMessageSending(id, dbId);
MsgTimer.startTimer(id);
} else {
infoRepo.setMessageFailByDbId(dbId);
}
emitter.onComplete();
}
}).subscribeOn(Schedulers.io()).subscribe();
}
}
@Override
public void notificationShownForPost(final int postId) {
Observable<Void> clearPostsObservable = Observable.create(new ObservableOnSubscribe<Void>() {
@Override
public void subscribe(ObservableEmitter<Void> emitter) throws Exception {
PredatorDatabase.getInstance()
.setNotificationShownForPost(postId);
emitter.onComplete();
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
mCompositeDisposable.add(clearPostsObservable.subscribeWith(new DisposableObserver<Void>() {
@Override
public void onComplete() {
// Done
}
@Override
public void onError(Throwable e) {
Logger.e(TAG, "onError: " + e.getMessage(), e);
}
@Override
public void onNext(Void aVoid) {
}
}));
}
private <T> void subscribe(ObservableEmitter<T> emitter, OnHandler<T> handler) {
try {
// 开始处理
T value = handler.onHandler();
if (value != null) {
// 返回结果
emitter.onNext(value);
}
emitter.onComplete();
} catch (Throwable tr) {
emitter.onError(tr);
}
}
@Override
public void subscribe(ObservableEmitter<T> observableEmitter) {
val pluginManager = plugin.getServer().getPluginManager();
pluginManager.registerEvent(eventClazz, listener, observeEvent.priority(), (l, event) -> {
if (eventClazz.isAssignableFrom(event.getClass())) {
T emittedEvent = (T) event;
context.runWithSender(EventUtil.getSender(emittedEvent), () -> observableEmitter.onNext(emittedEvent));
}
}, plugin, observeEvent.ignoreCancelled());
}
public Observable<Set<String>> getString(final String key, final Set<String> defaultValue) {
return Observable.create(new ObservableOnSubscribe<Set<String>>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Set<String>> e) throws Exception {
e.onNext(sharedPreferences.getStringSet(key, defaultValue));
e.onComplete();
}
});
}
@Override
protected void onAuthenticationSucceeded(ObservableEmitter<FingerprintDecryptionResult> emitter, AuthenticationResult result) {
try {
Cipher cipher = result.getCryptoObject().getCipher();
byte[] bytes = cipher.doFinal(encodingProvider.decode(encryptedString));
emitter.onNext(new FingerprintDecryptionResult(FingerprintResult.AUTHENTICATED, null, ConversionUtils.toChars(bytes)));
emitter.onComplete();
} catch (Exception e) {
Logger.error("Unable to decrypt given value. RxFingerprint is only able to decrypt values previously encrypted by RxFingerprint with the same encryption mode.", e);
emitter.onError(cipherProvider.mapCipherFinalOperationException(e));
}
}
public static Observable<IPageModel<News>> loadNewsList(final int page) {
return Observable.create(new ObservableOnSubscribe<DemoHttpModel<IPageModel<News>>>() {
@Override
public void subscribe(@NonNull ObservableEmitter<DemoHttpModel<IPageModel<News>>> subscriber) throws Exception {
DemoHttpModel<IPageModel<News>> baseModel = new DemoHttpModel<>();
baseModel.error = 0;
DemoPageModel<News> pageModel = new DemoPageModel<>();
pageModel.currentPage = page;
pageModel.totalPage = 10;
pageModel.dataList = new ArrayList<>();
for (int i = 0; i < 10; ++i) {
News standard = new News();
standard.setTitle("新闻标题" + page + "" + i);
standard.setDesc("征收标准:自2015年9月1日起,适当调整我省水资源费征收标准。淮河流域及合肥市、滁州市地表水资源费征收标准为每立方米0.12元;其他地区为每立方米0.08元。其中,水力发电用水水资源费征收标准0.003元/千万时,贯流式火电为0.001元/千万时,抽水蓄能电站发电循环用水量暂不征收水资源费。");
standard.setImgPath("http://ds.cdncache.org/avatar-50/532/164695.jpg");
pageModel.dataList.add(standard);
}
baseModel.results = pageModel;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
baseModel.error = 111;
baseModel.msg = "超时";
}
if (Math.random() > 0.5) {
subscriber.onError(new Throwable());
} else {
subscriber.onNext(baseModel);
}
subscriber.onComplete();
}
}).compose(RxHelper.<IPageModel<News>>handleResult());
}
@Override
public Observable<Boolean> recordItemIsRead(final String key) {
return Observable.create(new ObservableOnSubscribe<Boolean>() {
@Override
public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception {
emitter.onNext(DBUtils.getDB(AppUtils.getContext()).insertRead(DBConfig
.TABLE_GANKIO_DAY, key, ItemState.STATE_IS_READ));
emitter.onComplete();
}
}).compose(RxHelper.<Boolean>rxSchedulerHelper());
}
public static Observable<Progress> download(final String url,
final File file,
final boolean returnProgress) {
return Observable.create(new ObservableOnSubscribe<Progress>() {
@Override
public void subscribe(ObservableEmitter<Progress> emitter) throws Exception {
OkHttpClient httpClient = addProgressResponseListener(new OkHttpClient(), new ProgressResponseBody.ProgressListener() {
@Override
public void onProgress(long currentBytes, long contentLength, boolean done) {
if (returnProgress) {
emitter.onNext(new Progress(currentBytes, contentLength, done));
}
}
});
try {
download(httpClient, url, file);
emitter.onNext(new Progress());
} catch (Exception e) {
Log.e(TAG, e.getMessage());
emitter.onError(e);
}
emitter.onComplete();
}
});
}