java.util.stream.Stream#peek ( )源码实例Demo

下面列出了java.util.stream.Stream#peek ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: grakn   文件: QueryExecutorImpl.java
@Override
public Stream<ConceptMap> get(GraqlGet query, boolean explain) {
    //NB: we need distinct as projection can produce duplicates
    Stream<ConceptMap> answers = match(query.match())
            .map(ans -> ans.project(query.vars()))
            .distinct();

    answers = filter(query, answers);
    if (explain) {
        // record the explanations if the user indicated they will retrieved them
        answers = answers.peek(answer -> explanationCache.record(answer, answer.explanation()));
    } else {
        // null out the explanations if the user does not want the explanation
        answers = answers.map(answer -> new ConceptMap(answer.map(), null, answer.getPattern()));
    }
    return answers;
}
 
源代码2 项目: data-prep   文件: FolderService.java
/**
 * Search for folders.
 *
 * @param name the folder name to search.
 * @param strict strict mode means the name is the full name.
 * @return the folders whose part of their name match the given path.
 */
@RequestMapping(value = "/folders/search", method = GET)
@ApiOperation(value = "Search Folders with parameter as part of the name")
@Timed
public Stream<Folder> search(@RequestParam(required = false, defaultValue = "") final String name,
        @RequestParam(required = false, defaultValue = "false") final Boolean strict,
        @RequestParam(required = false) final String path) {
    Stream<Folder> folders;
    if (path == null) {
        folders = folderRepository.searchFolders(name, strict);
    } else {
        folders = folderRepository.searchFolders(name, strict).filter(f -> f.getPath().equals(path));
    }

    AtomicInteger foldersFound = new AtomicInteger(0);
    folders = folders.peek(folder -> {
        folder.setNbPreparations(folderRepository.count(folder.getId(), PREPARATION));
        foldersFound.incrementAndGet();
    });

    LOGGER.info("Found {} folder(s) searching for {}", foldersFound, name);

    return folders;
}
 
源代码3 项目: sofa-lookout   文件: DefaultReaderManager.java
@Override
public Stream<Metric> read(RawMetric rm) {
    SourceType sourceType = rm.getSourceType();
    Reader<RawMetric, Metric> r = getReader(sourceType);
    if (r != null) {
        try {
            Stream<Metric> stream = r.read(rm);

            // 是所以将这几个增强逻辑放在这里这是因为 reader 是连接 前后两种泛型的地方 RawMetric Metric
            // 出了这个地方就没有办法同时获取到两者了

            // 增强1: 如果tags不携带ip, 就将上报metric的客户端ip附在tags上
            String clientIp = rm.getHead().getClientIp();
            if (clientIp != null) {
                stream = stream.peek(m -> m.getTags().putIfAbsent("ip", clientIp));
            }

            // 增强2: 非标准上传的metrics对齐时间戳
            stream = stream.peek(m -> fixTimestamp(rm, m));

            // 增强3: 传递debugId
            String debugId = rm.getHead().getDebugId();
            if (debugId != null) {
                stream = stream.peek(m -> m.setDebugId(debugId));
            }

            // 增强4: 合并tags
            stream = stream.peek(m -> MetricImporterUtils.mergeWithExtraTags(m, rm.getExtraTags()));

            return stream;
        } catch (Exception e) {
            // 由于stream的op是lazy的, 因此同上上述操作不会抛异常, 而是等待具体的对象被处理的时候才会抛异常
            LOGGER.warn("读metric时失败", e);
            return Stream.empty();
        }
    } else {
        LOGGER.warn("找不到 {} 的reader", sourceType);
        return Stream.empty();
    }
}
 
源代码4 项目: grakn   文件: TransactionImpl.java
@Override
public Stream<ConceptMap> stream(GraqlInsert query, boolean infer, boolean explain) {
    checkMutationAllowed();
    Stream<ConceptMap> inserted = executor(infer).insert(query, explain);

    Stream<ConceptMap> explicitlyPersisted = inserted.peek(conceptMap -> {
        // mark all inferred concepts that are required for the insert for persistence explicitly
        // can avoid this potentially expensive check if there aren't any inferred concepts to start with
        if (transactionCache.anyFactsInferred()) {
            markConceptsForPersistence(conceptMap.concepts());
        }
    });

    return explicitlyPersisted;
}
 
源代码5 项目: data-prep   文件: DatasetClient.java
/**
 * Get a dataSet by id.
 * Convert metadata and records from {@link Dataset} to {@link DataSet}
 *
 * @param id the dataset to fetch
 * @param fullContent we need the full dataset or a sample (see sample limit in datset: 10k rows)
 * @param withRowValidityMarker perform a quality analysis on the dataset records
 * @param filter TQL filter for content
 */
public DataSet getDataSet(String id, boolean fullContent, boolean withRowValidityMarker, String filter) {
    DataSet dataset = new DataSet();
    // convert metadata
    Dataset metadata = dataCatalogClient.getMetadata(id);
    if (metadata == null) {
        return null;
    }
    final Schema dataSetSchema = dataCatalogClient.getDataSetSchema(id);
    DataSetMetadata dataSetMetadata = toDataSetMetadata(metadata, dataSetSchema);
    dataset.setMetadata(dataSetMetadata);

    // convert records
    final RowMetadata rowMetadata = dataSetMetadata.getRowMetadata();

    Stream<GenericRecord> dataSetContent =
            dataCatalogClient.getDataSetContent(id, limit(fullContent), dataSetSchema);
    Stream<DataSetRow> records = toDataSetRows(dataSetContent, rowMetadata);
    if (withRowValidityMarker) {
        records = records.peek(addValidity(rowMetadata.getColumns()));
    }
    if (filter != null) {
        records = records.filter(filterService.build(filter, rowMetadata));
    }
    dataset.setRecords(records);

    // DataSet specifics
    if (!fullContent) {
        dataSetMetadata
                .getContent()
                .getLimit()
                .ifPresent(limit -> dataset.setRecords(dataset.getRecords().limit(limit)));
    }
    return dataset;
}
 
源代码6 项目: data-prep   文件: FolderService.java
/**
 * Get folders. If parentId is supplied, it will be used as filter.
 *
 * @param parentId the parent folder id parameter
 * @return direct sub folders for the given id.
 */
//@formatter:off
@RequestMapping(value = "/folders", method = GET)
@ApiOperation(value = "List children folders of the parameter if null list root children.", notes = "List all child folders of the one as parameter")
@Timed
public Stream<Folder> list(@RequestParam(required = false) @ApiParam(value = "Parent id filter.") String parentId,
                               @RequestParam(defaultValue = "lastModificationDate") @ApiParam(value = "Sort key (by name or date).") Sort sort,
                               @RequestParam(defaultValue = "desc") @ApiParam(value = "Order for sort key (desc or asc).") Order order) {
//@formatter:on

    Stream<Folder> children;
    if (parentId != null) {
        if (!folderRepository.exists(parentId)) {
            throw new TDPException(FOLDER_NOT_FOUND, build().put("id", parentId));
        }
        children = folderRepository.children(parentId);
    } else {
        // This will list all folders
        children = folderRepository.searchFolders("", false);
    }

    final AtomicInteger folderCount = new AtomicInteger();

    // update the number of preparations in each children
    children = children.peek(f -> {
        final long count = folderRepository.count(f.getId(), PREPARATION);
        f.setNbPreparations(count);
        folderCount.addAndGet(1);
    });

    LOGGER.info("Found {} children for parentId: {}", folderCount.get(), parentId);

    // sort the folders
    return children.sorted(getFolderComparator(sort, order));
}
 
源代码7 项目: vertx-codegen   文件: CodeGenProcessor.java
private Collection<? extends Generator<?>> getCodeGenerators() {
  if (codeGenerators == null) {
    String outputDirectoryOption = processingEnv.getOptions().get("codegen.output");
    if (outputDirectoryOption != null) {
      outputDirectory = new File(outputDirectoryOption);
      if (!outputDirectory.exists()) {
        if (!outputDirectory.mkdirs()) {
          processingEnv.getMessager().printMessage(Diagnostic.Kind.ERROR, "Output directory " + outputDirectoryOption + " does not exist");
        }
      }
      if (!outputDirectory.isDirectory()) {
        processingEnv.getMessager().printMessage(Diagnostic.Kind.ERROR, "Output directory " + outputDirectoryOption + " is not a directory");
      }
    }
    // load GeneratorLoader by ServiceLoader
    Stream<GeneratorLoader> serviceLoader = StreamSupport.stream(ServiceLoader.load(GeneratorLoader.class, CodeGenProcessor.class.getClassLoader()).spliterator(), false);
    Stream<GeneratorLoader> loaders = Stream.of(new CheatsheetGenLoader(), new DataObjectHelperGenLoader());
    Stream<Generator<?>> generators = Stream.concat(serviceLoader, loaders).flatMap(l -> l.loadGenerators(processingEnv));
    Predicate<Generator> filter = filterGenerators();
    if (filter != null) {
      generators = generators.filter(filter);
    }
    generators = generators.peek(gen -> {
      gen.load(processingEnv);
      processingEnv.getMessager().printMessage(Diagnostic.Kind.NOTE, "Loaded " + gen.name + " code generator");
    });
    relocations = processingEnv.getOptions()
      .entrySet()
      .stream()
      .filter(e -> e.getKey().startsWith("codegen.output."))
      .collect(Collectors.toMap(
        e -> e.getKey().substring("codegen.output.".length()),
        Map.Entry::getValue)
      );

    codeGenerators = generators.collect(Collectors.toList());
  }
  return codeGenerators;
}
 
源代码8 项目: tutorials   文件: PeekUnitTest.java
@Test
void givenStringStream_whenCallingPeekOnly_thenNoElementProcessed() {
    // given
    Stream<String> nameStream = Stream.of("Alice", "Bob", "Chuck");

    // when
    nameStream.peek(out::append);

    // then
    assertThat(out.toString()).isEmpty();
}