下面列出了io.reactivex.Flowable#just ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* This sample performs a count, inserts data and performs a count again using reactive operator chaining. It prints
* the two counts ({@code 4} and {@code 6}) to the console.
*/
@Test
public void shouldInsertAndCountData() {
Flowable<Person> people = Flowable.just(new Person("Hank", "Schrader", 43), //
new Person("Mike", "Ehrmantraut", 62));
repository.count() //
.doOnSuccess(System.out::println) //
.toFlowable() //
.switchMap(count -> repository.saveAll(people)) //
.lastElement() //
.toSingle() //
.flatMap(v -> repository.count()) //
.doOnSuccess(System.out::println) //
.test() //
.awaitCount(1) //
.assertValue(6L) //
.assertNoErrors() //
.awaitTerminalEvent();
}
Flowable<CDAContentType> cacheTypeWithId(String id) {
CDAContentType contentType = cache.types().get(id);
if (contentType == null) {
return observe(CDAContentType.class)
.one(id)
.map(new Function<CDAContentType, CDAContentType>() {
@Override
public CDAContentType apply(CDAContentType resource) {
if (resource != null) {
cache.types().put(resource.id(), resource);
}
return resource;
}
}
);
}
return Flowable.just(contentType);
}
@Test
public void successfulStreaming_shouldNotInvokeChannelRead() {
Flowable<HttpContent> testPublisher = Flowable.just(fullHttpResponse);
StreamedHttpResponse streamedHttpResponse = new DefaultStreamedHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.ACCEPTED,
testPublisher);
ResponseHandler.PublisherAdapter publisherAdapter = new ResponseHandler.PublisherAdapter(streamedHttpResponse,
ctx,
requestContext,
executeFuture
);
TestSubscriber subscriber = new TestSubscriber();
publisherAdapter.subscribe(subscriber);
verify(ctx, times(0)).read();
verify(ctx, times(0)).close();
assertThat(subscriber.isCompleted).isEqualTo(true);
verify(channelPool).release(channel);
executeFuture.join();
assertThat(executeFuture).isCompleted();
}
@Override
public Publisher<Message> createFailedPublisher() {
RxTckGrpc.RxTckStub stub = RxTckGrpc.newRxStub(channel);
Flowable<Message> request = Flowable.just(toMessage(TckService.KABOOM));
Single<Message> publisher = request.as(stub::manyToOne);
return publisher.toFlowable();
}
@Override
public Publisher<NewsLetter> createPublisher(long elements) {
return new NewsPreparationOperator(
Flowable.just(new News()),
"test"
);
}
private Publisher<WriteResult> insert(StandardOperations.Insert op) {
if (op.values().isEmpty()) {
return Flowable.just(WriteResult.empty());
}
final Map<Object, Object> toInsert = op.values().stream().collect(Collectors.toMap(keyExtractor::extract, x -> x));
toInsert.forEach((k, v) -> {
Object result = store.putIfAbsent(k, v);
if (result != null) {
throw new BackendException(String.format("Duplicate key %s for %s", k, entityType()));
}
});
return Flowable.just(WriteResult.unknown());
}
@Incoming("intermediate")
@Outgoing("sink")
public Publisher<Message<String>> process(Message<String> input) {
return Flowable.just(
input.withMetadata(input.getMetadata().with(new MessageTest.MyMetadata<>("hello"))),
input.withMetadata(input.getMetadata().with(new MessageTest.MyMetadata<>("hello"))));
}
@SuppressWarnings("unchecked")
@Test
public void test() {
Flowable<Integer> a = Flowable.just(0, 2, 3, 6);
Flowable<Integer> b = Flowable.just(1, 2, 3, 4, 5, 6);
Flowable.defer(() -> {
boolean[] skip = { false };
return Flowables.<Pair>orderedMerge(
a.<Pair>map(Left::new), b.<Pair>map(Right::new)
)
.distinctUntilChanged()
.buffer(2, 1)
.flatMapIterable(buf -> {
if (skip[0]) {
skip[0] = false;
return Collections.emptyList();
}
if (buf.size() == 2) {
if (buf.get(0).value == buf.get(1).value) {
skip[0] = true;
return Collections.singletonList(new Both(buf.get(0).value));
}
return buf.subList(0, 1);
}
return buf;
});
})
.subscribe(System.out::println);
}
@Test
public void testParallelism() throws Exception
{
Flowable<Integer> flux = Flowable.just(1, 2, 3);
Set<String> threadNames = Collections.synchronizedSet(new TreeSet<>());
AtomicInteger count = new AtomicInteger();
CountDownLatch latch = new CountDownLatch(3);
flux
// Uncomment line below for failure
.replay(1).autoConnect()
.parallel(3)
.runOn(Schedulers.io())
.doOnNext(i ->
{
threadNames.add(Thread.currentThread()
.getName());
count.incrementAndGet();
latch.countDown();
tryToSleep(1000);
})
.sequential()
.subscribe();
latch.await(3, TimeUnit.SECONDS);
Assert.assertEquals("Multithreaded count", 3, count.get());
Assert.assertEquals("Multithreaded threads", 3, threadNames.size());
}
@Override
public Publisher<Message> createFailedPublisher() {
RxTckGrpc.RxTckStub stub = RxTckGrpc.newRxStub(channel);
Flowable<Message> request = Flowable.just(toMessage(TckService.KABOOM));
Single<Message> publisher = request.hide().as(stub::manyToOne);
return publisher.toFlowable();
}
@Test
public void manyToMany() {
RxGreeterGrpc.RxGreeterStub stub = RxGreeterGrpc.newRxStub(channel);
Flowable<HelloRequest> req = Flowable.just(HelloRequest.getDefaultInstance());
Flowable<HelloResponse> resp = req.compose(stub::sayHelloBothStream);
TestSubscriber<HelloResponse> test = resp.test();
test.awaitTerminalEvent(3, TimeUnit.SECONDS);
test.assertError(t -> t instanceof StatusRuntimeException);
test.assertError(t -> ((StatusRuntimeException)t).getStatus().getCode() == Status.Code.INTERNAL);
}
@Test
public void testSerializeAndDeserializeOfNonEmptyStream() {
File file = new File("target/temp1");
file.delete();
Flowable<Integer> source = Flowable.just(1, 2, 3);
Serialized.write(source, file).subscribe();
assertTrue(file.exists());
assertTrue(file.length() > 0);
List<Integer> list = Serialized.<Integer>read(file).toList().blockingGet();
assertEquals(Arrays.asList(1, 2, 3), list);
}
static <T> Flowable<Tx<T>> toTx(Notification<T> n, Connection con, Database db) {
if (n.isOnComplete())
return Flowable.just(new TxImpl<T>(con, null, null, true, db));
else if (n.isOnNext())
return Flowable.just(new TxImpl<T>(con, n.getValue(), null, false, db));
else
return Flowable.error(n.getError());
}
@Override
public Flowable<List<DataElementOperand>> getGreyFields(String sectionName) {
if (!sectionName.isEmpty() && !sectionName.equals("NO_SECTION"))
return d2.dataSetModule().sections().withGreyedFields().byDataSetUid().eq(dataSetUid).byDisplayName().eq(sectionName).one().get()
.map(Section::greyedFields).toFlowable();
else
return Flowable.just(new ArrayList<>());
}
@GET
@Path("echo")
public Flowable<String> echo(@QueryParam("message") String message) {
return Flowable.just(message);
}
public <T> Flowable<T> delete(T entity) {
provider.delete(uri, entity);
return Flowable.just(entity);
}
@Test(expected = RuntimeException.class)
public void testMalformedAtTheEndReport() {
Flowable<byte[]> src = Flowable.just(new byte[] { (byte) 0xc2 });
CharsetDecoder charsetDecoder = Charset.forName("UTF-8").newDecoder();
decode(src, charsetDecoder).blockingSingle();
}
@Override
public Flowable<Map<String, List<List<Pair<CategoryOption, Category>>>>> getCatOptions(String sectionName, String catCombo) {
Map<String, List<List<Pair<CategoryOption, Category>>>> map = new HashMap<>();
map.put(catCombo, getMap(catCombo));
return Flowable.just(map);
}
@Outgoing("b")
public Publisher<String> produce() {
return Flowable.just("d", "e", "f");
}
@Outgoing("v")
public Flowable<String> produce() {
return Flowable.just("a", "b", "c");
}