下面列出了怎么用org.reactivestreams.Processor的API类实例代码及写法,或者点击链接到github查看源代码。
@Test(invocationCount = 50)
public void verifyOnErrorThreadSafety() {
Exception failure = new Exception("boom");
final Processor<Integer, Integer> processor = UnicastProcessor.<Integer> create().serialized();
MultiAssertSubscriber<Integer> subscriber = MultiAssertSubscriber.create(100);
processor.subscribe(subscriber);
Runnable r1 = () -> processor.onError(failure);
Runnable r2 = () -> processor.onError(failure);
new Thread(r1).start();
new Thread(r2).start();
subscriber
.await()
.assertSubscribed()
.assertHasFailedWith(Exception.class, "boom");
}
@Test(invocationCount = 20)
public void verifyOnNextOnCompleteThreadSafety() {
final Processor<Integer, Integer> processor = UnicastProcessor.<Integer> create().serialized();
MultiAssertSubscriber<Integer> subscriber = MultiAssertSubscriber.create(100);
processor.subscribe(subscriber);
Runnable r1 = () -> {
processor.onNext(1);
processor.onComplete();
};
Runnable r2 = processor::onComplete;
new Thread(r1).start();
new Thread(r2).start();
subscriber.await();
subscriber
.assertSubscribed()
.assertCompletedSuccessfully();
if (subscriber.items().size() != 0) {
assertThat(subscriber.items()).containsExactly(1);
}
}
@Test(invocationCount = 20)
public void verifyOnSubscribeOnCompleteThreadSafety() {
final Processor<Integer, Integer> processor = UnicastProcessor.<Integer> create().serialized();
MultiAssertSubscriber<Integer> subscriber = MultiAssertSubscriber.create(100);
processor.subscribe(subscriber);
Runnable r1 = () -> {
processor.onNext(1);
processor.onComplete();
};
Runnable r2 = () -> processor.onSubscribe(new Subscriptions.EmptySubscription());
new Thread(r1).start();
new Thread(r2).start();
subscriber.await();
subscriber
.assertSubscribed()
.assertCompletedSuccessfully();
if (subscriber.items().size() != 0) {
assertThat(subscriber.items()).containsExactly(1);
}
}
@Test
public void shouldHandleAddCommands() {
// given:
final Processor<GSet.AddCommand<String>, GSet.AddCommand<String>> inputStream = ReplayProcessor.create();
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final GSet<String> set = new GSet<>("ID_1");
set.subscribeTo(inputStream);
set.subscribe(subscriber);
final GSet.AddCommand<String> command1 = new GSet.AddCommand<>(set.getCrdtId(), "1");
final GSet.AddCommand<String> command2 = new GSet.AddCommand<>(set.getCrdtId(), "2");
final GSet.AddCommand<String> command3 = new GSet.AddCommand<>(set.getCrdtId(), "1");
// when:
inputStream.onNext(command1);
inputStream.onNext(command2);
inputStream.onNext(command3);
// then:
assertThat(set, hasSize(2));
assertThat(subscriber.valueCount(), is(2));
subscriber.assertNotComplete();
subscriber.assertNoErrors();
}
@Test
public void shouldHandleRemoveCommandArrivesBeforeAddCommand() {
// given:
final Processor<TwoPSet.TwoPSetCommand<String>, TwoPSet.TwoPSetCommand<String>> inputStream = ReplayProcessor.create();
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final TwoPSet<String> set = new TwoPSet<>("ID_1");
set.subscribeTo(inputStream);
set.subscribe(subscriber);
final TwoPSet.RemoveCommand<String> command1 = new TwoPSet.RemoveCommand<>(set.getCrdtId(), "1");
final TwoPSet.AddCommand<String> command2 = new TwoPSet.AddCommand<>(set.getCrdtId(), "1");
final TwoPSet.AddCommand<String> command3 = new TwoPSet.AddCommand<>(set.getCrdtId(), "1");
// when:
inputStream.onNext(command1);
inputStream.onNext(command2);
inputStream.onNext(command3);
// then:
assertThat(set, empty());
assertThat(subscriber.valueCount(), is(1));
subscriber.assertNotComplete();
subscriber.assertNoErrors();
}
@Test
public void shouldHandleAddCommands() {
// given:
final Processor<TwoPSet.TwoPSetCommand<String>, TwoPSet.TwoPSetCommand<String>> inputStream = ReplayProcessor.create();
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final TwoPSet<String> set = new TwoPSet<>("ID_1");
set.subscribeTo(inputStream);
set.subscribe(subscriber);
final TwoPSet.AddCommand<String> command1 = new TwoPSet.AddCommand<>(set.getCrdtId(), "1");
final TwoPSet.AddCommand<String> command2 = new TwoPSet.AddCommand<>(set.getCrdtId(), "2");
final TwoPSet.AddCommand<String> command3 = new TwoPSet.AddCommand<>(set.getCrdtId(), "1");
// when:
inputStream.onNext(command1);
inputStream.onNext(command2);
inputStream.onNext(command3);
// then:
assertThat(set, hasSize(2));
assertThat(subscriber.valueCount(), is(2));
subscriber.assertNotComplete();
subscriber.assertNoErrors();
}
@Override
public <T, R> SubscriberWithCompletionStage<T, R> buildSubscriber(Graph graph) {
Processor<T, T> processor = new ConnectableProcessor<>();
Flowable<T> flowable = Flowable.fromPublisher(processor);
for (Stage stage : graph.getStages()) {
Operator operator = Stages.lookup(stage);
if (operator instanceof ProcessorOperator) {
flowable = applyProcessors(flowable, stage, (ProcessorOperator) operator);
} else if (operator instanceof TerminalOperator) {
CompletionStage<R> result = applySubscriber(Transformer.apply(flowable), stage,
(TerminalOperator) operator);
return new DefaultSubscriberWithCompletionStage<>(processor, result);
} else {
throw new UnsupportedStageException(stage);
}
}
throw new IllegalArgumentException("The graph does not have a valid final stage");
}
private boolean sendToKeyBoundBus(RxQueueKey key, Object event)
{
RxQueueKey keyToUse = key.clone();
boolean send = false;
Processor processor;
if (mKey instanceof String)
keyToUse.withId((String)mKey);
else if (mKey instanceof Integer)
keyToUse.withId((Integer)mKey);
processor = RxBus.getInstance().getProcessor(keyToUse, false);
// only send event, if processor exists => this means someone has at least once subscribed to it
if (processor != null)
{
if (mCast == null)
processor.onNext(event);
else
processor.onNext(mCast.cast(event));
send = true;
}
return send;
}
@Override
public void onError(Throwable t) {
if (done) {
Operators.onErrorDropped(t, actual.currentContext());
return;
}
done = true;
for (Processor<T, T> w : this) {
w.onError(t);
}
clear();
error = t;
drain();
}
@Test
public void shouldHandleDuplicateCommands() {
// given:
final Processor<GSet.AddCommand<String>, GSet.AddCommand<String>> inputStream = ReplayProcessor.create();
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final GSet<String> set = new GSet<>("ID_1");
set.subscribeTo(inputStream);
set.subscribe(subscriber);
final GSet.AddCommand<String> command = new GSet.AddCommand<>(set.getCrdtId(), "1");
// when:
inputStream.onNext(command);
inputStream.onNext(command);
// then:
assertThat(set, hasSize(1));
assertThat(subscriber.valueCount(), is(1));
subscriber.assertNotComplete();
subscriber.assertNoErrors();
}
@Test
public void shouldHandleRemoveCommands() {
// given:
final UUID uuid1 = UUID.randomUUID();
final Processor<USet.USetCommand<UUID>, USet.USetCommand<UUID>> inputStream = ReplayProcessor.create();
final TestSubscriber<CrdtCommand> subscriber = TestSubscriber.create();
final USet<UUID> set = new USet<>("ID_1");
set.subscribeTo(inputStream);
set.subscribe(subscriber);
final USet.AddCommand<UUID> command1 = new USet.AddCommand<>(set.getCrdtId(), uuid1);
final USet.RemoveCommand<UUID> command2 = new USet.RemoveCommand<>(set.getCrdtId(), uuid1);
// when:
inputStream.onNext(command1);
inputStream.onNext(command2);
// then:
assertThat(set, empty());
assertThat(subscriber.valueCount(), is(2));
subscriber.assertNotComplete();
subscriber.assertNoErrors();
}
private boolean sendToUnboundBus(RxQueueKey key, Object event)
{
boolean send = false;
Processor processor = RxBus.getInstance().getProcessor(key, false);
// only send event, if processor exists => this means someone has at least once subscribed to it
if (processor != null)
{
if (mCast == null)
processor.onNext(event);
else
processor.onNext(mCast.cast(event));
send = true;
}
return send;
}
@Override
public void onFailure(Throwable failure) {
Subscription subscription = upstream.getAndSet(CANCELLED);
if (subscription != CANCELLED) {
Processor<T, T> proc = processor;
if (proc != null) {
processor = null;
proc.onError(failure);
}
downstream.onFailure(failure);
}
}
@Override
public void onCompletion() {
Subscription subscription = upstream.getAndSet(CANCELLED);
if (subscription != CANCELLED) {
Processor<T, T> proc = processor;
if (proc != null) {
processor = null;
proc.onComplete();
}
downstream.onCompletion();
}
}
@Test
public void testWithMultipleItems() {
Processor<Integer, Integer> processor = UnicastProcessor.<Integer> create().serialized();
MultiAssertSubscriber<Integer> subscriber = MultiAssertSubscriber.create(10);
processor.subscribe(subscriber);
Multi.createFrom().range(1, 11).subscribe(processor);
subscriber
.assertReceived(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.assertCompletedSuccessfully();
processor.onNext(11);
processor.onComplete();
}
private Consumer<String> toSink(Processor<String, String> lines, int numLines) {
AtomicInteger latch = new AtomicInteger(numLines);
return l -> {
lines.onNext(l);
if (latch.decrementAndGet() == 0) {
lines.onComplete();
}
};
}
@Test
public void testSubscriptionAfterTerminalEvent() {
final Processor<Integer, Integer> processor = UnicastProcessor.<Integer> create().serialized();
processor.onComplete();
Subscription subscription = mock(Subscription.class);
processor.onSubscribe(subscription);
verify(subscription).cancel();
}
@Incoming("count")
@Outgoing("sink")
public Processor<Integer, String> process() {
return ReactiveStreams.<Integer> builder()
.map(i -> i + 1)
.flatMapRsPublisher(i -> Flowable.just(i, i))
.map(i -> Integer.toString(i))
.buildRs();
}
@Override
public void onError(Throwable t) {
if (done) {
UnsignalledExceptions.onErrorDropped(t);
return;
}
Processor<T, T> w = window;
if (w != null) {
window = null;
w.onError(t);
}
actual.onError(t);
}
@Incoming(PRE_ACKNOWLEDGMENT)
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
@Outgoing("sink-" + PRE_ACKNOWLEDGMENT)
public Processor<Message<String>, Message<String>> processorWithPreAck() {
return ReactiveStreams.<Message<String>> builder()
.flatMap(m -> ReactiveStreams.of(Message.of(m.getPayload()), Message.of(m.getPayload())))
.peek(m -> processed(PRE_ACKNOWLEDGMENT, m))
.buildRs();
}
@Incoming("A")
@Outgoing("AA")
public Processor<Message<Integer>, Message<String>> process() {
return ReactiveStreams.<Message<Integer>> builder()
.map(m -> Message.of(Integer.toString(m.getPayload())))
.buildRs();
}
@Incoming("count")
@Outgoing("sink")
public Processor<Message<Integer>, Message<String>> process() {
return ReactiveStreams.<Message<Integer>> builder()
.map(Message::getPayload)
.map(i -> i + 1)
.flatMapRsPublisher(i -> Flowable.just(i, i))
.map(i -> Integer.toString(i))
.map(Message::of)
.buildRs();
}
@Override
@SuppressWarnings("unchecked")
protected <T> Flowable<T> filter(Processor processor, final Class<T> filterClass) {
return ((Flowable) processor).filter(new Predicate() {
@Override
public boolean test(Object o) {
return filterClass.isAssignableFrom(o.getClass());
}
});
}
@ParameterizedTest
@EnumSource(StatsdFlavor.class)
void gaugeLineProtocol(StatsdFlavor flavor) {
final AtomicInteger n = new AtomicInteger(2);
final StatsdConfig config = configWithFlavor(flavor);
String line = null;
switch (flavor) {
case ETSY:
line = "myGauge.myTag.val.statistic.value:2|g";
break;
case DATADOG:
line = "my.gauge:2|g|#statistic:value,my.tag:val";
break;
case TELEGRAF:
line = "my_gauge,statistic=value,my_tag=val:2|g";
break;
case SYSDIG:
line = "my.gauge#statistic=value,my.tag=val:2|g";
break;
default:
fail("Unexpected flavor");
}
StepVerifier
.withVirtualTime(() -> {
final Processor<String, String> lines = lineProcessor();
registry = StatsdMeterRegistry.builder(config)
.clock(clock)
.lineSink(toSink(lines))
.build();
registry.gauge("my.gauge", Tags.of("my.tag", "val"), n);
return lines;
})
.then(() -> clock.add(config.step()))
.thenAwait(config.step())
.expectNext(line)
.verifyComplete();
}
public DefaultWebSocketHttpResponse(HttpVersion version, HttpResponseStatus status, boolean validateHeaders,
Processor<WebSocketFrame, WebSocketFrame> processor,
WebSocketServerHandshakerFactory handshakerFactory) {
super(version, status, validateHeaders);
this.processor = processor;
this.handshakerFactory = handshakerFactory;
}
@Override
public <I, O> ProcessingStage<I, O> create(Engine engine, Stage.ProcessorStage stage) {
Processor<I, O> processor = Casts.cast(Objects.requireNonNull(
Objects.requireNonNull(stage).getRsProcessor()));
return source -> Flowable.defer(() -> {
Flowable<O> flowable = Flowable.fromPublisher(processor);
source.safeSubscribe(processor);
return flowable;
});
}
@Override
@SuppressWarnings("unchecked")
protected Publisher hide(Processor processor) {
return (Publisher) Source
.fromPublisher(processor)
.runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), mat);
}
@Incoming(DEFAULT_ACKNOWLEDGMENT)
@Outgoing("sink-" + DEFAULT_ACKNOWLEDGMENT)
public Processor<Message<String>, Message<String>> processorWithAutoAck() {
return ReactiveStreams.<Message<String>> builder()
.flatMap(m -> ReactiveStreams.of(Message.of(m.getPayload()), Message.of(m.getPayload())).onComplete(
m::ack))
.peek(m -> processed(DEFAULT_ACKNOWLEDGMENT, m))
.buildRs();
}
@Incoming(MANUAL_ACKNOWLEDGMENT)
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
@Outgoing("sink-" + MANUAL_ACKNOWLEDGMENT)
public Processor<Message<String>, Message<String>> processorWithAck() {
return ReactiveStreams.<Message<String>> builder()
.flatMapCompletionStage(m -> m.ack().thenApply(x -> m))
.flatMap(m -> ReactiveStreams.of(Message.of(m.getPayload()), Message.of(m.getPayload())))
.peek(m -> processed(MANUAL_ACKNOWLEDGMENT, m))
.buildRs();
}
public static <In, Out> Processor<In, Out> flowToProcessor(Flow<In, Out, ?> flow, Materializer materializer) {
Pair<Subscriber<In>, Publisher<Out>> pair =
Source.<In>asSubscriber()
.via(flow)
.toMat(Sink.<Out>asPublisher(AsPublisher.WITH_FANOUT), Keep.<Subscriber<In>, Publisher<Out>>both())
.run(materializer);
return new DelegateProcessor<>(pair.first(), pair.second());
}