类org.reactivestreams.Processor源码实例Demo

下面列出了怎么用org.reactivestreams.Processor的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: smallrye-mutiny   文件: SerializedProcessorTest.java
@Test(invocationCount = 50)
public void verifyOnErrorThreadSafety() {
    Exception failure = new Exception("boom");
    final Processor<Integer, Integer> processor = UnicastProcessor.<Integer> create().serialized();
    MultiAssertSubscriber<Integer> subscriber = MultiAssertSubscriber.create(100);
    processor.subscribe(subscriber);

    Runnable r1 = () -> processor.onError(failure);
    Runnable r2 = () -> processor.onError(failure);

    new Thread(r1).start();
    new Thread(r2).start();

    subscriber
            .await()
            .assertSubscribed()
            .assertHasFailedWith(Exception.class, "boom");
}
 
源代码2 项目: smallrye-mutiny   文件: SerializedProcessorTest.java
@Test(invocationCount = 20)
public void verifyOnNextOnCompleteThreadSafety() {
    final Processor<Integer, Integer> processor = UnicastProcessor.<Integer> create().serialized();
    MultiAssertSubscriber<Integer> subscriber = MultiAssertSubscriber.create(100);
    processor.subscribe(subscriber);

    Runnable r1 = () -> {
        processor.onNext(1);
        processor.onComplete();
    };
    Runnable r2 = processor::onComplete;

    new Thread(r1).start();
    new Thread(r2).start();

    subscriber.await();
    subscriber
            .assertSubscribed()
            .assertCompletedSuccessfully();

    if (subscriber.items().size() != 0) {
        assertThat(subscriber.items()).containsExactly(1);
    }
}
 
源代码3 项目: smallrye-mutiny   文件: SerializedProcessorTest.java
@Test(invocationCount = 20)
public void verifyOnSubscribeOnCompleteThreadSafety() {
    final Processor<Integer, Integer> processor = UnicastProcessor.<Integer> create().serialized();
    MultiAssertSubscriber<Integer> subscriber = MultiAssertSubscriber.create(100);
    processor.subscribe(subscriber);

    Runnable r1 = () -> {
        processor.onNext(1);
        processor.onComplete();
    };
    Runnable r2 = () -> processor.onSubscribe(new Subscriptions.EmptySubscription());

    new Thread(r1).start();
    new Thread(r2).start();

    subscriber.await();
    subscriber
            .assertSubscribed()
            .assertCompletedSuccessfully();

    if (subscriber.items().size() != 0) {
        assertThat(subscriber.items()).containsExactly(1);
    }
}
 
源代码4 项目: wurmloch-crdt   文件: GSetTest.java
@Test
public void shouldHandleAddCommands() {
    // given:
    final Processor<GSet.AddCommand<String>, GSet.AddCommand<String>> inputStream = ReplayProcessor.create();
    final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
    final GSet<String> set = new GSet<>("ID_1");
    set.subscribeTo(inputStream);
    set.subscribe(subscriber);

    final GSet.AddCommand<String> command1 = new GSet.AddCommand<>(set.getCrdtId(), "1");
    final GSet.AddCommand<String> command2 = new GSet.AddCommand<>(set.getCrdtId(), "2");
    final GSet.AddCommand<String> command3 = new GSet.AddCommand<>(set.getCrdtId(), "1");

    // when:
    inputStream.onNext(command1);
    inputStream.onNext(command2);
    inputStream.onNext(command3);

    // then:
    assertThat(set, hasSize(2));
    assertThat(subscriber.valueCount(), is(2));
    subscriber.assertNotComplete();
    subscriber.assertNoErrors();
}
 
源代码5 项目: wurmloch-crdt   文件: TwoPSetTest.java
@Test
public void shouldHandleRemoveCommandArrivesBeforeAddCommand() {
    // given:
    final Processor<TwoPSet.TwoPSetCommand<String>, TwoPSet.TwoPSetCommand<String>> inputStream = ReplayProcessor.create();
    final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
    final TwoPSet<String> set = new TwoPSet<>("ID_1");
    set.subscribeTo(inputStream);
    set.subscribe(subscriber);

    final TwoPSet.RemoveCommand<String> command1 = new TwoPSet.RemoveCommand<>(set.getCrdtId(), "1");
    final TwoPSet.AddCommand<String> command2 = new TwoPSet.AddCommand<>(set.getCrdtId(), "1");
    final TwoPSet.AddCommand<String> command3 = new TwoPSet.AddCommand<>(set.getCrdtId(), "1");

    // when:
    inputStream.onNext(command1);
    inputStream.onNext(command2);
    inputStream.onNext(command3);

    // then:
    assertThat(set, empty());
    assertThat(subscriber.valueCount(), is(1));
    subscriber.assertNotComplete();
    subscriber.assertNoErrors();
}
 
源代码6 项目: wurmloch-crdt   文件: TwoPSetTest.java
@Test
public void shouldHandleAddCommands() {
    // given:
    final Processor<TwoPSet.TwoPSetCommand<String>, TwoPSet.TwoPSetCommand<String>> inputStream = ReplayProcessor.create();
    final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
    final TwoPSet<String> set = new TwoPSet<>("ID_1");
    set.subscribeTo(inputStream);
    set.subscribe(subscriber);

    final TwoPSet.AddCommand<String> command1 = new TwoPSet.AddCommand<>(set.getCrdtId(), "1");
    final TwoPSet.AddCommand<String> command2 = new TwoPSet.AddCommand<>(set.getCrdtId(), "2");
    final TwoPSet.AddCommand<String> command3 = new TwoPSet.AddCommand<>(set.getCrdtId(), "1");

    // when:
    inputStream.onNext(command1);
    inputStream.onNext(command2);
    inputStream.onNext(command3);

    // then:
    assertThat(set, hasSize(2));
    assertThat(subscriber.valueCount(), is(2));
    subscriber.assertNotComplete();
    subscriber.assertNoErrors();
}
 
@Override
public <T, R> SubscriberWithCompletionStage<T, R> buildSubscriber(Graph graph) {
    Processor<T, T> processor = new ConnectableProcessor<>();
    Flowable<T> flowable = Flowable.fromPublisher(processor);
    for (Stage stage : graph.getStages()) {
        Operator operator = Stages.lookup(stage);
        if (operator instanceof ProcessorOperator) {
            flowable = applyProcessors(flowable, stage, (ProcessorOperator) operator);
        } else if (operator instanceof TerminalOperator) {
            CompletionStage<R> result = applySubscriber(Transformer.apply(flowable), stage,
                    (TerminalOperator) operator);
            return new DefaultSubscriberWithCompletionStage<>(processor, result);
        } else {
            throw new UnsupportedStageException(stage);
        }
    }

    throw new IllegalArgumentException("The graph does not have a valid final stage");
}
 
源代码8 项目: RxBus2   文件: RxBusSenderBuilder.java
private boolean sendToKeyBoundBus(RxQueueKey key, Object event)
{
    RxQueueKey keyToUse = key.clone();
    boolean send = false;
    Processor processor;
    if (mKey instanceof String)
        keyToUse.withId((String)mKey);
    else if (mKey instanceof Integer)
        keyToUse.withId((Integer)mKey);
    processor = RxBus.getInstance().getProcessor(keyToUse, false);

    // only send event, if processor exists => this means someone has at least once subscribed to it
    if (processor != null)
    {
        if (mCast == null)
            processor.onNext(event);
        else
            processor.onNext(mCast.cast(event));
        send = true;
    }
    return send;
}
 
源代码9 项目: reactor-core   文件: FluxWindow.java
@Override
public void onError(Throwable t) {
	if (done) {
		Operators.onErrorDropped(t, actual.currentContext());
		return;
	}
	done = true;

	for (Processor<T, T> w : this) {
		w.onError(t);
	}
	clear();

	error = t;
	drain();
}
 
源代码10 项目: wurmloch-crdt   文件: GSetTest.java
@Test
public void shouldHandleDuplicateCommands() {
    // given:
    final Processor<GSet.AddCommand<String>, GSet.AddCommand<String>> inputStream = ReplayProcessor.create();
    final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
    final GSet<String> set = new GSet<>("ID_1");
    set.subscribeTo(inputStream);
    set.subscribe(subscriber);

    final GSet.AddCommand<String> command = new GSet.AddCommand<>(set.getCrdtId(), "1");

    // when:
    inputStream.onNext(command);
    inputStream.onNext(command);

    // then:
    assertThat(set, hasSize(1));
    assertThat(subscriber.valueCount(), is(1));
    subscriber.assertNotComplete();
    subscriber.assertNoErrors();
}
 
源代码11 项目: wurmloch-crdt   文件: USetTest.java
@Test
public void shouldHandleRemoveCommands() {
    // given:
    final UUID uuid1 = UUID.randomUUID();
    final Processor<USet.USetCommand<UUID>, USet.USetCommand<UUID>> inputStream = ReplayProcessor.create();
    final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
    final USet<UUID> set = new USet<>("ID_1");
    set.subscribeTo(inputStream);
    set.subscribe(subscriber);

    final USet.AddCommand<UUID> command1 = new USet.AddCommand<>(set.getCrdtId(), uuid1);
    final USet.RemoveCommand<UUID> command2 = new USet.RemoveCommand<>(set.getCrdtId(), uuid1);

    // when:
    inputStream.onNext(command1);
    inputStream.onNext(command2);

    // then:
    assertThat(set, empty());
    assertThat(subscriber.valueCount(), is(2));
    subscriber.assertNotComplete();
    subscriber.assertNoErrors();
}
 
源代码12 项目: RxBus2   文件: RxBusSenderBuilder.java
private boolean sendToUnboundBus(RxQueueKey key, Object event)
{
    boolean send = false;
    Processor processor = RxBus.getInstance().getProcessor(key, false);
    // only send event, if processor exists => this means someone has at least once subscribed to it
    if (processor != null)
    {
        if (mCast == null)
            processor.onNext(event);
        else
            processor.onNext(mCast.cast(event));
        send = true;
    }
    return send;
}
 
源代码13 项目: smallrye-mutiny   文件: MultiWindowOp.java
@Override
public void onFailure(Throwable failure) {
    Subscription subscription = upstream.getAndSet(CANCELLED);
    if (subscription != CANCELLED) {
        Processor<T, T> proc = processor;
        if (proc != null) {
            processor = null;
            proc.onError(failure);
        }
        downstream.onFailure(failure);
    }
}
 
源代码14 项目: smallrye-mutiny   文件: MultiWindowOp.java
@Override
public void onCompletion() {
    Subscription subscription = upstream.getAndSet(CANCELLED);
    if (subscription != CANCELLED) {
        Processor<T, T> proc = processor;
        if (proc != null) {
            processor = null;
            proc.onComplete();
        }

        downstream.onCompletion();
    }
}
 
源代码15 项目: smallrye-mutiny   文件: SerializedProcessorTest.java
@Test
public void testWithMultipleItems() {
    Processor<Integer, Integer> processor = UnicastProcessor.<Integer> create().serialized();
    MultiAssertSubscriber<Integer> subscriber = MultiAssertSubscriber.create(10);
    processor.subscribe(subscriber);

    Multi.createFrom().range(1, 11).subscribe(processor);

    subscriber
            .assertReceived(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
            .assertCompletedSuccessfully();

    processor.onNext(11);
    processor.onComplete();
}
 
源代码16 项目: micrometer   文件: StatsdMeterRegistryTest.java
private Consumer<String> toSink(Processor<String, String> lines, int numLines) {
    AtomicInteger latch = new AtomicInteger(numLines);
    return l -> {
        lines.onNext(l);
        if (latch.decrementAndGet() == 0) {
            lines.onComplete();
        }
    };
}
 
源代码17 项目: smallrye-mutiny   文件: SerializedProcessorTest.java
@Test
public void testSubscriptionAfterTerminalEvent() {
    final Processor<Integer, Integer> processor = UnicastProcessor.<Integer> create().serialized();
    processor.onComplete();
    Subscription subscription = mock(Subscription.class);
    processor.onSubscribe(subscription);
    verify(subscription).cancel();
}
 
@Incoming("count")
@Outgoing("sink")
public Processor<Integer, String> process() {
    return ReactiveStreams.<Integer> builder()
            .map(i -> i + 1)
            .flatMapRsPublisher(i -> Flowable.just(i, i))
            .map(i -> Integer.toString(i))
            .buildRs();
}
 
源代码19 项目: reactive-streams-commons   文件: PublisherWindow.java
@Override
public void onError(Throwable t) {
    if (done) {
        UnsignalledExceptions.onErrorDropped(t);
        return;
    }
    Processor<T, T> w = window;
    if (w != null) {
        window = null;
        w.onError(t);
    }
    
    actual.onError(t);
}
 
@Incoming(PRE_ACKNOWLEDGMENT)
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
@Outgoing("sink-" + PRE_ACKNOWLEDGMENT)
public Processor<Message<String>, Message<String>> processorWithPreAck() {
    return ReactiveStreams.<Message<String>> builder()
            .flatMap(m -> ReactiveStreams.of(Message.of(m.getPayload()), Message.of(m.getPayload())))
            .peek(m -> processed(PRE_ACKNOWLEDGMENT, m))
            .buildRs();
}
 
源代码21 项目: quarkus   文件: ProcessorSignatureTest.java
@Incoming("A")
@Outgoing("AA")
public Processor<Message<Integer>, Message<String>> process() {
    return ReactiveStreams.<Message<Integer>> builder()
            .map(m -> Message.of(Integer.toString(m.getPayload())))
            .buildRs();
}
 
@Incoming("count")
@Outgoing("sink")
public Processor<Message<Integer>, Message<String>> process() {
    return ReactiveStreams.<Message<Integer>> builder()
            .map(Message::getPayload)
            .map(i -> i + 1)
            .flatMapRsPublisher(i -> Flowable.just(i, i))
            .map(i -> Integer.toString(i))
            .map(Message::of)
            .buildRs();
}
 
源代码23 项目: RHub   文件: RxJava2ProcProxy.java
@Override
@SuppressWarnings("unchecked")
protected <T> Flowable<T> filter(Processor processor, final Class<T> filterClass) {
    return ((Flowable) processor).filter(new Predicate() {
        @Override
        public boolean test(Object o) {
            return filterClass.isAssignableFrom(o.getClass());
        }
    });
}
 
源代码24 项目: micrometer   文件: StatsdMeterRegistryTest.java
@ParameterizedTest
@EnumSource(StatsdFlavor.class)
void gaugeLineProtocol(StatsdFlavor flavor) {
    final AtomicInteger n = new AtomicInteger(2);
    final StatsdConfig config = configWithFlavor(flavor);

    String line = null;
    switch (flavor) {
        case ETSY:
            line = "myGauge.myTag.val.statistic.value:2|g";
            break;
        case DATADOG:
            line = "my.gauge:2|g|#statistic:value,my.tag:val";
            break;
        case TELEGRAF:
            line = "my_gauge,statistic=value,my_tag=val:2|g";
            break;
        case SYSDIG:
            line = "my.gauge#statistic=value,my.tag=val:2|g";
            break;
        default:
            fail("Unexpected flavor");
    }

    StepVerifier
            .withVirtualTime(() -> {
                final Processor<String, String> lines = lineProcessor();
                registry = StatsdMeterRegistry.builder(config)
                        .clock(clock)
                        .lineSink(toSink(lines))
                        .build();

                registry.gauge("my.gauge", Tags.of("my.tag", "val"), n);
                return lines;
            })
            .then(() -> clock.add(config.step()))
            .thenAwait(config.step())
            .expectNext(line)
            .verifyComplete();
}
 
public DefaultWebSocketHttpResponse(HttpVersion version, HttpResponseStatus status, boolean validateHeaders,
                                    Processor<WebSocketFrame, WebSocketFrame> processor,
                                    WebSocketServerHandshakerFactory handshakerFactory) {
    super(version, status, validateHeaders);
    this.processor = processor;
    this.handshakerFactory = handshakerFactory;
}
 
@Override
public <I, O> ProcessingStage<I, O> create(Engine engine, Stage.ProcessorStage stage) {
    Processor<I, O> processor = Casts.cast(Objects.requireNonNull(
            Objects.requireNonNull(stage).getRsProcessor()));

    return source -> Flowable.defer(() -> {
        Flowable<O> flowable = Flowable.fromPublisher(processor);
        source.safeSubscribe(processor);
        return flowable;
    });
}
 
源代码27 项目: RHub   文件: AkkaProcProxy.java
@Override
@SuppressWarnings("unchecked")
protected Publisher hide(Processor processor) {
    return (Publisher) Source
            .fromPublisher(processor)
            .runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), mat);
}
 
@Incoming(DEFAULT_ACKNOWLEDGMENT)
@Outgoing("sink-" + DEFAULT_ACKNOWLEDGMENT)
public Processor<Message<String>, Message<String>> processorWithAutoAck() {
    return ReactiveStreams.<Message<String>> builder()
            .flatMap(m -> ReactiveStreams.of(Message.of(m.getPayload()), Message.of(m.getPayload())).onComplete(
                    m::ack))
            .peek(m -> processed(DEFAULT_ACKNOWLEDGMENT, m))
            .buildRs();
}
 
@Incoming(MANUAL_ACKNOWLEDGMENT)
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
@Outgoing("sink-" + MANUAL_ACKNOWLEDGMENT)
public Processor<Message<String>, Message<String>> processorWithAck() {
    return ReactiveStreams.<Message<String>> builder()
            .flatMapCompletionStage(m -> m.ack().thenApply(x -> m))
            .flatMap(m -> ReactiveStreams.of(Message.of(m.getPayload()), Message.of(m.getPayload())))
            .peek(m -> processed(MANUAL_ACKNOWLEDGMENT, m))
            .buildRs();
}
 
源代码30 项目: netty-reactive-streams   文件: AkkaStreamsUtil.java
public static <In, Out> Processor<In, Out> flowToProcessor(Flow<In, Out, ?> flow, Materializer materializer) {
    Pair<Subscriber<In>, Publisher<Out>> pair =
            Source.<In>asSubscriber()
                    .via(flow)
                    .toMat(Sink.<Out>asPublisher(AsPublisher.WITH_FANOUT), Keep.<Subscriber<In>, Publisher<Out>>both())
                    .run(materializer);

    return new DelegateProcessor<>(pair.first(), pair.second());
}