下面列出了java.util.stream.IntStream#forEach ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private static void test(IntStream keyInputStream, Stream<String> valueInputStream, BlockComparator comparator, Iterator<String> outputIterator)
{
BlockBuilder keysBlockBuilder = BIGINT.createBlockBuilder(null, INPUT_SIZE);
BlockBuilder valuesBlockBuilder = VARCHAR.createBlockBuilder(null, INPUT_SIZE);
keyInputStream.forEach(x -> BIGINT.writeLong(keysBlockBuilder, x));
valueInputStream.forEach(x -> VARCHAR.writeString(valuesBlockBuilder, x));
TypedKeyValueHeap heap = new TypedKeyValueHeap(comparator, BIGINT, VARCHAR, OUTPUT_SIZE);
heap.addAll(keysBlockBuilder, valuesBlockBuilder);
BlockBuilder resultBlockBuilder = VARCHAR.createBlockBuilder(null, OUTPUT_SIZE);
heap.popAll(resultBlockBuilder);
Block resultBlock = resultBlockBuilder.build();
assertEquals(resultBlock.getPositionCount(), OUTPUT_SIZE);
for (int i = 0; i < OUTPUT_SIZE; i++) {
assertEquals(VARCHAR.getSlice(resultBlock, i).toStringUtf8(), outputIterator.next());
}
}
private static void test(IntStream inputStream, BlockComparator comparator, PrimitiveIterator.OfInt outputIterator)
{
BlockBuilder blockBuilder = BIGINT.createBlockBuilder(null, INPUT_SIZE);
inputStream.forEach(x -> BIGINT.writeLong(blockBuilder, x));
TypedHeap heap = new TypedHeap(comparator, BIGINT, OUTPUT_SIZE);
heap.addAll(blockBuilder);
BlockBuilder resultBlockBuilder = BIGINT.createBlockBuilder(null, OUTPUT_SIZE);
heap.popAll(resultBlockBuilder);
Block resultBlock = resultBlockBuilder.build();
assertEquals(resultBlock.getPositionCount(), OUTPUT_SIZE);
for (int i = 0; i < OUTPUT_SIZE; i++) {
assertEquals(BIGINT.getLong(resultBlock, i), outputIterator.nextInt());
}
}
@Test
public void testParallelPartitionDrops()
{
int partitionsToDrop = 5;
IntStream dropThreadsConfig = IntStream.of(1, 2);
dropThreadsConfig.forEach(dropThreads -> {
countDownLatch = new CountDownLatch(dropThreads);
SemiTransactionalHiveMetastore semiTransactionalHiveMetastore = getSemiTransactionalHiveMetastoreWithDropExecutor(newFixedThreadPool(dropThreads));
IntStream.range(0, partitionsToDrop).forEach(i -> semiTransactionalHiveMetastore.dropPartition(SESSION,
"test",
"test",
ImmutableList.of(String.valueOf(i)),
true));
semiTransactionalHiveMetastore.commit();
});
}
private void updatePredictedCounts(){
StopWatch stopWatch = new StopWatch();
if (logger.isDebugEnabled()){
stopWatch.start();
}
IntStream intStream;
if (isParallel){
intStream = IntStream.range(0,numParameters).parallel();
} else {
intStream = IntStream.range(0,numParameters);
}
intStream.forEach(i -> this.predictedCounts.set(i, calPredictedCount(i)));
if (logger.isDebugEnabled()){
logger.debug("time spent on updatePredictedCounts = "+stopWatch);
}
}
public static void processBatches(@Nonnull IntStream stream, int batchSize, @Nonnull Consumer<TIntHashSet> consumer) {
Ref<TIntHashSet> batch = new Ref<>(new TIntHashSet());
stream.forEach(commit -> {
batch.get().add(commit);
if (batch.get().size() >= batchSize) {
try {
consumer.consume(batch.get());
}
finally {
batch.set(new TIntHashSet());
}
}
});
if (!batch.get().isEmpty()) {
consumer.consume(batch.get());
}
}
/**
* Returns a new list which is the result of applying the mapping function to values of the input list
* @param list the input list on which to apply the mapping function to all elements
* @param parallel true if parallel mapping should be used
* @param listMapper the list mapper function to apply to all values in input list
* @param <I> the input list type
* @param <O> the output list type
* @return the output list
*/
public static <I,O> List<O> apply(List<I> list, boolean parallel, ListMapper<I,O> listMapper) {
final int size = list.size();
final List<O> result = createList(list);
IntStream.range(0, size).forEach(i -> result.add(null));
final IntStream indexes = parallel ? IntStream.range(0, size).parallel() : IntStream.range(0, size);
indexes.forEach(index -> {
final I source = list.get(index);
final O target = listMapper.apply(index, source);
result.set(index, target);
});
return result;
}
@Test
public void testCountSlots() throws Exception {
SlotSynchronizer<Integer> slotSynchronizer = new SlotSynchronizer<>();
int count = new Random().nextInt(1000) + 1;
Collection<SlotSynchronizer<Integer>.Lock> locks = new ArrayList<>();
IntStream keysStream = IntStream.range(0, count);
keysStream.forEach(value -> locks.add(slotSynchronizer.lockSlot(value)));
Assert.assertEquals(count, slotSynchronizer.occupiedSlots());
locks.forEach(lock -> lock.unlock());
Assert.assertEquals(0, slotSynchronizer.occupiedSlots());
}
@Override
public FastRecommendation getRecommendation(int uidx, IntStream candidates) {
IntSet set = new IntOpenHashSet();
candidates.forEach(set::add);
return getRecommendation(uidx, set::contains);
}
public static void main(String[] args) {
Random random = new Random();
IntStream intStream = random.ints(10, 1, 7);
Iterator iterator = intStream.iterator();
while (iterator.hasNext()){
System.out.println("Random Number "+iterator.next());
}
intStream = random.ints(10, 1, 7);
intStream.forEach(value ->
System.out.println("Random Number "+value)
);
}
private void updateEmpiricalCounts(){
IntStream intStream;
if (isParallel){
intStream = IntStream.range(0, numParameters).parallel();
} else {
intStream = IntStream.range(0, numParameters);
}
intStream.forEach(this::calEmpiricalCount);
}
private void updateEmpiricalCounts(){
IntStream intStream;
if (isParallel){
intStream = IntStream.range(0, numParameters).parallel();
} else {
intStream = IntStream.range(0, numParameters);
}
intStream.forEach(this::calEmpiricalCount);
}
private void updateEmpiricalCounts(){
IntStream intStream;
if (isParallel){
intStream = IntStream.range(0, numParameters).parallel();
} else {
intStream = IntStream.range(0, numParameters);
}
intStream.forEach(this::calEmpiricalCount);
}
private void initEmpiricalCounts(){
IntStream intStream;
if (isParallel){
intStream = IntStream.range(0, numParameters).parallel();
} else {
intStream = IntStream.range(0, numParameters);
}
intStream.forEach(this::calEmpiricalCount);
}
private void updateEmpiricalCounts(){
IntStream intStream;
if (isParallel){
intStream = IntStream.range(0, numParameters).parallel();
} else {
intStream = IntStream.range(0, numParameters);
}
intStream.forEach(this::calEmpiricalCount);
}
private void updateEmpricalCounts(){
IntStream intStream;
if (isParallel){
intStream = IntStream.range(0, numParameters).parallel();
} else {
intStream = IntStream.range(0, numParameters);
}
intStream.forEach(i -> this.empiricalCounts.set(i, calEmpricalCount(i)));
}
private void updateClassProbMatrix(){
StopWatch stopWatch = new StopWatch();
stopWatch.start();
IntStream intStream;
if (isParallel){
intStream = IntStream.range(0,dataSet.getNumDataPoints()).parallel();
} else {
intStream = IntStream.range(0,dataSet.getNumDataPoints());
}
intStream.forEach(this::updateClassProbs);
this.isProbabilityCacheValid = true;
if (logger.isDebugEnabled()){
logger.debug("time spent on updateClassProbMatrix = "+stopWatch);
}
}
/**
* Retruns the integer bounds of a stream of ints
* @param stream the stream to compute bounds on
* @return the bounds for stream, empty if no data in stream
*/
public static Optional<Bounds<Integer>> ofInts(IntStream stream) {
final OfInts calculator = new OfInts();
stream.forEach(calculator::add);
return calculator.getBounds();
}
public static IntColumn create(String name, IntStream stream) {
IntArrayList list = new IntArrayList();
stream.forEach(list::add);
return new IntColumn(name, list);
}
public static IntColumn create(String name, IntStream stream) {
IntArrayList list = new IntArrayList();
stream.forEach(list::add);
return new IntColumn(name, list);
}
private TaskStateSnapshot executeAndWaitForCheckpoint(
long checkpointId,
TaskStateSnapshot initialSnapshot,
IntStream expectedRecords) throws Exception {
try (StreamTaskMailboxTestHarness<Integer> testHarness = createTestHarness(checkpointId, initialSnapshot)) {
// Add records to the splits.
MockSourceSplit split = getAndMaybeAssignSplit(testHarness);
// Add records to the split and update expected output.
addRecords(split, NUM_RECORDS);
// Process all the records.
processUntil(testHarness, () -> !testHarness.getStreamTask().inputProcessor.getAvailableFuture().isDone());
// Trigger a checkpoint.
CheckpointOptions checkpointOptions = CheckpointOptions.forCheckpointWithDefaultLocation();
OneShotLatch waitForAcknowledgeLatch = new OneShotLatch();
testHarness.taskStateManager.setWaitForReportLatch(waitForAcknowledgeLatch);
CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, checkpointId);
Future<Boolean> checkpointFuture =
testHarness
.getStreamTask()
.triggerCheckpointAsync(checkpointMetaData, checkpointOptions, false);
// Wait until the checkpoint finishes.
// We have to mark the source reader as available here, otherwise the runMailboxStep() call after
// checkpiont is completed will block.
getSourceReaderFromTask(testHarness).markAvailable();
processUntil(testHarness, checkpointFuture::isDone);
waitForAcknowledgeLatch.await();
// Build expected output to verify the results
Queue<Object> expectedOutput = new LinkedList<>();
expectedRecords.forEach(r -> expectedOutput.offer(new StreamRecord<>(r, TimestampAssigner.NO_TIMESTAMP)));
// Add barrier to the expected output.
expectedOutput.add(new CheckpointBarrier(checkpointId, checkpointId, checkpointOptions));
assertEquals(checkpointId, testHarness.taskStateManager.getReportedCheckpointId());
assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
return testHarness.taskStateManager.getLastJobManagerTaskStateSnapshot();
}
}