类org.reactivestreams.Subscription源码实例Demo

下面列出了怎么用org.reactivestreams.Subscription的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: reactor-core   文件: FluxBufferBoundary.java
void otherError(Throwable t){
	Subscription s = S.getAndSet(this, Operators.cancelledSubscription());
	if(s != Operators.cancelledSubscription()) {
		C b;
		synchronized (this) {
			b = buffer;
			buffer = null;
		}

		if(s != null){
			s.cancel();
		}

		actual.onError(t);
		Operators.onDiscardMultiple(b, this.ctx);
		return;
	}
	Operators.onErrorDropped(t, this.ctx);
}
 
源代码2 项目: reactor-core   文件: MonoSubscribeOn.java
@Override
public void request(long n) {
	if (Operators.validate(n)) {
		Subscription a = s;
		if (a != null) {
			trySchedule(n, a);
		}
		else {
			Operators.addCap(REQUESTED, this, n);
			a = s;
			if (a != null) {
				long r = REQUESTED.getAndSet(this, 0L);
				if (r != 0L) {
					trySchedule(n, a);
				}
			}
		}
	}
}
 
源代码3 项目: camunda-bpm-reactor   文件: SubscriberBarrier.java
@Override
public final void onSubscribe(Subscription s) {
  if (s == null) {
    throw SpecificationExceptions.spec_2_13_exception();
  }

  try {
    if (subscription != null) {
      s.cancel();
      return;
    }
    subscription = s;
    doSubscribe(this);
  } catch (Throwable throwable) {
    Exceptions.throwIfFatal(throwable);
    doError(throwable);
  }
}
 
源代码4 项目: reactor-core   文件: FluxZipTest.java
@Test
public void scanSingleSubscriber() {
    CoreSubscriber<Integer> actual = new LambdaSubscriber<>(null, e -> {}, null, null);
    FluxZip.ZipSingleCoordinator<Integer, Integer> main =
new FluxZip.ZipSingleCoordinator<Integer, Integer>(actual, new Object[1], 1, i -> 5);
    FluxZip.ZipSingleSubscriber<Integer> test = new FluxZip.ZipSingleSubscriber<>(main, 0);
    Subscription parent = Operators.emptySubscription();
    test.onSubscribe(parent);

    Assertions.assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(parent);
    Assertions.assertThat(test.scan(Scannable.Attr.ACTUAL)).isSameAs(main);
    test.onNext(7);
    Assertions.assertThat(test.scan(Scannable.Attr.BUFFERED)).isEqualTo(1);
    Assertions.assertThat(test.scan(Scannable.Attr.TERMINATED)).isTrue();
    Assertions.assertThat(test.scan(Scannable.Attr.CANCELLED)).isTrue();
}
 
@Test
public void testItemsAndCompletion() {
    MultiAssertSubscriber<String> subscriber = MultiAssertSubscriber.create();
    Subscription subscription = mock(Subscription.class);
    subscriber.assertNotTerminated();
    subscriber.onSubscribe(subscription);
    subscriber.request(2);
    verify(subscription).request(2);
    subscriber.assertSubscribed();
    subscriber.onNext("a");
    subscriber.onNext("b");
    subscriber.onComplete();

    subscriber.assertReceived("a", "b")
            .assertCompletedSuccessfully()
            .assertHasNotFailed();
}
 
@Override
public void onSubscribe(Subscription s) {
    delegate.onSubscribe(new Subscription() {
        @Override
        public void request(long n) {
            if (n <= 0) {
                throw new IllegalArgumentException("n > 0 required but it was " + n);
            }

            downstreamDemand.getAndAdd(n);

            if (upstreamDone.get()) {
                sendTrailingEmptyFrame();
            } else {
                s.request(n);
            }
        }

        @Override
        public void cancel() {
            s.cancel();
        }
    });
}
 
@Test
public void testAwaitOnFailure() {
    MultiAssertSubscriber<String> subscriber = MultiAssertSubscriber.create();
    Subscription subscription = mock(Subscription.class);

    subscriber.onSubscribe(subscription);
    subscriber.request(2);

    new Thread(() -> {
        subscriber.onNext("1");
        subscriber.onNext("2");
        subscriber.onError(new Exception("boom"));
    }).start();

    subscriber.await();
    subscriber.assertHasFailedWith(Exception.class, "boom");
}
 
源代码8 项目: reactor-core   文件: MonoThenIgnoreTest.java
@Test
public void scanThenAcceptInner() {
	CoreSubscriber<String> actual = new LambdaMonoSubscriber<>(null, e -> {}, null, null);
	MonoIgnoreThen.ThenIgnoreMain<String> main = new MonoIgnoreThen.ThenIgnoreMain<>(actual, new Publisher[0], Mono.just("foo"));

	MonoIgnoreThen.ThenAcceptInner<String> test = new MonoIgnoreThen.ThenAcceptInner<>(main);
	Subscription parent = Operators.emptySubscription();
	test.onSubscribe(parent);

	assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(parent);
	assertThat(test.scan(Scannable.Attr.ACTUAL)).isSameAs(main);

	assertThat(test.scan(Scannable.Attr.TERMINATED)).isFalse();
	test.onError(new IllegalStateException("boom"));
	assertThat(test.scan(Scannable.Attr.TERMINATED)).isTrue();

	assertThat(test.scan(Scannable.Attr.CANCELLED)).isFalse();
	test.cancel();
	assertThat(test.scan(Scannable.Attr.CANCELLED)).isTrue();
}
 
private void rejectSubscription(SdkPublisher<SubscribeToShardEventStream> publisher) {
    publisher.subscribe(new Subscriber<SubscribeToShardEventStream>() {
        Subscription localSub;

        @Override
        public void onSubscribe(Subscription s) {
            localSub = s;
            localSub.cancel();
        }

        @Override
        public void onNext(SubscribeToShardEventStream subscribeToShardEventStream) {
            localSub.cancel();
        }

        @Override
        public void onError(Throwable t) {
            localSub.cancel();
        }

        @Override
        public void onComplete() {
            localSub.cancel();
        }
    });
}
 
源代码10 项目: reactor-core   文件: FluxSampleTimeoutTest.java
@Test
public void scanMain() {
    CoreSubscriber<Integer> actual = new LambdaSubscriber<>(null, e -> {}, null, null);
    FluxSampleTimeout.SampleTimeoutMain<Integer, Integer> test =
    		new FluxSampleTimeout.SampleTimeoutMain<>(actual, i -> Flux.just(i),
    				Queues.<SampleTimeoutOther<Integer, Integer>>one().get());
    Subscription parent = Operators.emptySubscription();
    test.onSubscribe(parent);

    Assertions.assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(parent);
    Assertions.assertThat(test.scan(Scannable.Attr.ACTUAL)).isSameAs(actual);
    test.requested = 35;
    Assertions.assertThat(test.scan(Scannable.Attr.REQUESTED_FROM_DOWNSTREAM)).isEqualTo(35L);
    test.queue.add(new FluxSampleTimeout.SampleTimeoutOther<Integer, Integer>(test, 1, 0));
    Assertions.assertThat(test.scan(Scannable.Attr.BUFFERED)).isEqualTo(1);

    Assertions.assertThat(test.scan(Scannable.Attr.CANCELLED)).isFalse();
    Assertions.assertThat(test.scan(Scannable.Attr.TERMINATED)).isFalse();
    test.error = new IllegalStateException("boom");
    Assertions.assertThat(test.scan(Scannable.Attr.ERROR)).hasMessage("boom");
    test.onComplete();
    Assertions.assertThat(test.scan(Scannable.Attr.TERMINATED)).isTrue();
    Assertions.assertThat(test.scan(Scannable.Attr.CANCELLED)).isTrue();
}
 
源代码11 项目: smallrye-mutiny   文件: Subscriptions.java
/**
 * Atomically requests from the Subscription in the field if not null, otherwise accumulates
 * the request amount in the requested field to be requested once the field is set to non-null.
 *
 * @param field the target field that may already contain a Subscription
 * @param requested the current requested amount
 * @param requests the request amount, positive (verified)
 */
public static void requestIfNotNullOrAccumulate(AtomicReference<Subscription> field, AtomicLong requested, long requests) {
    Subscription subscription = field.get();
    if (subscription != null) {
        subscription.request(requests);
    } else {
        if (requests > 0) {
            add(requested, requests);
            subscription = field.get();
            if (subscription != null) {
                long r = requested.getAndSet(0L);
                if (r != 0L) {
                    subscription.request(r);
                }
            }
        }
    }
}
 
@Test
public void testWrappedSubscription() {
    Subscription subscription = new Subscription() {
        @Override
        public void request(long n) {

        }

        @Override
        public void cancel() {

        }
    };

    WrappedSubscription wrapped = new WrappedSubscription(subscription, null);
    assertThat(wrapped).isNotNull();
    wrapped.request(10);
    wrapped.cancel();
}
 
源代码13 项目: reactor-core   文件: MonoUsingWhenTest.java
@Test
public void scanResourceSubscriber() {
	CoreSubscriber<Integer> actual = new LambdaSubscriber<>(null, e -> {}, null, null);
	ResourceSubscriber<String, Integer> op = new ResourceSubscriber<>(actual, s -> Mono.just(s.length()), Mono::just, (res, err) -> Mono.just(res), Mono::just, true);
	final Subscription parent = Operators.emptySubscription();
	op.onSubscribe(parent);

	assertThat(op.scan(Attr.PARENT)).as("PARENT").isSameAs(parent);
	assertThat(op.scan(Attr.ACTUAL)).as("ACTUAL").isSameAs(actual);

	assertThat(op.scan(Attr.PREFETCH)).as("PREFETCH").isEqualTo(Integer.MAX_VALUE);

	assertThat(op.scan(Attr.TERMINATED)).as("TERMINATED").isFalse();
	op.resourceProvided = true;
	assertThat(op.scan(Attr.TERMINATED)).as("TERMINATED resourceProvided").isTrue();

	assertThat(op.scanUnsafe(Attr.CANCELLED)).as("CANCELLED not supported").isNull();
}
 
/**
 * This assumes that implementations do not issue an HTTP request until
 * {@link Subscription#request(long)} is called. Since a client span is only for
 * remote operations, we should not create one when we know a network request won't
 * happen. In this case, we ensure a canceled subscription doesn't end up traced.
 */
@Test
public void cancelledSubscription_doesntTrace() throws Exception {
	CountDownLatch latch = new CountDownLatch(1);

	BaseSubscriber<Integer> subscriber = new BaseSubscriber<Integer>() {
		@Override
		protected void hookOnSubscribe(Subscription subscription) {
			subscription.cancel();
			latch.countDown();
		}
	};

	getMono(client, "/foo").subscribe(subscriber);

	latch.await();

	assertThat(server.getRequestCount()).isZero();
	// post-conditions will prove no span was created
}
 
@Override
public void request(long n) {
    Subscription a = s;
    if (a != null) {
        a.request(n);
    } else {
        BackpressureHelper.getAndAddCap(REQUESTED, this, n);

        a = s;

        if (a != null) {
            long r = REQUESTED.getAndSet(this, 0L);

            if (r != 0L) {
                a.request(r);
            }
        }
    }
}
 
@Test
public void errorInStream_completesFuture() {
    Path testPath = testFs.getPath("test_file.txt");
    FileAsyncResponseTransformer xformer = new FileAsyncResponseTransformer(testPath);

    CompletableFuture prepareFuture = xformer.prepare();

    xformer.onResponse(new Object());
    xformer.onStream(subscriber -> {
        subscriber.onSubscribe(new Subscription() {
            @Override
            public void request(long l) {
            }

            @Override
            public void cancel() {
            }
        });

        subscriber.onError(new RuntimeException("Something went wrong"));
    });

    assertThat(prepareFuture.isCompletedExceptionally()).isTrue();
}
 
源代码17 项目: reactor-core   文件: FluxGroupByTest.java
@Test
public void scanMain() {
	CoreSubscriber<GroupedFlux<Integer, String>> actual = new LambdaSubscriber<>(null, e -> {}, null, null);
	FluxGroupBy.GroupByMain<Integer, Integer, String> test = new FluxGroupBy.GroupByMain<>(actual,
			Queues.<GroupedFlux<Integer, String>>one().get(), Queues.one(), 123, i -> i % 5, i -> String.valueOf(i));
	Subscription sub = Operators.emptySubscription();
       test.onSubscribe(sub);

	assertThat(test.scan(Scannable.Attr.ACTUAL)).isSameAs(actual);
	assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(sub);
	assertThat(test.scan(Scannable.Attr.REQUESTED_FROM_DOWNSTREAM)).isEqualTo(Long.MAX_VALUE);
	assertThat(test.scan(Scannable.Attr.PREFETCH)).isSameAs(123);
	assertThat(test.scan(Scannable.Attr.BUFFERED)).isSameAs(0);
	assertThat(test.scan(Scannable.Attr.CANCELLED)).isFalse();
	assertThat(test.scan(Scannable.Attr.TERMINATED)).isFalse();
	assertThat(test.scan(Scannable.Attr.ERROR)).isNull();
	test.error = new IllegalStateException("boom");
	assertThat(test.scan(Scannable.Attr.ERROR)).isSameAs(test.error);
}
 
源代码18 项目: aws-sdk-java-v2   文件: FlatteningSubscriber.java
@Override
public void onSubscribe(Subscription subscription) {
    sourceSubscription = subscription;
    subscriber.onSubscribe(new Subscription() {
        @Override
        public void request(long l) {
            synchronized (lock) {
                demand.addAndGet(l);
                // Execution goes into `if` block only once for the initial request
                // After that requestedNextBatch is always true and more requests are made in fulfillDemand()
                if (!requestedNextBatch) {
                    requestedNextBatch = true;
                    sourceSubscription.request(1);
                } else {
                    fulfillDemand();
                }
            }
        }

        @Override
        public void cancel() {
            subscription.cancel();
        }
    });
}
 
源代码19 项目: smallrye-mutiny   文件: SafeSubscriberTest.java
@Test
public void testThatDownstreamFailuresAreHandledInOnError() {
    Subscriber<String> subscriber = mock(Subscriber.class);
    doAnswer(new ThrowsException(new IllegalStateException("boom"))).when(subscriber).onError(any());

    Subscription subscription = mock(Subscription.class);

    SafeSubscriber<String> safe = new SafeSubscriber<>(subscriber);

    safe.onSubscribe(subscription);
    verify(subscriber, times(1)).onSubscribe(safe);

    Exception boom = new Exception("boom");
    safe.onError(boom);
    // called
    verify(subscriber, times(1)).onError(boom);
    assertThat(safe.isDone()).isTrue();
}
 
源代码20 项目: smallrye-mutiny   文件: WrappedSubscriptionTest.java
@Test
public void testWrappedSubscription() {
    Subscription subscription = new Subscription() {
        @Override
        public void request(long n) {

        }

        @Override
        public void cancel() {

        }
    };

    WrappedSubscription wrapped = new WrappedSubscription(subscription, null);
    assertThat(wrapped).isNotNull();
    wrapped.request(10);
    wrapped.cancel();
}
 
源代码21 项目: cyclops   文件: Spouts.java
static  ReactiveSeq<Integer> interval(String cron,ScheduledExecutorService exec) {
    ReactiveSubscriber<Integer> sub = reactiveSubscriber();
    AtomicBoolean isOpen = new AtomicBoolean(true);
    Subscription[] s= {null};
    sub.onSubscribe(new Subscription() {
        @Override
        public void request(long n) {
            s[0].request(n);
        }

        @Override
        public void cancel() {
            isOpen.set(false);
        }
    });

    s[0] = ReactiveSeq.iterate(1, a -> a + 1)
                      .takeWhile(e -> isOpen.get())
                      .schedule(cron, exec)
                      .connect()
                      .forEach(1, e -> sub.onNext(e));

    return sub.reactiveStream();

}
 
源代码22 项目: reactor-core   文件: FluxDistinctTest.java
@Test
public void scanSubscriber() {
	CoreSubscriber<String> actual = new LambdaSubscriber<>(null, e -> {}, null, null);
	DistinctSubscriber<String, Integer, Set<Integer>> test =
			new DistinctSubscriber<>(actual, new HashSet<>(), String::hashCode, Set::add, Set::clear);
	Subscription parent = Operators.emptySubscription();
	test.onSubscribe(parent);

	assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(parent);
	assertThat(test.scan(Scannable.Attr.ACTUAL)).isSameAs(actual);

	assertThat(test.scan(Scannable.Attr.TERMINATED)).isFalse();
	test.onError(new IllegalStateException("boom"));
	assertThat(test.scan(Scannable.Attr.TERMINATED)).isTrue();
}
 
源代码23 项目: reactor-core   文件: FluxSampleTimeout.java
@Override
public void onComplete() {
	Subscription o = other;
	if (o instanceof FluxSampleTimeout.SampleTimeoutOther) {
		SampleTimeoutOther<?, ?> os = (SampleTimeoutOther<?, ?>) o;
		os.cancel();
		os.onComplete();
	}
	done = true;
	drain();
}
 
@Test
public void testPublishBroadcast() throws Exception{
    Flux<Long> fibonacciGenerator = Flux.generate(() -> Tuples.<Long,
            Long>of(0L, 1L), (state, sink) -> {
        if (state.getT1() < 0)
            sink.complete();
        else
            sink.next(state.getT1());
        System.out.println("generating next of "+ state.getT2());

        return Tuples.of(state.getT2(), state.getT1() + state.getT2());
    });
    fibonacciGenerator=fibonacciGenerator.doFinally(x -> {
        System.out.println("Closing ");
    }).publish().autoConnect(2);

    fibonacciGenerator.subscribe(new BaseSubscriber<Long>() {
        @Override
        protected void hookOnSubscribe(Subscription subscription) {
            request(1);
        }

        @Override
        protected void hookOnNext(Long value) {
            System.out.println("1st: "+value);
        }
    });

    fibonacciGenerator.subscribe(x -> System.out.println("2nd : "+x));
    Thread.sleep(500);

}
 
源代码25 项目: cyclops   文件: FlatMapTest.java
private Stream<Integer> nextAsyncRS() {
    ReactiveSubscriber<Integer> sub = Spouts.reactiveSubscriber();
    AtomicLong req = new AtomicLong(0);
    int id = start.incrementAndGet();
    sub.onSubscribe(new Subscription() {

        @Override
        public void request(long n) {

            req.addAndGet(n);

        }

        @Override
        public void cancel() {

        }
        public String toString(){
            return "subscription " + id;
        }
    });
    new Thread(()->{
        int sent=0;
        while(sent<2){
            if(req.get()>0){
                sub.onNext( ++sent);

                req.decrementAndGet();
            }
        }
        sub.onComplete();


        // Flux.just(1,2).forEachAsync(sub);


    }).start();

    return sub.reactiveStream();
}
 
源代码26 项目: ditto   文件: SubscriptionActor.java
private void onSubscribe(final Subscription subscription) {
    if (this.subscription != null) {
        subscription.cancel();
    } else {
        this.subscription = subscription;
        sender.tell(getSubscriptionCreated(), ActorRef.noSender());
        unstashAll();
    }
}
 
源代码27 项目: reactor-core   文件: ParallelSource.java
@Override
public void onSubscribe(Subscription s) {
	if (Operators.validate(this.s, s)) {
		this.s = s;

		if (s instanceof Fuseable.QueueSubscription) {
			@SuppressWarnings("unchecked")
			Fuseable.QueueSubscription<T> qs = (Fuseable.QueueSubscription<T>) s;
			
			int m = qs.requestFusion(Fuseable.ANY | Fuseable.THREAD_BARRIER);
			
			if (m == Fuseable.SYNC) {
				sourceMode = m;
				queue = qs;
				done = true;
				setupSubscribers();
				drain();
				return;
			} else
			if (m == Fuseable.ASYNC) {
				sourceMode = m;
				queue = qs;
				
				setupSubscribers();
				
				s.request(Operators.unboundedOrPrefetch(prefetch));
				
				return;
			}
		}
		
		queue = queueSupplier.get();
		
		setupSubscribers();
		
		s.request(Operators.unboundedOrPrefetch(prefetch));
	}
}
 
源代码28 项目: vertx-web   文件: ApolloWSHandlerTest.java
private Publisher<Map<String, Object>> getCounter(DataFetchingEnvironment env) {
  boolean finite = env.getArgument("finite");
  return subscriber -> {
    Subscription subscription = new Subscription() {
      @Override
      public void request(long n) {
      }

      @Override
      public void cancel() {
        if (!subscriptionRef.compareAndSet(this, null)) {
          fail();
        }
      }
    };
    if (!subscriptionRef.compareAndSet(null, subscription)) {
      fail();
    }
    subscriber.onSubscribe(subscription);
    IntStream.range(0, 5).forEach(num -> {
      Map<String, Object> counter = new HashMap<>();
      counter.put("count", num);
      subscriber.onNext(counter);
    });
    if (finite) {
      subscriber.onComplete();
      if (!subscriptionRef.compareAndSet(subscription, null)) {
        fail();
      }
    }
  };
}
 
@Override
protected void configure(ServerBuilder sb) throws Exception {
    sb.service("/large-stream", new AbstractHttpService() {
        @Override
        protected HttpResponse doGet(ServiceRequestContext ctx, HttpRequest req) throws Exception {
            return HttpResponse.of(s -> s.onSubscribe(new Subscription() {
                int count;

                @Override
                public void request(long n) {
                    for (int i = 0; i < n; i++) {
                        if (count == 0) {
                            s.onNext(ResponseHeaders.of(HttpStatus.OK));
                        } else {
                            s.onNext(HttpData.wrap(new byte[1024]));
                        }
                    }
                    count += n;
                    // 10MB
                    if (count > 1024 * 10) {
                        s.onComplete();
                    }
                }

                @Override
                public void cancel() {
                }
            }));
        }
    });
    sb.requestTimeout(Duration.of(30, ChronoUnit.SECONDS));
}
 
@Override
@SuppressWarnings("unchecked")
public void onSubscribe(Subscription s) {
  if (Operators.validate(this.s, s)) {
    this.s = (QueueSubscription<T>) s;
    this.start = System.nanoTime();

    actual.onSubscribe(this);
  }
}