下面列出了io.reactivex.Flowable#fromIterable ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* 遍历集合进行处理(IO线程处理,UI线程显示)
*
* @param rxIteratorTask
* @param errorConsumer 出错的处理
* @return
*/
public static <T, R> Disposable executeRxIteratorTask(final RxIteratorTask<T, R> rxIteratorTask, @NonNull Consumer<Throwable> errorConsumer) {
Flowable<T> flowable = rxIteratorTask.isArray() ? Flowable.fromArray(rxIteratorTask.getArray()) : Flowable.fromIterable(rxIteratorTask.getIterable());
return flowable.map(new Function<T, R>() {
@Override
public R apply(T t) throws Exception {
return rxIteratorTask.doInIOThread(t);
}
}).compose(RxSchedulerUtils.<R>_io_main_f())
.subscribe(new Consumer<R>() {
@Override
public void accept(R r) throws Exception {
rxIteratorTask.doInUIThread(r);
}
}, errorConsumer);
}
@Override
public Flowable<Message> oneToMany(Single<Message> request) {
return Flowable.fromIterable(() -> {
// Return increasing integers as fast as possible forever.
AtomicInteger i = new AtomicInteger(0);
return new Iterator<Message>() {
@Override
public boolean hasNext() {
return i.get() < Integer.MAX_VALUE;
}
@Override
public Message next() {
int j = i.getAndIncrement();
if (j % PRINT_EVERY == 0) {
System.out.println(j);
}
return Message.newBuilder().setNumber(j).build();
}
};
});
}
@Test
public void publisherToFlux() {
List<Integer> sequence = Arrays.asList(1, 2, 3);
Publisher<Integer> source = Flowable.fromIterable(sequence);
Object target = getAdapter(Flux.class).fromPublisher(source);
assertTrue(target instanceof Flux);
assertEquals(sequence, ((Flux<Integer>) target).collectList().block(Duration.ofMillis(1000)));
}
@Test
public void publisherToRxObservable() {
List<Integer> sequence = Arrays.asList(1, 2, 3);
Publisher<Integer> source = Flowable.fromIterable(sequence);
Object target = getAdapter(rx.Observable.class).fromPublisher(source);
assertTrue(target instanceof rx.Observable);
assertEquals(sequence, ((rx.Observable<?>) target).toList().toBlocking().first());
}
private Publisher<?> getByKey(StandardOperations.GetByKey op) {
List<Object> result = new ArrayList<>();
for (Object key: op.keys()) {
Object value = store.get(key);
if (value != null) {
result.add(value);
}
}
return Flowable.fromIterable(result);
}
@Test
public void publisherToFlux() {
List<Integer> sequence = Arrays.asList(1, 2, 3);
Publisher<Integer> source = Flowable.fromIterable(sequence);
Object target = getAdapter(Flux.class).fromPublisher(source);
assertTrue(target instanceof Flux);
assertEquals(sequence, ((Flux<Integer>) target).collectList().block(Duration.ofMillis(1000)));
}
@Test
public void publisherToRxObservable() {
List<Integer> sequence = Arrays.asList(1, 2, 3);
Publisher<Integer> source = Flowable.fromIterable(sequence);
Object target = getAdapter(rx.Observable.class).fromPublisher(source);
assertTrue(target instanceof rx.Observable);
assertEquals(sequence, ((rx.Observable<?>) target).toList().toBlocking().first());
}
@Test
public void publisherToReactivexObservable() {
List<Integer> sequence = Arrays.asList(1, 2, 3);
Publisher<Integer> source = Flowable.fromIterable(sequence);
Object target = getAdapter(io.reactivex.Observable.class).fromPublisher(source);
assertTrue(target instanceof io.reactivex.Observable);
assertEquals(sequence, ((io.reactivex.Observable<?>) target).toList().blockingGet());
}
private Publisher<GaoxiaoEntity.DataEntity> handleException(GaoxiaoEntity gaoxiaoEntity) {
if (gaoxiaoEntity == null) {
return Flowable.error(new ApiException(CodeConstant.CODE_EMPTY, "数据为空,请求个毛线!"));
}
List<GaoxiaoEntity.DataEntity> data = gaoxiaoEntity.getData();
if (data == null || data.isEmpty()) {
return Flowable.error(new ApiException(CodeConstant.CODE_EMPTY, "数据为空,请求个毛线!"));
}
return Flowable.fromIterable(data);
}
private Publisher<GaoxiaoEntity.DataEntity> handleException(GaoxiaoEntity gaoxiaoEntity) {
if (gaoxiaoEntity == null) {
return Flowable.error(new ApiException(CodeConstant.CODE_EMPTY, "数据为空,请求个毛线!"));
}
List<GaoxiaoEntity.DataEntity> data = gaoxiaoEntity.getData();
if (data == null || data.isEmpty()) {
return Flowable.error(new ApiException(CodeConstant.CODE_EMPTY, "数据为空,请求个毛线!"));
}
return Flowable.fromIterable(data);
}
@Test
public void testSharedStreamingBatchPoolExecution() throws Exception {
/*
* Tests that there is only 1 thread running inside the pool at all times.
* */
final List<String> errors = new ArrayList<String>();
// Only 1 thread should be present in the pool at a given time.
System.setProperty( Const.SHARED_STREAMING_BATCH_POOL_SIZE, "1" );
RowMetaInterface rowMeta = new RowMeta();
rowMeta.addValueMeta( new ValueMetaString( "field" ) );
Result mockResult = new Result();
mockResult.setRows( Arrays.asList( new RowMetaAndData( rowMeta, "queen" ), new RowMetaAndData( rowMeta, "king" ) ) );
FixedTimeStreamWindow<List> window1 = new FixedTimeStreamWindow<>( subtransExecutor, rowMeta, 0, 10, 10 );
FixedTimeStreamWindow<List> window2 = new FixedTimeStreamWindow<>( subtransExecutor, rowMeta, 0, 10, 10 );
Flowable flowable = Flowable.fromIterable( singletonList( asList( "v1", "v2" ) ) );
Field field = window1.getClass().getDeclaredField( "sharedStreamingBatchPool" );
field.setAccessible( true );
ThreadPoolExecutor sharedStreamingBatchPool = (ThreadPoolExecutor) field.get( window1 );
when( subtransExecutor.getPrefetchCount() ).thenReturn( 1000 );
when( subtransExecutor.execute( any() ) ).thenAnswer( ( InvocationOnMock invocation ) -> {
//The active count should always be 1.
if ( sharedStreamingBatchPool.getActiveCount() != 1 ) {
errors.add( "Error: Active count should have been 1 at all times. Current value: " + sharedStreamingBatchPool.getActiveCount() );
}
return Optional.of( mockResult );
} );
Thread bufferThread1 = new Thread( new BufferThread( window1, flowable, mockResult ) );
bufferThread1.start();
Thread bufferThread2 = new Thread( new BufferThread( window2, flowable, mockResult ) );
bufferThread2.start();
Thread.sleep( 10000 );
assertEquals( 0, errors.size() );
}
@Outgoing("strings")
public Publisher<String> strings() {
return Flowable.fromIterable(VALUES);
}
@SuppressWarnings("unchecked")
@Override
public <O> PublisherStage<O> create(Engine engine, Stage.Of stage) {
Iterable<O> elements = (Iterable<O>) Objects.requireNonNull(Objects.requireNonNull(stage).getElements());
return () -> Flowable.fromIterable(elements);
}
public <T> Flowable<T> query(ProviderCompartment.QueryBuilder<T> preparedQuery) {
return Flowable.fromIterable(preparedQuery.query());
}
TestVector generateTestVector() {
return new TestVector() {
List<String> requestBody = Lists.newArrayList("A", "B", "C");
@Override
public List<String> requestBody() {
return requestBody;
}
@Override
public SdkHttpFullRequest.Builder httpFullRequest() {
//Header signature: "79f246d8652f08dd3cfaf84cc0d8b4fcce032332c78d43ea1ed6f4f6586ab59d";
//Signing key: "29dc0a760fed568677d74136ad02d315a07d31b8f321f5c43350f284dac892c";
return SdkHttpFullRequest.builder()
.method(SdkHttpMethod.POST)
.putHeader("Host", "demo.us-east-1.amazonaws.com")
.putHeader("content-encoding", "application/vnd.amazon.eventstream")
.putHeader("x-amz-content-sha256", "STREAMING-AWS4-HMAC-SHA256-EVENTS")
.encodedPath("/streaming")
.protocol("https")
.host("demo.us-east-1.amazonaws.com");
}
@Override
public AsyncRequestBody requestBodyPublisher() {
List<ByteBuffer> bodyBytes = requestBody.stream()
.map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)))
.collect(Collectors.toList());
Publisher<ByteBuffer> bodyPublisher = Flowable.fromIterable(bodyBytes);
return AsyncRequestBody.fromPublisher(bodyPublisher);
}
@Override
public Flowable<Message> expectedMessagePublisher() {
Flowable<String> sigsHex = Flowable.just(
"7aabf85b765e6a4d0d500b6e968657b14726fa3e1eb7e839302728ffd77629a5",
"f72aa9642f571d24a6e1ae42f10f073ad9448d8a028b6bcd82da081335adda02",
"632af120435b57ec241d8bfbb12e496dfd5e2730a1a02ac0ab6eaa230ae02e9a",
"c6f679ddb3af68f5e82f0cf6761244cb2338cf11e7d01a24130aea1b7c17e53e");
// The Last data frame is empty
Flowable<String> payloads = Flowable.fromIterable(requestBody).concatWith(Flowable.just(""));
return sigsHex.zipWith(payloads, new BiFunction<String, String, Message>() {
// The first Instant was used to sign the request
private int idx = 1;
@Override
public Message apply(String sig, String payload) throws Exception {
Map<String, HeaderValue> headers = new HashMap<>();
headers.put(EVENT_STREAM_DATE, HeaderValue.fromTimestamp(SIGNING_INSTANTS.get(idx++)));
headers.put(EVENT_STREAM_SIGNATURE,
HeaderValue.fromByteArray(BinaryUtils.fromHex(sig)));
return new Message(headers, payload.getBytes(StandardCharsets.UTF_8));
}
}
);
}
};
}
@Override
public Flowable<JWK> keys() {
return Flowable.fromIterable(keys);
}
@Override
public Flowable<JWK> keys() {
return Flowable.fromIterable(keys);
}
public <T> Flowable<T> iterate(Class<T> entityClass) {
return Flowable.fromIterable(cursor.iterate(entityClass));
}
public <T> Flowable<T> query(Class<T> entityClass) {
return Flowable.fromIterable(provider.query(uri, entityClass).query());
}
public <T> Flowable<T> query(Class<T> entityClass, String selection, String... args) {
return Flowable.fromIterable(provider.query(uri, entityClass).withSelection(selection, args).query());
}