下面列出了io.reactivex.FlowableSubscriber#io.reactivex.internal.functions.ObjectHelper 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public T poll() {
int i = index;
JSONArray a = array;
if (i != a.length()) {
index = i + 1;
T value = null;
try {
value = (T) a.opt(i);
} catch (ClassCastException e) {
return null;
}
return ObjectHelper.requireNonNull(value, "The array element is null");
}
return null;
}
public <T> void registerSubscriber(@NonNull Object observer, @NonNull CustomSubscriber<T> subscriber) {
ObjectHelper.requireNonNull(observer, "Observer to register must not be null.");
ObjectHelper.requireNonNull(subscriber, "Subscriber to register must not be null.");
SUBSCRIBERS.putIfAbsent(observer.getClass(), new CopyOnWriteArraySet<CustomSubscriber<?>>());
Set<CustomSubscriber<?>> subscribers = SUBSCRIBERS.get(observer.getClass());
if (subscribers.contains(subscriber))
throw new IllegalArgumentException("Subscriber has already been registered.");
else
subscribers.add(subscriber);
Observable<T> observable = bus.ofType(subscriber.getEventClass())
.observeOn(subscriber.getScheduler() == null ?
AndroidSchedulers.mainThread() : subscriber.getScheduler());
Class<?> observerClass = observer.getClass();
OBSERVERS.putIfAbsent(observerClass, new CompositeDisposable());
CompositeDisposable composite = OBSERVERS.get(observerClass);
composite.add(((subscriber.getFilter() == null) ? observable :
observable.filter(subscriber.getFilter()))
.subscribe(subscriber));
}
@Override
public void onNext(In t) {
if (done) {
return;
}
if (!createdState()) {
return;
}
if (--count == 0) {
requestsArrived = true;
count = requestBatchSize;
}
try {
drainCalled = false;
state = ObjectHelper.requireNonNull(transition.apply(state, t, this),
"intermediate state cannot be null");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
onError(e);
return;
}
if (!drainCalled) {
drain();
}
}
private boolean createdState() {
if (state == null) {
try {
state = ObjectHelper.requireNonNull(initialState.call(),
"initial state cannot be null");
return true;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
done = true;
onError_(e);
return false;
}
} else {
return true;
}
}
@Override
public void onNext(T t) {
if (done) {
ResourceFlowable.releaseItem(t, release);
} else {
R v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
upstream.cancel();
ResourceFlowable.releaseItem(t, release);
done = true;
actual.onError(ex);
return;
}
ResourceFlowable.releaseItem(t, release);
actual.onNext(v);
}
}
@Override
public void onNext(T t) {
if (done) {
ResourceFlowable.releaseItem(t, release);
} else {
R v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
upstream.cancel();
ResourceFlowable.releaseItem(t, release);
done = true;
actual.onError(ex);
return;
}
ResourceFlowable.releaseItem(t, release);
actual.onNext(v);
}
}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
public final Disposable subscribe(final BiConsumer<? super T, ? super Throwable> onCallback) {
ObjectHelper.requireNonNull(onCallback, "onCallback is null");
BiConsumerSingleObserver<T> observer = new BiConsumerSingleObserver<T>(onCallback);
subscribe(observer);
return observer;
}
public final Disposable subscribe(final Consumer<? super T> onSuccess, final Consumer<? super Throwable> onError) {
ObjectHelper.requireNonNull(onSuccess, "onSuccess is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ConsumerSingleObserver<T> observer = new ConsumerSingleObserver<T>(onSuccess, onError);
subscribe(observer);
return observer;
}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Subscription> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
LambdaSubscriber<T> ls = new LambdaSubscriber<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
public final Disposable subscribe(Consumer<? super T> onSuccess, Consumer<? super Throwable> onError,
Action onComplete) {
ObjectHelper.requireNonNull(onSuccess, "onSuccess is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
return subscribeWith(new MaybeCallbackObserver<T>(onSuccess, onError, onComplete));
}
public final Disposable subscribe(final Action onComplete) {
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
CallbackCompletableObserver observer = new CallbackCompletableObserver(onComplete);
subscribe(observer);
return observer;
}
public final Disposable subscribe(final Action onComplete, final Consumer<? super Throwable> onError) {
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
CallbackCompletableObserver observer = new CallbackCompletableObserver(onError, onComplete);
subscribe(observer);
return observer;
}
public static <T> Observable<T> just(T item) {
ObjectHelper.requireNonNull(item, "The item is null");
return Observable.create(subscriber -> {
subscriber.onNext(item);
subscriber.onComplete();
});
}
private static <T> Disposable subscribe(Flowable<T> flowable,
Consumer<? super T> onNext,
Consumer<? super Throwable> onError,
Action onComplete,
Consumer<? super Subscription> onSubscribe) {
ObjectHelper.requireNonNull(flowable, "flowable is null");
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
MyLambdaSubscriber<T> ls = new MyLambdaSubscriber<T>(onNext, onError, onComplete, onSubscribe);
flowable.subscribe(ls);
return ls;
}
public static <T> Observable<T> fromJsonArray(JSONArray json) {
ObjectHelper.requireNonNull(json, "items is null");
if (json.length() == 0) {
return empty();
}
return RxJavaPlugins.onAssembly(new JSONArrayObservable<T>(json));
}
public <T> CustomSubscriber<T> obtainSubscriber(@NonNull Class<T> eventClass,
@NonNull Consumer<T> receiver) {
ObjectHelper.requireNonNull(eventClass, "Event class must not be null.");
if (eventClass.isInterface())
throw new IllegalArgumentException("Event class must be on a concrete class type.");
ObjectHelper.requireNonNull(receiver, "Receiver must not be null.");
return new CustomSubscriber<>(eventClass, receiver);
}
public void unregister(@NonNull Object observer) {
ObjectHelper.requireNonNull(observer, "Observer to unregister must not be null.");
CompositeDisposable composite = OBSERVERS.get(observer.getClass());
ObjectHelper.requireNonNull(composite, "Missing observer, it was registered?");
composite.dispose();
OBSERVERS.remove(observer.getClass());
Set<CustomSubscriber<?>> subscribers = SUBSCRIBERS.get(observer.getClass());
if (subscribers != null) {
subscribers.clear();
SUBSCRIBERS.remove(observer.getClass());
}
}
@SafeVarargs
public static <T> ResourceFlowable<T> fromArray(Consumer<? super T> release, T... items) {
ObjectHelper.requireNonNull(release, "release is null");
int n = items.length;
if (n == 1) {
return just(items[0], release);
}
return new ResourceFlowableArray<>(items, release);
}
public void post(@NonNull Object event) {
ObjectHelper.requireNonNull(event, "Event must not be null.");
bus.onNext(event);
}
@SuppressWarnings("WeakerAccess")
public CustomSubscriber<T> withFilter(@NonNull Predicate<T> filter) {
ObjectHelper.requireNonNull(filter, "Filter must not be null.");
this.filter = filter;
return this;
}
@SuppressWarnings("WeakerAccess")
public CustomSubscriber<T> withScheduler(@NonNull Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "Scheduler must not be null.");
this.scheduler = scheduler;
return this;
}
public static <T> ResourceFlowable<T> just(T item, Consumer<? super T> release) {
ObjectHelper.requireNonNull(item, "item is null");
ObjectHelper.requireNonNull(release, "release is null");
return new ResourceFlowableJust<>(item, release);
}
public static <T> ResourceFlowable<T> fromIterable(Iterable<? extends T> items, Consumer<? super T> release) {
ObjectHelper.requireNonNull(items, "items is null");
ObjectHelper.requireNonNull(release, "release is null");
return new ResourceFlowableIterable<>(items, release);
}
public static <T> ResourceFlowable<T> fromPublisher(Publisher<? extends T> source, Consumer<? super T> release) {
ObjectHelper.requireNonNull(source, "source is null");
ObjectHelper.requireNonNull(release, "release is null");
return new ResourceFlowablePublisher<>(source, release);
}
public static <T> ResourceFlowable<T> defer(Callable<? extends ResourceFlowable<T>> call, Consumer<? super T> release) {
ObjectHelper.requireNonNull(call, "call is null");
ObjectHelper.requireNonNull(release, "release is null");
return new ResourceFlowableDefer<>(call, release);
}
public final ResourceFlowable<T> observeOn(ResourceScheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return new ResourceFlowableObserveOn<>(this, scheduler, Flowable.bufferSize());
}
public final ResourceFlowable<T> withRelease(Consumer<? super T> release) {
ObjectHelper.requireNonNull(release, "release is null");
return new ResourceFlowableWithRelease<>(this, release);
}
public final <R> Flowable<R> toFlowable(Function<? super T, ? extends R> extract) {
ObjectHelper.requireNonNull(extract, "extract is null");
return new ResourceFlowableToFlowable<>(this, extract);
}
public final <R> ResourceFlowable<R> map(Function<? super T, ? extends R> mapper, Consumer<? super R> release) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
ObjectHelper.requireNonNull(release, "release is null");
return new ResourceFlowableMap<>(this, mapper, release);
}