io.reactivex.functions.Cancellable#io.objectbox.reactive.DataObserver源码实例Demo

下面列出了io.reactivex.functions.Cancellable#io.objectbox.reactive.DataObserver 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: weather   文件: MultipleDaysFragment.java
private void showStoredMultipleDaysWeather() {
  Query<MultipleDaysWeather> query = DbUtil.getMultipleDaysWeatherQuery(multipleDaysWeatherBox);
  query.subscribe().on(AndroidScheduler.mainThread())
      .observer(new DataObserver<List<MultipleDaysWeather>>() {
        @Override
        public void onData(@NonNull List<MultipleDaysWeather> data) {
          if (data.size() > 0) {
            final Handler handler = new Handler();
            handler.postDelayed(new Runnable() {
              @Override
              public void run() {
                data.remove(0);
                mItemAdapter.clear();
                mItemAdapter.add(data);
              }
            }, 500);
          }
        }
      });
}
 
源代码2 项目: weather   文件: MainActivity.java
private void showStoredCurrentWeather() {
  Query<CurrentWeather> query = DbUtil.getCurrentWeatherQuery(currentWeatherBox);
  query.subscribe(subscriptions).on(AndroidScheduler.mainThread())
      .observer(new DataObserver<List<CurrentWeather>>() {
        @Override
        public void onData(@NonNull List<CurrentWeather> data) {
          if (data.size() > 0) {
            hideEmptyLayout();
            CurrentWeather currentWeather = data.get(0);
            if (isLoad) {
              tempTextView.setText(String.format(Locale.getDefault(), "%.0f°", currentWeather.getTemp()));
              descriptionTextView.setText(AppUtil.getWeatherStatus(currentWeather.getWeatherId(), AppUtil.isRTL(MainActivity.this)));
              humidityTextView.setText(String.format(Locale.getDefault(), "%d%%", currentWeather.getHumidity()));
              windTextView.setText(String.format(Locale.getDefault(), getResources().getString(R.string.wind_unit_label), currentWeather.getWindSpeed()));
            } else {
              tempTextView.setCurrentText(String.format(Locale.getDefault(), "%.0f°", currentWeather.getTemp()));
              descriptionTextView.setCurrentText(AppUtil.getWeatherStatus(currentWeather.getWeatherId(), AppUtil.isRTL(MainActivity.this)));
              humidityTextView.setCurrentText(String.format(Locale.getDefault(), "%d%%", currentWeather.getHumidity()));
              windTextView.setCurrentText(String.format(Locale.getDefault(), getResources().getString(R.string.wind_unit_label), currentWeather.getWindSpeed()));
            }
            animationView.setAnimation(AppUtil.getWeatherAnimation(currentWeather.getWeatherId()));
            animationView.playAnimation();
          }
        }
      });
}
 
源代码3 项目: ObjectBoxRxJava   文件: RxBoxStore.java
/**
 * Using the returned Observable, you can be notified about data changes.
 * Once a transaction is committed, you will get info on classes with changed Objects.
 */
public static <T> Observable<Class> observable(final BoxStore boxStore) {
    return Observable.create(new ObservableOnSubscribe<Class>() {
        @Override
        public void subscribe(final ObservableEmitter<Class> emitter) throws Exception {
            final DataSubscription dataSubscription = boxStore.subscribe().observer(new DataObserver<Class>() {
                @Override
                public void onData(Class data) {
                    if (!emitter.isDisposed()) {
                        emitter.onNext(data);
                    }
                }
            });
            emitter.setCancellable(new Cancellable() {
                @Override
                public void cancel() throws Exception {
                    dataSubscription.cancel();
                }
            });
        }
    });
}
 
源代码4 项目: ObjectBoxRxJava   文件: RxQuery.java
static <T> void createListItemEmitter(final Query<T> query, final FlowableEmitter<T> emitter) {
    final DataSubscription dataSubscription = query.subscribe().observer(new DataObserver<List<T>>() {
        @Override
        public void onData(List<T> data) {
            for (T datum : data) {
                if (emitter.isCancelled()) {
                    return;
                } else {
                    emitter.onNext(datum);
                }
            }
            if (!emitter.isCancelled()) {
                emitter.onComplete();
            }
        }
    });
    emitter.setCancellable(new Cancellable() {
        @Override
        public void cancel() throws Exception {
            dataSubscription.cancel();
        }
    });
}
 
源代码5 项目: ObjectBoxRxJava   文件: RxQuery.java
/**
 * The returned Observable emits Query results as Lists.
 * Never completes, so you will get updates when underlying data changes.
 */
public static <T> Observable<List<T>> observable(final Query<T> query) {
    return Observable.create(new ObservableOnSubscribe<List<T>>() {
        @Override
        public void subscribe(final ObservableEmitter<List<T>> emitter) throws Exception {
            final DataSubscription dataSubscription = query.subscribe().observer(new DataObserver<List<T>>() {
                @Override
                public void onData(List<T> data) {
                    if (!emitter.isDisposed()) {
                        emitter.onNext(data);
                    }
                }
            });
            emitter.setCancellable(new Cancellable() {
                @Override
                public void cancel() throws Exception {
                    dataSubscription.cancel();
                }
            });
        }
    });
}
 
源代码6 项目: ObjectBoxRxJava   文件: RxQuery.java
/**
 * The returned Single emits one Query result as a List.
 */
public static <T> Single<List<T>> single(final Query<T> query) {
    return Single.create(new SingleOnSubscribe<List<T>>() {
        @Override
        public void subscribe(final SingleEmitter<List<T>> emitter) throws Exception {
            query.subscribe().single().observer(new DataObserver<List<T>>() {
                @Override
                public void onData(List<T> data) {
                    if (!emitter.isDisposed()) {
                        emitter.onSuccess(data);
                    }
                }
            });
            // no need to cancel, single never subscribes
        }
    });
}
 
源代码7 项目: objectbox-java   文件: QueryPublisher.java
@Override
public synchronized void subscribe(DataObserver<List<T>> observer, @Nullable Object param) {
    final BoxStore store = box.getStore();
    if (objectClassObserver == null) {
        objectClassObserver = objectClass -> publish();
    }
    if (observers.isEmpty()) {
        if (objectClassSubscription != null) {
            throw new IllegalStateException("Existing subscription found");
        }

        // Weak: Query references QueryPublisher, which references objectClassObserver.
        // Query's DataSubscription references QueryPublisher, which references Query.
        // --> Query and its DataSubscription keep objectClassSubscription alive.
        // --> If both are gone, the app could not possibly unsubscribe.
        // --> OK for objectClassSubscription to be GCed and thus unsubscribed?
        // --> However, still subscribed observers to the query will NOT be notified anymore.
        objectClassSubscription = store.subscribe(box.getEntityClass())
                .weak()
                .onlyChanges()
                .observer(objectClassObserver);
    }
    observers.add(observer);
}
 
源代码8 项目: weather   文件: MainActivity.java
private void showStoredFiveDayWeather() {
  Query<FiveDayWeather> query = DbUtil.getFiveDayWeatherQuery(fiveDayWeatherBox);
  query.subscribe(subscriptions).on(AndroidScheduler.mainThread())
      .observer(new DataObserver<List<FiveDayWeather>>() {
        @Override
        public void onData(@NonNull List<FiveDayWeather> data) {
          if (data.size() > 0) {
            todayFiveDayWeather = data.remove(0);
            mItemAdapter.clear();
            mItemAdapter.add(data);
          }
        }
      });
}
 
源代码9 项目: weather   文件: HourlyActivity.java
private void showItemHourlyDB() {
  Query<ItemHourlyDB> query = DbUtil.getItemHourlyDBQuery(itemHourlyDBBox, fiveDayWeather.getId());
  query.subscribe().on(AndroidScheduler.mainThread())
      .observer(new DataObserver<List<ItemHourlyDB>>() {
        @Override
        public void onData(@NonNull List<ItemHourlyDB> data) {
          if (data.size() > 0) {
            mItemAdapter.clear();
            mItemAdapter.add(data);
            setChartValues(data);
          }
        }
      });
}
 
源代码10 项目: objectbox-java   文件: QueryPublisher.java
@Override
public void publishSingle(final DataObserver<List<T>> observer, @Nullable Object param) {
    box.getStore().internalScheduleThread(() -> {
        List<T> result = query.find();
        observer.onData(result);
    });
}
 
源代码11 项目: objectbox-java   文件: QueryPublisher.java
void publish() {
    box.getStore().internalScheduleThread(() -> {
        List<T> result = query.find();
        for (DataObserver<List<T>> observer : observers) {
            observer.onData(result);
        }
    });
}
 
源代码12 项目: objectbox-java   文件: QueryPublisher.java
@Override
public synchronized void unsubscribe(DataObserver<List<T>> observer, @Nullable Object param) {
    DataPublisherUtils.removeObserverFromCopyOnWriteSet(observers, observer);
    if (observers.isEmpty()) {
        objectClassSubscription.cancel();
        objectClassSubscription = null;
    }
}
 
源代码13 项目: objectbox-java   文件: ObjectClassPublisher.java
@Override
public void publishSingle(final DataObserver<Class> observer, @Nullable final Object forClass) {
    boxStore.internalScheduleThread(() -> {
        Collection<Class<?>> entityClasses = forClass != null ? Collections.singletonList((Class<?>) forClass) :
                boxStore.getAllEntityClasses();
        for (Class<?> entityClass : entityClasses) {
            try {
                observer.onData(entityClass);
            } catch (RuntimeException e) {
                handleObserverException(entityClass);
            }
        }
    });
}
 
源代码14 项目: objectbox-java   文件: ObjectClassPublisher.java
@Override
public void run() {
    try {
        while (true) {
            // We do not join all available array, just in case the app relies on a specific order
            int[] entityTypeIdsAffected;
            synchronized (changesQueue) {
                entityTypeIdsAffected = changesQueue.pollFirst();
                if (entityTypeIdsAffected == null) {
                    changePublisherRunning = false;
                    break;
                }
            }
            for (int entityTypeId : entityTypeIdsAffected) {
                Collection<DataObserver<Class>> observers = observersByEntityTypeId.get(entityTypeId);
                if (observers != null && !observers.isEmpty()) {
                    Class<?> objectClass = boxStore.getEntityClassOrThrow(entityTypeId);
                    try {
                        for (DataObserver<Class> observer : observers) {
                            observer.onData(objectClass);
                        }
                    } catch (RuntimeException e) {
                        handleObserverException(objectClass);
                    }
                }
            }
        }
    } finally {
        // Just in Case of exceptions; it's better done within synchronized for regular cases
        changePublisherRunning = false;
    }
}
 
源代码15 项目: objectbox-java   文件: ObjectClassObserverTest.java
public void testForObserverLeaks(boolean wrapped, boolean weak) {
    // Allocation would sum up to 70 GB in total when observer is not unsubscribed
    long maxMB = Math.min(Runtime.getRuntime().maxMemory() / (1024 * 1024), 70L * 1024);
    final int chunkSizeMB = 16; // 16 is faster than 64 & 128 (~0,3s instead of ~1s) and a bit faster than 8 and 32
    int runs = (int) (maxMB / chunkSizeMB + 1);
    for (int i = 0; i < runs; i++) {
        // Use a Scheduler to ensure wrapped observer is used
        SubscriptionBuilder<Class> subscriptionBuilder = store.subscribe().onlyChanges();
        if (weak) {
            subscriptionBuilder.weak();
        }
        if (wrapped) {
            subscriptionBuilder.on(new TestScheduler());
        }
        DataSubscription subscription = subscriptionBuilder.observer(new DataObserver<Class>() {
            byte[] bigMemory = new byte[chunkSizeMB * 1024 * 1024];

            @Override
            public void onData(Class data) {
                bigMemory[0] ^= 1;
            }
        });
        if (!weak) {
            subscription.cancel();
        }
    }
}
 
源代码16 项目: ObjectBoxRxJava   文件: FakeQueryPublisher.java
@Override
public synchronized void subscribe(DataObserver<List<T>> observer, Object param) {
    observers.add(observer);
}
 
源代码17 项目: ObjectBoxRxJava   文件: FakeQueryPublisher.java
@Override
public void publishSingle(final DataObserver<List<T>> observer, Object param) {
    observer.onData(queryResult);
}
 
源代码18 项目: ObjectBoxRxJava   文件: FakeQueryPublisher.java
public void publish() {
    for (DataObserver<List<T>> observer : observers) {
        observer.onData(queryResult);
    }
}
 
源代码19 项目: ObjectBoxRxJava   文件: FakeQueryPublisher.java
@Override
public synchronized void unsubscribe(DataObserver<List<T>> observer, Object param) {
    DataPublisherUtils.removeObserverFromCopyOnWriteSet(observers, observer);
}
 
源代码20 项目: objectbox-java   文件: ObjectClassPublisher.java
private void unsubscribe(DataObserver<Class> observer, int entityTypeId) {
    Set<DataObserver<Class>> observers = observersByEntityTypeId.get(entityTypeId);
    DataPublisherUtils.removeObserverFromCopyOnWriteSet(observers, observer);
}
 
源代码21 项目: objectbox-java   文件: FakeQueryPublisher.java
@Override
public synchronized void subscribe(DataObserver<List<T>> observer, @Nullable Object param) {
    observers.add(observer);
}
 
源代码22 项目: objectbox-java   文件: FakeQueryPublisher.java
@Override
public void publishSingle(final DataObserver<List<T>> observer, @Nullable Object param) {
    observer.onData(queryResult);
}
 
源代码23 项目: objectbox-java   文件: FakeQueryPublisher.java
public void publish() {
    for (DataObserver<List<T>> observer : observers) {
        observer.onData(queryResult);
    }
}
 
源代码24 项目: objectbox-java   文件: FakeQueryPublisher.java
@Override
public synchronized void unsubscribe(DataObserver<List<T>> observer, @Nullable Object param) {
    DataPublisherUtils.removeObserverFromCopyOnWriteSet(observers, observer);
}
 
源代码25 项目: objectbox-java   文件: FakeQueryPublisher.java
@Override
public synchronized void subscribe(DataObserver<List<T>> observer, Object param) {
    observers.add(observer);
}
 
源代码26 项目: objectbox-java   文件: FakeQueryPublisher.java
@Override
public void publishSingle(final DataObserver<List<T>> observer, Object param) {
    observer.onData(queryResult);
}
 
源代码27 项目: objectbox-java   文件: FakeQueryPublisher.java
public void publish() {
    for (DataObserver<List<T>> observer : observers) {
        observer.onData(queryResult);
    }
}
 
源代码28 项目: objectbox-java   文件: FakeQueryPublisher.java
@Override
public synchronized void unsubscribe(DataObserver<List<T>> observer, Object param) {
    DataPublisherUtils.removeObserverFromCopyOnWriteSet(observers, observer);
}