下面列出了io.reactivex.functions.Cancellable#io.objectbox.reactive.DataObserver 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void showStoredMultipleDaysWeather() {
Query<MultipleDaysWeather> query = DbUtil.getMultipleDaysWeatherQuery(multipleDaysWeatherBox);
query.subscribe().on(AndroidScheduler.mainThread())
.observer(new DataObserver<List<MultipleDaysWeather>>() {
@Override
public void onData(@NonNull List<MultipleDaysWeather> data) {
if (data.size() > 0) {
final Handler handler = new Handler();
handler.postDelayed(new Runnable() {
@Override
public void run() {
data.remove(0);
mItemAdapter.clear();
mItemAdapter.add(data);
}
}, 500);
}
}
});
}
private void showStoredCurrentWeather() {
Query<CurrentWeather> query = DbUtil.getCurrentWeatherQuery(currentWeatherBox);
query.subscribe(subscriptions).on(AndroidScheduler.mainThread())
.observer(new DataObserver<List<CurrentWeather>>() {
@Override
public void onData(@NonNull List<CurrentWeather> data) {
if (data.size() > 0) {
hideEmptyLayout();
CurrentWeather currentWeather = data.get(0);
if (isLoad) {
tempTextView.setText(String.format(Locale.getDefault(), "%.0f°", currentWeather.getTemp()));
descriptionTextView.setText(AppUtil.getWeatherStatus(currentWeather.getWeatherId(), AppUtil.isRTL(MainActivity.this)));
humidityTextView.setText(String.format(Locale.getDefault(), "%d%%", currentWeather.getHumidity()));
windTextView.setText(String.format(Locale.getDefault(), getResources().getString(R.string.wind_unit_label), currentWeather.getWindSpeed()));
} else {
tempTextView.setCurrentText(String.format(Locale.getDefault(), "%.0f°", currentWeather.getTemp()));
descriptionTextView.setCurrentText(AppUtil.getWeatherStatus(currentWeather.getWeatherId(), AppUtil.isRTL(MainActivity.this)));
humidityTextView.setCurrentText(String.format(Locale.getDefault(), "%d%%", currentWeather.getHumidity()));
windTextView.setCurrentText(String.format(Locale.getDefault(), getResources().getString(R.string.wind_unit_label), currentWeather.getWindSpeed()));
}
animationView.setAnimation(AppUtil.getWeatherAnimation(currentWeather.getWeatherId()));
animationView.playAnimation();
}
}
});
}
/**
* Using the returned Observable, you can be notified about data changes.
* Once a transaction is committed, you will get info on classes with changed Objects.
*/
public static <T> Observable<Class> observable(final BoxStore boxStore) {
return Observable.create(new ObservableOnSubscribe<Class>() {
@Override
public void subscribe(final ObservableEmitter<Class> emitter) throws Exception {
final DataSubscription dataSubscription = boxStore.subscribe().observer(new DataObserver<Class>() {
@Override
public void onData(Class data) {
if (!emitter.isDisposed()) {
emitter.onNext(data);
}
}
});
emitter.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
dataSubscription.cancel();
}
});
}
});
}
static <T> void createListItemEmitter(final Query<T> query, final FlowableEmitter<T> emitter) {
final DataSubscription dataSubscription = query.subscribe().observer(new DataObserver<List<T>>() {
@Override
public void onData(List<T> data) {
for (T datum : data) {
if (emitter.isCancelled()) {
return;
} else {
emitter.onNext(datum);
}
}
if (!emitter.isCancelled()) {
emitter.onComplete();
}
}
});
emitter.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
dataSubscription.cancel();
}
});
}
/**
* The returned Observable emits Query results as Lists.
* Never completes, so you will get updates when underlying data changes.
*/
public static <T> Observable<List<T>> observable(final Query<T> query) {
return Observable.create(new ObservableOnSubscribe<List<T>>() {
@Override
public void subscribe(final ObservableEmitter<List<T>> emitter) throws Exception {
final DataSubscription dataSubscription = query.subscribe().observer(new DataObserver<List<T>>() {
@Override
public void onData(List<T> data) {
if (!emitter.isDisposed()) {
emitter.onNext(data);
}
}
});
emitter.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
dataSubscription.cancel();
}
});
}
});
}
/**
* The returned Single emits one Query result as a List.
*/
public static <T> Single<List<T>> single(final Query<T> query) {
return Single.create(new SingleOnSubscribe<List<T>>() {
@Override
public void subscribe(final SingleEmitter<List<T>> emitter) throws Exception {
query.subscribe().single().observer(new DataObserver<List<T>>() {
@Override
public void onData(List<T> data) {
if (!emitter.isDisposed()) {
emitter.onSuccess(data);
}
}
});
// no need to cancel, single never subscribes
}
});
}
@Override
public synchronized void subscribe(DataObserver<List<T>> observer, @Nullable Object param) {
final BoxStore store = box.getStore();
if (objectClassObserver == null) {
objectClassObserver = objectClass -> publish();
}
if (observers.isEmpty()) {
if (objectClassSubscription != null) {
throw new IllegalStateException("Existing subscription found");
}
// Weak: Query references QueryPublisher, which references objectClassObserver.
// Query's DataSubscription references QueryPublisher, which references Query.
// --> Query and its DataSubscription keep objectClassSubscription alive.
// --> If both are gone, the app could not possibly unsubscribe.
// --> OK for objectClassSubscription to be GCed and thus unsubscribed?
// --> However, still subscribed observers to the query will NOT be notified anymore.
objectClassSubscription = store.subscribe(box.getEntityClass())
.weak()
.onlyChanges()
.observer(objectClassObserver);
}
observers.add(observer);
}
private void showStoredFiveDayWeather() {
Query<FiveDayWeather> query = DbUtil.getFiveDayWeatherQuery(fiveDayWeatherBox);
query.subscribe(subscriptions).on(AndroidScheduler.mainThread())
.observer(new DataObserver<List<FiveDayWeather>>() {
@Override
public void onData(@NonNull List<FiveDayWeather> data) {
if (data.size() > 0) {
todayFiveDayWeather = data.remove(0);
mItemAdapter.clear();
mItemAdapter.add(data);
}
}
});
}
private void showItemHourlyDB() {
Query<ItemHourlyDB> query = DbUtil.getItemHourlyDBQuery(itemHourlyDBBox, fiveDayWeather.getId());
query.subscribe().on(AndroidScheduler.mainThread())
.observer(new DataObserver<List<ItemHourlyDB>>() {
@Override
public void onData(@NonNull List<ItemHourlyDB> data) {
if (data.size() > 0) {
mItemAdapter.clear();
mItemAdapter.add(data);
setChartValues(data);
}
}
});
}
@Override
public void publishSingle(final DataObserver<List<T>> observer, @Nullable Object param) {
box.getStore().internalScheduleThread(() -> {
List<T> result = query.find();
observer.onData(result);
});
}
void publish() {
box.getStore().internalScheduleThread(() -> {
List<T> result = query.find();
for (DataObserver<List<T>> observer : observers) {
observer.onData(result);
}
});
}
@Override
public synchronized void unsubscribe(DataObserver<List<T>> observer, @Nullable Object param) {
DataPublisherUtils.removeObserverFromCopyOnWriteSet(observers, observer);
if (observers.isEmpty()) {
objectClassSubscription.cancel();
objectClassSubscription = null;
}
}
@Override
public void publishSingle(final DataObserver<Class> observer, @Nullable final Object forClass) {
boxStore.internalScheduleThread(() -> {
Collection<Class<?>> entityClasses = forClass != null ? Collections.singletonList((Class<?>) forClass) :
boxStore.getAllEntityClasses();
for (Class<?> entityClass : entityClasses) {
try {
observer.onData(entityClass);
} catch (RuntimeException e) {
handleObserverException(entityClass);
}
}
});
}
@Override
public void run() {
try {
while (true) {
// We do not join all available array, just in case the app relies on a specific order
int[] entityTypeIdsAffected;
synchronized (changesQueue) {
entityTypeIdsAffected = changesQueue.pollFirst();
if (entityTypeIdsAffected == null) {
changePublisherRunning = false;
break;
}
}
for (int entityTypeId : entityTypeIdsAffected) {
Collection<DataObserver<Class>> observers = observersByEntityTypeId.get(entityTypeId);
if (observers != null && !observers.isEmpty()) {
Class<?> objectClass = boxStore.getEntityClassOrThrow(entityTypeId);
try {
for (DataObserver<Class> observer : observers) {
observer.onData(objectClass);
}
} catch (RuntimeException e) {
handleObserverException(objectClass);
}
}
}
}
} finally {
// Just in Case of exceptions; it's better done within synchronized for regular cases
changePublisherRunning = false;
}
}
public void testForObserverLeaks(boolean wrapped, boolean weak) {
// Allocation would sum up to 70 GB in total when observer is not unsubscribed
long maxMB = Math.min(Runtime.getRuntime().maxMemory() / (1024 * 1024), 70L * 1024);
final int chunkSizeMB = 16; // 16 is faster than 64 & 128 (~0,3s instead of ~1s) and a bit faster than 8 and 32
int runs = (int) (maxMB / chunkSizeMB + 1);
for (int i = 0; i < runs; i++) {
// Use a Scheduler to ensure wrapped observer is used
SubscriptionBuilder<Class> subscriptionBuilder = store.subscribe().onlyChanges();
if (weak) {
subscriptionBuilder.weak();
}
if (wrapped) {
subscriptionBuilder.on(new TestScheduler());
}
DataSubscription subscription = subscriptionBuilder.observer(new DataObserver<Class>() {
byte[] bigMemory = new byte[chunkSizeMB * 1024 * 1024];
@Override
public void onData(Class data) {
bigMemory[0] ^= 1;
}
});
if (!weak) {
subscription.cancel();
}
}
}
@Override
public synchronized void subscribe(DataObserver<List<T>> observer, Object param) {
observers.add(observer);
}
@Override
public void publishSingle(final DataObserver<List<T>> observer, Object param) {
observer.onData(queryResult);
}
public void publish() {
for (DataObserver<List<T>> observer : observers) {
observer.onData(queryResult);
}
}
@Override
public synchronized void unsubscribe(DataObserver<List<T>> observer, Object param) {
DataPublisherUtils.removeObserverFromCopyOnWriteSet(observers, observer);
}
private void unsubscribe(DataObserver<Class> observer, int entityTypeId) {
Set<DataObserver<Class>> observers = observersByEntityTypeId.get(entityTypeId);
DataPublisherUtils.removeObserverFromCopyOnWriteSet(observers, observer);
}
@Override
public synchronized void subscribe(DataObserver<List<T>> observer, @Nullable Object param) {
observers.add(observer);
}
@Override
public void publishSingle(final DataObserver<List<T>> observer, @Nullable Object param) {
observer.onData(queryResult);
}
public void publish() {
for (DataObserver<List<T>> observer : observers) {
observer.onData(queryResult);
}
}
@Override
public synchronized void unsubscribe(DataObserver<List<T>> observer, @Nullable Object param) {
DataPublisherUtils.removeObserverFromCopyOnWriteSet(observers, observer);
}
@Override
public synchronized void subscribe(DataObserver<List<T>> observer, Object param) {
observers.add(observer);
}
@Override
public void publishSingle(final DataObserver<List<T>> observer, Object param) {
observer.onData(queryResult);
}
public void publish() {
for (DataObserver<List<T>> observer : observers) {
observer.onData(queryResult);
}
}
@Override
public synchronized void unsubscribe(DataObserver<List<T>> observer, Object param) {
DataPublisherUtils.removeObserverFromCopyOnWriteSet(observers, observer);
}