类io.reactivex.functions.Cancellable源码实例Demo

下面列出了怎么用io.reactivex.functions.Cancellable的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: mobius   文件: RxEventSources.java
/**
 * Create an observable from the given event source.
 *
 * @param eventSource the eventSource you want to convert to an observable
 * @param <E> the event type
 * @return an Observable based on the provided event source
 */
public static <E> Observable<E> toObservable(final EventSource<E> eventSource) {
  return Observable.create(
      new ObservableOnSubscribe<E>() {
        @Override
        public void subscribe(final ObservableEmitter<E> emitter) throws Exception {
          final Disposable disposable =
              eventSource.subscribe(
                  new Consumer<E>() {
                    @Override
                    public void accept(E value) {
                      emitter.onNext(value);
                    }
                  });

          emitter.setCancellable(
              new Cancellable() {
                @Override
                public void cancel() throws Exception {
                  disposable.dispose();
                }
              });
        }
      });
}
 
源代码2 项目: ObjectBoxRxJava   文件: RxBoxStore.java
/**
 * Using the returned Observable, you can be notified about data changes.
 * Once a transaction is committed, you will get info on classes with changed Objects.
 */
public static <T> Observable<Class> observable(final BoxStore boxStore) {
    return Observable.create(new ObservableOnSubscribe<Class>() {
        @Override
        public void subscribe(final ObservableEmitter<Class> emitter) throws Exception {
            final DataSubscription dataSubscription = boxStore.subscribe().observer(new DataObserver<Class>() {
                @Override
                public void onData(Class data) {
                    if (!emitter.isDisposed()) {
                        emitter.onNext(data);
                    }
                }
            });
            emitter.setCancellable(new Cancellable() {
                @Override
                public void cancel() throws Exception {
                    dataSubscription.cancel();
                }
            });
        }
    });
}
 
源代码3 项目: ObjectBoxRxJava   文件: RxQuery.java
static <T> void createListItemEmitter(final Query<T> query, final FlowableEmitter<T> emitter) {
    final DataSubscription dataSubscription = query.subscribe().observer(new DataObserver<List<T>>() {
        @Override
        public void onData(List<T> data) {
            for (T datum : data) {
                if (emitter.isCancelled()) {
                    return;
                } else {
                    emitter.onNext(datum);
                }
            }
            if (!emitter.isCancelled()) {
                emitter.onComplete();
            }
        }
    });
    emitter.setCancellable(new Cancellable() {
        @Override
        public void cancel() throws Exception {
            dataSubscription.cancel();
        }
    });
}
 
源代码4 项目: ObjectBoxRxJava   文件: RxQuery.java
/**
 * The returned Observable emits Query results as Lists.
 * Never completes, so you will get updates when underlying data changes.
 */
public static <T> Observable<List<T>> observable(final Query<T> query) {
    return Observable.create(new ObservableOnSubscribe<List<T>>() {
        @Override
        public void subscribe(final ObservableEmitter<List<T>> emitter) throws Exception {
            final DataSubscription dataSubscription = query.subscribe().observer(new DataObserver<List<T>>() {
                @Override
                public void onData(List<T> data) {
                    if (!emitter.isDisposed()) {
                        emitter.onNext(data);
                    }
                }
            });
            emitter.setCancellable(new Cancellable() {
                @Override
                public void cancel() throws Exception {
                    dataSubscription.cancel();
                }
            });
        }
    });
}
 
@Override
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
public synchronized <T> Observable<T> queue(final Operation<T> operation) {
    if (!shouldRun) {
        return Observable.error(disconnectionException);
    }

    return Observable.create(new ObservableOnSubscribe<T>() {
        @Override
        public void subscribe(ObservableEmitter<T> emitter) {
            final FIFORunnableEntry entry = new FIFORunnableEntry<>(operation, emitter);
            emitter.setCancellable(new Cancellable() {
                @Override
                public void cancel() {
                    if (queue.remove(entry)) {
                        logOperationRemoved(operation);
                    }
                }
            });

            logOperationQueued(operation);
            queue.add(entry);
        }
    });
}
 
源代码6 项目: RxAndroidBle   文件: ScanOperation.java
@Override
final protected void protectedRun(final ObservableEmitter<SCAN_RESULT_TYPE> emitter, QueueReleaseInterface queueReleaseInterface) {

    final SCAN_CALLBACK_TYPE scanCallback = createScanCallback(emitter);

    try {
        emitter.setCancellable(new Cancellable() {
            @Override
            public void cancel() {
                RxBleLog.i("Scan operation is requested to stop.");
                stopScan(rxBleAdapterWrapper, scanCallback);
            }
        });
        RxBleLog.i("Scan operation is requested to start.");
        boolean startLeScanStatus = startScan(rxBleAdapterWrapper, scanCallback);

        if (!startLeScanStatus) {
            emitter.tryOnError(new BleScanException(BleScanException.BLUETOOTH_CANNOT_START));
        }
    } catch (Throwable throwable) {
        RxBleLog.w(throwable, "Error while calling the start scan function");
        emitter.tryOnError(new BleScanException(BleScanException.BLUETOOTH_CANNOT_START, throwable));
    } finally {
        queueReleaseInterface.release();
    }
}
 
public Observable<Boolean> get() {
    return Observable.create(new ObservableOnSubscribe<Boolean>() {
        @Override
        public void subscribe(final ObservableEmitter<Boolean> emitter) {
            final boolean initialValue = locationServicesStatus.isLocationProviderOk();
            final BroadcastReceiver broadcastReceiver = new BroadcastReceiver() {
                @Override
                public void onReceive(Context context, Intent intent) {
                    final boolean newValue = locationServicesStatus.isLocationProviderOk();
                    emitter.onNext(newValue);
                }
            };
            emitter.onNext(initialValue);
            context.registerReceiver(broadcastReceiver, new IntentFilter(LocationManager.MODE_CHANGED_ACTION));
            emitter.setCancellable(new Cancellable() {
                @Override
                public void cancel() {
                    context.unregisterReceiver(broadcastReceiver);
                }
            });
        }
    })
            .distinctUntilChanged()
            .subscribeOn(Schedulers.trampoline())
            .unsubscribeOn(Schedulers.trampoline());
}
 
@Override
public void subscribe(@NonNull final ObservableEmitter<AppState> appStateEmitter) throws Exception {
  final AppStateListener appStateListener = new AppStateListener() {
    @Override
    public void onAppDidEnterForeground() {
      appStateEmitter.onNext(FOREGROUND);
    }

    @Override
    public void onAppDidEnterBackground() {
      appStateEmitter.onNext(BACKGROUND);
    }
  };

  appStateEmitter.setCancellable(new Cancellable() {
    @Override public void cancel() throws Exception {
      recognizer.removeListener(appStateListener);
      recognizer.stop();
    }
  });

  recognizer.addListener(appStateListener);
  recognizer.start();
}
 
源代码9 项目: rxfirebase   文件: RxValue.java
/**
 * @param query
 * @return
 */
@NonNull
@CheckReturnValue
public static Single<DataSnapshot> single(@NonNull final Query query) {
    return Single.create(new SingleOnSubscribe<DataSnapshot>() {
        @Override
        public void subscribe(
                @NonNull final SingleEmitter<DataSnapshot> emit) throws Exception {
            final ValueEventListener listener = listener(emit);

            emit.setCancellable(new Cancellable() {
                @Override
                public void cancel() throws Exception {
                    query.removeEventListener(listener);
                }
            });

            query.addListenerForSingleValueEvent(listener);
        }
    });
}
 
源代码10 项目: rxfirebase   文件: RxDatabaseReference.java
/**
 * @param query
 * @return
 */
@NonNull
@CheckReturnValue
public static Single<DataSnapshot> single(@NonNull final Query query) {
    return Single.create(new SingleOnSubscribe<DataSnapshot>() {
        @Override
        public void subscribe(
                @NonNull final SingleEmitter<DataSnapshot> emit) throws Exception {
            final ValueEventListener listener = listener(emit);

            emit.setCancellable(new Cancellable() {
                @Override
                public void cancel() throws Exception {
                    query.removeEventListener(listener);
                }
            });

            query.addListenerForSingleValueEvent(listener);
        }
    });
}
 
源代码11 项目: RxFingerprint   文件: FingerprintObservable.java
@Override
@RequiresPermission(USE_FINGERPRINT)
@RequiresApi(Build.VERSION_CODES.M)
public void subscribe(ObservableEmitter<T> emitter) throws Exception {
	if (fingerprintApiWrapper.isUnavailable()) {
		emitter.onError(new FingerprintUnavailableException("Fingerprint authentication is not available on this device! Ensure that the device has a Fingerprint sensor and enrolled Fingerprints by calling RxFingerprint#isAvailable(Context) first"));
		return;
	}

	AuthenticationCallback callback = createAuthenticationCallback(emitter);
	cancellationSignal = fingerprintApiWrapper.createCancellationSignal();
	CryptoObject cryptoObject = initCryptoObject(emitter);
	//noinspection MissingPermission
	fingerprintApiWrapper.getFingerprintManager().authenticate(cryptoObject, cancellationSignal, 0, callback, null);

	emitter.setCancellable(new Cancellable() {
		@Override
		public void cancel() throws Exception {
			if (cancellationSignal != null && !cancellationSignal.isCanceled()) {
				cancellationSignal.cancel();
			}
		}
	});
}
 
源代码12 项目: android-mvvm   文件: FieldUtils.java
/**
 * Converts an ObservableField to an Observable. Note that setting null value inside
 * ObservableField (except for initial value) throws a NullPointerException.
 * @return Observable that contains the latest value in the ObservableField
 */
@NonNull
public static <T> Observable<T> toObservable(@NonNull final ObservableField<T> field) {

    return Observable.create(new ObservableOnSubscribe<T>() {
        @Override
        public void subscribe(final ObservableEmitter<T> e) throws Exception {
            T initialValue = field.get();
            if (initialValue != null) {
                e.onNext(initialValue);
            }
            final OnPropertyChangedCallback callback = new OnPropertyChangedCallback() {
                @Override
                public void onPropertyChanged(android.databinding.Observable observable, int i) {
                    e.onNext(field.get());
                }
            };
            field.addOnPropertyChangedCallback(callback);
            e.setCancellable(new Cancellable() {
                @Override
                public void cancel() throws Exception {
                    field.removeOnPropertyChangedCallback(callback);
                }
            });
        }
    });
}
 
源代码13 项目: rxfirebase   文件: RxValue.java
/**
 * @param query
 * @return
 */
@NonNull
@CheckReturnValue
public static Observable<DataSnapshot> changes(@NonNull final Query query) {
    return Observable.create(new ObservableOnSubscribe<DataSnapshot>() {
        @Override
        public void subscribe(
                @NonNull final ObservableEmitter<DataSnapshot> emit) throws Exception {
            final ValueEventListener listener = new ValueEventListener() {
                @Override
                public void onDataChange(DataSnapshot dataSnapshot) {
                    if (!emit.isDisposed()) {
                        emit.onNext(dataSnapshot);
                    }
                }

                @Override
                public void onCancelled(DatabaseError e) {
                    if (!emit.isDisposed()) {
                        emit.onError(e.toException());
                    }
                }
            };

            emit.setCancellable(new Cancellable() {
                @Override
                public void cancel() throws Exception {
                    query.removeEventListener(listener);
                }
            });

            query.addValueEventListener(listener);
        }
    });
}
 
源代码14 项目: rxfirebase   文件: RxDatabaseReference.java
/**
 * @param query
 * @return
 */
@NonNull
@CheckReturnValue
public static Observable<DataSnapshot> changes(@NonNull final Query query) {
    return Observable.create(new ObservableOnSubscribe<DataSnapshot>() {
        @Override
        public void subscribe(
                @NonNull final ObservableEmitter<DataSnapshot> emit) throws Exception {
            final ValueEventListener listener = new ValueEventListener() {
                @Override
                public void onDataChange(DataSnapshot dataSnapshot) {
                    if (!emit.isDisposed()) {
                        emit.onNext(dataSnapshot);
                    }
                }

                @Override
                public void onCancelled(DatabaseError e) {
                    if (!emit.isDisposed()) {
                        emit.onError(e.toException());
                    }
                }
            };

            emit.setCancellable(new Cancellable() {
                @Override
                public void cancel() throws Exception {
                    query.removeEventListener(listener);
                }
            });

            query.addValueEventListener(listener);
        }
    });
}
 
源代码15 项目: RxBroadcast   文件: RxBroadcast.java
private static Observable<Intent> createBroadcastObservable(
        final BroadcastRegistrarStrategy broadcastRegistrarStrategy,
        final OrderedBroadcastAbortStrategy orderedBroadcastAbortStrategy) {
    return Observable.create(new ObservableOnSubscribe<Intent>() {

        @Override
        public void subscribe(final ObservableEmitter<Intent> intentEmitter) throws Exception {
            final BroadcastReceiver broadcastReceiver = new BroadcastReceiver() {
                @Override
                public void onReceive(Context context, Intent intent) {
                    intentEmitter.onNext(intent);

                    if (isOrderedBroadcast()) {
                        orderedBroadcastAbortStrategy.handleOrderedBroadcast(
                                context,
                                intent,
                                BroadcastReceiverAbortProxy.create(this));
                    }
                }
            };

            intentEmitter.setCancellable(new Cancellable() {
                @Override
                public void cancel() throws Exception {
                    broadcastRegistrarStrategy.unregisterBroadcastReceiver(broadcastReceiver);
                }
            });

            broadcastRegistrarStrategy.registerBroadcastReceiver(broadcastReceiver);
        }
    });
}
 
源代码16 项目: mobius   文件: RxMobiusLoop.java
@Override
public ObservableSource<M> apply(final Observable<E> events) {
  return Observable.create(
      new ObservableOnSubscribe<M>() {
        @Override
        public void subscribe(final ObservableEmitter<M> emitter) throws Exception {
          final MobiusLoop<M, E, ?> loop;
          if (startEffects == null) {
            loop = loopFactory.startFrom(startModel);
          } else {
            loop = loopFactory.startFrom(startModel, startEffects);
          }

          loop.observe(
              new com.spotify.mobius.functions.Consumer<M>() {
                @Override
                public void accept(M newModel) {
                  emitter.onNext(newModel);
                }
              });

          final Disposable eventsDisposable =
              events.subscribe(
                  new Consumer<E>() {
                    @Override
                    public void accept(E event) throws Exception {
                      loop.dispatchEvent(event);
                    }
                  },
                  new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                      emitter.onError(new UnrecoverableIncomingException(throwable));
                    }
                  });

          emitter.setCancellable(
              new Cancellable() {
                @Override
                public void cancel() throws Exception {
                  loop.dispose();
                  eventsDisposable.dispose();
                }
              });
        }
      });
}
 
源代码17 项目: mobius   文件: RxConnectables.java
public static <I, O> ObservableTransformer<I, O> toTransformer(
    final Connectable<I, O> connectable) {
  return new ObservableTransformer<I, O>() {
    @Override
    public ObservableSource<O> apply(final Observable<I> upstream) {
      return Observable.create(
          new ObservableOnSubscribe<O>() {
            @Override
            public void subscribe(final ObservableEmitter<O> emitter) throws Exception {
              Consumer<O> output =
                  new Consumer<O>() {
                    @Override
                    public void accept(O value) {
                      emitter.onNext(value);
                    }
                  };

              final Connection<I> input = connectable.connect(output);

              final Disposable disposable =
                  upstream.subscribe(
                      new io.reactivex.functions.Consumer<I>() {
                        @Override
                        public void accept(I f) {
                          input.accept(f);
                        }
                      },
                      new io.reactivex.functions.Consumer<Throwable>() {
                        @Override
                        public void accept(Throwable throwable) throws Exception {
                          emitter.onError(throwable);
                        }
                      },
                      new Action() {
                        @Override
                        public void run() {
                          emitter.onComplete();
                        }
                      });

              emitter.setCancellable(
                  new Cancellable() {
                    @Override
                    public void cancel() throws Exception {
                      disposable.dispose();
                      input.dispose();
                    }
                  });
            }
          });
    }
  };
}
 
源代码18 项目: rxjava2-extras   文件: TransformerStateMachine.java
@Override
public void setCancellable(Cancellable c) {
    throw new UnsupportedOperationException();
}
 
@Test
public void setsCancellable() {
  verify(mockEmitter).setCancellable(any(Cancellable.class));
}
 
源代码20 项目: sqlbrite   文件: BriteContentResolver.java
/**
 * Create an observable which will notify subscribers with a {@linkplain Query query} for
 * execution. Subscribers are responsible for <b>always</b> closing {@link Cursor} instance
 * returned from the {@link Query}.
 * <p>
 * Subscribers will receive an immediate notification for initial data as well as subsequent
 * notifications for when the supplied {@code uri}'s data changes. Unsubscribe when you no longer
 * want updates to a query.
 * <p>
 * Since content resolver triggers are inherently asynchronous, items emitted from the returned
 * observable use the {@link Scheduler} supplied to {@link SqlBrite#wrapContentProvider}. For
 * consistency, the immediate notification sent on subscribe also uses this scheduler. As such,
 * calling {@link Observable#subscribeOn subscribeOn} on the returned observable has no effect.
 * <p>
 * Note: To skip the immediate notification and only receive subsequent notifications when data
 * has changed call {@code skip(1)} on the returned observable.
 * <p>
 * <b>Warning:</b> this method does not perform the query! Only by subscribing to the returned
 * {@link Observable} will the operation occur.
 *
 * @see ContentResolver#query(Uri, String[], String, String[], String)
 * @see ContentResolver#registerContentObserver(Uri, boolean, ContentObserver)
 */
@CheckResult @NonNull
public QueryObservable createQuery(@NonNull final Uri uri, @Nullable final String[] projection,
    @Nullable final String selection, @Nullable final String[] selectionArgs, @Nullable
    final String sortOrder, final boolean notifyForDescendents) {
  final Query query = new Query() {
    @Override public Cursor run() {
      long startNanos = nanoTime();
      Cursor cursor = contentResolver.query(uri, projection, selection, selectionArgs, sortOrder);

      if (logging) {
        long tookMillis = NANOSECONDS.toMillis(nanoTime() - startNanos);
        log("QUERY (%sms)\n  uri: %s\n  projection: %s\n  selection: %s\n  selectionArgs: %s\n  "
                + "sortOrder: %s\n  notifyForDescendents: %s", tookMillis, uri,
            Arrays.toString(projection), selection, Arrays.toString(selectionArgs), sortOrder,
            notifyForDescendents);
      }

      return cursor;
    }
  };
  Observable<Query> queries = Observable.create(new ObservableOnSubscribe<Query>() {
    @Override public void subscribe(final ObservableEmitter<Query> e) throws Exception {
      final ContentObserver observer = new ContentObserver(contentObserverHandler) {
        @Override public void onChange(boolean selfChange) {
          if (!e.isDisposed()) {
            e.onNext(query);
          }
        }
      };
      contentResolver.registerContentObserver(uri, notifyForDescendents, observer);
      e.setCancellable(new Cancellable() {
        @Override public void cancel() throws Exception {
          contentResolver.unregisterContentObserver(observer);
        }
      });

      if (!e.isDisposed()) {
        e.onNext(query); // Trigger initial query.
      }
    }
  });
  return queries //
      .observeOn(scheduler) //
      .compose(queryTransformer) // Apply the user's query transformer.
      .to(QUERY_OBSERVABLE);
}
 
 类所在包
 同包方法