io.reactivex.Flowable#fromIterable ( )源码实例Demo

下面列出了io.reactivex.Flowable#fromIterable ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: Collection-Android   文件: RxJavaUtils.java
/**
 * 遍历集合进行处理(IO线程处理,UI线程显示)
 *
 * @param rxIteratorTask
 * @param errorConsumer  出错的处理
 * @return
 */
public static <T, R> Disposable executeRxIteratorTask(final RxIteratorTask<T, R> rxIteratorTask, @NonNull Consumer<Throwable> errorConsumer) {
    Flowable<T> flowable = rxIteratorTask.isArray() ? Flowable.fromArray(rxIteratorTask.getArray()) : Flowable.fromIterable(rxIteratorTask.getIterable());
    return flowable.map(new Function<T, R>() {
        @Override
        public R apply(T t) throws Exception {
            return rxIteratorTask.doInIOThread(t);
        }
    }).compose(RxSchedulerUtils.<R>_io_main_f())
            .subscribe(new Consumer<R>() {
                @Override
                public void accept(R r) throws Exception {
                    rxIteratorTask.doInUIThread(r);
                }
            }, errorConsumer);
}
 
@Override
public Flowable<Message> oneToMany(Single<Message> request) {
    return Flowable.fromIterable(() -> {
        // Return increasing integers as fast as possible forever.
        AtomicInteger i = new AtomicInteger(0);
        return new Iterator<Message>() {
            @Override
            public boolean hasNext() {
                return i.get() < Integer.MAX_VALUE;
            }

            @Override
            public Message next() {
                int j = i.getAndIncrement();
                if (j % PRINT_EVERY == 0) {
                    System.out.println(j);
                }
                return Message.newBuilder().setNumber(j).build();
            }
        };
    });
}
 
@Test
public void publisherToFlux() {
	List<Integer> sequence = Arrays.asList(1, 2, 3);
	Publisher<Integer> source = Flowable.fromIterable(sequence);
	Object target = getAdapter(Flux.class).fromPublisher(source);
	assertTrue(target instanceof Flux);
	assertEquals(sequence, ((Flux<Integer>) target).collectList().block(Duration.ofMillis(1000)));
}
 
@Test
public void publisherToRxObservable() {
	List<Integer> sequence = Arrays.asList(1, 2, 3);
	Publisher<Integer> source = Flowable.fromIterable(sequence);
	Object target = getAdapter(rx.Observable.class).fromPublisher(source);
	assertTrue(target instanceof rx.Observable);
	assertEquals(sequence, ((rx.Observable<?>) target).toList().toBlocking().first());
}
 
源代码5 项目: immutables   文件: InMemoryBackend.java
private Publisher<?> getByKey(StandardOperations.GetByKey op) {
  List<Object> result = new ArrayList<>();
  for (Object key: op.keys()) {
    Object value = store.get(key);
    if (value != null) {
      result.add(value);
    }
  }

  return Flowable.fromIterable(result);
}
 
@Test
public void publisherToFlux() {
	List<Integer> sequence = Arrays.asList(1, 2, 3);
	Publisher<Integer> source = Flowable.fromIterable(sequence);
	Object target = getAdapter(Flux.class).fromPublisher(source);
	assertTrue(target instanceof Flux);
	assertEquals(sequence, ((Flux<Integer>) target).collectList().block(Duration.ofMillis(1000)));
}
 
@Test
public void publisherToRxObservable() {
	List<Integer> sequence = Arrays.asList(1, 2, 3);
	Publisher<Integer> source = Flowable.fromIterable(sequence);
	Object target = getAdapter(rx.Observable.class).fromPublisher(source);
	assertTrue(target instanceof rx.Observable);
	assertEquals(sequence, ((rx.Observable<?>) target).toList().toBlocking().first());
}
 
@Test
public void publisherToReactivexObservable() {
	List<Integer> sequence = Arrays.asList(1, 2, 3);
	Publisher<Integer> source = Flowable.fromIterable(sequence);
	Object target = getAdapter(io.reactivex.Observable.class).fromPublisher(source);
	assertTrue(target instanceof io.reactivex.Observable);
	assertEquals(sequence, ((io.reactivex.Observable<?>) target).toList().blockingGet());
}
 
源代码9 项目: CrazyDaily   文件: ContactUseCase.java
private Publisher<GaoxiaoEntity.DataEntity> handleException(GaoxiaoEntity gaoxiaoEntity) {
    if (gaoxiaoEntity == null) {
        return Flowable.error(new ApiException(CodeConstant.CODE_EMPTY, "数据为空,请求个毛线!"));
    }
    List<GaoxiaoEntity.DataEntity> data = gaoxiaoEntity.getData();
    if (data == null || data.isEmpty()) {
        return Flowable.error(new ApiException(CodeConstant.CODE_EMPTY, "数据为空,请求个毛线!"));
    }
    return Flowable.fromIterable(data);
}
 
源代码10 项目: CrazyDaily   文件: GaoxiaoUseCase.java
private Publisher<GaoxiaoEntity.DataEntity> handleException(GaoxiaoEntity gaoxiaoEntity) {
    if (gaoxiaoEntity == null) {
        return Flowable.error(new ApiException(CodeConstant.CODE_EMPTY, "数据为空,请求个毛线!"));
    }
    List<GaoxiaoEntity.DataEntity> data = gaoxiaoEntity.getData();
    if (data == null || data.isEmpty()) {
        return Flowable.error(new ApiException(CodeConstant.CODE_EMPTY, "数据为空,请求个毛线!"));
    }
    return Flowable.fromIterable(data);
}
 
源代码11 项目: pentaho-kettle   文件: FixedTimeStreamWindowTest.java
@Test
public void testSharedStreamingBatchPoolExecution() throws Exception {
  /*
  * Tests that there is only 1 thread running inside the pool at all times.
  * */

  final List<String> errors = new ArrayList<String>();

  // Only 1 thread should be present in the pool at a given time.
  System.setProperty( Const.SHARED_STREAMING_BATCH_POOL_SIZE, "1" );

  RowMetaInterface rowMeta = new RowMeta();
  rowMeta.addValueMeta( new ValueMetaString( "field" ) );
  Result mockResult = new Result();
  mockResult.setRows( Arrays.asList( new RowMetaAndData( rowMeta, "queen" ), new RowMetaAndData( rowMeta, "king" ) ) );

  FixedTimeStreamWindow<List> window1 = new FixedTimeStreamWindow<>( subtransExecutor, rowMeta, 0, 10, 10 );
  FixedTimeStreamWindow<List> window2 = new FixedTimeStreamWindow<>( subtransExecutor, rowMeta, 0, 10, 10 );
  Flowable flowable = Flowable.fromIterable( singletonList( asList( "v1", "v2" ) ) );

  Field field = window1.getClass().getDeclaredField( "sharedStreamingBatchPool" );
  field.setAccessible( true );
  ThreadPoolExecutor sharedStreamingBatchPool = (ThreadPoolExecutor) field.get( window1 );

  when( subtransExecutor.getPrefetchCount() ).thenReturn( 1000 );
  when( subtransExecutor.execute( any() ) ).thenAnswer( ( InvocationOnMock invocation ) -> {
    //The active count should always be 1.
    if ( sharedStreamingBatchPool.getActiveCount() != 1 ) {
      errors.add( "Error: Active count should have been 1 at all times. Current value: " + sharedStreamingBatchPool.getActiveCount() );
    }
    return Optional.of( mockResult );
  } );

  Thread bufferThread1 = new Thread( new BufferThread( window1, flowable, mockResult ) );
  bufferThread1.start();

  Thread bufferThread2 = new Thread( new BufferThread( window2, flowable, mockResult ) );
  bufferThread2.start();

  Thread.sleep( 10000 );
  assertEquals( 0, errors.size() );
}
 
@Outgoing("strings")
public Publisher<String> strings() {
  return Flowable.fromIterable(VALUES);
}
 
@SuppressWarnings("unchecked")
@Override
public <O> PublisherStage<O> create(Engine engine, Stage.Of stage) {
    Iterable<O> elements = (Iterable<O>) Objects.requireNonNull(Objects.requireNonNull(stage).getElements());
    return () -> Flowable.fromIterable(elements);
}
 
源代码14 项目: RxCupboard   文件: RxContentProvider.java
public <T> Flowable<T> query(ProviderCompartment.QueryBuilder<T> preparedQuery) {
	return Flowable.fromIterable(preparedQuery.query());
}
 
TestVector generateTestVector() {
    return new TestVector() {
        List<String> requestBody = Lists.newArrayList("A", "B", "C");

        @Override
        public List<String> requestBody() {
            return requestBody;
        }

        @Override
        public SdkHttpFullRequest.Builder httpFullRequest() {
            //Header signature: "79f246d8652f08dd3cfaf84cc0d8b4fcce032332c78d43ea1ed6f4f6586ab59d";
            //Signing key: "29dc0a760fed568677d74136ad02d315a07d31b8f321f5c43350f284dac892c";
            return SdkHttpFullRequest.builder()
                    .method(SdkHttpMethod.POST)
                    .putHeader("Host", "demo.us-east-1.amazonaws.com")
                    .putHeader("content-encoding", "application/vnd.amazon.eventstream")
                    .putHeader("x-amz-content-sha256", "STREAMING-AWS4-HMAC-SHA256-EVENTS")
                    .encodedPath("/streaming")
                    .protocol("https")
                    .host("demo.us-east-1.amazonaws.com");
        }

        @Override
        public AsyncRequestBody requestBodyPublisher() {
            List<ByteBuffer> bodyBytes = requestBody.stream()
                    .map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)))
                    .collect(Collectors.toList());

            Publisher<ByteBuffer> bodyPublisher = Flowable.fromIterable(bodyBytes);

            return AsyncRequestBody.fromPublisher(bodyPublisher);
        }

        @Override
        public Flowable<Message> expectedMessagePublisher() {
            Flowable<String> sigsHex = Flowable.just(
                    "7aabf85b765e6a4d0d500b6e968657b14726fa3e1eb7e839302728ffd77629a5",
                    "f72aa9642f571d24a6e1ae42f10f073ad9448d8a028b6bcd82da081335adda02",
                    "632af120435b57ec241d8bfbb12e496dfd5e2730a1a02ac0ab6eaa230ae02e9a",
                    "c6f679ddb3af68f5e82f0cf6761244cb2338cf11e7d01a24130aea1b7c17e53e");

            // The Last data frame is empty
            Flowable<String> payloads = Flowable.fromIterable(requestBody).concatWith(Flowable.just(""));

            return sigsHex.zipWith(payloads, new BiFunction<String, String, Message>() {
                        // The first Instant was used to sign the request
                        private int idx = 1;

                        @Override
                        public Message apply(String sig, String payload) throws Exception {
                            Map<String, HeaderValue> headers = new HashMap<>();
                            headers.put(EVENT_STREAM_DATE, HeaderValue.fromTimestamp(SIGNING_INSTANTS.get(idx++)));
                            headers.put(EVENT_STREAM_SIGNATURE,
                                    HeaderValue.fromByteArray(BinaryUtils.fromHex(sig)));
                            return new Message(headers, payload.getBytes(StandardCharsets.UTF_8));
                        }
                    }
            );
        }
    };
}
 
@Override
public Flowable<JWK> keys() {
    return Flowable.fromIterable(keys);
}
 
@Override
public Flowable<JWK> keys() {
    return Flowable.fromIterable(keys);
}
 
源代码18 项目: RxCupboard   文件: RxCursor.java
public <T> Flowable<T> iterate(Class<T> entityClass) {
	return Flowable.fromIterable(cursor.iterate(entityClass));
}
 
源代码19 项目: RxCupboard   文件: RxContentProvider.java
public <T> Flowable<T> query(Class<T> entityClass) {
	return Flowable.fromIterable(provider.query(uri, entityClass).query());
}
 
源代码20 项目: RxCupboard   文件: RxContentProvider.java
public <T> Flowable<T> query(Class<T> entityClass, String selection, String... args) {
	return Flowable.fromIterable(provider.query(uri, entityClass).withSelection(selection, args).query());
}