下面列出了org.mockito.internal.stubbing.answers.ThrowsException#org.reactivestreams.Subscriber 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testSuccess() throws Exception {
@SuppressWarnings("unchecked")
Future<Object> future = mock(Future.class);
Object value = new Object();
when(future.get()).thenReturn(value);
Subscriber<Object> o = TestHelper.mockSubscriber();
TestSubscriber<Object> ts = new TestSubscriber<Object>(o);
Flowable.fromFuture(future).subscribe(ts);
ts.dispose();
verify(o, times(1)).onNext(value);
verify(o, times(1)).onComplete();
verify(o, never()).onError(any(Throwable.class));
verify(future, never()).cancel(anyBoolean());
}
@Test
public void testSubscribeAfterCompleted() {
AsyncProcessor<String> subject = AsyncProcessor.create();
Subscriber<String> observer = TestHelper.mockSubscriber();
subject.onNext("one");
subject.onNext("two");
subject.onNext("three");
subject.onComplete();
subject.subscribe(observer);
verify(observer, times(1)).onNext("three");
verify(observer, Mockito.never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
/**
* Interprets the contents as NotificationLite objects and calls
* the appropriate Subscriber method.
*
* @param <U> the target type
* @param subscriber the subscriber to emit the events to
* @return true if a terminal event has been reached
*/
public <U> boolean accept(Subscriber<? super U> subscriber) {
Object[] a = head;
final int c = capacity;
while (a != null) {
for (int i = 0; i < c; i++) {
Object o = a[i];
if (o == null) {
break;
}
if (NotificationLite.acceptFull(o, subscriber)) {
return true;
}
}
a = (Object[])a[c];
}
return false;
}
@Test
public void testZipIterableSameSize() {
PublishProcessor<String> r1 = PublishProcessor.create();
/* define a Subscriber to receive aggregated events */
Subscriber<String> o = TestHelper.mockSubscriber();
InOrder io = inOrder(o);
Iterable<String> r2 = Arrays.asList("1", "2", "3");
r1.zipWith(r2, zipr2).subscribe(o);
r1.onNext("one-");
r1.onNext("two-");
r1.onNext("three-");
r1.onComplete();
io.verify(o).onNext("one-1");
io.verify(o).onNext("two-2");
io.verify(o).onNext("three-3");
io.verify(o).onComplete();
verify(o, never()).onError(any(Throwable.class));
}
@Test
public void testTimedFirstNames(){
employeeBatchStreamServiceImpl.getTimedFirstNames().subscribe(new Subscriber<String>(){
@Override
public void onComplete() { }
@Override
public void onError(Throwable arg0) {
System.out.println("time is out....");
}
@Override
public void onNext(String data) {
System.out.println(data);
}
@Override
public void onSubscribe(Subscription subs) {
subs.request(Long.MAX_VALUE);
}
});
}
@Override
public void subscribe(Subscriber<? super Buffer> subscriber) {
ByteBufToBufferSubscriber byteBufToBufferSubscriber = new ByteBufToBufferSubscriber(subscriber);
eventLoop.submit(() -> contentProducer.onSubscribed(byteBufToBufferSubscriber));
byteBufToBufferSubscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) {
eventLoop.submit(() -> contentProducer.request(n));
}
@Override
public void cancel() {
eventLoop.submit(contentProducer::unsubscribe);
}
});
}
@Test
public void testCancelledBeforeSubscribe() throws Exception {
@SuppressWarnings("unchecked")
Future<Object> future = mock(Future.class);
CancellationException e = new CancellationException("unit test synthetic cancellation");
when(future.get()).thenThrow(e);
Subscriber<Object> o = TestHelper.mockSubscriber();
TestSubscriber<Object> ts = new TestSubscriber<Object>(o);
ts.dispose();
Flowable.fromFuture(future).subscribe(ts);
ts.assertNoErrors();
ts.assertNotComplete();
}
@Test
public void takeAll() {
Subscriber<Object> o = TestHelper.mockSubscriber();
Flowable.just(1, 2).takeUntil(new Predicate<Integer>() {
@Override
public boolean test(Integer v) {
return false;
}
}).subscribe(o);
verify(o).onNext(1);
verify(o).onNext(2);
verify(o, never()).onError(any(Throwable.class));
verify(o).onComplete();
}
private boolean checkTerminated(boolean d, boolean empty, Subscriber<? super T> subscriber, Queue<T> q) {
if (cancelled) {
q.clear();
downstream = null;
return true;
}
if (d && empty) {
Throwable e = error;
downstream = null;
if (e != null) {
subscriber.onError(e);
} else {
subscriber.onComplete();
}
return true;
}
return false;
}
@SuppressWarnings("unchecked")
@Test
public void testSinkUsingString() {
String topic = UUID.randomUUID().toString();
AtomicInteger expected = new AtomicInteger(0);
usage.consumeStrings(topic, 10, 10, TimeUnit.SECONDS,
v -> expected.getAndIncrement());
Map<String, Object> config = new HashMap<>();
config.put("address", topic);
EventBusSink sink = new EventBusSink(vertx,
new VertxEventBusConnectorOutgoingConfiguration(new MapBasedConfig(config)));
SubscriberBuilder<? extends Message<?>, Void> subscriber = sink.sink();
Multi.createFrom().range(0, 10)
.map(i -> Integer.toString(i))
.map(Message::of)
.subscribe((Subscriber<Message<?>>) subscriber.build());
await().untilAtomic(expected, is(10));
assertThat(expected).hasValue(10);
}
@SuppressWarnings("SubscriberImplementation")
@Incoming("G")
public Subscriber<Integer> consume() {
return new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(10);
}
@Override
public void onNext(Integer integer) {
getItems().add(integer);
}
@Override
public void onError(Throwable throwable) {
// Ignored
}
@Override
public void onComplete() {
// Ignored
}
};
}
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> chunks) {
Mono<Void> writeCompletion = Mono.create(sink -> {
logger.debug("Subscribing to body publisher");
Subscriber<DataBuffer> subscriber = new WriteStreamSubscriber.Builder<HttpClientRequest, DataBuffer>()
.writeStream(delegate)
.endHook(sink)
.nextHandler((stream, value) -> stream.write(bufferConverter.toBuffer(value)))
.build();
chunks.subscribe(subscriber);
});
Mono<Void> endCompletion = Mono.create(sink -> {
logger.debug("Completing request after writing");
delegate.end();
sink.success();
});
return doCommit(() -> writeCompletion.then(endCompletion));
}
public <R> Disposable subscribe(DisposableSubscriber<R> subscriber, FlowableTransformer<T, R> transformer)
{
Flowable flowable = build(false);
if (transformer != null)
flowable = flowable.compose(transformer);
Subscriber<R> actualSubscriber = subscriber;
if (mQueuer != null && mQueueSubscriptionSafetyCheckEnabled)
actualSubscriber = RxBusUtil.wrapSubscriber(subscriber, mQueuer);
flowable = applySchedular(flowable);
Disposable disposable = (DisposableSubscriber)flowable.subscribeWith(actualSubscriber);
if (mBoundObject != null)
RxDisposableManager.addDisposable(mBoundObject, disposable);
return disposable;
}
@Override
public void subscribe(Subscriber<? super T> subscriber) {
if (subscriber == null) {
throw new NullPointerException("The subscriber must not be `null`");
}
Function<? super Throwable, ? extends Publisher<? extends T>> next = failure -> {
if (predicate.test(failure)) {
Throwable throwable = mapper.apply(failure);
if (throwable == null) {
return Multi.createFrom().failure(new NullPointerException(MAPPER_RETURNED_NULL));
} else {
return Multi.createFrom().failure(throwable);
}
}
return Multi.createFrom().failure(failure);
};
Multi<T> op = Infrastructure.onMultiCreation(new MultiOnFailureResumeOp<>(upstream(), next));
op.subscribe(subscriber);
}
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
assertEquals(activePublishers.incrementAndGet(), 1);
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) {
if (published.compareAndSet(false, true)) {
getExecutorService().schedule(() -> {
subscriber.onNext(id);
getExecutorService().schedule(() -> {
activePublishers.decrementAndGet();
subscriber.onComplete();
}, 100, TimeUnit.MILLISECONDS);
}, 100, TimeUnit.MILLISECONDS);
}
}
@Override
public void cancel() {
}
});
}
@Override
public void onNext(Message<?> message) {
if (isCancelled()) {
return;
}
Subscriber<? super Message<?>> subscriber = this.downstream.get();
send(sender, message, durable, ttl, configuredAddress, useAnonymousSender, configuration)
.subscribe().with(
res -> {
subscriber.onNext(res);
if (requested.decrementAndGet() == 0) { // no more credit, request more
onNoMoreCredit();
}
},
subscriber::onError);
}
@Test
public void overflowFastpathDelayError() {
new ParallelFlowable<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer>[] subscribers) {
subscribers[0].onSubscribe(new BooleanSubscription());
subscribers[0].onNext(1);
subscribers[0].onNext(2);
}
@Override
public int parallelism() {
return 1;
}
}
.sequentialDelayError(1)
.test(0)
.requestMore(1)
.assertFailure(MissingBackpressureException.class, 1);
}
@SuppressWarnings("unchecked")
public void subscribe(@NonNull Subscriber<? super T>[] subscribers) {
if (!validate(subscribers)) {
return;
}
int n = subscribers.length;
Subscriber<? super T>[] parents = new Subscriber[n];
for (int i = 0; i < n; i++) {
Subscriber<? super T> a = subscribers[i];
if (a instanceof ConditionalSubscriber) {
parents[i] = new LifeConditionalSubscriber<>((ConditionalSubscriber<? super T>) a, scope);
} else {
parents[i] = new LifeSubscriber<>(a, scope);
}
}
ParallelFlowable<T> upStream = this.upStream;
if (onMain) upStream = upStream.runOn(AndroidSchedulers.mainThread());
upStream.subscribe(parents);
}
@Override
protected void subscribeActual(Subscriber<? super T> s) {
if (s instanceof ConditionalSubscriber) {
ConditionalSubscriber<? super T> cs = (ConditionalSubscriber<? super T>) s;
source.subscribe(new DistinctUntilChangedConditionalSubscriber<T, K>(cs, keySelector, comparer));
} else {
source.subscribe(new DistinctUntilChangedSubscriber<T, K>(s, keySelector, comparer));
}
}
@Test
public void testTakeLast2() {
Flowable<String> w = Flowable.just("one");
Flowable<String> take = w.takeLast(10);
Subscriber<String> observer = TestHelper.mockSubscriber();
take.subscribe(observer);
verify(observer, times(1)).onNext("one");
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
@Override
public void subscribe(
Subscriber<? super Message<?>> subscriber) {
if (!downstream.compareAndSet(null, subscriber)) {
Subscriptions.fail(subscriber, ex.illegalStateOnlyOneSubscriber());
} else {
if (subscription.get() != null) {
subscriber.onSubscribe(this);
}
}
}
@SuppressWarnings("SubscriberImplementation")
@Override
public <T> Subscriber<? super T> onSubscription(Publisher<? extends T> instance, Subscriber<? super T> subscriber) {
Executor executor = THREAD_CONTEXT.currentContextExecutor();
return new Subscriber<T>() {
@Override
public void onSubscribe(Subscription subscription) {
executor.execute(() -> subscriber.onSubscribe(subscription));
}
@Override
public void onNext(T item) {
executor.execute(() -> subscriber.onNext(item));
}
@Override
public void onError(Throwable failure) {
executor.execute(() -> subscriber.onError(failure));
}
@Override
public void onComplete() {
executor.execute(subscriber::onComplete);
}
};
}
@Test
public void testDematerialize3() {
Exception exception = new Exception("test");
Flowable<Integer> observable = Flowable.error(exception);
Flowable<Integer> dematerialize = observable.materialize().dematerialize();
Subscriber<Integer> observer = TestHelper.mockSubscriber();
dematerialize.subscribe(observer);
verify(observer, times(1)).onError(exception);
verify(observer, times(0)).onComplete();
verify(observer, times(0)).onNext(any(Integer.class));
}
/**
* Mocks a subscriber and prepares it to request {@code Long.MAX_VALUE}.
*
* @param <T> the value type
* @return the mocked subscriber
*/
@SuppressWarnings("unchecked")
public static <T> Subscriber<T> subscriber() {
Subscriber<T> w = mock(Subscriber.class);
Mockito.doAnswer((Answer<Object>) a -> {
Subscription s = a.getArgument(0);
s.request(Long.MAX_VALUE);
return null;
}).when(w).onSubscribe(any());
return w;
}
@Override
public void subscribe(Subscriber<? super T> s) {
String sName = s == null ? "null" : s.getClass().getName();
log("invoke subscribe with subscriber " + sName);
publisher.subscribe(new SubscriberProbe<>(s, name, start));
log("finish subscribe");
}
@SuppressWarnings("unchecked")
@Test
public void errorConditionalBackpressured() {
TestSubscriber<Object> ts = new TestSubscriber<Object>(0L);
Flowable.error(new TestException())
.parallel(1)
.runOn(ImmediateThinScheduler.INSTANCE)
.filter(Functions.alwaysTrue())
.subscribe(new Subscriber[] { ts });
ts
.assertFailure(TestException.class);
}
@SuppressWarnings("unchecked")
@Test
public void nextCancelRaceBackpressuredConditional() {
for (int i = 0; i < 1000; i++) {
final PublishProcessor<Integer> pp = PublishProcessor.create();
final TestSubscriber<Integer> ts = TestSubscriber.create(0L);
pp.parallel(1)
.runOn(Schedulers.computation())
.filter(Functions.alwaysTrue())
.subscribe(new Subscriber[] { ts });
Runnable r1 = new Runnable() {
@Override
public void run() {
pp.onNext(1);
}
};
Runnable r2 = new Runnable() {
@Override
public void run() {
ts.cancel();
}
};
TestCommonHelper.race(r1, r2);
}
}
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
int count = 0, a = 0, b = 1;
while (count < 50) {
int sum = a + b;
subscriber.onNext(b);
a = b;
b = sum;
count++;
}
subscriber.onComplete();
}
@Test
public void testSkipEmptyStream() {
Flowable<String> w = Flowable.empty();
Flowable<String> skip = w.skip(1);
Subscriber<String> observer = TestHelper.mockSubscriber();
skip.subscribe(observer);
verify(observer, never()).onNext(any(String.class));
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
@Override
public final void subscribe(Subscriber<? super Void> subscriber) {
// Technically, cancellation from the result subscriber should be propagated
// to the upstream subscription. In practice, HttpHandler server adapters
// don't have a reason to cancel the result subscription.
this.resultPublisher.subscribe(subscriber);
}