下面列出了怎么用rx.functions.Action0的API类实例代码及写法,或者点击链接到github查看源代码。
public static <K, V> LegacyTcpPushServer<KeyValuePair<K, V>> infiniteStreamLegacyTcpNestedGroupedObservable(ServerConfig<KeyValuePair<K, V>> config,
Observable<Observable<GroupedObservable<K, V>>> go,
long groupExpirySeconds, final Func1<K, byte[]> keyEncoder,
HashFunction hashFunction) {
final PublishSubject<String> serverSignals = PublishSubject.create();
final String serverName = config.getName();
Action0 onComplete = new Action0() {
@Override
public void call() {
serverSignals.onNext("ILLEGAL_STATE_COMPLETED");
throw new IllegalStateException("OnComplete signal received, Server: " + serverName + " is pushing an infinite stream, should not complete");
}
};
Action1<Throwable> onError = new Action1<Throwable>() {
@Override
public void call(Throwable t) {
serverSignals.onError(t);
}
};
PushTrigger<KeyValuePair<K, V>> trigger = ObservableTrigger.oogo(serverName, go, onComplete, onError, groupExpirySeconds,
keyEncoder, hashFunction);
return new LegacyTcpPushServer<KeyValuePair<K, V>>(trigger, config, serverSignals);
}
public static <K, V> LegacyTcpPushServer<KeyValuePair<K, V>> infiniteStreamLegacyTcpNestedMantisGroup(ServerConfig<KeyValuePair<K, V>> config,
Observable<Observable<MantisGroup<K, V>>> go,
long groupExpirySeconds, final Func1<K, byte[]> keyEncoder,
HashFunction hashFunction) {
final PublishSubject<String> serverSignals = PublishSubject.create();
final String serverName = config.getName();
Action0 onComplete = new Action0() {
@Override
public void call() {
serverSignals.onNext("ILLEGAL_STATE_COMPLETED");
throw new IllegalStateException("OnComplete signal received, Server: " + serverName + " is pushing an infinite stream, should not complete");
}
};
Action1<Throwable> onError = new Action1<Throwable>() {
@Override
public void call(Throwable t) {
serverSignals.onError(t);
}
};
PushTrigger<KeyValuePair<K, V>> trigger = ObservableTrigger.oomgo(serverName, go, onComplete, onError, groupExpirySeconds,
keyEncoder, hashFunction);
return new LegacyTcpPushServer<KeyValuePair<K, V>>(trigger, config, serverSignals);
}
@Test
public void testTwoConnectionsOpenedAndClosedWhenTakeOneUsedWithSelectThatReturnsOneRow()
throws InterruptedException {
Action0 completed = new Action0() {
@Override
public void call() {
System.out.println("completed");
}
};
CountDownConnectionProvider cp = new CountDownConnectionProvider(1, 1);
Database db = new Database(cp);
db.select("select count(*) from person").getAs(Long.class).doOnCompleted(completed).take(1)
.toBlocking().single();
assertTrue(cp.getsLatch().await(6, TimeUnit.SECONDS));
assertTrue(cp.closesLatch().await(6, TimeUnit.SECONDS));
}
private static <T> void startScheduledResetAgain(final long duration, final TimeUnit unit,
final Scheduler scheduler, final AtomicReference<CachedObservable<T>> cacheRef,
final AtomicReference<Optional<Worker>> workerRef) {
Action0 action = new Action0() {
@Override
public void call() {
cacheRef.get().reset();
}
};
// CAS loop to cancel the current worker and create a new one
while (true) {
Optional<Worker> wOld = workerRef.get();
if (wOld == null) {
// we are finished
return;
}
Optional<Worker> w = Optional.of(scheduler.createWorker());
if (workerRef.compareAndSet(wOld, w)) {
if (wOld.isPresent())
wOld.get().unsubscribe();
w.get().schedule(action, duration, unit);
break;
}
}
}
private static Subscription unsubscribeInUiThread(final Action0 unsubscribe) {
return Subscriptions.create(new Action0() {
@Override public void call() {
if (Looper.getMainLooper() == Looper.myLooper()) {
unsubscribe.call();
} else {
final Scheduler.Worker inner = AndroidSchedulers.mainThread().createWorker();
inner.schedule(new Action0() {
@Override public void call() {
unsubscribe.call();
inner.unsubscribe();
}
});
}
}
});
}
public Single<T> get(long duration, TimeUnit units, boolean throwException, boolean hasZF, Scheduler scheduler) {
return observe().timeout(duration, units, Single.create(subscriber -> {
// whenever timeout occurs, continuous timeout counter will increase by 1.
MemcachedConnection.opTimedOut(op);
if (op != null) op.timeOut();
//if (!hasZF) EVCacheMetricsFactory.getCounter(appName, null, serverGroup.getName(), appName + "-get-CheckedOperationTimeout", DataSourceType.COUNTER).increment();
if (throwException) {
subscriber.onError(new CheckedOperationTimeoutException("Timed out waiting for operation", op));
} else {
if (isCancelled()) {
//if (hasZF) EVCacheMetricsFactory.getCounter(appName, null, serverGroup.getName(), appName + "-get-Cancelled", DataSourceType.COUNTER).increment();
}
subscriber.onSuccess(objRef.get());
}
}), scheduler).doAfterTerminate(new Action0() {
@Override
public void call() {
}
}
);
}
public static <K, V> DynamicConnectionSet<MantisGroup<K, V>> createMGO(
final ConnectToGroupedObservable.Builder<K, V> config, int maxTimeBeforeDisconnectSec, final SpscArrayQueue<MantisGroup<?, ?>> inputQueue) {
Func3<Endpoint, Action0, PublishSubject<Integer>, RemoteRxConnection<MantisGroup<K, V>>> toObservableFunc
= new Func3<Endpoint, Action0, PublishSubject<Integer>, RemoteRxConnection<MantisGroup<K, V>>>() {
@Override
public RemoteRxConnection<MantisGroup<K, V>> call(Endpoint endpoint, Action0 disconnectCallback,
PublishSubject<Integer> closeConnectionTrigger) {
// copy config, change host, port and id
ConnectToGroupedObservable.Builder<K, V> configCopy = new ConnectToGroupedObservable.Builder<K, V>(config);
configCopy
.host(endpoint.getHost())
.port(endpoint.getPort())
.closeTrigger(closeConnectionTrigger)
.connectionDisconnectCallback(disconnectCallback)
.slotId(endpoint.getSlotId());
return RemoteObservable.connectToMGO(configCopy.build(), inputQueue);
}
};
return new DynamicConnectionSet<MantisGroup<K, V>>(toObservableFunc, MIN_TIME_SEC_DEFAULT, maxTimeBeforeDisconnectSec);
}
public <T extends BaseV7EndlessResponse> EndlessRecyclerOnScrollListener(BaseAdapter baseAdapter,
V7<T, ? extends Endless> v7request, Action1<T> successRequestListener,
ErrorRequestListener errorRequestListener, int visibleThreshold, boolean bypassCache,
BooleanAction<T> onFirstLoadListener, Action0 onEndOfListReachedListener) {
this.multiLangPatch = new MultiLangPatch();
this.onEndlessFinishList = new LinkedList<>();
this.adapter = baseAdapter;
this.v7request = v7request;
this.successRequestListener = successRequestListener;
this.errorRequestListener = errorRequestListener;
this.visibleThreshold = visibleThreshold;
this.bypassCache = bypassCache;
this.onFirstLoadListener = onFirstLoadListener;
this.onEndOfListReachedListener = onEndOfListReachedListener;
this.endCallbackCalled = false;
this.firstCallbackCalled = false;
bypassServerCache = false;
}
public <T> Observable<T> get(final String key) {
return Observable.create(new Observable.OnSubscribe<T>() {
@Override
public void call(final Subscriber<? super T> subscriber) {
final DataChangeCallback<T> callback = new DataChangeCallback<T>(key) {
@Override
public void onDataChange(T value) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(value);
}
}
};
Chest.this.addOnDataChangeListener(callback);
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
Chest.this.removeListener(callback);
}
}));
subscriber.onNext(Chest.this.<T>read(key));
}
}).compose(this.<T>applySchedulers());
}
private Observable<PrayerContext> updatePrayerContext(String code) {
Timber.i("Updating preferred prayer context");
return mPrayerDownloader.getPrayerTimes(code)
.doOnNext(new Action1<PrayerContext>() {
@Override
public void call(PrayerContext prayerContext) {
mLastPrayerContext = prayerContext;
broadcastPrayerContext(prayerContext);
}
})
.doOnSubscribe(new Action0() {
@Override
public void call() {
mIsLoading.set(true);
}
})
.doOnTerminate(new Action0() {
@Override
public void call() {
mIsLoading.set(false);
}
});
}
public Observable<Void> sendInfiniteStream(final HttpServerResponse<ByteBuf> response) {
response.getHeaders().add(HttpHeaders.Names.CONTENT_TYPE, "text/event-stream");
response.getHeaders().add(HttpHeaders.Names.TRANSFER_ENCODING, "chunked");
return Observable.create(new OnSubscribe<Void>() {
final AtomicLong counter = new AtomicLong();
Worker worker = Schedulers.computation().createWorker();
public void call(Subscriber<? super Void> subscriber) {
worker.schedulePeriodically(
new Action0() {
@Override
public void call() {
System.out.println("In infinte stream");
byte[] contentBytes = ("data:" + "line " + counter.getAndIncrement() + "\n\n").getBytes();
response.writeBytes(contentBytes);
response.flush();
}
},
0,
100,
TimeUnit.MILLISECONDS
);
}
});
}
private Action0 instrumentedAction(String actionName, Action0 action) {
return () -> {
ActionMetrics actionMetrics = this.actionMetrics.computeIfAbsent(actionName, k -> {
String rootName = metricNameRoot + ".eventLoop." + actionName;
return SpectatorExt.actionMetrics(rootName, Collections.emptyList(), registry);
});
long start = actionMetrics.start();
try {
action.call();
actionMetrics.success();
} catch (Exception e) {
actionMetrics.failure(e);
} finally {
actionMetrics.finish(start);
actionsRemaining.decrementAndGet();
}
};
}
@Override
public Observable<Secret> updateResourceAsync() {
Observable<Secret> set = Observable.just((Secret) this);
if (setSecretRequest != null) {
set = createResourceAsync();
}
return set.flatMap(new Func1<Secret, Observable<SecretBundle>>() {
@Override
public Observable<SecretBundle> call(Secret secret) {
return Observable.from(vault.client().updateSecretAsync(updateSecretRequest.build(), null));
}
}).flatMap(new Func1<SecretBundle, Observable<Secret>>() {
@Override
public Observable<Secret> call(SecretBundle secretBundle) {
return refreshAsync();
}
}).doOnCompleted(new Action0() {
@Override
public void call() {
setSecretRequest = null;
updateSecretRequest = new UpdateSecretRequest.Builder(vault.vaultUri(), name());
}
});
}
@Override
protected void onResume() {
super.onResume();
resumeSub = exampleObservable
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
Log.d(TAG, "Called unsubscribe OnPause()");
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
mOutputTextView3.setText(Integer.toString(i) + " OnResume()");
}
}, errorHandler);
}
/**
* Creates an {@link Transformer} that will flatten the provided {@link Action0} into the stream
* as a {@link Completable} every time it receives an effect from the upstream effects observable.
* This Completable will be subscribed on the specified {@link Scheduler}. This will result in
* calling the provided Action on the specified scheduler every time an effect dispatched to the
* created effect transformer.
*
* @param doEffect {@link Action0} to be run every time the effect is requested
* @param scheduler the {@link Scheduler} that the action should be run on
* @param <F> the type of Effect this transformer handles
* @param <E> these transformers are for effects that do not result in any events; however, they
* still need to share the same Event type
* @return a {@link Transformer} that can be used with an {@link SubtypeEffectHandlerBuilder}
*/
static <F, E> Transformer<F, E> fromAction(
final Action0 doEffect, @Nullable final Scheduler scheduler) {
return fromConsumer(
new Action1<F>() {
@Override
public void call(F f) {
try {
doEffect.call();
} catch (Exception e) {
throw OnErrorThrowable.from(e);
}
}
},
scheduler);
}
public void getSingleUser(String name) {
mCompositeSubscription.add(mRepoApi.getSingleUser(name)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnSubscribe(new Action0() {
@Override
public void call() {
getMvpView().showLoading();
}
})
.doOnTerminate(new Action0() {
@Override
public void call() {
getMvpView().dismissLoading();
}
})
.subscribe(new ResponseObserver<User>() {
@Override
public void onError(Throwable e) {
getMvpView().showError(e);
}
@Override
public void onSuccess(User user) {
getMvpView().showContent(user);
}
}));
}
public static <T, S> PushServerSse<T, S> infiniteStreamSse(ServerConfig<T> config, Observable<T> o,
Func2<Map<String, List<String>>, S, Void> requestPreprocessor,
Func2<Map<String, List<String>>, S, Void> requestPostprocessor,
final Func2<Map<String, List<String>>, S, Void> subscribeProcessor,
S state, boolean supportLegacyMetrics) {
final String serverName = config.getName();
final PublishSubject<String> serverSignals = PublishSubject.create();
Action0 onComplete = new Action0() {
@Override
public void call() {
serverSignals.onNext("ILLEGAL_STATE_COMPLETED");
throw new IllegalStateException("OnComplete signal received, Server: " + serverName + " is pushing an infinite stream, should not complete");
}
};
Action1<Throwable> onError = new Action1<Throwable>() {
@Override
public void call(Throwable t) {
serverSignals.onError(t);
}
};
PushTrigger<T> trigger = ObservableTrigger.o(serverName, o, onComplete, onError);
return new PushServerSse<T, S>(trigger, config, serverSignals,
requestPreprocessor, requestPostprocessor,
subscribeProcessor, state, supportLegacyMetrics);
}
private void getData() {
recyclerView.getSwipeToRefresh().setRefreshing(true);
topicChoice.getTopicChoice(cate,start,limit)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<TopicModel>() {
@Override
public void call(TopicModel topicModel) {
pagecount=Integer.valueOf(topicModel.getTotalCount())/limit+1;
datums.addAll(topicModel.getData());
adapter.notifyDataSetChanged();
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
}
}, new Action0() {
@Override
public void call() {
recyclerView.getSwipeToRefresh().setRefreshing(false);
}
});
}
@Test
public void testNoTimeoutPostSubscription() throws Exception {
TestScheduler testScheduler = Schedulers.test();
UnicastAutoReleaseSubject<String> subject = UnicastAutoReleaseSubject.create(1, TimeUnit.DAYS, testScheduler);
subject.onNext("Start the timeout now."); // Since the timeout is scheduled only after content arrival.
final AtomicReference<Throwable> errorOnSubscribe = new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1);
subject.subscribe(Actions.empty(), new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
errorOnSubscribe.set(throwable);
latch.countDown();
}
}, new Action0() {
@Override
public void call() {
latch.countDown();
}
});
testScheduler.advanceTimeBy(1, TimeUnit.DAYS);
subject.onCompleted();
latch.await(1, TimeUnit.MINUTES);
Assert.assertNull("Subscription got an error.", errorOnSubscribe.get());
}
private Observable<Boolean> restPoster(Observable<List<Sample>> samples, MetricRegistry metrics) {
final CloseableHttpAsyncClient httpClient = HttpAsyncClients.createDefault();
httpClient.start();
return samples
// turn each batch into json
.map(toJSON())
// meter them as the go into the post code
.map(meter(metrics.meter("posts"), String.class))
// post the json to the REST server
.mergeMap(postJSON(m_restUrl, httpClient))
// meter the responses
.map(meter(metrics.meter("responses"), ObservableHttpResponse.class))
// count sample completions
.map(meter(metrics.meter("samples-completed"), m_samplesPerBatch, ObservableHttpResponse.class))
// make sure every request has a successful return code
.all(successful())
.doOnCompleted(new Action0() {
@Override
public void call() {
try {
httpClient.close();
} catch (IOException e) {
System.err.println("Failed to close httpClient!");
e.printStackTrace();
}
}
});
}
private void turnLightOff(LightViewHolder holder, LightModel light) {
log.debug("turn off light {}", light.getUuid());
if (light.getStatus() == null) {
log.warn("light status is null: unable to turn off");
return;
}
Action0 updateLightStatusAction = () -> updateLightStatus(holder, light.getStatus());
lightService.turnOff(light).doOnCompleted(updateLightStatusAction).subscribe();
}
@Override public void call(final Subscriber<? super GeoFenceEvent> subscriber) {
Preconditions.checkNotNull(locationSource, "Location source must not be null");
Preconditions.checkNotNull(geoFenceSource, "Geo fence source must not be null");
locationSource.forEach(new Action1<LatLng>() {
@Override public void call(LatLng latLng) {
geoFire.setLocation(userKey, latLng.toGeoLocation());
}
});
geoFenceSource.forEach(new Action1<List<Place>>() {
@Override public void call(List<Place> places) {
stopListen();
for (Place place : places) {
listen(
new GeoQueryWrapper<>(geoFire.queryAtLocation(place.toGeoLocation(), place.getRad()),
place), subscriber);
}
}
});
subscriber.add(Subscriptions.create(new Action0() {
@Override public void call() {
stopListen();
geoFire.removeLocation(userKey);
}
}));
}
private synchronized void publishWithCallbacks(final String key, Observable<T> o,
final Action1<Throwable> errorCallback, final Action0 successCallback) {
subject.onNext(o
.doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable t1) {
if (errorCallback != null) {
errorCallback.call(t1);
}
logger.error("Inner observable with key: " + key + " terminated with onError, calling onError() on outer observable." + t1.getMessage(), t1);
takeUntilSubjects.remove(key);
subject.onError(t1);
}
})
.doOnCompleted(new Action0() {
@Override
public void call() {
if (successCallback != null) {
successCallback.call();
}
logger.debug("Inner observable with key: " + key + " completed, incrementing terminal count.");
takeUntilSubjects.remove(key);
if (counts.incrementTerminalCountAndCheck()) {
logger.debug("All inner observables terminated, calling onCompleted() on outer observable.");
subject.onCompleted();
}
}
}));
}
private void notifyStarted(final String query) {
if(mListener == null) return;
dispatchOnMainThread(new Action0() {
@Override
public void call() {
mListener.onSearchStarted(query);
}
});
}
@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
action = schedulersHook.onSchedule(action);
long delayMillis = unit.toMillis(delayTime);
TimedAction timed = new TimedAction(action, 0);
actions.put(timed, DUMB);
timed.schedule(delayMillis);
return timed;
}
@Override
public void publishTopic(String title, String body, String categoryId) {
mRxManager.add(mModel.publishTopic(title, body, categoryId)
.doOnSubscribe(new Action0() {
@Override
public void call() {
mView.onRequestStart();
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(mTopicDetailObserver));
}
@Override
public Action0 onSchedule(Action0 action) {
if (action instanceof TraceAction) {
return action;
}
Action0 wrappedAction = this.delegate != null ? this.delegate.onSchedule(action)
: action;
if (wrappedAction instanceof TraceAction) {
return action;
}
return super.onSchedule(
new TraceAction(this.tracer, wrappedAction, this.threadsToSample));
}
private void applySlottingSideEffectToObservable(Observable<Observable<T>> o) {
final Observable<T> withSideEffects =
Observable.merge(o)
.doOnEach(new Observer<T>() {
@Override
public void onCompleted() {
slottingStrategy.completeAllConnections();
}
@Override
public void onError(Throwable e) {
slottingStrategy.errorAllConnections(e);
}
@Override
public void onNext(T value) {
slottingStrategy.writeOnSlot(null, value);
}
});
final MutableReference<Subscription> subscriptionRef = new MutableReference<>();
slottingStrategy.registerDoAfterFirstConnectionAdded(new Action0() {
@Override
public void call() {
subscriptionRef.setValue(withSideEffects.subscribe());
}
});
slottingStrategy.registerDoAfterLastConnectionRemoved(new Action0() {
@Override
public void call() {
subscriptionRef.getValue().unsubscribe();
}
});
}
<T> boolean safeWrite(final ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>> connection,
List<RemoteRxEvent> events,
final MutableReference<Subscription> subReference,
Action0 onSuccessfulWriteCallback,
final Action1<Throwable> onFailedWriteCallback,
final SlottingStrategy<T> slottingStrategyReference,
final WritableEndpoint<T> endpoint) {
boolean writeSuccess = true;
if (checkIsOpenCounter.getAndIncrement() % CHECK_IS_OPEN_INTERVAL == 0) {
if (!connection.isCloseIssued() && connection.getChannel().isActive()) {
writeSuccess = checkWriteableAndWrite(connection, events,
onSuccessfulWriteCallback, onFailedWriteCallback);
} else {
writeSuccess = false;
logger.warn("Detected closed or inactive client connection, force unsubscribe.");
subReference.getValue().unsubscribe();
// release slot
if (slottingStrategyReference != null) {
logger.info("Removing slot for endpoint: " + endpoint);
if (!slottingStrategyReference.removeConnection(endpoint)) {
logger.error("Failed to remove endpoint from slot, endpoint: " + endpoint);
}
}
}
} else {
writeSuccess = checkWriteableAndWrite(connection, events,
onSuccessfulWriteCallback, onFailedWriteCallback);
}
return writeSuccess;
}
private static <T> void setupUnsubscription(Subscriber<T> subscriber, final State state) {
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
closeQuietly(state);
}
}));
}