类io.reactivex.ObservableEmitter源码实例Demo

下面列出了怎么用io.reactivex.ObservableEmitter的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: YiZhi   文件: HeadSettingActivity.java
@OnClick({R.id.tv_cancel, R.id.tv_ok})
public void onClick(View view) {
    switch (view.getId()) {
        case R.id.tv_cancel:
            onBackPressedSupport();
            break;
        case R.id.tv_ok:
            Observable.create(new ObservableOnSubscribe<Uri>() {
                @Override
                public void subscribe(ObservableEmitter<Uri> e) throws
                        Exception {
                    e.onNext(generateUri());
                    e.onComplete();
                }
            }).compose(RxHelper.<Uri>rxSchedulerHelper())
                    .subscribe(new Consumer<Uri>() {
                        @Override
                        public void accept(Uri uri) throws Exception {
                            RxEventHeadBean rxEventHeadBean = new RxEventHeadBean(uri);
                            RxBus.get().send(RX_BUS_CODE_HEAD_IMAGE_URI, rxEventHeadBean);
                            onBackPressedSupport();
                        }
                    });
            break;
    }
}
 
源代码2 项目: RxAndroidBle   文件: ConnectOperation.java
@Override
protected void protectedRun(final ObservableEmitter<BluetoothGatt> emitter, final QueueReleaseInterface queueReleaseInterface) {
    final Action queueReleaseAction = new Action() {
        @Override
        public void run() {
            queueReleaseInterface.release();
        }
    };
    final DisposableSingleObserver<BluetoothGatt> disposableGattObserver = getConnectedBluetoothGatt()
            .compose(wrapWithTimeoutWhenNotAutoconnecting())
            // when there are no subscribers there is no point of continuing work -> next will be disconnect operation
            .doFinally(queueReleaseAction)
            .subscribeWith(disposableSingleObserverFromEmitter(emitter));
    emitter.setDisposable(disposableGattObserver);

    if (autoConnect) {
        // with autoConnect the connection may be established after a really long time
        queueReleaseInterface.release();
    }
}
 
源代码3 项目: AppOpsX   文件: Helper.java
public static Observable<OpsResult> setMode(final Context context, final String pkgName,
    final OpEntryInfo opEntryInfo) {

  return Observable.create(new ObservableOnSubscribe<OpsResult>() {
    @Override
    public void subscribe(ObservableEmitter<OpsResult> e) throws Exception {

      OpsResult opsForPackage = AppOpsx.getInstance(context)
          .setOpsMode(pkgName, opEntryInfo.opEntry.getOp(), opEntryInfo.mode);
      if (opsForPackage != null) {
        if (opsForPackage.getException() == null) {
          e.onNext(opsForPackage);
        } else {
          throw new Exception(opsForPackage.getException());
        }
      }
      e.onComplete();

    }
  }).retry(5, new Predicate<Throwable>() {
    @Override
    public boolean test(Throwable throwable) throws Exception {
      return throwable instanceof IOException || throwable instanceof NullPointerException;
    }
  });
}
 
@Override
public void subscribe(@NonNull final ObservableEmitter<AppState> appStateEmitter) throws Exception {
  final AppStateListener appStateListener = new AppStateListener() {
    @Override
    public void onAppDidEnterForeground() {
      appStateEmitter.onNext(FOREGROUND);
    }

    @Override
    public void onAppDidEnterBackground() {
      appStateEmitter.onNext(BACKGROUND);
    }
  };

  appStateEmitter.setCancellable(new Cancellable() {
    @Override public void cancel() throws Exception {
      recognizer.removeListener(appStateListener);
      recognizer.stop();
    }
  });

  recognizer.addListener(appStateListener);
  recognizer.start();
}
 
源代码5 项目: AndroidWallet   文件: RxBus.java
/**
 * 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
 */
public <T> Observable<T> toObservableSticky(final Class<T> eventType) {
    synchronized (mStickyEventMap) {
        Observable<T> observable = mBus.ofType(eventType);
        final Object event = mStickyEventMap.get(eventType);

        if (event != null) {
            return Observable.merge(observable, Observable.create(new ObservableOnSubscribe<T>() {
                @Override
                public void subscribe(ObservableEmitter<T> emitter) throws Exception {
                    emitter.onNext(eventType.cast(event));
                }
            }));
        } else {
            return observable;
        }
    }
}
 
源代码6 项目: apollo-android   文件: Rx2Apollo.java
/**
 * Converts an {@link ApolloQueryWatcher} to an asynchronous Observable.
 *
 * @param watcher the ApolloQueryWatcher to convert.
 * @param <T>     the value type
 * @return the converted Observable
 * @throws NullPointerException if watcher == null
 */
@NotNull
@CheckReturnValue
public static <T> Observable<Response<T>> from(@NotNull final ApolloQueryWatcher<T> watcher) {
  checkNotNull(watcher, "watcher == null");
  return Observable.create(new ObservableOnSubscribe<Response<T>>() {
    @Override public void subscribe(final ObservableEmitter<Response<T>> emitter) throws Exception {
      cancelOnObservableDisposed(emitter, watcher);

      watcher.enqueueAndWatch(new ApolloCall.Callback<T>() {
        @Override public void onResponse(@NotNull Response<T> response) {
          if (!emitter.isDisposed()) {
            emitter.onNext(response);
          }
        }

        @Override public void onFailure(@NotNull ApolloException e) {
          Exceptions.throwIfFatal(e);
          if (!emitter.isDisposed()) {
            emitter.onError(e);
          }
        }
      });
    }
  });
}
 
源代码7 项目: RxJava2Demo   文件: ChapterTwo.java
public static void demo1() {
    Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
            Log.d(TAG, "emit 1");
            emitter.onNext(1);
        }
    });

    Consumer<Integer> consumer = new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
            Log.d(TAG, "onNext: " + integer);
        }
    };

    observable.subscribe(consumer);
}
 
源代码8 项目: burstkit4j   文件: CompositeBurstNodeService.java
private synchronized <T> void doIfUsedObservable(ObservableEmitter<T> emitter, AtomicInteger usedObservable, AtomicReferenceArray<Disposable> disposables, int myI, Runnable runnable) {
    int used = usedObservable.get();
    if (used == -1) {
        // We are the first!
        usedObservable.set(myI);
        runnable.run();
        // Kill all of the others.
        Disposable myDisposable = disposables.get(myI);
        disposables.set(myI, null);
        emitter.setCancellable(() -> {
            if (myDisposable != null) {
                myDisposable.dispose();
            }
        }); // Calling this calls the previous one, so all of the others get disposed.
    } else if (used == myI) {
        // We are the used observable.
        runnable.run();
    }
}
 
private Observable<ThumbnailBitmap> obtainThumbnail(MediaMetadataRetrieverCompat mmrc, int itemCount) {
    //每500毫秒取一帧
    final long interval = 500;
    final int size = getResources().getDimensionPixelSize(R.dimen.thumbnail_size);
    //取帧是耗时的操作,需要放在子线程
    return Observable.create(new ObservableOnSubscribe<ThumbnailBitmap>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<ThumbnailBitmap> s) throws Exception {
            //异步获取缩略图
            for (int i = 0; i < itemCount; i++) {
                long millis = i * interval;
                Bitmap atTime;
                if (ORIGINAL_SIZE == mSizeType) {
                    atTime = mmrc.getFrameAtTime(millis, mFetchFrameOption);
                } else if (FLOOR_SCALE == mSizeType) {
                    atTime = mmrc.getScaledFrameAtTime(millis, mFetchFrameOption, size, size);
                } else {
                    atTime = mmrc.getCenterCropFrameAtTime(millis, mFetchFrameOption, size, size);
                }
                s.onNext(new ThumbnailBitmap(i, atTime));
            }
            s.onComplete();
        }
    }).subscribeOn(Schedulers.io());
}
 
源代码10 项目: pandroid   文件: RxActionDelegate.java
public static <T> Observable<Result<T>> observableWrapped(final OnSubscribeAction<T> subscribe) {
    return Observable.<Result<T>>create(new ObservableOnSubscribe<Result<T>>() {
        @Override
        public void subscribe(final ObservableEmitter<Result<T>> emitter) throws Exception {
            final RxActionDelegate<T> delegate = new RxActionDelegate<>(new ActionDelegate<T>() {
                @Override
                public void onSuccess(T result) {
                    emitter.onNext(new Result<T>(result));
                }

                @Override
                public void onError(Exception e) {
                    emitter.onNext(new Result<T>(e));
                }
            });
            emitter.setDisposable(delegate);
            subscribe.subscribe(delegate);
        }
    });
}
 
源代码11 项目: Capstone-Project   文件: ImageFetcher.java
public static Observable<Bitmap> getBitmap(final String url) {
    return Observable.create(new ObservableOnSubscribe<Bitmap>() {
        @Override
        public void subscribe(ObservableEmitter<Bitmap> emitter) throws Exception {
            Request request = new Request.Builder()
                    .url(url)
                    .get()
                    .build();

            Response response = OK_HTTP_CLIENT.newCall(request).execute();

            InputStream inputStream = response.body().byteStream();
            Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
            inputStream.close();

            if (bitmap == null) {
                emitter.onError(new NullPointerException("Bitmap fetched from url is null"));
            } else {
                emitter.onNext(bitmap);
            }
            emitter.onNext(bitmap);
        }
    });
}
 
@Override
public void subscribe(ObservableEmitter<DocumentSnapshot> e) throws Exception {
    DocEventListener listener = new DocEventListener(e);
    ListenerRegistration registration = mDocumentReference.addSnapshotListener(listener);
    e.setDisposable(new Disposable() {
        boolean disposed = false;
        @Override
        public void dispose() {
            if (!isDisposed()) {
                registration.remove();
                listener.emitter = null;
                disposed = true;
            }
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }
    });
}
 
源代码13 项目: RxJava2Demo   文件: ChapterThree.java
public static void demo3() {
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
        }
    }).concatMap(new Function<Integer, ObservableSource<String>>() {
        @Override
        public ObservableSource<String> apply(Integer integer) throws Exception {
            final List<String> list = new ArrayList<>();
            for (int i = 0; i < 3; i++) {
                list.add("I am value " + integer);
            }
            return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.d(TAG, s);
        }
    });
}
 
源代码14 项目: GankGirl   文件: ReadTadPagePresenter.java
@Override
public void subscribe(ObservableEmitter<List<ReadTypeBean>> subscriber) throws Exception {
    List<ReadTypeBean> datas = new ArrayList<>();
    try {
        Document doc = Jsoup.connect(Constants.API_URL_READ).get();
        Elements tads = doc.select("div#xiandu_cat").select("a");
        for (Element tad : tads) {
            ReadTypeBean bean = new ReadTypeBean();
            bean.setTitle(tad.text());//获取标题
            bean.setUrl(tad.absUrl("href"));//absUrl可以获取地址的绝对路径
            datas.add(bean);
            Log.v("Jsoup","title= "+bean.getTitle()+"   url= "+bean.getUrl());
        }
    } catch (IOException e) {
        subscriber.onError(e);
    }

    subscriber.onNext(datas);
    subscriber.onComplete();
}
 
源代码15 项目: RxGps   文件: RxGps.java
private Observable<Boolean> checkPlayServicesAvailable() {
    return Observable.create(new ObservableOnSubscribe<Boolean>() {
        @Override
        public void subscribe(ObservableEmitter<Boolean> e) throws Exception {
            final Activity activity = activityReference.get();
            if (activity != null) {
                final GoogleApiAvailability apiAvailability = GoogleApiAvailability.getInstance();
                final int status = apiAvailability.isGooglePlayServicesAvailable(activity);

                if (status != ConnectionResult.SUCCESS) {
                    e.onError(new PlayServicesNotAvailableException());
                } else {
                    e.onNext(true);
                    e.onComplete();
                }
            }
        }
    });
}
 
源代码16 项目: Dainty   文件: LoginRegisterModel.java
@Override
public Observable<String> login(final String account, final String oldPassword) {
    return Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            String path = "http://36078d58.nat123.cc/AndroidRegisterAndLogin_war/login";
            Map<String, String> params = new HashMap<>();
            params.put("username", account);
            params.put("password", oldPassword);

            ResponseBody responseBody=executeHttp(path,params);
            if (responseBody != null) {
                String result = responseBody.string().replaceAll("(\\\r\\\n|\\\r|\\\n|\\\n\\\r)", "");
                emitter.onNext(result);
                responseBody.close();
            } else {
                emitter.onError(new Throwable("响应体为空"));
            }
        }
    }).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
}
 
源代码17 项目: Dainty   文件: DecompressionModel.java
@Override
public Observable<Long> decompression(final String type, String original, String purpose, boolean deleteZip) {
    final File mInput = new File(original);
    final File mOutput = new File(purpose);
    mProgress = 0L;
    return Observable.create(new ObservableOnSubscribe<Long>() {
        @Override
        public void subscribe(ObservableEmitter<Long> emitter) throws Exception {
            switch (type) {
                case "zip":
                    unzip(mInput, mOutput, emitter);
                    break;
                case "rar":
                    unRar(mInput, mOutput, emitter);
                    break;
                default:
                    emitter.onError(new Throwable("无法解压该类型的压缩文件"));
                    break;
            }
        }
    }).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
}
 
源代码18 项目: power-adapters   文件: RxPowerAdapter.java
@CheckResult
@NonNull
public static Observable<ChangeEvent> changes(@NonNull final PowerAdapter adapter) {
    checkNotNull(adapter, "adapter");
    return Observable.create(new ObservableOnSubscribe<ChangeEvent>() {
        @Override
        public void subscribe(final ObservableEmitter<ChangeEvent> emitter) throws Exception {
            verifyMainThread();
            final DataObserver dataObserver = new Observer() {
                @Override
                public void onItemRangeChanged(int positionStart, int itemCount, @Nullable Object payload) {
                    emitter.onNext(new ChangeEvent(positionStart, itemCount, payload));
                }
            };
            adapter.registerDataObserver(dataObserver);
            emitter.setDisposable(new MainThreadDisposable() {
                @Override
                protected void onDispose() {
                    adapter.unregisterDataObserver(dataObserver);
                }
            });
        }
    });
}
 
源代码19 项目: RuntimePermission   文件: RxPermissions.java
public Observable<PermissionResult> request(final List<String> permissions) {
    return Observable.create(new ObservableOnSubscribe<PermissionResult>() {
        @Override
        public void subscribe(final ObservableEmitter<PermissionResult> emitter) throws Exception {
            runtimePermission
                    .request(permissions)
                    .onResponse(new ResponseCallback() {
                        @Override
                        public void onResponse(PermissionResult result) {
                            if (result.isAccepted()) {
                                emitter.onNext(result);
                                emitter.onComplete();
                            } else {
                                emitter.onError(new Error(result));
                            }
                        }
                    }).ask();
        }
    });
}
 
源代码20 项目: RxBeacon   文件: RxBeacon.java
public Observable<RxBeaconRange> beaconsInRegion() {
    return startup()
            .flatMap(new Function<Boolean, ObservableSource<RxBeaconRange>>() {
                @Override
                public ObservableSource<RxBeaconRange> apply(@NonNull Boolean aBoolean) throws Exception {
                    return Observable.create(new ObservableOnSubscribe<RxBeaconRange>() {
                        @Override
                        public void subscribe(@NonNull final ObservableEmitter<RxBeaconRange> objectObservableEmitter) throws Exception {
                            beaconManager.addRangeNotifier(new RangeNotifier() {
                                @Override
                                public void didRangeBeaconsInRegion(Collection<Beacon> collection, Region region) {
                                    objectObservableEmitter.onNext(new RxBeaconRange(collection, region));
                                }
                            });
                            beaconManager.startRangingBeaconsInRegion(getRegion());
                        }
                    });
                }
            });
}
 
源代码21 项目: Nibo   文件: LocationRepository.java
@Override
public Observable<Location> getLocationObservable() {
    return Observable.create(new ObservableOnSubscribe<Location>() {
        @Override
        public void subscribe(final ObservableEmitter<Location> source) throws Exception {
            FusedLocationProviderClient locationClient = LocationServices.getFusedLocationProviderClient(context);
            locationClient.getLastLocation().addOnSuccessListener(new OnSuccessListener<Location>() {
                @Override
                public void onSuccess(Location location) {
                    if (location != null) {
                        source.onNext(location);
                    }
                }
            }).addOnFailureListener(new OnFailureListener() {
                @Override
                public void onFailure(@NonNull Exception e) {
                    source.onError(e);
                }
            });
        }
    }).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
}
 
源代码22 项目: Tok-Android   文件: MsgHelper.java
private static void sendMessage(final ContactsKey receiverKey, final boolean onLine,
    final String msg, final MessageType msgType, final boolean isFriend) {
    final ToxManager manager = ToxManager.getManager();
    final InfoRepository infoRepo = State.infoRepo();
    for (final String splitMsg : splitMessage(msg)) {
        ContactsKey senderKey = manager.toxBase.getSelfKey();
        ToxNickname senderName = manager.toxBase.getName();
        final long dbId = infoRepo.addMessage(receiverKey, senderKey, senderName, splitMsg,
            GlobalParams.SEND_ING, true, msgType, -2);//can't -1
        Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
                long id = 0;
                if (isFriend) {
                    if (onLine) {
                        id = manager.toxBase.friendSendMessage(receiverKey,
                            ToxFriendMessage.unsafeFromValue(splitMsg.getBytes()),
                            MessageType.toToxMessageType(msgType));
                    } else {
                        id = manager.toxBase.generateUniqueId(receiverKey);
                        ContactsKey botKey =
                            new ToxAddress(GlobalParams.OFFLINE_BOT_TOK_ID).getKey();
                        //ignore the id
                        LogUtil.i(TAG, "botKey:" + botKey.getKey() + ",offlineMsgId:" + id);
                        manager.toxBase.friendSendMessageOffline(botKey,
                            OfflineBuilder.offlineMsgSend(id, receiverKey.getKey(), splitMsg));
                    }
                }

                if (id > 0) {
                    infoRepo.setMessageSending(id, dbId);
                    MsgTimer.startTimer(id);
                } else {
                    infoRepo.setMessageFailByDbId(dbId);
                }
                emitter.onComplete();
            }
        }).subscribeOn(Schedulers.io()).subscribe();
    }
}
 
源代码23 项目: Capstone-Project   文件: PostsPresenter.java
@Override
public void notificationShownForPost(final int postId) {
    Observable<Void> clearPostsObservable = Observable.create(new ObservableOnSubscribe<Void>() {
        @Override
        public void subscribe(ObservableEmitter<Void> emitter) throws Exception {
            PredatorDatabase.getInstance()
                    .setNotificationShownForPost(postId);
            emitter.onComplete();
        }
    }).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());

    mCompositeDisposable.add(clearPostsObservable.subscribeWith(new DisposableObserver<Void>() {
        @Override
        public void onComplete() {
            // Done
        }

        @Override
        public void onError(Throwable e) {
            Logger.e(TAG, "onError: " + e.getMessage(), e);
        }

        @Override
        public void onNext(Void aVoid) {

        }
    }));
}
 
源代码24 项目: xposed-rimet   文件: RimetLocalSource.java
private <T> void subscribe(ObservableEmitter<T> emitter, OnHandler<T> handler) {

        try {
            // 开始处理
            T value = handler.onHandler();

            if (value != null) {
                // 返回结果
                emitter.onNext(value);
            }
            emitter.onComplete();
        } catch (Throwable tr) {
            emitter.onError(tr);
        }
    }
 
源代码25 项目: mcspring-boot   文件: EventEmitter.java
@Override
public void subscribe(ObservableEmitter<T> observableEmitter) {
    val pluginManager = plugin.getServer().getPluginManager();
    pluginManager.registerEvent(eventClazz, listener, observeEvent.priority(), (l, event) -> {
        if (eventClazz.isAssignableFrom(event.getClass())) {
            T emittedEvent = (T) event;
            context.runWithSender(EventUtil.getSender(emittedEvent), () -> observableEmitter.onNext(emittedEvent));
        }
    }, plugin, observeEvent.ignoreCancelled());
}
 
源代码26 项目: RxSharedPreferences   文件: RxSharedPreferences.java
public Observable<Set<String>> getString(final String key, final Set<String> defaultValue) {
    return Observable.create(new ObservableOnSubscribe<Set<String>>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Set<String>> e) throws Exception {
            e.onNext(sharedPreferences.getStringSet(key, defaultValue));
            e.onComplete();
        }
    });
}
 
源代码27 项目: RxFingerprint   文件: RsaDecryptionObservable.java
@Override
protected void onAuthenticationSucceeded(ObservableEmitter<FingerprintDecryptionResult> emitter, AuthenticationResult result) {
	try {
		Cipher cipher = result.getCryptoObject().getCipher();
		byte[] bytes = cipher.doFinal(encodingProvider.decode(encryptedString));

		emitter.onNext(new FingerprintDecryptionResult(FingerprintResult.AUTHENTICATED, null, ConversionUtils.toChars(bytes)));
		emitter.onComplete();
	} catch (Exception e) {
		Logger.error("Unable to decrypt given value. RxFingerprint is only able to decrypt values previously encrypted by RxFingerprint with the same encryption mode.", e);
		emitter.onError(cipherProvider.mapCipherFinalOperationException(e));
	}

}
 
源代码28 项目: smart-farmer-android   文件: FakeLoader.java
public static Observable<IPageModel<News>> loadNewsList(final int page) {
    return Observable.create(new ObservableOnSubscribe<DemoHttpModel<IPageModel<News>>>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<DemoHttpModel<IPageModel<News>>> subscriber) throws Exception {
            DemoHttpModel<IPageModel<News>> baseModel = new DemoHttpModel<>();
            baseModel.error = 0;
            DemoPageModel<News> pageModel = new DemoPageModel<>();
            pageModel.currentPage = page;
            pageModel.totalPage = 10;
            pageModel.dataList = new ArrayList<>();
            for (int i = 0; i < 10; ++i) {
                News standard = new News();
                standard.setTitle("新闻标题" + page + "" + i);
                standard.setDesc("征收标准:自2015年9月1日起,适当调整我省水资源费征收标准。淮河流域及合肥市、滁州市地表水资源费征收标准为每立方米0.12元;其他地区为每立方米0.08元。其中,水力发电用水水资源费征收标准0.003元/千万时,贯流式火电为0.001元/千万时,抽水蓄能电站发电循环用水量暂不征收水资源费。");
                standard.setImgPath("http://ds.cdncache.org/avatar-50/532/164695.jpg");
                pageModel.dataList.add(standard);
            }
            baseModel.results = pageModel;
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                baseModel.error = 111;
                baseModel.msg = "超时";
            }

            if (Math.random() > 0.5) {
                subscriber.onError(new Throwable());
            } else {
                subscriber.onNext(baseModel);
            }

            subscriber.onComplete();
        }
    }).compose(RxHelper.<IPageModel<News>>handleResult());
}
 
源代码29 项目: YiZhi   文件: GankIoDayModel.java
@Override
public Observable<Boolean> recordItemIsRead(final String key) {
    return Observable.create(new ObservableOnSubscribe<Boolean>() {
        @Override
        public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception {
            emitter.onNext(DBUtils.getDB(AppUtils.getContext()).insertRead(DBConfig
                    .TABLE_GANKIO_DAY, key, ItemState.STATE_IS_READ));
            emitter.onComplete();
        }
    }).compose(RxHelper.<Boolean>rxSchedulerHelper());
}
 
源代码30 项目: tysq-android   文件: DownloadHelper.java
public static Observable<Progress> download(final String url,
                                            final File file,
                                            final boolean returnProgress) {
    return Observable.create(new ObservableOnSubscribe<Progress>() {
        @Override
        public void subscribe(ObservableEmitter<Progress> emitter) throws Exception {

            OkHttpClient httpClient = addProgressResponseListener(new OkHttpClient(), new ProgressResponseBody.ProgressListener() {
                @Override
                public void onProgress(long currentBytes, long contentLength, boolean done) {
                    if (returnProgress) {
                        emitter.onNext(new Progress(currentBytes, contentLength, done));
                    }
                }
            });

            try {
                download(httpClient, url, file);

                emitter.onNext(new Progress());

            } catch (Exception e) {
                Log.e(TAG, e.getMessage());
                emitter.onError(e);
            }

            emitter.onComplete();
        }
    });
}
 
 类所在包
 同包方法