下面列出了com.google.common.collect.Streams#concat ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Creates a string array of status values.
*
* <p>The spec indicates that OK should be listed as "active". We use the "inactive" status to
* indicate deleted objects, and as directed by the profile, the "removed" status to indicate
* redacted objects.
*/
private static ImmutableSet<RdapStatus> makeStatusValueList(
ImmutableSet<StatusValue> statusValues, boolean isRedacted, boolean isDeleted) {
Stream<RdapStatus> stream =
statusValues
.stream()
.map(status -> STATUS_TO_RDAP_STATUS_MAP.getOrDefault(status, RdapStatus.OBSCURED));
if (isRedacted) {
stream = Streams.concat(stream, Stream.of(RdapStatus.REMOVED));
}
if (isDeleted) {
stream =
Streams.concat(
stream.filter(not(RdapStatus.ACTIVE::equals)),
Stream.of(RdapStatus.INACTIVE));
}
return stream
.sorted(Ordering.natural().onResultOf(RdapStatus::getDisplayName))
.collect(toImmutableSet());
}
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
//Using Collection
Stream<Integer> streamFromCollection = Streams.stream(numbers);
//Using Iterator
Stream<Integer> streamFromIterator = Streams.stream(numbers.iterator());
//Using Iterable
Stream<Integer> streamFromIterable = Streams.stream((Iterable<Integer>) numbers);
//Using Optional
Stream<Integer> streamFromOptional = Streams.stream(Optional.of(1));
//Using OptionalLong to LongStream
LongStream streamFromOptionalLong = Streams.stream(OptionalLong.of(1));
//Using OptionalInt to IntStream
IntStream streamFromOptionalInt = Streams.stream(OptionalInt.of(1));
//Using OptionalDouble to DoubleStream
DoubleStream streamFromOptionalDouble = Streams.stream(OptionalDouble.of(1.0));
Stream<Integer> concatenatedStreams = Streams.concat(streamFromCollection, streamFromIterable, streamFromIterator);
List<Integer> integers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
//This will return 10
Optional<Integer> lastItem = Streams.findLast(integers.stream());
Streams.zip(Stream.of("candy", "chocolate", "bar"), Stream.of("$1", "$2", "$3"), (arg1, arg2) -> arg1 + ":" + arg2);
}
/** Returns Buck's classpath. */
public static ImmutableList<Path> getClasspath() throws IOException {
String classpathFromEnv = getBuckClasspathFromEnvVarOrNull();
if (classpathFromEnv != null) {
Optional<String> buckTestClasspath = getBuckTestClasspath();
Stream<Path> classpathStream =
Arrays.stream(classpathFromEnv.split(File.pathSeparator)).map(Paths::get);
if (buckTestClasspath.isPresent()) {
classpathStream =
Streams.concat(
classpathStream, readClasspaths(Paths.get(buckTestClasspath.get())).stream());
}
return classpathStream.collect(ImmutableList.toImmutableList());
}
return getBuckClasspathForIntellij();
}
@Nonnull
@VisibleForTesting
Stream<Edge> generateRules_PreOutVrf_PreOutInterfaceDisposition(
Predicate<String> includedNode,
StateExprConstructor2 preOutVrf,
StateExprConstructor2 preOutInterfaceDeliveredToSubnet,
StateExprConstructor2 preOutInterfaceExitsNetwork,
StateExprConstructor2 preOutInterfaceInsufficientInfo,
StateExprConstructor2 preOutInterfaceNeighborUnreachable) {
return Streams.concat(
generateRules_PreOutVrf_PreOutInterfaceDisposition(
includedNode, _deliveredToSubnetBDDs, preOutVrf, preOutInterfaceDeliveredToSubnet),
generateRules_PreOutVrf_PreOutInterfaceDisposition(
includedNode, _exitsNetworkBDDs, preOutVrf, preOutInterfaceExitsNetwork),
generateRules_PreOutVrf_PreOutInterfaceDisposition(
includedNode, _insufficientInfoBDDs, preOutVrf, preOutInterfaceInsufficientInfo),
generateRules_PreOutVrf_PreOutInterfaceDisposition(
includedNode, _neighborUnreachableBDDs, preOutVrf, preOutInterfaceNeighborUnreachable));
}
private Stream<Edge> generateEdges() {
return Streams.concat(
generateRules_PreInInterface_NodeDropAclIn(),
generateRules_PreInInterface_PostInInterface(),
generateRules_PreInInterface_PbrFibLookup(),
generateRules_PostInInterface_NodeDropAclIn(),
generateRules_PostInInterface_PostInVrf(),
generateRules_PbrFibLookup_InterfaceAccept(),
generateRules_PbrFibLookup_PreOutVrf(),
generateRules_PreOutEdge_NodeDropAclOut(),
generateRules_PreOutEdge_PreOutEdgePostNat(),
generateRules_PreOutEdgePostNat_NodeDropAclOut(),
generateRules_PreOutEdgePostNat_PreInInterface(),
generateRules_PreOutInterfaceDisposition_NodeInterfaceDisposition(),
generateRules_PreOutInterfaceDisposition_NodeDropAclOut(),
generateRules_VrfAccept_NodeAccept(),
generateFibRules());
}
private @NotNull Stream<Interceptor> streamAllInterceptors() {
return Streams.concat(
publishInbounds.stream(), publishOutbounds.stream(),
pubackInbounds.stream(), pubackOutbounds.stream(),
pubrecInbounds.stream(), pubrecOutbounds.stream(),
pubrelInbounds.stream(), pubrelOutbounds.stream(),
pubcompInbounds.stream(), pubcompOutbounds.stream(),
subscribeInbounds.stream(), subackOutbounds.stream(),
unsubscribeInbounds.stream(), unsubackOutbounds.stream(),
disconnectInbounds.stream(), disconnectOutbounds.stream(),
pingReqInbounds.stream(), pingRespOutbounds.stream());
}
private static Stream<HashCode> testHashes() {
// Hash codes of zeros of various length
Stream<HashCode> zeroHashCodes = IntStream.of(1, 2, 16, 32)
.mapToObj(byte[]::new)
.map(HashCode::fromBytes);
// Non-zero 32-byte SHA-256 hash codes
Stream<HashCode> sha256HashCodes = Stream.of(
"",
"a",
"hello"
).map(s -> Hashing.sha256().hashString(s, StandardCharsets.UTF_8));
return Streams.concat(zeroHashCodes, sha256HashCodes);
}
@Test
public void concatStreamsOfTypeLongStream() {
LongStream combinedStreams = Streams.concat(LongStream.range(1, 21), LongStream.range(21, 40));
assertNotNull(combinedStreams);
assertStreamEquals(combinedStreams, LongStream.range(1, 40));
}
@Test
public void concatStreamsOfTypeIntStream() {
IntStream combinedStreams = Streams.concat(IntStream.range(1, 20), IntStream.range(21, 40));
assertNotNull(combinedStreams);
assertStreamEquals(combinedStreams, IntStream.concat(IntStream.range(1, 20), IntStream.range(21, 40)));
}
protected Stream<AccessConstraint> getConstraints(MetaClass metaClass) {
UserSession userSession = userSessionSource.getUserSession();
MetaClass mainMetaClass = extendedEntities.getOriginalOrThisMetaClass(metaClass);
ConstraintsContainer setOfConstraints = userSession.getConstraints();
Stream<AccessConstraint> constraints = setOfConstraints.findConstraintsByEntity(mainMetaClass.getName());
for (MetaClass parent : mainMetaClass.getAncestors()) {
constraints = Streams.concat(constraints, setOfConstraints.findConstraintsByEntity(parent.getName()));
}
return constraints;
}
/**
* Collects the NAT rules from this Vsys and merges the common pre-/post-rulebases from Panorama.
*/
@SuppressWarnings("PMD.CloseResource") // PMD has a bug for this pattern.
private Stream<NatRule> getAllNatRules(Vsys vsys) {
Stream<NatRule> pre =
_panorama == null
? Stream.of()
: _panorama.getPreRulebase().getNatRules().values().stream();
Stream<NatRule> post =
_panorama == null
? Stream.of()
: _panorama.getPostRulebase().getNatRules().values().stream();
Stream<NatRule> rules = vsys.getRulebase().getNatRules().values().stream();
return Streams.concat(pre, rules, post);
}
/**
* Creates a stream of map entries whose elements are those of the first stream followed by those of the second
* stream.
*
* @param a the first stream of entries
* @param b the second stream of entries
* @param <K> the key type
* @param <V> the value type
* @return the concatenation of the two input streams
*/
public static <K, V> MapStream<K, V> concat(
MapStream<? extends K, ? extends V> a,
MapStream<? extends K, ? extends V> b) {
@SuppressWarnings("unchecked")
MapStream<K, V> kvMapStream = new MapStream<>(Streams.concat(
(Stream<? extends Map.Entry<K, V>>) a,
(Stream<? extends Map.Entry<K, V>>) b));
return kvMapStream;
}
/** Generate edges to each disposition. Depends on final nodes. */
private Stream<Edge> generateDispositionEdges(Set<String> finalNodes) {
return Streams.concat(
generateRules_NodeAccept_Accept(finalNodes),
generateRules_NodeDropAclIn_DropAclIn(finalNodes),
generateRules_NodeDropAclOut_DropAclOut(finalNodes),
generateRules_NodeDropNoRoute_DropNoRoute(finalNodes),
generateRules_NodeDropNullRoute_DropNullRoute(finalNodes),
generateRules_NodeInterfaceDeliveredToSubnet_DeliveredToSubnet(finalNodes),
generateRules_NodeInterfaceExitsNetwork_ExitsNetwork(finalNodes),
generateRules_NodeInterfaceInsufficientInfo_InsufficientInfo(finalNodes),
generateRules_NodeInterfaceNeighborUnreachable_NeighborUnreachable(finalNodes));
}
private BDDReachabilityAndLoopDetectionAnalysis bddReachabilityAndLoopDetectionAnalysis(
IpSpaceAssignment srcIpSpaceAssignment,
AclLineMatchExpr initialHeaderSpace,
Set<String> forbiddenTransitNodes,
Set<String> requiredTransitNodes,
Set<String> finalNodes,
Set<FlowDisposition> actions) {
BDD initialHeaderSpaceBdd = computeInitialHeaderSpaceBdd(initialHeaderSpace);
BDD finalHeaderSpaceBdd = computeFinalHeaderSpaceBdd(initialHeaderSpaceBdd);
Map<StateExpr, BDD> roots = rootConstraints(srcIpSpaceAssignment, initialHeaderSpaceBdd, false);
List<Edge> sharedEdges =
Stream.concat(generateEdges(), generateRootEdges(roots)).collect(Collectors.toList());
Stream<Edge> reachabilityEdges =
Streams.concat(
sharedEdges.stream(),
generateDispositionEdges(finalNodes),
generateQueryEdges(actions));
reachabilityEdges = instrumentForbiddenTransitNodes(forbiddenTransitNodes, reachabilityEdges);
reachabilityEdges = instrumentRequiredTransitNodes(requiredTransitNodes, reachabilityEdges);
BDDLoopDetectionAnalysis loopDetectionAnalysis =
new BDDLoopDetectionAnalysis(_bddPacket, sharedEdges.stream(), roots.keySet());
BDDReachabilityAnalysis reachabilityAnalysis =
new BDDReachabilityAnalysis(
_bddPacket, roots.keySet(), reachabilityEdges, finalHeaderSpaceBdd);
return new BDDReachabilityAndLoopDetectionAnalysis(reachabilityAnalysis, loopDetectionAnalysis);
}
public static Stream<Edge> sessionInstrumentation(
BDDPacket bddPacket,
Map<String, Configuration> configs,
Map<String, BDDSourceManager> srcMgrs,
LastHopOutgoingInterfaceManager lastHopMgr,
Map<String, BDDOutgoingOriginalFlowFilterManager> outgoingOriginalFlowFilterMgrs,
Map<String, Map<String, Supplier<BDD>>> filterBdds,
Stream<Edge> originalEdges,
Map<String, List<BDDFirewallSessionTraceInfo>> initializedSessions,
BDDFibGenerator bddFibGenerator) {
SessionInstrumentation instrumentation =
new SessionInstrumentation(
bddPacket, configs, srcMgrs, lastHopMgr, outgoingOriginalFlowFilterMgrs, filterBdds);
Stream<Edge> newEdges = instrumentation.computeNewEdges(initializedSessions);
/* Instrument the original graph by adding an additional constraint to the out-edges from
* PreInInterface. Those edges are for non-session flows, and the constraint ensures only
* non-session flows can traverse those edges.
*/
Stream<Edge> instrumentedEdges =
constrainOutEdges(
originalEdges, computeNonSessionOutEdgeConstraints(initializedSessions, configs));
Stream<Edge> sessionFibLookupEdges =
computeSessionFibLookupSubgraph(initializedSessions, bddFibGenerator);
return Streams.concat(newEdges, instrumentedEdges, sessionFibLookupEdges);
}
@Nonnull
private Stream<Edge> computeNewEdges(BDDFirewallSessionTraceInfo sessionInfo) {
return Streams.concat(
computeNewSuccessEdges(sessionInfo),
nodeDropAclInEdges(sessionInfo),
nodeDropAclOutEdges(sessionInfo),
fibLookupSessionEdges(sessionInfo));
}
@Test
public void concatStreamsOfSameType() {
List<Integer> oddNumbers = Arrays
.asList(1, 3, 5, 7, 9, 11, 13, 15, 17, 19);
List<Integer> evenNumbers = Arrays
.asList(2, 4, 6, 8, 10, 12, 14, 16, 18, 20);
Stream<Integer> combinedStreams = Streams.concat(oddNumbers.stream(), evenNumbers.stream());
//Assert.assertNotNull(combinedStreams);
assertStreamEquals(combinedStreams, Stream.concat(oddNumbers.stream(), evenNumbers.stream()));
}
private Stream<Object> search(EntityType entityType, Query<Entity> q, int offset, int pageSize) {
QueryBuilder query = contentGenerators.createQuery(q, entityType);
Sort sort = q.getSort() != null ? contentGenerators.createSorts(q.getSort(), entityType) : null;
Index index = contentGenerators.createIndex(entityType);
Stream<SearchHit> searchHits = Stream.empty();
boolean done = false;
int currentOffset;
int i = 0;
while (!done) {
int batchSize = pageSize < MAX_BATCH_SIZE && pageSize != 0 ? pageSize : MAX_BATCH_SIZE;
currentOffset = offset + (i * MAX_BATCH_SIZE);
SearchHits currentSearchHits =
clientFacade.search(query, currentOffset, batchSize, sort, index);
searchHits = Streams.concat(searchHits, currentSearchHits.getHits().stream());
if (currentSearchHits.getHits().size() < MAX_BATCH_SIZE) {
done = true;
}
if (pageSize != 0) pageSize -= MAX_BATCH_SIZE;
i++;
}
return toEntityIds(entityType, searchHits.map(SearchHit::getId));
}
@Override
public void create(Environment environment) {
final Config config = environment.config();
final String schedulerServiceBaseUrl = get(config, config::getString, SCHEDULER_SERVICE_BASE_URL)
.orElse(DEFAULT_SCHEDULER_SERVICE_BASE_URL);
final Duration runningStateTtl = get(config, config::getString, STYX_RUNNING_STATE_TTL_CONFIG)
.map(Duration::parse)
.orElse(DEFAULT_STYX_RUNNING_STATE_TTL);
var secretWhitelist =
get(config, config::getStringList, STYX_SECRET_WHITELIST).map(Set::copyOf).orElse(Set.of());
final Stats stats = statsFactory.apply(environment);
final Storage storage = MeteredStorageProxy.instrument(storageFactory.apply(environment, stats), stats, time);
final BiConsumer<Optional<Workflow>, Optional<Workflow>> workflowConsumer =
workflowConsumerFactory.apply(environment, stats);
// N.B. if we need to forward a request to scheduler that behind an nginx, we CAN NOT
// use rc.requestScopedClient() and at the same time inherit all headers from original
// request, because request scoped client would add Authorization header again which
// results duplicated headers, and that would make nginx unhappy. This has been fixed
// in later Apollo version.
final ServiceAccountUsageAuthorizer serviceAccountUsageAuthorizer =
serviceAccountUsageAuthorizerFactory.apply(config, serviceName);
final WorkflowActionAuthorizer workflowActionAuthorizer =
new WorkflowActionAuthorizer(storage, serviceAccountUsageAuthorizer);
var workflowValidator = new ExtendedWorkflowValidator(
new BasicWorkflowValidator(new DockerImageValidator()), runningStateTtl, secretWhitelist);
final WorkflowResource workflowResource = new WorkflowResource(storage, workflowValidator,
new WorkflowInitializer(storage, time), workflowConsumer, workflowActionAuthorizer);
final BackfillResource backfillResource = new BackfillResource(schedulerServiceBaseUrl, storage,
workflowValidator, time, workflowActionAuthorizer);
final StatusResource statusResource = new StatusResource(storage, serviceAccountUsageAuthorizer);
environment.closer().register(backfillResource);
environment.closer().register(statusResource);
final ResourceResource resourceResource = new ResourceResource(storage);
final SchedulerProxyResource schedulerProxyResource = new SchedulerProxyResource(
schedulerServiceBaseUrl, environment.client());
final Supplier<StyxConfig> configSupplier =
new CachedSupplier<>(storage::config, Instant::now);
final Supplier<Set<String>> clientBlacklistSupplier =
() -> configSupplier.get().clientBlacklist();
final RequestAuthenticator requestAuthenticator = new RequestAuthenticator(authenticatorFactory.apply(
AuthenticatorConfiguration.fromConfig(config, serviceName)));
final Stream<Route<AsyncHandler<Response<ByteString>>>> routes = Streams.concat(
workflowResource.routes(requestAuthenticator),
backfillResource.routes(requestAuthenticator),
resourceResource.routes(),
statusResource.routes(),
schedulerProxyResource.routes()
);
environment.routingEngine()
.registerAutoRoute(Route.sync("GET", "/ping", rc -> "pong"))
.registerRoutes(Api.withCommonMiddleware(routes, clientBlacklistSupplier,
requestAuthenticator, serviceName));
}
public static Stream concatStreams(Stream stream1, Stream stream2, Stream stream3) {
return Streams.concat(stream1, stream2, stream3);
}