org.mockito.internal.stubbing.answers.ThrowsException#org.reactivestreams.Subscriber源码实例Demo

下面列出了org.mockito.internal.stubbing.answers.ThrowsException#org.reactivestreams.Subscriber 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: RxJava3-preview   文件: FlowableToFutureTest.java
@Test
public void testSuccess() throws Exception {
    @SuppressWarnings("unchecked")
    Future<Object> future = mock(Future.class);
    Object value = new Object();
    when(future.get()).thenReturn(value);

    Subscriber<Object> o = TestHelper.mockSubscriber();

    TestSubscriber<Object> ts = new TestSubscriber<Object>(o);

    Flowable.fromFuture(future).subscribe(ts);

    ts.dispose();

    verify(o, times(1)).onNext(value);
    verify(o, times(1)).onComplete();
    verify(o, never()).onError(any(Throwable.class));
    verify(future, never()).cancel(anyBoolean());
}
 
源代码2 项目: RxJava3-preview   文件: AsyncProcessorTest.java
@Test
public void testSubscribeAfterCompleted() {
    AsyncProcessor<String> subject = AsyncProcessor.create();

    Subscriber<String> observer = TestHelper.mockSubscriber();

    subject.onNext("one");
    subject.onNext("two");
    subject.onNext("three");
    subject.onComplete();

    subject.subscribe(observer);

    verify(observer, times(1)).onNext("three");
    verify(observer, Mockito.never()).onError(any(Throwable.class));
    verify(observer, times(1)).onComplete();
}
 
/**
 * Interprets the contents as NotificationLite objects and calls
 * the appropriate Subscriber method.
 * 
 * @param <U> the target type
 * @param subscriber the subscriber to emit the events to
 * @return true if a terminal event has been reached
 */
public <U> boolean accept(Subscriber<? super U> subscriber) {
    Object[] a = head;
    final int c = capacity;
    while (a != null) {
        for (int i = 0; i < c; i++) {
            Object o = a[i];
            if (o == null) {
                break;
            }

            if (NotificationLite.acceptFull(o, subscriber)) {
                return true;
            }
        }
        a = (Object[])a[c];
    }
    return false;
}
 
源代码4 项目: RxJava3-preview   文件: FlowableZipIterableTest.java
@Test
public void testZipIterableSameSize() {
    PublishProcessor<String> r1 = PublishProcessor.create();
    /* define a Subscriber to receive aggregated events */
    Subscriber<String> o = TestHelper.mockSubscriber();
    InOrder io = inOrder(o);

    Iterable<String> r2 = Arrays.asList("1", "2", "3");

    r1.zipWith(r2, zipr2).subscribe(o);

    r1.onNext("one-");
    r1.onNext("two-");
    r1.onNext("three-");
    r1.onComplete();

    io.verify(o).onNext("one-1");
    io.verify(o).onNext("two-2");
    io.verify(o).onNext("three-3");
    io.verify(o).onComplete();

    verify(o, never()).onError(any(Throwable.class));

}
 
@Test
public void testTimedFirstNames(){
	employeeBatchStreamServiceImpl.getTimedFirstNames().subscribe(new Subscriber<String>(){

		@Override
		public void onComplete() {	}

		@Override
		public void onError(Throwable arg0) {
			System.out.println("time is out....");
		}

		@Override
		public void onNext(String data) {
			System.out.println(data);
		}

		@Override
		public void onSubscribe(Subscription subs) {
			subs.request(Long.MAX_VALUE);
		}
		
	});
}
 
源代码6 项目: styx   文件: NettyByteStream.java
@Override
public void subscribe(Subscriber<? super Buffer> subscriber) {
    ByteBufToBufferSubscriber byteBufToBufferSubscriber = new ByteBufToBufferSubscriber(subscriber);
    eventLoop.submit(() -> contentProducer.onSubscribed(byteBufToBufferSubscriber));
    byteBufToBufferSubscriber.onSubscribe(new Subscription() {
        @Override
        public void request(long n) {
            eventLoop.submit(() -> contentProducer.request(n));
        }

        @Override
        public void cancel() {
            eventLoop.submit(contentProducer::unsubscribe);
        }
    });
}
 
源代码7 项目: RxJava3-preview   文件: FlowableToFutureTest.java
@Test
public void testCancelledBeforeSubscribe() throws Exception {
    @SuppressWarnings("unchecked")
    Future<Object> future = mock(Future.class);
    CancellationException e = new CancellationException("unit test synthetic cancellation");
    when(future.get()).thenThrow(e);

    Subscriber<Object> o = TestHelper.mockSubscriber();

    TestSubscriber<Object> ts = new TestSubscriber<Object>(o);
    ts.dispose();

    Flowable.fromFuture(future).subscribe(ts);

    ts.assertNoErrors();
    ts.assertNotComplete();
}
 
@Test
public void takeAll() {
    Subscriber<Object> o = TestHelper.mockSubscriber();

    Flowable.just(1, 2).takeUntil(new Predicate<Integer>() {
        @Override
        public boolean test(Integer v) {
            return false;
        }
    }).subscribe(o);

    verify(o).onNext(1);
    verify(o).onNext(2);
    verify(o, never()).onError(any(Throwable.class));
    verify(o).onComplete();
}
 
private boolean checkTerminated(boolean d, boolean empty, Subscriber<? super T> subscriber, Queue<T> q) {
    if (cancelled) {
        q.clear();
        downstream = null;
        return true;
    }

    if (d && empty) {
        Throwable e = error;
        downstream = null;
        if (e != null) {
            subscriber.onError(e);
        } else {
            subscriber.onComplete();
        }
        return true;
    }

    return false;
}
 
@SuppressWarnings("unchecked")
@Test
public void testSinkUsingString() {
    String topic = UUID.randomUUID().toString();
    AtomicInteger expected = new AtomicInteger(0);
    usage.consumeStrings(topic, 10, 10, TimeUnit.SECONDS,
            v -> expected.getAndIncrement());

    Map<String, Object> config = new HashMap<>();
    config.put("address", topic);
    EventBusSink sink = new EventBusSink(vertx,
            new VertxEventBusConnectorOutgoingConfiguration(new MapBasedConfig(config)));

    SubscriberBuilder<? extends Message<?>, Void> subscriber = sink.sink();
    Multi.createFrom().range(0, 10)
            .map(i -> Integer.toString(i))
            .map(Message::of)
            .subscribe((Subscriber<Message<?>>) subscriber.build());
    await().untilAtomic(expected, is(10));
    assertThat(expected).hasValue(10);
}
 
源代码11 项目: quarkus   文件: PublisherSignatureTest.java
@SuppressWarnings("SubscriberImplementation")
@Incoming("G")
public Subscriber<Integer> consume() {
    return new Subscriber<Integer>() {
        @Override
        public void onSubscribe(Subscription subscription) {
            subscription.request(10);
        }

        @Override
        public void onNext(Integer integer) {
            getItems().add(integer);
        }

        @Override
        public void onError(Throwable throwable) {
            // Ignored
        }

        @Override
        public void onComplete() {
            // Ignored
        }
    };
}
 
源代码12 项目: vertx-spring-boot   文件: VertxClientHttpRequest.java
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> chunks) {
    Mono<Void> writeCompletion = Mono.create(sink -> {
        logger.debug("Subscribing to body publisher");
        Subscriber<DataBuffer> subscriber = new WriteStreamSubscriber.Builder<HttpClientRequest, DataBuffer>()
            .writeStream(delegate)
            .endHook(sink)
            .nextHandler((stream, value) -> stream.write(bufferConverter.toBuffer(value)))
            .build();
        chunks.subscribe(subscriber);
    });

    Mono<Void> endCompletion = Mono.create(sink -> {
        logger.debug("Completing request after writing");
        delegate.end();
        sink.success();
    });

    return doCommit(() -> writeCompletion.then(endCompletion));
}
 
源代码13 项目: RxBus2   文件: RxBusBuilder.java
public <R> Disposable subscribe(DisposableSubscriber<R> subscriber, FlowableTransformer<T, R> transformer)
{
    Flowable flowable = build(false);
    if (transformer != null)
        flowable = flowable.compose(transformer);

    Subscriber<R> actualSubscriber = subscriber;
    if (mQueuer != null && mQueueSubscriptionSafetyCheckEnabled)
        actualSubscriber = RxBusUtil.wrapSubscriber(subscriber, mQueuer);

    flowable = applySchedular(flowable);
    Disposable disposable = (DisposableSubscriber)flowable.subscribeWith(actualSubscriber);
    if (mBoundObject != null)
        RxDisposableManager.addDisposable(mBoundObject, disposable);
    return disposable;
}
 
源代码14 项目: smallrye-mutiny   文件: MultiMapOnFailure.java
@Override
public void subscribe(Subscriber<? super T> subscriber) {
    if (subscriber == null) {
        throw new NullPointerException("The subscriber must not be `null`");
    }
    Function<? super Throwable, ? extends Publisher<? extends T>> next = failure -> {
        if (predicate.test(failure)) {
            Throwable throwable = mapper.apply(failure);
            if (throwable == null) {
                return Multi.createFrom().failure(new NullPointerException(MAPPER_RETURNED_NULL));
            } else {
                return Multi.createFrom().failure(throwable);
            }
        }
        return Multi.createFrom().failure(failure);
    };
    Multi<T> op = Infrastructure.onMultiCreation(new MultiOnFailureResumeOp<>(upstream(), next));
    op.subscribe(subscriber);
}
 
源代码15 项目: smallrye-mutiny   文件: ScheduledPublisher.java
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
    assertEquals(activePublishers.incrementAndGet(), 1);
    subscriber.onSubscribe(new Subscription() {
        @Override
        public void request(long n) {
            if (published.compareAndSet(false, true)) {
                getExecutorService().schedule(() -> {
                    subscriber.onNext(id);
                    getExecutorService().schedule(() -> {
                        activePublishers.decrementAndGet();
                        subscriber.onComplete();
                    }, 100, TimeUnit.MILLISECONDS);
                }, 100, TimeUnit.MILLISECONDS);
            }
        }

        @Override
        public void cancel() {
        }
    });
}
 
@Override
public void onNext(Message<?> message) {
    if (isCancelled()) {
        return;
    }

    Subscriber<? super Message<?>> subscriber = this.downstream.get();
    send(sender, message, durable, ttl, configuredAddress, useAnonymousSender, configuration)
            .subscribe().with(
                    res -> {
                        subscriber.onNext(res);
                        if (requested.decrementAndGet() == 0) { // no more credit, request more
                            onNoMoreCredit();
                        }
                    },
                    subscriber::onError);
}
 
源代码17 项目: RxJava3-preview   文件: ParallelJoinTest.java
@Test
public void overflowFastpathDelayError() {
    new ParallelFlowable<Integer>() {
        @Override
        public void subscribe(Subscriber<? super Integer>[] subscribers) {
            subscribers[0].onSubscribe(new BooleanSubscription());
            subscribers[0].onNext(1);
            subscribers[0].onNext(2);
        }

        @Override
        public int parallelism() {
            return 1;
        }
    }
    .sequentialDelayError(1)
    .test(0)
    .requestMore(1)
    .assertFailure(MissingBackpressureException.class, 1);
}
 
源代码18 项目: rxjava-RxLife   文件: ParallelFlowableLife.java
@SuppressWarnings("unchecked")
public void subscribe(@NonNull Subscriber<? super T>[] subscribers) {
    if (!validate(subscribers)) {
        return;
    }

    int n = subscribers.length;

    Subscriber<? super T>[] parents = new Subscriber[n];

    for (int i = 0; i < n; i++) {
        Subscriber<? super T> a = subscribers[i];
        if (a instanceof ConditionalSubscriber) {
            parents[i] = new LifeConditionalSubscriber<>((ConditionalSubscriber<? super T>) a, scope);
        } else {
            parents[i] = new LifeSubscriber<>(a, scope);
        }
    }
    ParallelFlowable<T> upStream = this.upStream;
    if (onMain) upStream = upStream.runOn(AndroidSchedulers.mainThread());
    upStream.subscribe(parents);
}
 
@Override
protected void subscribeActual(Subscriber<? super T> s) {
    if (s instanceof ConditionalSubscriber) {
        ConditionalSubscriber<? super T> cs = (ConditionalSubscriber<? super T>) s;
        source.subscribe(new DistinctUntilChangedConditionalSubscriber<T, K>(cs, keySelector, comparer));
    } else {
        source.subscribe(new DistinctUntilChangedSubscriber<T, K>(s, keySelector, comparer));
    }
}
 
源代码20 项目: RxJava3-preview   文件: FlowableTakeLastTest.java
@Test
public void testTakeLast2() {
    Flowable<String> w = Flowable.just("one");
    Flowable<String> take = w.takeLast(10);

    Subscriber<String> observer = TestHelper.mockSubscriber();
    take.subscribe(observer);
    verify(observer, times(1)).onNext("one");
    verify(observer, never()).onError(any(Throwable.class));
    verify(observer, times(1)).onComplete();
}
 
源代码21 项目: smallrye-reactive-messaging   文件: KafkaSink.java
@Override
public void subscribe(
        Subscriber<? super Message<?>> subscriber) {
    if (!downstream.compareAndSet(null, subscriber)) {
        Subscriptions.fail(subscriber, ex.illegalStateOnlyOneSubscriber());
    } else {
        if (subscription.get() != null) {
            subscriber.onSubscribe(this);
        }
    }
}
 
@SuppressWarnings("SubscriberImplementation")
@Override
public <T> Subscriber<? super T> onSubscription(Publisher<? extends T> instance, Subscriber<? super T> subscriber) {
    Executor executor = THREAD_CONTEXT.currentContextExecutor();
    return new Subscriber<T>() {

        @Override
        public void onSubscribe(Subscription subscription) {
            executor.execute(() -> subscriber.onSubscribe(subscription));
        }

        @Override
        public void onNext(T item) {
            executor.execute(() -> subscriber.onNext(item));
        }

        @Override
        public void onError(Throwable failure) {
            executor.execute(() -> subscriber.onError(failure));
        }

        @Override
        public void onComplete() {
            executor.execute(subscriber::onComplete);
        }
    };
}
 
@Test
public void testDematerialize3() {
    Exception exception = new Exception("test");
    Flowable<Integer> observable = Flowable.error(exception);
    Flowable<Integer> dematerialize = observable.materialize().dematerialize();

    Subscriber<Integer> observer = TestHelper.mockSubscriber();

    dematerialize.subscribe(observer);

    verify(observer, times(1)).onError(exception);
    verify(observer, times(0)).onComplete();
    verify(observer, times(0)).onNext(any(Integer.class));
}
 
源代码24 项目: smallrye-mutiny   文件: Mocks.java
/**
 * Mocks a subscriber and prepares it to request {@code Long.MAX_VALUE}.
 * 
 * @param <T> the value type
 * @return the mocked subscriber
 */
@SuppressWarnings("unchecked")
public static <T> Subscriber<T> subscriber() {
    Subscriber<T> w = mock(Subscriber.class);

    Mockito.doAnswer((Answer<Object>) a -> {
        Subscription s = a.getArgument(0);
        s.request(Long.MAX_VALUE);
        return null;
    }).when(w).onSubscribe(any());

    return w;
}
 
源代码25 项目: aws-sdk-java-v2   文件: PublisherProbe.java
@Override
public void subscribe(Subscriber<? super T> s) {
    String sName = s == null ? "null" : s.getClass().getName();
    log("invoke subscribe with subscriber " + sName);
    publisher.subscribe(new SubscriberProbe<>(s, name, start));
    log("finish subscribe");
}
 
源代码26 项目: RxJava3-preview   文件: ParallelRunOnTest.java
@SuppressWarnings("unchecked")
@Test
public void errorConditionalBackpressured() {
    TestSubscriber<Object> ts = new TestSubscriber<Object>(0L);

    Flowable.error(new TestException())
    .parallel(1)
    .runOn(ImmediateThinScheduler.INSTANCE)
    .filter(Functions.alwaysTrue())
    .subscribe(new Subscriber[] { ts });

    ts
    .assertFailure(TestException.class);
}
 
源代码27 项目: RxJava3-preview   文件: ParallelRunOnTest.java
@SuppressWarnings("unchecked")
@Test
public void nextCancelRaceBackpressuredConditional() {
    for (int i = 0; i < 1000; i++) {
        final PublishProcessor<Integer> pp = PublishProcessor.create();

        final TestSubscriber<Integer> ts = TestSubscriber.create(0L);

        pp.parallel(1)
        .runOn(Schedulers.computation())
        .filter(Functions.alwaysTrue())
        .subscribe(new Subscriber[] { ts });

        Runnable r1 = new Runnable() {
            @Override
            public void run() {
                pp.onNext(1);
            }
        };

        Runnable r2 = new Runnable() {
            @Override
            public void run() {
                ts.cancel();
            }
        };

        TestCommonHelper.race(r1, r2);
    }
}
 
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
    int count = 0, a = 0, b = 1;
    while (count < 50) {
        int sum = a + b;
        subscriber.onNext(b);
        a = b;
        b = sum;
        count++;
    }

    subscriber.onComplete();
}
 
源代码29 项目: RxJava3-preview   文件: FlowableSkipTest.java
@Test
public void testSkipEmptyStream() {

    Flowable<String> w = Flowable.empty();
    Flowable<String> skip = w.skip(1);

    Subscriber<String> observer = TestHelper.mockSubscriber();
    skip.subscribe(observer);
    verify(observer, never()).onNext(any(String.class));
    verify(observer, never()).onError(any(Throwable.class));
    verify(observer, times(1)).onComplete();
}
 
@Override
public final void subscribe(Subscriber<? super Void> subscriber) {
	// Technically, cancellation from the result subscriber should be propagated
	// to the upstream subscription. In practice, HttpHandler server adapters
	// don't have a reason to cancel the result subscription.
	this.resultPublisher.subscribe(subscriber);
}