io.reactivex.Flowable#just ( )源码实例Demo

下面列出了io.reactivex.Flowable#just ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

/**
 * This sample performs a count, inserts data and performs a count again using reactive operator chaining. It prints
 * the two counts ({@code 4} and {@code 6}) to the console.
 */
@Test
public void shouldInsertAndCountData() {

	Flowable<Person> people = Flowable.just(new Person("Hank", "Schrader", 43), //
			new Person("Mike", "Ehrmantraut", 62));

	repository.count() //
			.doOnSuccess(System.out::println) //
			.toFlowable() //
			.switchMap(count -> repository.saveAll(people)) //
			.lastElement() //
			.toSingle() //
			.flatMap(v -> repository.count()) //
			.doOnSuccess(System.out::println) //
			.test() //
			.awaitCount(1) //
			.assertValue(6L) //
			.assertNoErrors() //
			.awaitTerminalEvent();
}
 
源代码2 项目: contentful.java   文件: CDAClient.java
Flowable<CDAContentType> cacheTypeWithId(String id) {
  CDAContentType contentType = cache.types().get(id);
  if (contentType == null) {
    return observe(CDAContentType.class)
        .one(id)
        .map(new Function<CDAContentType, CDAContentType>() {
               @Override
               public CDAContentType apply(CDAContentType resource) {
                 if (resource != null) {
                   cache.types().put(resource.id(), resource);
                 }
                 return resource;
               }
             }
        );
  }
  return Flowable.just(contentType);
}
 
源代码3 项目: aws-sdk-java-v2   文件: PublisherAdapterTest.java
@Test
public void successfulStreaming_shouldNotInvokeChannelRead() {
    Flowable<HttpContent> testPublisher = Flowable.just(fullHttpResponse);

    StreamedHttpResponse streamedHttpResponse = new DefaultStreamedHttpResponse(HttpVersion.HTTP_1_1,
                                                                                HttpResponseStatus.ACCEPTED,
                                                                                testPublisher);



    ResponseHandler.PublisherAdapter publisherAdapter = new ResponseHandler.PublisherAdapter(streamedHttpResponse,
                                                                                             ctx,
                                                                                             requestContext,
                                                                                             executeFuture
    );
    TestSubscriber subscriber = new TestSubscriber();

    publisherAdapter.subscribe(subscriber);

    verify(ctx, times(0)).read();
    verify(ctx, times(0)).close();
    assertThat(subscriber.isCompleted).isEqualTo(true);
    verify(channelPool).release(channel);
    executeFuture.join();
    assertThat(executeFuture).isCompleted();
}
 
@Override
public Publisher<Message> createFailedPublisher() {
    RxTckGrpc.RxTckStub stub = RxTckGrpc.newRxStub(channel);
    Flowable<Message> request = Flowable.just(toMessage(TckService.KABOOM));
    Single<Message> publisher = request.as(stub::manyToOne);

    return publisher.toFlowable();
}
 
@Override
public Publisher<NewsLetter> createPublisher(long elements) {
    return new NewsPreparationOperator(
            Flowable.just(new News()),
            "test"
    );
}
 
源代码6 项目: immutables   文件: InMemoryBackend.java
private Publisher<WriteResult> insert(StandardOperations.Insert op) {
  if (op.values().isEmpty()) {
    return Flowable.just(WriteResult.empty());
  }

  final Map<Object, Object> toInsert = op.values().stream().collect(Collectors.toMap(keyExtractor::extract, x -> x));
  toInsert.forEach((k, v) -> {
    Object result = store.putIfAbsent(k, v);
    if (result != null) {
      throw new BackendException(String.format("Duplicate key %s for %s", k, entityType()));
    }
  });

  return Flowable.just(WriteResult.unknown());
}
 
@Incoming("intermediate")
@Outgoing("sink")
public Publisher<Message<String>> process(Message<String> input) {
    return Flowable.just(
            input.withMetadata(input.getMetadata().with(new MessageTest.MyMetadata<>("hello"))),
            input.withMetadata(input.getMetadata().with(new MessageTest.MyMetadata<>("hello"))));
}
 
源代码8 项目: akarnokd-misc   文件: JoinPairwise.java
@SuppressWarnings("unchecked")
@Test
public void test() {
    Flowable<Integer> a = Flowable.just(0, 2, 3, 6);
    Flowable<Integer> b = Flowable.just(1, 2, 3, 4, 5, 6);

    Flowable.defer(() -> {
        boolean[] skip = { false };
        return Flowables.<Pair>orderedMerge(
                a.<Pair>map(Left::new), b.<Pair>map(Right::new)
            )
            .distinctUntilChanged()
            .buffer(2, 1)
            .flatMapIterable(buf -> {
                if (skip[0]) {
                    skip[0] = false;
                    return Collections.emptyList();
                }
                if (buf.size() == 2) {
                    if (buf.get(0).value == buf.get(1).value) {
                        skip[0] = true;
                        return Collections.singletonList(new Both(buf.get(0).value));
                    }
                    return buf.subList(0, 1);
                }
                return buf;
            });
    })
    .subscribe(System.out::println);
}
 
源代码9 项目: akarnokd-misc   文件: CacheParallel.java
@Test
public void testParallelism() throws Exception
{
    Flowable<Integer> flux = Flowable.just(1, 2, 3);

    Set<String> threadNames = Collections.synchronizedSet(new TreeSet<>());
    AtomicInteger count = new AtomicInteger();

    CountDownLatch latch = new CountDownLatch(3);

    flux
    // Uncomment line below for failure
    .replay(1).autoConnect()
    .parallel(3)
    .runOn(Schedulers.io())
    .doOnNext(i ->
    {
        threadNames.add(Thread.currentThread()
                .getName());
        count.incrementAndGet();
        latch.countDown();

        tryToSleep(1000);
    })
    .sequential()
    .subscribe();

    latch.await(3, TimeUnit.SECONDS);

    Assert.assertEquals("Multithreaded count", 3, count.get());
    Assert.assertEquals("Multithreaded threads", 3, threadNames.size());
}
 
@Override
public Publisher<Message> createFailedPublisher() {
    RxTckGrpc.RxTckStub stub = RxTckGrpc.newRxStub(channel);
    Flowable<Message> request = Flowable.just(toMessage(TckService.KABOOM));
    Single<Message> publisher = request.hide().as(stub::manyToOne);

    return publisher.toFlowable();
}
 
@Test
public void manyToMany() {
    RxGreeterGrpc.RxGreeterStub stub = RxGreeterGrpc.newRxStub(channel);
    Flowable<HelloRequest> req = Flowable.just(HelloRequest.getDefaultInstance());
    Flowable<HelloResponse> resp = req.compose(stub::sayHelloBothStream);
    TestSubscriber<HelloResponse> test = resp.test();

    test.awaitTerminalEvent(3, TimeUnit.SECONDS);
    test.assertError(t -> t instanceof StatusRuntimeException);
    test.assertError(t -> ((StatusRuntimeException)t).getStatus().getCode() == Status.Code.INTERNAL);
}
 
源代码12 项目: rxjava2-extras   文件: SerializedTest.java
@Test
public void testSerializeAndDeserializeOfNonEmptyStream() {
    File file = new File("target/temp1");
    file.delete();
    Flowable<Integer> source = Flowable.just(1, 2, 3);
    Serialized.write(source, file).subscribe();
    assertTrue(file.exists());
    assertTrue(file.length() > 0);
    List<Integer> list = Serialized.<Integer>read(file).toList().blockingGet();
    assertEquals(Arrays.asList(1, 2, 3), list);
}
 
源代码13 项目: rxjava2-jdbc   文件: Tx.java
static <T> Flowable<Tx<T>> toTx(Notification<T> n, Connection con, Database db) {
    if (n.isOnComplete())
        return Flowable.just(new TxImpl<T>(con, null, null, true, db));
    else if (n.isOnNext())
        return Flowable.just(new TxImpl<T>(con, n.getValue(), null, false, db));
    else
        return Flowable.error(n.getError());
}
 
@Override
public Flowable<List<DataElementOperand>> getGreyFields(String sectionName) {
    if (!sectionName.isEmpty() && !sectionName.equals("NO_SECTION"))
        return d2.dataSetModule().sections().withGreyedFields().byDataSetUid().eq(dataSetUid).byDisplayName().eq(sectionName).one().get()
                .map(Section::greyedFields).toFlowable();
    else
        return Flowable.just(new ArrayList<>());
}
 
源代码15 项目: rx-jersey   文件: FlowableResourceTest.java
@GET
@Path("echo")
public Flowable<String> echo(@QueryParam("message") String message) {
    return Flowable.just(message);
}
 
源代码16 项目: RxCupboard   文件: RxContentProvider.java
public <T> Flowable<T> delete(T entity) {
	provider.delete(uri, entity);
	return Flowable.just(entity);
}
 
源代码17 项目: rxjava2-extras   文件: StringsTest.java
@Test(expected = RuntimeException.class)
public void testMalformedAtTheEndReport() {
    Flowable<byte[]> src = Flowable.just(new byte[] { (byte) 0xc2 });
    CharsetDecoder charsetDecoder = Charset.forName("UTF-8").newDecoder();
    decode(src, charsetDecoder).blockingSingle();
}
 
@Override
public Flowable<Map<String, List<List<Pair<CategoryOption, Category>>>>> getCatOptions(String sectionName, String catCombo) {
    Map<String, List<List<Pair<CategoryOption, Category>>>> map = new HashMap<>();
    map.put(catCombo, getMap(catCombo));
    return Flowable.just(map);
}
 
@Outgoing("b")
public Publisher<String> produce() {
    return Flowable.just("d", "e", "f");
}
 
@Outgoing("v")
public Flowable<String> produce() {
    return Flowable.just("a", "b", "c");
}