下面列出了io.reactivex.Maybe#subscribe ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static void main(String[] args) {
// TODO Auto-generated method stub
Maybe<List<String>> month_maybe = Maybe.create(emitter -> {
try {
String[] monthArray = { "Jan", "Feb", "Mar", "Apl", "May", "Jun", "July", "Aug", "Sept", "Oct", "Nov",
"Dec" };
List<String> months = Arrays.asList(monthArray);
if (months != null && !months.isEmpty()) {
emitter.onSuccess(months);
} else {
emitter.onComplete();
}
} catch (Exception e) {
emitter.onError(e);
}
});
month_maybe.subscribe(s->System.out.println(s));
}
@Override
public void onNext(T t) {
if (finished) {
return;
}
queue.offer(t);
Maybe<? extends T> maybe;
try {
maybe = valueToInsert.apply(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// we cancel upstream ourselves because the
// error did not originate from source
upstream.cancel();
onError(e);
return;
}
ValueToInsertObserver<T> o = new ValueToInsertObserver<T>(this);
if (DisposableHelper.set(valueToInsertObserver, o)) {
// note that at this point we have to cover o being disposed
// from another thread so the Observer class needs
// to handle dispose being called before/during onSubscribe
maybe.subscribe(o);
}
drain();
}
@Test
public void testMaybe() {
NullableTCK obj = new NullableTCK(new NullableTCKImpl());
List<String> result = new ArrayList<>();
List<Throwable> failure = new ArrayList<>();
AtomicInteger completions = new AtomicInteger();
Maybe<String> maybeNotNull = obj.rxMethodWithNullableStringHandlerAsyncResult(true);
maybeNotNull.subscribe(result::add, failure::add, completions::incrementAndGet);
assertEquals(Collections.singletonList("the_string_value"), result);
assertEquals(Collections.emptyList(), failure);
assertEquals(0, completions.get());
result.clear();
maybeNotNull = obj.rxMethodWithNullableStringHandlerAsyncResult(false);
maybeNotNull.subscribe(result::add, failure::add, completions::incrementAndGet);
assertEquals(Collections.emptyList(), result);
assertEquals(Collections.emptyList(), failure);
assertEquals(1, completions.get());
result.clear();
}
@SuppressWarnings("CheckResult")
@Test
public void shouldExecuteAsBlockingAfterSubscription() {
//noinspection unchecked
final PreparedMaybeOperation<String, String, String> preparedOperation = mock(PreparedMaybeOperation.class);
String expectedResult = "test";
when(preparedOperation.executeAsBlocking()).thenReturn(expectedResult);
TestObserver<String> testObserver = new TestObserver<String>();
verifyZeroInteractions(preparedOperation);
Maybe<String> maybe = Maybe.create(new MaybeOnSubscribeExecuteAsBlocking<String, String, String>(preparedOperation));
verifyZeroInteractions(preparedOperation);
maybe.subscribe(testObserver);
testObserver.assertValue(expectedResult);
testObserver.assertNoErrors();
testObserver.assertComplete();
verify(preparedOperation).executeAsBlocking();
}
@SuppressWarnings("unchecked")
@Override
public <T> CompletionStage<T> toCompletionStage(Maybe instance) {
CompletableFuture<T> future = new CompletableFuture<>();
Maybe<T> s = Objects.requireNonNull(instance);
//noinspection ResultOfMethodCallIgnored
s.subscribe(
future::complete,
future::completeExceptionally,
() -> future.complete(null));
return future;
}
@Test
public void testToMaybeObserverSuccess() {
Promise<String> promise = Promise.promise();
MaybeObserver<String> observer = MaybeHelper.toObserver(promise);
Maybe<String> s = Maybe.just("foobar");
s.subscribe(observer);
assertTrue(promise.future().succeeded());
assertSame("foobar", promise.future().result());
}
@Test
public void testToMaybeObserverEmpty() {
Promise<String> promise = Promise.promise();
MaybeObserver<String> observer = MaybeHelper.toObserver(promise);
Maybe<String> s = Maybe.empty();
s.subscribe(observer);
assertTrue(promise.future().succeeded());
assertNull(promise.future().result());
}
@Test
public void testToMaybeObserverFailure() {
Promise<String> promise = Promise.promise();
MaybeObserver<String> observer = MaybeHelper.toObserver(promise);
RuntimeException cause = new RuntimeException();
Maybe<String> s = Maybe.error(cause);
s.subscribe(observer);
assertTrue(promise.future().failed());
assertSame(cause, promise.future().cause());
}
@Test
public void testMaybeReportingSubscribeUncheckedException() {
RuntimeException cause = new RuntimeException();
MethodWithMaybeString meth = new MethodWithMaybeString(handler -> {
throw cause;
});
Maybe<String> single = meth.rxDoSomethingWithMaybeResult();
single.subscribe(result -> fail(), err -> testComplete(), this::fail);
await();
}
@Test
public void testNullableTypeVariableParamByVoidArg() {
MethodWithNullableTypeVariableParamByVoidArg abc = MethodWithNullableTypeVariableParamByVoidArg.newInstance(handler -> handler.handle(Future.succeededFuture()));
Maybe<Void> maybe = abc.rxDoSomethingWithMaybeResult();
AtomicInteger count = new AtomicInteger();
maybe.subscribe(o -> fail(), err -> fail(err.getMessage()), count::incrementAndGet);
assertEquals(1, count.get());
}
@Test
public void getExistedObjectExecuteAsMaybe() {
final TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();
storIOContentResolver
.observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
.take(1)
.subscribe(changesTestSubscriber);
TestItem expectedItem = TestItem.create(null, "value");
contentResolver.insert(TestItem.CONTENT_URI, expectedItem.toContentValues());
contentResolver.insert(TestItem.CONTENT_URI, TestItem.create(null, "value1").toContentValues());
contentResolver.insert(TestItem.CONTENT_URI, TestItem.create(null, "value2").toContentValues());
Maybe<TestItem> testItemMaybe = storIOContentResolver
.get()
.object(TestItem.class)
.withQuery(Query.builder()
.uri(TestItem.CONTENT_URI)
.where(TestItem.COLUMN_VALUE + "=?")
.whereArgs("value")
.build())
.prepare()
.asRxMaybe();
TestObserver<TestItem> testObserver = new TestObserver<TestItem>();
testItemMaybe.subscribe(testObserver);
testObserver.awaitTerminalEvent(5, SECONDS);
testObserver.assertNoErrors();
List<TestItem> emmitedItems = testObserver.values();
assertThat(emmitedItems.size()).isEqualTo(1);
assertThat(expectedItem.equalsWithoutId(emmitedItems.get(0))).isTrue();
changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
changesTestSubscriber.assertNoErrors();
changesTestSubscriber.assertValue(Changes.newInstance(TestItem.CONTENT_URI));
}
@Test
public void getNonExistedObjectExecuteAsMaybe() {
final TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();
storIOContentResolver
.observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
.take(1)
.subscribe(changesTestSubscriber);
contentResolver.insert(TestItem.CONTENT_URI, TestItem.create(null, "value").toContentValues());
Maybe<TestItem> testItemMaybe = storIOContentResolver
.get()
.object(TestItem.class)
.withQuery(Query.builder()
.uri(TestItem.CONTENT_URI)
.where(TestItem.COLUMN_VALUE + "=?")
.whereArgs("some value")
.build())
.prepare()
.asRxMaybe();
TestObserver<TestItem> testObserver = new TestObserver<TestItem>();
testItemMaybe.subscribe(testObserver);
testObserver.awaitTerminalEvent(5, SECONDS);
testObserver.assertNoValues();
testObserver.assertNoErrors();
changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
changesTestSubscriber.assertNoErrors();
changesTestSubscriber.assertValue(Changes.newInstance(TestItem.CONTENT_URI));
}
public MaybeAdaptor(Maybe<T> observable)
{
this.subscription = observable.subscribe(this::complete, this::completeExceptionally, this::completeEmpty);
}
@Override
public void handle(RoutingContext context) {
// Even if a scope parameter is present in the Request Object value, a scope parameter MUST always be passed
// using the OAuth 2.0 request syntax containing the openid scope value to indicate to the underlying OAuth 2.0
// logic that this is an OpenID Connect request.
String scope = context.request().getParam(io.gravitee.am.common.oauth2.Parameters.SCOPE);
HashSet<String> scopes = scope != null && !scope.isEmpty() ? new HashSet<>(Arrays.asList(scope.split("\\s+"))) : null;
if (scopes == null || !scopes.contains(Scope.OPENID.getKey())) {
context.next();
return;
}
// if there is no request or request_uri parameters, continue
if ((context.request().getParam(Parameters.REQUEST) == null || context.request().getParam(Parameters.REQUEST).isEmpty())
&& ((context.request().getParam(Parameters.REQUEST_URI) == null || context.request().getParam(Parameters.REQUEST_URI).isEmpty()))) {
context.next();
return;
}
// check request object parameters
checkRequestObjectParameters(context);
// Proceed request and request_uri parameters
Maybe<JWT> requestObject = null;
if (context.request().getParam(Parameters.REQUEST) != null) {
requestObject = handleRequestObjectValue(context);
} else if (context.request().getParam(Parameters.REQUEST_URI) != null) {
requestObject = handleRequestObjectURI(context);
}
requestObject
.subscribe(
jwt -> {
try {
// Check OAuth2 parameters
checkOAuthParameters(context, jwt);
overrideRequestParameters(context, jwt);
context.next();
} catch (Exception ex) {
context.fail(ex);
}
},
context::fail,
() -> context.next());
}
public static <T> void subscribe(Maybe<T> obs, TestSubscriber<T> sub) {
obs.subscribe(sub::onNext, sub::onError, sub::onCompleted);
}
@SuppressWarnings("CheckResult")
@Test
public void shouldCompleteIfNullOccurred() {
//noinspection unchecked
final PreparedMaybeOperation<String, String, String> preparedOperation = mock(PreparedMaybeOperation.class);
when(preparedOperation.executeAsBlocking()).thenReturn(null);
TestObserver<String> testObserver = new TestObserver<String>();
verifyZeroInteractions(preparedOperation);
Maybe<String> maybe = Maybe.create(new MaybeOnSubscribeExecuteAsBlocking<String, String, String>(preparedOperation));
verifyZeroInteractions(preparedOperation);
maybe.subscribe(testObserver);
testObserver.assertNoErrors();
testObserver.assertComplete();
verify(preparedOperation).executeAsBlocking();
}
@SuppressWarnings("CheckResult")
@Test
public void shouldCallOnErrorIfExceptionOccurred() {
//noinspection unchecked
final PreparedMaybeOperation<Object, Object, Object> preparedOperation = mock(PreparedMaybeOperation.class);
StorIOException expectedException = new StorIOException("test exception");
when(preparedOperation.executeAsBlocking()).thenThrow(expectedException);
TestObserver<Object> testObserver = new TestObserver<Object>();
Maybe<Object> maybe = Maybe.create(new MaybeOnSubscribeExecuteAsBlocking<Object, Object, Object>(preparedOperation));
verifyZeroInteractions(preparedOperation);
maybe.subscribe(testObserver);
testObserver.assertError(expectedException);
testObserver.assertNotComplete();
verify(preparedOperation).executeAsBlocking();
}