下面列出了java.util.stream.Stream#peek ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public Stream<ConceptMap> get(GraqlGet query, boolean explain) {
//NB: we need distinct as projection can produce duplicates
Stream<ConceptMap> answers = match(query.match())
.map(ans -> ans.project(query.vars()))
.distinct();
answers = filter(query, answers);
if (explain) {
// record the explanations if the user indicated they will retrieved them
answers = answers.peek(answer -> explanationCache.record(answer, answer.explanation()));
} else {
// null out the explanations if the user does not want the explanation
answers = answers.map(answer -> new ConceptMap(answer.map(), null, answer.getPattern()));
}
return answers;
}
/**
* Search for folders.
*
* @param name the folder name to search.
* @param strict strict mode means the name is the full name.
* @return the folders whose part of their name match the given path.
*/
@RequestMapping(value = "/folders/search", method = GET)
@ApiOperation(value = "Search Folders with parameter as part of the name")
@Timed
public Stream<Folder> search(@RequestParam(required = false, defaultValue = "") final String name,
@RequestParam(required = false, defaultValue = "false") final Boolean strict,
@RequestParam(required = false) final String path) {
Stream<Folder> folders;
if (path == null) {
folders = folderRepository.searchFolders(name, strict);
} else {
folders = folderRepository.searchFolders(name, strict).filter(f -> f.getPath().equals(path));
}
AtomicInteger foldersFound = new AtomicInteger(0);
folders = folders.peek(folder -> {
folder.setNbPreparations(folderRepository.count(folder.getId(), PREPARATION));
foldersFound.incrementAndGet();
});
LOGGER.info("Found {} folder(s) searching for {}", foldersFound, name);
return folders;
}
@Override
public Stream<Metric> read(RawMetric rm) {
SourceType sourceType = rm.getSourceType();
Reader<RawMetric, Metric> r = getReader(sourceType);
if (r != null) {
try {
Stream<Metric> stream = r.read(rm);
// 是所以将这几个增强逻辑放在这里这是因为 reader 是连接 前后两种泛型的地方 RawMetric Metric
// 出了这个地方就没有办法同时获取到两者了
// 增强1: 如果tags不携带ip, 就将上报metric的客户端ip附在tags上
String clientIp = rm.getHead().getClientIp();
if (clientIp != null) {
stream = stream.peek(m -> m.getTags().putIfAbsent("ip", clientIp));
}
// 增强2: 非标准上传的metrics对齐时间戳
stream = stream.peek(m -> fixTimestamp(rm, m));
// 增强3: 传递debugId
String debugId = rm.getHead().getDebugId();
if (debugId != null) {
stream = stream.peek(m -> m.setDebugId(debugId));
}
// 增强4: 合并tags
stream = stream.peek(m -> MetricImporterUtils.mergeWithExtraTags(m, rm.getExtraTags()));
return stream;
} catch (Exception e) {
// 由于stream的op是lazy的, 因此同上上述操作不会抛异常, 而是等待具体的对象被处理的时候才会抛异常
LOGGER.warn("读metric时失败", e);
return Stream.empty();
}
} else {
LOGGER.warn("找不到 {} 的reader", sourceType);
return Stream.empty();
}
}
@Override
public Stream<ConceptMap> stream(GraqlInsert query, boolean infer, boolean explain) {
checkMutationAllowed();
Stream<ConceptMap> inserted = executor(infer).insert(query, explain);
Stream<ConceptMap> explicitlyPersisted = inserted.peek(conceptMap -> {
// mark all inferred concepts that are required for the insert for persistence explicitly
// can avoid this potentially expensive check if there aren't any inferred concepts to start with
if (transactionCache.anyFactsInferred()) {
markConceptsForPersistence(conceptMap.concepts());
}
});
return explicitlyPersisted;
}
/**
* Get a dataSet by id.
* Convert metadata and records from {@link Dataset} to {@link DataSet}
*
* @param id the dataset to fetch
* @param fullContent we need the full dataset or a sample (see sample limit in datset: 10k rows)
* @param withRowValidityMarker perform a quality analysis on the dataset records
* @param filter TQL filter for content
*/
public DataSet getDataSet(String id, boolean fullContent, boolean withRowValidityMarker, String filter) {
DataSet dataset = new DataSet();
// convert metadata
Dataset metadata = dataCatalogClient.getMetadata(id);
if (metadata == null) {
return null;
}
final Schema dataSetSchema = dataCatalogClient.getDataSetSchema(id);
DataSetMetadata dataSetMetadata = toDataSetMetadata(metadata, dataSetSchema);
dataset.setMetadata(dataSetMetadata);
// convert records
final RowMetadata rowMetadata = dataSetMetadata.getRowMetadata();
Stream<GenericRecord> dataSetContent =
dataCatalogClient.getDataSetContent(id, limit(fullContent), dataSetSchema);
Stream<DataSetRow> records = toDataSetRows(dataSetContent, rowMetadata);
if (withRowValidityMarker) {
records = records.peek(addValidity(rowMetadata.getColumns()));
}
if (filter != null) {
records = records.filter(filterService.build(filter, rowMetadata));
}
dataset.setRecords(records);
// DataSet specifics
if (!fullContent) {
dataSetMetadata
.getContent()
.getLimit()
.ifPresent(limit -> dataset.setRecords(dataset.getRecords().limit(limit)));
}
return dataset;
}
/**
* Get folders. If parentId is supplied, it will be used as filter.
*
* @param parentId the parent folder id parameter
* @return direct sub folders for the given id.
*/
//@formatter:off
@RequestMapping(value = "/folders", method = GET)
@ApiOperation(value = "List children folders of the parameter if null list root children.", notes = "List all child folders of the one as parameter")
@Timed
public Stream<Folder> list(@RequestParam(required = false) @ApiParam(value = "Parent id filter.") String parentId,
@RequestParam(defaultValue = "lastModificationDate") @ApiParam(value = "Sort key (by name or date).") Sort sort,
@RequestParam(defaultValue = "desc") @ApiParam(value = "Order for sort key (desc or asc).") Order order) {
//@formatter:on
Stream<Folder> children;
if (parentId != null) {
if (!folderRepository.exists(parentId)) {
throw new TDPException(FOLDER_NOT_FOUND, build().put("id", parentId));
}
children = folderRepository.children(parentId);
} else {
// This will list all folders
children = folderRepository.searchFolders("", false);
}
final AtomicInteger folderCount = new AtomicInteger();
// update the number of preparations in each children
children = children.peek(f -> {
final long count = folderRepository.count(f.getId(), PREPARATION);
f.setNbPreparations(count);
folderCount.addAndGet(1);
});
LOGGER.info("Found {} children for parentId: {}", folderCount.get(), parentId);
// sort the folders
return children.sorted(getFolderComparator(sort, order));
}
private Collection<? extends Generator<?>> getCodeGenerators() {
if (codeGenerators == null) {
String outputDirectoryOption = processingEnv.getOptions().get("codegen.output");
if (outputDirectoryOption != null) {
outputDirectory = new File(outputDirectoryOption);
if (!outputDirectory.exists()) {
if (!outputDirectory.mkdirs()) {
processingEnv.getMessager().printMessage(Diagnostic.Kind.ERROR, "Output directory " + outputDirectoryOption + " does not exist");
}
}
if (!outputDirectory.isDirectory()) {
processingEnv.getMessager().printMessage(Diagnostic.Kind.ERROR, "Output directory " + outputDirectoryOption + " is not a directory");
}
}
// load GeneratorLoader by ServiceLoader
Stream<GeneratorLoader> serviceLoader = StreamSupport.stream(ServiceLoader.load(GeneratorLoader.class, CodeGenProcessor.class.getClassLoader()).spliterator(), false);
Stream<GeneratorLoader> loaders = Stream.of(new CheatsheetGenLoader(), new DataObjectHelperGenLoader());
Stream<Generator<?>> generators = Stream.concat(serviceLoader, loaders).flatMap(l -> l.loadGenerators(processingEnv));
Predicate<Generator> filter = filterGenerators();
if (filter != null) {
generators = generators.filter(filter);
}
generators = generators.peek(gen -> {
gen.load(processingEnv);
processingEnv.getMessager().printMessage(Diagnostic.Kind.NOTE, "Loaded " + gen.name + " code generator");
});
relocations = processingEnv.getOptions()
.entrySet()
.stream()
.filter(e -> e.getKey().startsWith("codegen.output."))
.collect(Collectors.toMap(
e -> e.getKey().substring("codegen.output.".length()),
Map.Entry::getValue)
);
codeGenerators = generators.collect(Collectors.toList());
}
return codeGenerators;
}
@Test
void givenStringStream_whenCallingPeekOnly_thenNoElementProcessed() {
// given
Stream<String> nameStream = Stream.of("Alice", "Bob", "Chuck");
// when
nameStream.peek(out::append);
// then
assertThat(out.toString()).isEmpty();
}