下面列出了io.reactivex.ObservableEmitter#isDisposed ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void subscribe(@NonNull ObservableEmitter<T> subscriber) throws Exception {
try {
T data = execute();
if (!subscriber.isDisposed()) {
subscriber.onNext(data);
}
} catch (Throwable e) {
HttpLog.e(e.getMessage());
if (!subscriber.isDisposed()) {
subscriber.onError(e);
}
Exceptions.throwIfFatal(e);
//RxJavaPlugins.onError(e);
return;
}
if (!subscriber.isDisposed()) {
subscriber.onComplete();
}
}
/**
* @param emitter
*/
@Override
public void subscribe(final ObservableEmitter<FirebaseAuth> emitter) {
final FirebaseAuth.AuthStateListener listener = new FirebaseAuth.AuthStateListener() {
@Override
public void onAuthStateChanged(@NonNull FirebaseAuth firebaseAuth) {
if (!emitter.isDisposed()) {
emitter.onNext(firebaseAuth);
}
}
};
instance.addAuthStateListener(listener);
emitter.setDisposable(Disposables.fromAction(new Action() {
@Override
public void run() throws Exception {
instance.removeAuthStateListener(listener);
}
}));
}
DisposableObserver<? super T> disposableWrapper(final ObservableEmitter<? super T> emitter) {
return new DisposableObserver<T>() {
@Override public void onNext(@NonNull T t) {
if (!emitter.isDisposed()) {
emitter.onNext(t);
}
}
@Override public void onError(@NonNull Throwable e) {
if (!emitter.isDisposed()) {
emitter.onError(e);
}
}
@Override public void onComplete() {
if (!emitter.isDisposed()) {
emitter.onComplete();
}
}
};
}
private AuthenticationCallback createAuthenticationCallback(final ObservableEmitter<T> emitter) {
return new AuthenticationCallback() {
@Override
public void onAuthenticationError(int errMsgId, CharSequence errString) {
if (!emitter.isDisposed()) {
emitter.onError(new FingerprintAuthenticationException(errString));
}
}
@Override
public void onAuthenticationFailed() {
FingerprintObservable.this.onAuthenticationFailed(emitter);
}
@Override
public void onAuthenticationHelp(int helpMsgId, CharSequence helpString) {
FingerprintObservable.this.onAuthenticationHelp(emitter, helpMsgId, helpString.toString());
}
@Override
public void onAuthenticationSucceeded(AuthenticationResult result) {
FingerprintObservable.this.onAuthenticationSucceeded(emitter, result);
}
};
}
@Override
public void subscribe(final ObservableEmitter<View> emitter) throws Exception {
View.OnClickListener listener = new View.OnClickListener() {
@Override
public void onClick(View v) {
if (!emitter.isDisposed()) {
emitter.onNext(view);
}
}
};
view.setOnClickListener(listener);
}
@Override
public void subscribe(final ObservableEmitter<FeatureEvent> subscriber) {
verifyMainThread();
FeatureEventListener listener =
event -> {
if (!subscriber.isDisposed()) {
subscriber.onNext(event);
}
};
subscriber.setCancellable(() -> featureController.removeFeatureEventListener(listener));
featureController.addFeatureEventListener(listener);
}
private void unzip(File mInput, File mOutput, ObservableEmitter<Long> emitter) throws IOException {
Enumeration<ZipEntry> entries; //含有被压缩的文件对象(目录或文件)
ZipFile zip = null;
try {
zip = new ZipFile(mInput, "GBK");
long uncompressedSize = getOriginalSize(zip);
//publishProgress(0L, uncompressedSize);
emitter.onNext(uncompressedSize);
entries = zip.getEntries();
while (entries.hasMoreElements()) {
if (emitter.isDisposed())
break;
ZipEntry entry = entries.nextElement();
if (entry.isDirectory()) {
continue; //目录内的文件对象会在下次遍历中会出现
}
File destination = new File(mOutput, entry.getName()); //每个文件的存放路径,包括文件夹内的
if (!destination.getParentFile().exists()) {
//noinspection ResultOfMethodCallIgnored
destination.getParentFile().mkdirs();
}
ProgressReportingOutputStream outStream = new ProgressReportingOutputStream(destination, emitter);
save(zip.getInputStream(entry), outStream);
outStream.close();
}
} finally {
try {
if (zip != null) {
zip.close();
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
private void unRar(File mInput, File mOutput, final ObservableEmitter<Long> emitter) throws IOException, RarException {
Archive rarFile = null;
try {
rarFile = new Archive(mInput, new UnrarCallback() {
@Override
public boolean isNextVolumeReady(File file) {
return false;
}
@Override
public void volumeProgressChanged(long l, long l1) {
emitter.onNext(l);
}
});
long uncompressedSize = getOriginalSize(rarFile);
//publishProgress(0L, uncompressedSize);
emitter.onNext(uncompressedSize);
for (int i = 0; i < rarFile.getFileHeaders().size(); i++) {
if (emitter.isDisposed())
break;
FileHeader fh = rarFile.getFileHeaders().get(i);
String entryPath;
if (fh.isUnicode()) {
entryPath = fh.getFileNameW().trim();
} else {
entryPath = fh.getFileNameString().trim();
}
entryPath = entryPath.replaceAll("\\\\", "/");
File file = new File(mOutput, entryPath);
if (fh.isDirectory()) {
//noinspection ResultOfMethodCallIgnored
file.mkdirs();
} else {
File parent = file.getParentFile();
if (parent != null && !parent.exists()) {
//noinspection ResultOfMethodCallIgnored
parent.mkdirs();
}
FileOutputStream fileOut = new FileOutputStream(file);
rarFile.extractFile(fh, fileOut);
fileOut.close();
}
}
emitter.onComplete();
} finally {
if (rarFile != null) {
try {
rarFile.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
@SuppressWarnings({"PMD.AvoidInstantiatingObjectsInLoops", "PMD.CyclomaticComplexity"})
private void executeSmsSending(ObservableEmitter<SmsSendingState> e, String number,
List<String> smsParts, int timeoutSeconds) {
final long timeStarted = System.currentTimeMillis();
SendingStateReceiver stateReceiver = new SendingStateReceiver(timeStarted,
timeoutSeconds, sendSmsAction);
context.registerReceiver(stateReceiver, new IntentFilter(sendSmsAction));
int sentNumber = 0;
sendSmsToOS(stateReceiver, number, smsParts);
int totalMessages = smsParts.size();
e.onNext(new SmsSendingState(0, totalMessages));
while (stateReceiver.smsResultsWaiting() > 0 && !stateReceiver.isError() &&
Utility.timeLeft(timeStarted, timeoutSeconds) > 0 && !e.isDisposed()) {
// wait until timeout passes, response comes, or request disposed
try {
Thread.sleep(500);
} catch (InterruptedException ie) {
if (!e.isDisposed()) {
e.onError(ie);
}
Utility.unregisterReceiver(context, stateReceiver);
return;
}
int currentSentNumber = totalMessages - stateReceiver.smsResultsWaiting();
if (currentSentNumber != sentNumber) {
sentNumber = currentSentNumber;
e.onNext(new SmsSendingState(sentNumber, totalMessages));
}
}
Utility.unregisterReceiver(context, stateReceiver);
if (e.isDisposed()) {
return;
}
if (stateReceiver.smsResultsWaiting() == 0 && !stateReceiver.isError()) {
e.onNext(new SmsSendingState(totalMessages, totalMessages));
e.onComplete();
} else if (stateReceiver.isError()) {
e.onError(new ReceivedErrorException(stateReceiver.getErrorCode()));
} else {
e.onError(new ResultResponseException(ResultResponseIssue.TIMEOUT));
}
}