类rx.functions.Action0源码实例Demo

下面列出了怎么用rx.functions.Action0的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: mantis   文件: PushServers.java
public static <K, V> LegacyTcpPushServer<KeyValuePair<K, V>> infiniteStreamLegacyTcpNestedGroupedObservable(ServerConfig<KeyValuePair<K, V>> config,
                                                                                                            Observable<Observable<GroupedObservable<K, V>>> go,
                                                                                                            long groupExpirySeconds, final Func1<K, byte[]> keyEncoder,
                                                                                                            HashFunction hashFunction) {
    final PublishSubject<String> serverSignals = PublishSubject.create();
    final String serverName = config.getName();
    Action0 onComplete = new Action0() {
        @Override
        public void call() {
            serverSignals.onNext("ILLEGAL_STATE_COMPLETED");
            throw new IllegalStateException("OnComplete signal received, Server: " + serverName + " is pushing an infinite stream, should not complete");
        }
    };

    Action1<Throwable> onError = new Action1<Throwable>() {
        @Override
        public void call(Throwable t) {
            serverSignals.onError(t);
        }
    };

    PushTrigger<KeyValuePair<K, V>> trigger = ObservableTrigger.oogo(serverName, go, onComplete, onError, groupExpirySeconds,
            keyEncoder, hashFunction);
    return new LegacyTcpPushServer<KeyValuePair<K, V>>(trigger, config, serverSignals);
}
 
源代码2 项目: mantis   文件: PushServers.java
public static <K, V> LegacyTcpPushServer<KeyValuePair<K, V>> infiniteStreamLegacyTcpNestedMantisGroup(ServerConfig<KeyValuePair<K, V>> config,
                                                                                                      Observable<Observable<MantisGroup<K, V>>> go,
                                                                                                      long groupExpirySeconds, final Func1<K, byte[]> keyEncoder,
                                                                                                      HashFunction hashFunction) {
    final PublishSubject<String> serverSignals = PublishSubject.create();
    final String serverName = config.getName();
    Action0 onComplete = new Action0() {
        @Override
        public void call() {
            serverSignals.onNext("ILLEGAL_STATE_COMPLETED");
            throw new IllegalStateException("OnComplete signal received, Server: " + serverName + " is pushing an infinite stream, should not complete");
        }
    };

    Action1<Throwable> onError = new Action1<Throwable>() {
        @Override
        public void call(Throwable t) {
            serverSignals.onError(t);
        }
    };

    PushTrigger<KeyValuePair<K, V>> trigger = ObservableTrigger.oomgo(serverName, go, onComplete, onError, groupExpirySeconds,
            keyEncoder, hashFunction);
    return new LegacyTcpPushServer<KeyValuePair<K, V>>(trigger, config, serverSignals);
}
 
源代码3 项目: rxjava-jdbc   文件: DatabaseTestBase.java
@Test
public void testTwoConnectionsOpenedAndClosedWhenTakeOneUsedWithSelectThatReturnsOneRow()
        throws InterruptedException {
    Action0 completed = new Action0() {

        @Override
        public void call() {
            System.out.println("completed");
        }
    };
    CountDownConnectionProvider cp = new CountDownConnectionProvider(1, 1);
    Database db = new Database(cp);
    db.select("select count(*) from person").getAs(Long.class).doOnCompleted(completed).take(1)
            .toBlocking().single();
    assertTrue(cp.getsLatch().await(6, TimeUnit.SECONDS));
    assertTrue(cp.closesLatch().await(6, TimeUnit.SECONDS));
}
 
源代码4 项目: rxjava-extras   文件: Obs.java
private static <T> void startScheduledResetAgain(final long duration, final TimeUnit unit,
        final Scheduler scheduler, final AtomicReference<CachedObservable<T>> cacheRef,
        final AtomicReference<Optional<Worker>> workerRef) {

    Action0 action = new Action0() {
        @Override
        public void call() {
            cacheRef.get().reset();
        }
    };
    // CAS loop to cancel the current worker and create a new one
    while (true) {
        Optional<Worker> wOld = workerRef.get();
        if (wOld == null) {
            // we are finished
            return;
        }
        Optional<Worker> w = Optional.of(scheduler.createWorker());
        if (workerRef.compareAndSet(wOld, w)) {
            if (wOld.isPresent())
                wOld.get().unsubscribe();
            w.get().schedule(action, duration, unit);
            break;
        }
    }
}
 
源代码5 项目: RxSerach   文件: BroadcastObservable.java
private static Subscription unsubscribeInUiThread(final Action0 unsubscribe) {
    return Subscriptions.create(new Action0() {
        @Override public void call() {
            if (Looper.getMainLooper() == Looper.myLooper()) {
                unsubscribe.call();
            } else {
                final Scheduler.Worker inner = AndroidSchedulers.mainThread().createWorker();
                inner.schedule(new Action0() {
                    @Override public void call() {
                        unsubscribe.call();
                        inner.unsubscribe();
                    }
                });
            }
        }
    });
}
 
源代码6 项目: EVCache   文件: EVCacheOperationFuture.java
public Single<T> get(long duration, TimeUnit units, boolean throwException, boolean hasZF, Scheduler scheduler) {
    return observe().timeout(duration, units, Single.create(subscriber -> {
        // whenever timeout occurs, continuous timeout counter will increase by 1.
        MemcachedConnection.opTimedOut(op);
        if (op != null) op.timeOut();
        //if (!hasZF) EVCacheMetricsFactory.getCounter(appName, null, serverGroup.getName(), appName + "-get-CheckedOperationTimeout", DataSourceType.COUNTER).increment();
        if (throwException) {
            subscriber.onError(new CheckedOperationTimeoutException("Timed out waiting for operation", op));
        } else {
            if (isCancelled()) {
                //if (hasZF) EVCacheMetricsFactory.getCounter(appName, null, serverGroup.getName(), appName + "-get-Cancelled", DataSourceType.COUNTER).increment();
            }
            subscriber.onSuccess(objRef.get());
        }
    }), scheduler).doAfterTerminate(new Action0() {
        @Override
        public void call() {

        }
    }
    );
}
 
源代码7 项目: mantis   文件: DynamicConnectionSet.java
public static <K, V> DynamicConnectionSet<MantisGroup<K, V>> createMGO(
        final ConnectToGroupedObservable.Builder<K, V> config, int maxTimeBeforeDisconnectSec, final SpscArrayQueue<MantisGroup<?, ?>> inputQueue) {
    Func3<Endpoint, Action0, PublishSubject<Integer>, RemoteRxConnection<MantisGroup<K, V>>> toObservableFunc
            = new Func3<Endpoint, Action0, PublishSubject<Integer>, RemoteRxConnection<MantisGroup<K, V>>>() {
        @Override
        public RemoteRxConnection<MantisGroup<K, V>> call(Endpoint endpoint, Action0 disconnectCallback,
                                                          PublishSubject<Integer> closeConnectionTrigger) {
            // copy config, change host, port and id
            ConnectToGroupedObservable.Builder<K, V> configCopy = new ConnectToGroupedObservable.Builder<K, V>(config);
            configCopy
                    .host(endpoint.getHost())
                    .port(endpoint.getPort())
                    .closeTrigger(closeConnectionTrigger)
                    .connectionDisconnectCallback(disconnectCallback)
                    .slotId(endpoint.getSlotId());
            return RemoteObservable.connectToMGO(configCopy.build(), inputQueue);
        }
    };
    return new DynamicConnectionSet<MantisGroup<K, V>>(toObservableFunc, MIN_TIME_SEC_DEFAULT, maxTimeBeforeDisconnectSec);
}
 
public <T extends BaseV7EndlessResponse> EndlessRecyclerOnScrollListener(BaseAdapter baseAdapter,
    V7<T, ? extends Endless> v7request, Action1<T> successRequestListener,
    ErrorRequestListener errorRequestListener, int visibleThreshold, boolean bypassCache,
    BooleanAction<T> onFirstLoadListener, Action0 onEndOfListReachedListener) {
  this.multiLangPatch = new MultiLangPatch();
  this.onEndlessFinishList = new LinkedList<>();
  this.adapter = baseAdapter;
  this.v7request = v7request;
  this.successRequestListener = successRequestListener;
  this.errorRequestListener = errorRequestListener;
  this.visibleThreshold = visibleThreshold;
  this.bypassCache = bypassCache;
  this.onFirstLoadListener = onFirstLoadListener;
  this.onEndOfListReachedListener = onEndOfListReachedListener;
  this.endCallbackCalled = false;
  this.firstCallbackCalled = false;
  bypassServerCache = false;
}
 
源代码9 项目: Iron   文件: Chest.java
public <T> Observable<T> get(final String key) {
    return Observable.create(new Observable.OnSubscribe<T>() {
        @Override
        public void call(final Subscriber<? super T> subscriber) {
            final DataChangeCallback<T> callback = new DataChangeCallback<T>(key) {
                @Override
                public void onDataChange(T value) {
                    if (!subscriber.isUnsubscribed()) {
                        subscriber.onNext(value);
                    }
                }
            };
            Chest.this.addOnDataChangeListener(callback);
            subscriber.add(Subscriptions.create(new Action0() {
                @Override
                public void call() {
                    Chest.this.removeListener(callback);
                }
            }));
            subscriber.onNext(Chest.this.<T>read(key));
        }
    }).compose(this.<T>applySchedulers());
}
 
源代码10 项目: android   文件: PrayerManager.java
private Observable<PrayerContext> updatePrayerContext(String code) {
    Timber.i("Updating preferred prayer context");

    return mPrayerDownloader.getPrayerTimes(code)
            .doOnNext(new Action1<PrayerContext>() {
                @Override
                public void call(PrayerContext prayerContext) {
                    mLastPrayerContext = prayerContext;
                    broadcastPrayerContext(prayerContext);
                }
            })
            .doOnSubscribe(new Action0() {
                @Override
                public void call() {
                    mIsLoading.set(true);
                }
            })
            .doOnTerminate(new Action0() {
                @Override
                public void call() {
                    mIsLoading.set(false);
                }
            });
}
 
源代码11 项目: mantis   文件: RequestProcessor.java
public Observable<Void> sendInfiniteStream(final HttpServerResponse<ByteBuf> response) {
    response.getHeaders().add(HttpHeaders.Names.CONTENT_TYPE, "text/event-stream");
    response.getHeaders().add(HttpHeaders.Names.TRANSFER_ENCODING, "chunked");

    return Observable.create(new OnSubscribe<Void>() {
        final AtomicLong counter = new AtomicLong();
        Worker worker = Schedulers.computation().createWorker();

        public void call(Subscriber<? super Void> subscriber) {
            worker.schedulePeriodically(
                    new Action0() {
                        @Override
                        public void call() {
                            System.out.println("In infinte stream");
                            byte[] contentBytes = ("data:" + "line " + counter.getAndIncrement() + "\n\n").getBytes();
                            response.writeBytes(contentBytes);
                            response.flush();
                        }
                    },
                    0,
                    100,
                    TimeUnit.MILLISECONDS
            );
        }
    });
}
 
private Action0 instrumentedAction(String actionName, Action0 action) {
    return () -> {
        ActionMetrics actionMetrics = this.actionMetrics.computeIfAbsent(actionName, k -> {
            String rootName = metricNameRoot + ".eventLoop." + actionName;
            return SpectatorExt.actionMetrics(rootName, Collections.emptyList(), registry);
        });
        long start = actionMetrics.start();
        try {
            action.call();
            actionMetrics.success();
        } catch (Exception e) {
            actionMetrics.failure(e);
        } finally {
            actionMetrics.finish(start);
            actionsRemaining.decrementAndGet();
        }
    };
}
 
源代码13 项目: azure-libraries-for-java   文件: SecretImpl.java
@Override
public Observable<Secret> updateResourceAsync() {
    Observable<Secret> set = Observable.just((Secret) this);
    if (setSecretRequest != null) {
        set = createResourceAsync();
    }
    return set.flatMap(new Func1<Secret, Observable<SecretBundle>>() {
        @Override
        public Observable<SecretBundle> call(Secret secret) {
            return Observable.from(vault.client().updateSecretAsync(updateSecretRequest.build(), null));
        }
    }).flatMap(new Func1<SecretBundle, Observable<Secret>>() {
        @Override
        public Observable<Secret> call(SecretBundle secretBundle) {
            return refreshAsync();
        }
    }).doOnCompleted(new Action0() {
        @Override
        public void call() {
            setSecretRequest = null;
            updateSecretRequest = new UpdateSecretRequest.Builder(vault.vaultUri(), name());
        }
    });
}
 
源代码14 项目: AnDevCon-RxPatterns   文件: Example6.java
@Override
protected void onResume() {
    super.onResume();

    resumeSub = exampleObservable
            .doOnUnsubscribe(new Action0() {
                @Override
                public void call() {
                    Log.d(TAG, "Called unsubscribe OnPause()");
                }
            })
            .subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer i) {
                    mOutputTextView3.setText(Integer.toString(i) + " OnResume()");
                }
            }, errorHandler);
}
 
源代码15 项目: mobius   文件: Transformers.java
/**
 * Creates an {@link Transformer} that will flatten the provided {@link Action0} into the stream
 * as a {@link Completable} every time it receives an effect from the upstream effects observable.
 * This Completable will be subscribed on the specified {@link Scheduler}. This will result in
 * calling the provided Action on the specified scheduler every time an effect dispatched to the
 * created effect transformer.
 *
 * @param doEffect {@link Action0} to be run every time the effect is requested
 * @param scheduler the {@link Scheduler} that the action should be run on
 * @param <F> the type of Effect this transformer handles
 * @param <E> these transformers are for effects that do not result in any events; however, they
 *     still need to share the same Event type
 * @return a {@link Transformer} that can be used with an {@link SubtypeEffectHandlerBuilder}
 */
static <F, E> Transformer<F, E> fromAction(
    final Action0 doEffect, @Nullable final Scheduler scheduler) {
  return fromConsumer(
      new Action1<F>() {
        @Override
        public void call(F f) {
          try {
            doEffect.call();
          } catch (Exception e) {
            throw OnErrorThrowable.from(e);
          }
        }
      },
      scheduler);
}
 
源代码16 项目: GithubApp   文件: UserPresenter.java
public void getSingleUser(String name) {
    mCompositeSubscription.add(mRepoApi.getSingleUser(name)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .doOnSubscribe(new Action0() {
                @Override
                public void call() {
                    getMvpView().showLoading();
                }
            })
            .doOnTerminate(new Action0() {
                @Override
                public void call() {
                    getMvpView().dismissLoading();
                }
            })
            .subscribe(new ResponseObserver<User>() {

                @Override
                public void onError(Throwable e) {
                    getMvpView().showError(e);
                }

                @Override
                public void onSuccess(User user) {
                    getMvpView().showContent(user);
                }
            }));
}
 
源代码17 项目: mantis   文件: PushServers.java
public static <T, S> PushServerSse<T, S> infiniteStreamSse(ServerConfig<T> config, Observable<T> o,
                                                           Func2<Map<String, List<String>>, S, Void> requestPreprocessor,
                                                           Func2<Map<String, List<String>>, S, Void> requestPostprocessor,
                                                           final Func2<Map<String, List<String>>, S, Void> subscribeProcessor,
                                                           S state, boolean supportLegacyMetrics) {

    final String serverName = config.getName();
    final PublishSubject<String> serverSignals = PublishSubject.create();
    Action0 onComplete = new Action0() {
        @Override
        public void call() {
            serverSignals.onNext("ILLEGAL_STATE_COMPLETED");
            throw new IllegalStateException("OnComplete signal received, Server: " + serverName + " is pushing an infinite stream, should not complete");
        }
    };

    Action1<Throwable> onError = new Action1<Throwable>() {
        @Override
        public void call(Throwable t) {
            serverSignals.onError(t);
        }
    };

    PushTrigger<T> trigger = ObservableTrigger.o(serverName, o, onComplete, onError);

    return new PushServerSse<T, S>(trigger, config, serverSignals,
            requestPreprocessor, requestPostprocessor,
            subscribeProcessor, state, supportLegacyMetrics);
}
 
源代码18 项目: MaterialWpp   文件: TopicChoiceFragment.java
private void getData() {
    recyclerView.getSwipeToRefresh().setRefreshing(true);
    topicChoice.getTopicChoice(cate,start,limit)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Action1<TopicModel>() {
                @Override
                public void call(TopicModel topicModel) {

                    pagecount=Integer.valueOf(topicModel.getTotalCount())/limit+1;
                    datums.addAll(topicModel.getData());
                    adapter.notifyDataSetChanged();

                }
            }, new Action1<Throwable>() {
                @Override
                public void call(Throwable throwable) {

                }
            }, new Action0() {
                @Override
                public void call() {
                    recyclerView.getSwipeToRefresh().setRefreshing(false);

                }
            });

}
 
@Test
public void testNoTimeoutPostSubscription() throws Exception {
    TestScheduler testScheduler = Schedulers.test();
    UnicastAutoReleaseSubject<String> subject = UnicastAutoReleaseSubject.create(1, TimeUnit.DAYS, testScheduler);
    subject.onNext("Start the timeout now."); // Since the timeout is scheduled only after content arrival.
    final AtomicReference<Throwable> errorOnSubscribe = new AtomicReference<Throwable>();
    final CountDownLatch latch = new CountDownLatch(1);
    subject.subscribe(Actions.empty(), new Action1<Throwable>() {
        @Override
        public void call(Throwable throwable) {
            errorOnSubscribe.set(throwable);
            latch.countDown();
        }
    }, new Action0() {
        @Override
        public void call() {
            latch.countDown();
        }
    });

    testScheduler.advanceTimeBy(1, TimeUnit.DAYS);
    subject.onCompleted();

    latch.await(1, TimeUnit.MINUTES);

    Assert.assertNull("Subscription got an error.", errorOnSubscribe.get());
}
 
源代码20 项目: newts   文件: ImportRunner.java
private Observable<Boolean> restPoster(Observable<List<Sample>> samples,  MetricRegistry metrics) {

        final CloseableHttpAsyncClient httpClient = HttpAsyncClients.createDefault();
        httpClient.start();


        return samples

                // turn each batch into json
                .map(toJSON())

                // meter them as the go into the post code
                .map(meter(metrics.meter("posts"), String.class))

                // post the json to the REST server
                .mergeMap(postJSON(m_restUrl, httpClient))

                // meter the responses
                .map(meter(metrics.meter("responses"), ObservableHttpResponse.class))
                
                // count sample completions
                .map(meter(metrics.meter("samples-completed"), m_samplesPerBatch, ObservableHttpResponse.class))

                // make sure every request has a successful return code
                .all(successful())
                
                .doOnCompleted(new Action0() {

                    @Override
                    public void call() {
                        try {
                            httpClient.close();
                        } catch (IOException e) {
                            System.err.println("Failed to close httpClient!");
                            e.printStackTrace();
                        }
                    }
                    
                });
    }
 
源代码21 项目: openwebnet-android   文件: DeviceListAdapter.java
private void turnLightOff(LightViewHolder holder, LightModel light) {
    log.debug("turn off light {}", light.getUuid());
    if (light.getStatus() == null) {
        log.warn("light status is null: unable to turn off");
        return;
    }

    Action0 updateLightStatusAction = () -> updateLightStatus(holder, light.getStatus());
    lightService.turnOff(light).doOnCompleted(updateLightStatusAction).subscribe();
}
 
源代码22 项目: android-rxgeofence   文件: RxGeoFenceOnSubscribe.java
@Override public void call(final Subscriber<? super GeoFenceEvent> subscriber) {
  Preconditions.checkNotNull(locationSource, "Location source must not be null");
  Preconditions.checkNotNull(geoFenceSource, "Geo fence source must not be null");

  locationSource.forEach(new Action1<LatLng>() {
    @Override public void call(LatLng latLng) {
      geoFire.setLocation(userKey, latLng.toGeoLocation());
    }
  });

  geoFenceSource.forEach(new Action1<List<Place>>() {
    @Override public void call(List<Place> places) {
      stopListen();
      for (Place place : places) {
        listen(
            new GeoQueryWrapper<>(geoFire.queryAtLocation(place.toGeoLocation(), place.getRad()),
                place), subscriber);
      }
    }
  });

  subscriber.add(Subscriptions.create(new Action0() {
    @Override public void call() {
      stopListen();
      geoFire.removeLocation(userKey);
    }
  }));
}
 
源代码23 项目: mantis   文件: MergedObservable.java
private synchronized void publishWithCallbacks(final String key, Observable<T> o,
                                               final Action1<Throwable> errorCallback, final Action0 successCallback) {
    subject.onNext(o
            .doOnError(new Action1<Throwable>() {
                @Override
                public void call(Throwable t1) {
                    if (errorCallback != null) {
                        errorCallback.call(t1);
                    }
                    logger.error("Inner observable with key: " + key + " terminated with onError, calling onError() on outer observable." + t1.getMessage(), t1);
                    takeUntilSubjects.remove(key);
                    subject.onError(t1);
                }
            })
            .doOnCompleted(new Action0() {
                @Override
                public void call() {
                    if (successCallback != null) {
                        successCallback.call();
                    }
                    logger.debug("Inner observable with key: " + key + " completed, incrementing terminal count.");
                    takeUntilSubjects.remove(key);
                    if (counts.incrementTerminalCountAndCheck()) {
                        logger.debug("All inner observables terminated, calling onCompleted() on outer observable.");
                        subject.onCompleted();
                    }
                }
            }));
}
 
private void notifyStarted(final String query) {
    if(mListener == null) return;
    dispatchOnMainThread(new Action0() {
        @Override
        public void call() {
            mListener.onSearchStarted(query);
        }
    });
}
 
源代码25 项目: vertx-rx   文件: ContextScheduler.java
@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
  action = schedulersHook.onSchedule(action);
  long delayMillis = unit.toMillis(delayTime);
  TimedAction timed = new TimedAction(action, 0);
  actions.put(timed, DUMB);
  timed.schedule(delayMillis);
  return timed;
}
 
源代码26 项目: Elephant   文件: TopicPublishPresenter.java
@Override
public void publishTopic(String title, String body, String categoryId) {
    mRxManager.add(mModel.publishTopic(title, body, categoryId)
            .doOnSubscribe(new Action0() {
                @Override
                public void call() {
                    mView.onRequestStart();
                }
            })
            .subscribeOn(AndroidSchedulers.mainThread())
            .subscribe(mTopicDetailObserver));
}
 
@Override
public Action0 onSchedule(Action0 action) {
	if (action instanceof TraceAction) {
		return action;
	}
	Action0 wrappedAction = this.delegate != null ? this.delegate.onSchedule(action)
			: action;
	if (wrappedAction instanceof TraceAction) {
		return action;
	}
	return super.onSchedule(
			new TraceAction(this.tracer, wrappedAction, this.threadsToSample));
}
 
源代码28 项目: mantis   文件: ServeNestedObservable.java
private void applySlottingSideEffectToObservable(Observable<Observable<T>> o) {

        final Observable<T> withSideEffects =
                Observable.merge(o)
                        .doOnEach(new Observer<T>() {
                            @Override
                            public void onCompleted() {
                                slottingStrategy.completeAllConnections();
                            }

                            @Override
                            public void onError(Throwable e) {
                                slottingStrategy.errorAllConnections(e);
                            }

                            @Override
                            public void onNext(T value) {
                                slottingStrategy.writeOnSlot(null, value);
                            }
                        });

        final MutableReference<Subscription> subscriptionRef = new MutableReference<>();
        slottingStrategy.registerDoAfterFirstConnectionAdded(new Action0() {
            @Override
            public void call() {
                subscriptionRef.setValue(withSideEffects.subscribe());
            }
        });

        slottingStrategy.registerDoAfterLastConnectionRemoved(new Action0() {
            @Override
            public void call() {
                subscriptionRef.getValue().unsubscribe();
            }
        });
    }
 
源代码29 项目: mantis   文件: SafeWriter.java
<T> boolean safeWrite(final ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>> connection,
                      List<RemoteRxEvent> events,
                      final MutableReference<Subscription> subReference,
                      Action0 onSuccessfulWriteCallback,
                      final Action1<Throwable> onFailedWriteCallback,
                      final SlottingStrategy<T> slottingStrategyReference,
                      final WritableEndpoint<T> endpoint) {
    boolean writeSuccess = true;
    if (checkIsOpenCounter.getAndIncrement() % CHECK_IS_OPEN_INTERVAL == 0) {
        if (!connection.isCloseIssued() && connection.getChannel().isActive()) {
            writeSuccess = checkWriteableAndWrite(connection, events,
                    onSuccessfulWriteCallback, onFailedWriteCallback);
        } else {
            writeSuccess = false;
            logger.warn("Detected closed or inactive client connection, force unsubscribe.");
            subReference.getValue().unsubscribe();
            // release slot
            if (slottingStrategyReference != null) {
                logger.info("Removing slot for endpoint: " + endpoint);
                if (!slottingStrategyReference.removeConnection(endpoint)) {
                    logger.error("Failed to remove endpoint from slot,  endpoint: " + endpoint);
                }
            }
        }

    } else {
        writeSuccess = checkWriteableAndWrite(connection, events,
                onSuccessfulWriteCallback, onFailedWriteCallback);
    }
    return writeSuccess;
}
 
源代码30 项目: rxjava-jdbc   文件: QuerySelectOnSubscribe.java
private static <T> void setupUnsubscription(Subscriber<T> subscriber, final State state) {
    subscriber.add(Subscriptions.create(new Action0() {
        @Override
        public void call() {
            closeQuietly(state);
        }
    }));
}