下面列出了怎么用org.reactivestreams.Subscription的API类实例代码及写法,或者点击链接到github查看源代码。
void otherError(Throwable t){
Subscription s = S.getAndSet(this, Operators.cancelledSubscription());
if(s != Operators.cancelledSubscription()) {
C b;
synchronized (this) {
b = buffer;
buffer = null;
}
if(s != null){
s.cancel();
}
actual.onError(t);
Operators.onDiscardMultiple(b, this.ctx);
return;
}
Operators.onErrorDropped(t, this.ctx);
}
@Override
public void request(long n) {
if (Operators.validate(n)) {
Subscription a = s;
if (a != null) {
trySchedule(n, a);
}
else {
Operators.addCap(REQUESTED, this, n);
a = s;
if (a != null) {
long r = REQUESTED.getAndSet(this, 0L);
if (r != 0L) {
trySchedule(n, a);
}
}
}
}
}
@Override
public final void onSubscribe(Subscription s) {
if (s == null) {
throw SpecificationExceptions.spec_2_13_exception();
}
try {
if (subscription != null) {
s.cancel();
return;
}
subscription = s;
doSubscribe(this);
} catch (Throwable throwable) {
Exceptions.throwIfFatal(throwable);
doError(throwable);
}
}
@Test
public void scanSingleSubscriber() {
CoreSubscriber<Integer> actual = new LambdaSubscriber<>(null, e -> {}, null, null);
FluxZip.ZipSingleCoordinator<Integer, Integer> main =
new FluxZip.ZipSingleCoordinator<Integer, Integer>(actual, new Object[1], 1, i -> 5);
FluxZip.ZipSingleSubscriber<Integer> test = new FluxZip.ZipSingleSubscriber<>(main, 0);
Subscription parent = Operators.emptySubscription();
test.onSubscribe(parent);
Assertions.assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(parent);
Assertions.assertThat(test.scan(Scannable.Attr.ACTUAL)).isSameAs(main);
test.onNext(7);
Assertions.assertThat(test.scan(Scannable.Attr.BUFFERED)).isEqualTo(1);
Assertions.assertThat(test.scan(Scannable.Attr.TERMINATED)).isTrue();
Assertions.assertThat(test.scan(Scannable.Attr.CANCELLED)).isTrue();
}
@Test
public void testItemsAndCompletion() {
MultiAssertSubscriber<String> subscriber = MultiAssertSubscriber.create();
Subscription subscription = mock(Subscription.class);
subscriber.assertNotTerminated();
subscriber.onSubscribe(subscription);
subscriber.request(2);
verify(subscription).request(2);
subscriber.assertSubscribed();
subscriber.onNext("a");
subscriber.onNext("b");
subscriber.onComplete();
subscriber.assertReceived("a", "b")
.assertCompletedSuccessfully()
.assertHasNotFailed();
}
@Override
public void onSubscribe(Subscription s) {
delegate.onSubscribe(new Subscription() {
@Override
public void request(long n) {
if (n <= 0) {
throw new IllegalArgumentException("n > 0 required but it was " + n);
}
downstreamDemand.getAndAdd(n);
if (upstreamDone.get()) {
sendTrailingEmptyFrame();
} else {
s.request(n);
}
}
@Override
public void cancel() {
s.cancel();
}
});
}
@Test
public void testAwaitOnFailure() {
MultiAssertSubscriber<String> subscriber = MultiAssertSubscriber.create();
Subscription subscription = mock(Subscription.class);
subscriber.onSubscribe(subscription);
subscriber.request(2);
new Thread(() -> {
subscriber.onNext("1");
subscriber.onNext("2");
subscriber.onError(new Exception("boom"));
}).start();
subscriber.await();
subscriber.assertHasFailedWith(Exception.class, "boom");
}
@Test
public void scanThenAcceptInner() {
CoreSubscriber<String> actual = new LambdaMonoSubscriber<>(null, e -> {}, null, null);
MonoIgnoreThen.ThenIgnoreMain<String> main = new MonoIgnoreThen.ThenIgnoreMain<>(actual, new Publisher[0], Mono.just("foo"));
MonoIgnoreThen.ThenAcceptInner<String> test = new MonoIgnoreThen.ThenAcceptInner<>(main);
Subscription parent = Operators.emptySubscription();
test.onSubscribe(parent);
assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(parent);
assertThat(test.scan(Scannable.Attr.ACTUAL)).isSameAs(main);
assertThat(test.scan(Scannable.Attr.TERMINATED)).isFalse();
test.onError(new IllegalStateException("boom"));
assertThat(test.scan(Scannable.Attr.TERMINATED)).isTrue();
assertThat(test.scan(Scannable.Attr.CANCELLED)).isFalse();
test.cancel();
assertThat(test.scan(Scannable.Attr.CANCELLED)).isTrue();
}
private void rejectSubscription(SdkPublisher<SubscribeToShardEventStream> publisher) {
publisher.subscribe(new Subscriber<SubscribeToShardEventStream>() {
Subscription localSub;
@Override
public void onSubscribe(Subscription s) {
localSub = s;
localSub.cancel();
}
@Override
public void onNext(SubscribeToShardEventStream subscribeToShardEventStream) {
localSub.cancel();
}
@Override
public void onError(Throwable t) {
localSub.cancel();
}
@Override
public void onComplete() {
localSub.cancel();
}
});
}
@Test
public void scanMain() {
CoreSubscriber<Integer> actual = new LambdaSubscriber<>(null, e -> {}, null, null);
FluxSampleTimeout.SampleTimeoutMain<Integer, Integer> test =
new FluxSampleTimeout.SampleTimeoutMain<>(actual, i -> Flux.just(i),
Queues.<SampleTimeoutOther<Integer, Integer>>one().get());
Subscription parent = Operators.emptySubscription();
test.onSubscribe(parent);
Assertions.assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(parent);
Assertions.assertThat(test.scan(Scannable.Attr.ACTUAL)).isSameAs(actual);
test.requested = 35;
Assertions.assertThat(test.scan(Scannable.Attr.REQUESTED_FROM_DOWNSTREAM)).isEqualTo(35L);
test.queue.add(new FluxSampleTimeout.SampleTimeoutOther<Integer, Integer>(test, 1, 0));
Assertions.assertThat(test.scan(Scannable.Attr.BUFFERED)).isEqualTo(1);
Assertions.assertThat(test.scan(Scannable.Attr.CANCELLED)).isFalse();
Assertions.assertThat(test.scan(Scannable.Attr.TERMINATED)).isFalse();
test.error = new IllegalStateException("boom");
Assertions.assertThat(test.scan(Scannable.Attr.ERROR)).hasMessage("boom");
test.onComplete();
Assertions.assertThat(test.scan(Scannable.Attr.TERMINATED)).isTrue();
Assertions.assertThat(test.scan(Scannable.Attr.CANCELLED)).isTrue();
}
/**
* Atomically requests from the Subscription in the field if not null, otherwise accumulates
* the request amount in the requested field to be requested once the field is set to non-null.
*
* @param field the target field that may already contain a Subscription
* @param requested the current requested amount
* @param requests the request amount, positive (verified)
*/
public static void requestIfNotNullOrAccumulate(AtomicReference<Subscription> field, AtomicLong requested, long requests) {
Subscription subscription = field.get();
if (subscription != null) {
subscription.request(requests);
} else {
if (requests > 0) {
add(requested, requests);
subscription = field.get();
if (subscription != null) {
long r = requested.getAndSet(0L);
if (r != 0L) {
subscription.request(r);
}
}
}
}
}
@Test
public void testWrappedSubscription() {
Subscription subscription = new Subscription() {
@Override
public void request(long n) {
}
@Override
public void cancel() {
}
};
WrappedSubscription wrapped = new WrappedSubscription(subscription, null);
assertThat(wrapped).isNotNull();
wrapped.request(10);
wrapped.cancel();
}
@Test
public void scanResourceSubscriber() {
CoreSubscriber<Integer> actual = new LambdaSubscriber<>(null, e -> {}, null, null);
ResourceSubscriber<String, Integer> op = new ResourceSubscriber<>(actual, s -> Mono.just(s.length()), Mono::just, (res, err) -> Mono.just(res), Mono::just, true);
final Subscription parent = Operators.emptySubscription();
op.onSubscribe(parent);
assertThat(op.scan(Attr.PARENT)).as("PARENT").isSameAs(parent);
assertThat(op.scan(Attr.ACTUAL)).as("ACTUAL").isSameAs(actual);
assertThat(op.scan(Attr.PREFETCH)).as("PREFETCH").isEqualTo(Integer.MAX_VALUE);
assertThat(op.scan(Attr.TERMINATED)).as("TERMINATED").isFalse();
op.resourceProvided = true;
assertThat(op.scan(Attr.TERMINATED)).as("TERMINATED resourceProvided").isTrue();
assertThat(op.scanUnsafe(Attr.CANCELLED)).as("CANCELLED not supported").isNull();
}
/**
* This assumes that implementations do not issue an HTTP request until
* {@link Subscription#request(long)} is called. Since a client span is only for
* remote operations, we should not create one when we know a network request won't
* happen. In this case, we ensure a canceled subscription doesn't end up traced.
*/
@Test
public void cancelledSubscription_doesntTrace() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
BaseSubscriber<Integer> subscriber = new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
subscription.cancel();
latch.countDown();
}
};
getMono(client, "/foo").subscribe(subscriber);
latch.await();
assertThat(server.getRequestCount()).isZero();
// post-conditions will prove no span was created
}
@Override
public void request(long n) {
Subscription a = s;
if (a != null) {
a.request(n);
} else {
BackpressureHelper.getAndAddCap(REQUESTED, this, n);
a = s;
if (a != null) {
long r = REQUESTED.getAndSet(this, 0L);
if (r != 0L) {
a.request(r);
}
}
}
}
@Test
public void errorInStream_completesFuture() {
Path testPath = testFs.getPath("test_file.txt");
FileAsyncResponseTransformer xformer = new FileAsyncResponseTransformer(testPath);
CompletableFuture prepareFuture = xformer.prepare();
xformer.onResponse(new Object());
xformer.onStream(subscriber -> {
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long l) {
}
@Override
public void cancel() {
}
});
subscriber.onError(new RuntimeException("Something went wrong"));
});
assertThat(prepareFuture.isCompletedExceptionally()).isTrue();
}
@Test
public void scanMain() {
CoreSubscriber<GroupedFlux<Integer, String>> actual = new LambdaSubscriber<>(null, e -> {}, null, null);
FluxGroupBy.GroupByMain<Integer, Integer, String> test = new FluxGroupBy.GroupByMain<>(actual,
Queues.<GroupedFlux<Integer, String>>one().get(), Queues.one(), 123, i -> i % 5, i -> String.valueOf(i));
Subscription sub = Operators.emptySubscription();
test.onSubscribe(sub);
assertThat(test.scan(Scannable.Attr.ACTUAL)).isSameAs(actual);
assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(sub);
assertThat(test.scan(Scannable.Attr.REQUESTED_FROM_DOWNSTREAM)).isEqualTo(Long.MAX_VALUE);
assertThat(test.scan(Scannable.Attr.PREFETCH)).isSameAs(123);
assertThat(test.scan(Scannable.Attr.BUFFERED)).isSameAs(0);
assertThat(test.scan(Scannable.Attr.CANCELLED)).isFalse();
assertThat(test.scan(Scannable.Attr.TERMINATED)).isFalse();
assertThat(test.scan(Scannable.Attr.ERROR)).isNull();
test.error = new IllegalStateException("boom");
assertThat(test.scan(Scannable.Attr.ERROR)).isSameAs(test.error);
}
@Override
public void onSubscribe(Subscription subscription) {
sourceSubscription = subscription;
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long l) {
synchronized (lock) {
demand.addAndGet(l);
// Execution goes into `if` block only once for the initial request
// After that requestedNextBatch is always true and more requests are made in fulfillDemand()
if (!requestedNextBatch) {
requestedNextBatch = true;
sourceSubscription.request(1);
} else {
fulfillDemand();
}
}
}
@Override
public void cancel() {
subscription.cancel();
}
});
}
@Test
public void testThatDownstreamFailuresAreHandledInOnError() {
Subscriber<String> subscriber = mock(Subscriber.class);
doAnswer(new ThrowsException(new IllegalStateException("boom"))).when(subscriber).onError(any());
Subscription subscription = mock(Subscription.class);
SafeSubscriber<String> safe = new SafeSubscriber<>(subscriber);
safe.onSubscribe(subscription);
verify(subscriber, times(1)).onSubscribe(safe);
Exception boom = new Exception("boom");
safe.onError(boom);
// called
verify(subscriber, times(1)).onError(boom);
assertThat(safe.isDone()).isTrue();
}
@Test
public void testWrappedSubscription() {
Subscription subscription = new Subscription() {
@Override
public void request(long n) {
}
@Override
public void cancel() {
}
};
WrappedSubscription wrapped = new WrappedSubscription(subscription, null);
assertThat(wrapped).isNotNull();
wrapped.request(10);
wrapped.cancel();
}
static ReactiveSeq<Integer> interval(String cron,ScheduledExecutorService exec) {
ReactiveSubscriber<Integer> sub = reactiveSubscriber();
AtomicBoolean isOpen = new AtomicBoolean(true);
Subscription[] s= {null};
sub.onSubscribe(new Subscription() {
@Override
public void request(long n) {
s[0].request(n);
}
@Override
public void cancel() {
isOpen.set(false);
}
});
s[0] = ReactiveSeq.iterate(1, a -> a + 1)
.takeWhile(e -> isOpen.get())
.schedule(cron, exec)
.connect()
.forEach(1, e -> sub.onNext(e));
return sub.reactiveStream();
}
@Test
public void scanSubscriber() {
CoreSubscriber<String> actual = new LambdaSubscriber<>(null, e -> {}, null, null);
DistinctSubscriber<String, Integer, Set<Integer>> test =
new DistinctSubscriber<>(actual, new HashSet<>(), String::hashCode, Set::add, Set::clear);
Subscription parent = Operators.emptySubscription();
test.onSubscribe(parent);
assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(parent);
assertThat(test.scan(Scannable.Attr.ACTUAL)).isSameAs(actual);
assertThat(test.scan(Scannable.Attr.TERMINATED)).isFalse();
test.onError(new IllegalStateException("boom"));
assertThat(test.scan(Scannable.Attr.TERMINATED)).isTrue();
}
@Override
public void onComplete() {
Subscription o = other;
if (o instanceof FluxSampleTimeout.SampleTimeoutOther) {
SampleTimeoutOther<?, ?> os = (SampleTimeoutOther<?, ?>) o;
os.cancel();
os.onComplete();
}
done = true;
drain();
}
@Test
public void testPublishBroadcast() throws Exception{
Flux<Long> fibonacciGenerator = Flux.generate(() -> Tuples.<Long,
Long>of(0L, 1L), (state, sink) -> {
if (state.getT1() < 0)
sink.complete();
else
sink.next(state.getT1());
System.out.println("generating next of "+ state.getT2());
return Tuples.of(state.getT2(), state.getT1() + state.getT2());
});
fibonacciGenerator=fibonacciGenerator.doFinally(x -> {
System.out.println("Closing ");
}).publish().autoConnect(2);
fibonacciGenerator.subscribe(new BaseSubscriber<Long>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(1);
}
@Override
protected void hookOnNext(Long value) {
System.out.println("1st: "+value);
}
});
fibonacciGenerator.subscribe(x -> System.out.println("2nd : "+x));
Thread.sleep(500);
}
private Stream<Integer> nextAsyncRS() {
ReactiveSubscriber<Integer> sub = Spouts.reactiveSubscriber();
AtomicLong req = new AtomicLong(0);
int id = start.incrementAndGet();
sub.onSubscribe(new Subscription() {
@Override
public void request(long n) {
req.addAndGet(n);
}
@Override
public void cancel() {
}
public String toString(){
return "subscription " + id;
}
});
new Thread(()->{
int sent=0;
while(sent<2){
if(req.get()>0){
sub.onNext( ++sent);
req.decrementAndGet();
}
}
sub.onComplete();
// Flux.just(1,2).forEachAsync(sub);
}).start();
return sub.reactiveStream();
}
private void onSubscribe(final Subscription subscription) {
if (this.subscription != null) {
subscription.cancel();
} else {
this.subscription = subscription;
sender.tell(getSubscriptionCreated(), ActorRef.noSender());
unstashAll();
}
}
@Override
public void onSubscribe(Subscription s) {
if (Operators.validate(this.s, s)) {
this.s = s;
if (s instanceof Fuseable.QueueSubscription) {
@SuppressWarnings("unchecked")
Fuseable.QueueSubscription<T> qs = (Fuseable.QueueSubscription<T>) s;
int m = qs.requestFusion(Fuseable.ANY | Fuseable.THREAD_BARRIER);
if (m == Fuseable.SYNC) {
sourceMode = m;
queue = qs;
done = true;
setupSubscribers();
drain();
return;
} else
if (m == Fuseable.ASYNC) {
sourceMode = m;
queue = qs;
setupSubscribers();
s.request(Operators.unboundedOrPrefetch(prefetch));
return;
}
}
queue = queueSupplier.get();
setupSubscribers();
s.request(Operators.unboundedOrPrefetch(prefetch));
}
}
private Publisher<Map<String, Object>> getCounter(DataFetchingEnvironment env) {
boolean finite = env.getArgument("finite");
return subscriber -> {
Subscription subscription = new Subscription() {
@Override
public void request(long n) {
}
@Override
public void cancel() {
if (!subscriptionRef.compareAndSet(this, null)) {
fail();
}
}
};
if (!subscriptionRef.compareAndSet(null, subscription)) {
fail();
}
subscriber.onSubscribe(subscription);
IntStream.range(0, 5).forEach(num -> {
Map<String, Object> counter = new HashMap<>();
counter.put("count", num);
subscriber.onNext(counter);
});
if (finite) {
subscriber.onComplete();
if (!subscriptionRef.compareAndSet(subscription, null)) {
fail();
}
}
};
}
@Override
protected void configure(ServerBuilder sb) throws Exception {
sb.service("/large-stream", new AbstractHttpService() {
@Override
protected HttpResponse doGet(ServiceRequestContext ctx, HttpRequest req) throws Exception {
return HttpResponse.of(s -> s.onSubscribe(new Subscription() {
int count;
@Override
public void request(long n) {
for (int i = 0; i < n; i++) {
if (count == 0) {
s.onNext(ResponseHeaders.of(HttpStatus.OK));
} else {
s.onNext(HttpData.wrap(new byte[1024]));
}
}
count += n;
// 10MB
if (count > 1024 * 10) {
s.onComplete();
}
}
@Override
public void cancel() {
}
}));
}
});
sb.requestTimeout(Duration.of(30, ChronoUnit.SECONDS));
}
@Override
@SuppressWarnings("unchecked")
public void onSubscribe(Subscription s) {
if (Operators.validate(this.s, s)) {
this.s = (QueueSubscription<T>) s;
this.start = System.nanoTime();
actual.onSubscribe(this);
}
}