下面列出了java.util.HashSet#parallelStream ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Geospatial query
*
* @param key should be indexed by 2dsphere
* db.vertices.createIndex({"urn:oliot:ubv:mda:gps" : "2dsphere"})
* @param lon
* @param lat
* @param radius in metres db.vertices.find({ "urn:oliot:ubv:mda:gps" : { $near
* : { $geometry: { type: "Point", coordinates: [ -1.1673,52.93]},
* $maxDistance: 50000}}})
*
* @return
*/
public Stream<ChronoVertex> getChronoVertexStream(String key, double lon, double lat, double radius) {
HashSet<ChronoVertex> ret = new HashSet<ChronoVertex>();
BsonArray coordinates = new BsonArray();
coordinates.add(new BsonDouble(lon));
coordinates.add(new BsonDouble(lat));
BsonDocument geometry = new BsonDocument();
geometry.put("type", new BsonString("Point"));
geometry.put("coordinates", coordinates);
BsonDocument near = new BsonDocument();
near.put("$geometry", geometry);
near.put("$maxDistance", new BsonDouble(radius));
BsonDocument geoquery = new BsonDocument();
geoquery.put("$near", near);
BsonDocument queryDoc = new BsonDocument();
queryDoc.put(key, geoquery);
MongoCursor<BsonDocument> cursor = vertices.find(queryDoc).projection(Tokens.PRJ_ONLY_ID).iterator();
while (cursor.hasNext()) {
BsonDocument v = cursor.next();
ret.add(new ChronoVertex(v.getString(Tokens.ID).getValue(), this));
}
return ret.parallelStream();
}
public Stream<ChronoVertex> getChronoVertexStream(String key, Object value, boolean isParallel) {
ElementHelper.validateProperty(null, key, value);
HashSet<ChronoVertex> ret = new HashSet<ChronoVertex>();
MongoCursor<BsonDocument> cursor = vertices
.find(Tokens.FLT_VERTEX_FIELD_NOT_INCLUDED.append(key, (BsonValue) value))
.projection(Tokens.PRJ_ONLY_ID).iterator();
while (cursor.hasNext()) {
BsonDocument v = cursor.next();
ret.add(new ChronoVertex(v.getString(Tokens.ID).getValue(), this));
}
if (isParallel)
return ret.parallelStream();
else
return ret.stream();
}
/**
* Return an iterable to all the edges in the graph. If this is not possible for
* the implementation, then an UnsupportedOperationException can be thrown.
*
* @return
*
* @return an iterable reference to all edges in the graph
*/
public Stream<ChronoEdge> getChronoEdgeStream(boolean isParallel) {
HashSet<ChronoEdge> ret = new HashSet<ChronoEdge>();
MongoCursor<BsonDocument> cursor = edges.find().projection(Tokens.PRJ_ONLY_OUTV_LABEL_INV).iterator();
while (cursor.hasNext()) {
BsonDocument v = cursor.next();
String outV = v.getString(Tokens.OUT_VERTEX).getValue();
String label = v.getString(Tokens.LABEL).getValue();
String inV = v.getString(Tokens.IN_VERTEX).getValue();
String id = outV + "|" + label + "|" + inV;
ret.add(new ChronoEdge(id, outV, inV, label, this));
}
if (isParallel)
return ret.parallelStream();
else
return ret.stream();
}
public Stream<ChronoEdge> getChronoEdgeStream(String key, Object value, boolean isParallel) {
ElementHelper.validateProperty(null, key, value);
HashSet<ChronoEdge> ret = new HashSet<ChronoEdge>();
MongoCursor<BsonDocument> cursor = edges.find(Tokens.FLT_EDGE_FIELD_NOT_INCLUDED.append(key, (BsonValue) value))
.projection(Tokens.PRJ_ONLY_ID).iterator();
while (cursor.hasNext()) {
BsonDocument v = cursor.next();
ret.add(new ChronoEdge(v.getString(Tokens.ID).getValue(), this));
}
if (isParallel)
return ret.parallelStream();
else
return ret.stream();
}
private Stream<ChronoEdge> getInChronoEdgeStream(final BsonArray labels, final int branchFactor,
final boolean setParallel) {
HashSet<ChronoEdge> edgeSet = new HashSet<ChronoEdge>();
BsonDocument filter = new BsonDocument();
BsonDocument inner = new BsonDocument();
filter.put(Tokens.IN_VERTEX, new BsonString(this.toString()));
if (labels != null && labels.size() != 0) {
inner.put(Tokens.FC.$in.toString(), labels);
filter.put(Tokens.LABEL, inner);
}
Iterator<BsonDocument> it = null;
if (branchFactor == Integer.MAX_VALUE)
it = graph.getEdgeCollection().find(filter).projection(Tokens.PRJ_ONLY_ID).iterator();
else
it = graph.getEdgeCollection().find(filter).projection(Tokens.PRJ_ONLY_ID).limit(branchFactor).iterator();
while (it.hasNext()) {
BsonDocument d = it.next();
edgeSet.add(new ChronoEdge(d.getString(Tokens.ID).getValue(), this.graph));
}
if (setParallel)
return edgeSet.parallelStream();
else
return edgeSet.stream();
}
private Stream<ChronoVertex> getOutChronoVertexStream(BsonArray labels, final int branchFactor,
final boolean setParallel) {
HashSet<ChronoVertex> vertexSet = new HashSet<ChronoVertex>();
BsonDocument filter = new BsonDocument();
BsonDocument inner = new BsonDocument();
filter.put(Tokens.OUT_VERTEX, new BsonString(this.toString()));
if (labels != null && labels.size() != 0) {
inner.put(Tokens.FC.$in.toString(), labels);
filter.put(Tokens.LABEL, inner);
}
Iterator<BsonDocument> it = null;
if (branchFactor == Integer.MAX_VALUE)
it = graph.getEdgeCollection().find(filter).projection(Tokens.PRJ_ONLY_ID).iterator();
else
it = graph.getEdgeCollection().find(filter).projection(Tokens.PRJ_ONLY_ID).limit(branchFactor).iterator();
while (it.hasNext()) {
BsonDocument d = it.next();
vertexSet.add(new ChronoVertex(d.getString(Tokens.ID).getValue().split("\\|")[2], this.graph));
}
if (setParallel)
return vertexSet.parallelStream();
else
return vertexSet.stream();
}
private Stream<ChronoVertex> getInChronoVertexStream(final BsonArray labels, final int branchFactor,
final boolean setParallel) {
HashSet<ChronoVertex> vertexSet = new HashSet<ChronoVertex>();
BsonDocument filter = new BsonDocument();
BsonDocument inner = new BsonDocument();
filter.put(Tokens.IN_VERTEX, new BsonString(this.toString()));
if (labels != null && labels.size() != 0) {
inner.put(Tokens.FC.$in.toString(), labels);
filter.put(Tokens.LABEL, inner);
}
Iterator<BsonDocument> it = null;
if (branchFactor == Integer.MAX_VALUE)
it = graph.getEdgeCollection().find(filter).projection(Tokens.PRJ_ONLY_ID).iterator();
else
it = graph.getEdgeCollection().find(filter).projection(Tokens.PRJ_ONLY_ID).limit(branchFactor).iterator();
while (it.hasNext()) {
BsonDocument d = it.next();
vertexSet.add(new ChronoVertex(d.getString(Tokens.ID).getValue().split("\\|")[0], this.graph));
}
if (setParallel)
return vertexSet.parallelStream();
else
return vertexSet.stream();
}
public Stream<ChronoVertex> getChronoVertexStream(String key, Object value) {
ElementHelper.validateProperty(null, key, value);
HashSet<ChronoVertex> ret = new HashSet<ChronoVertex>();
MongoCursor<BsonDocument> cursor = vertices
.find(Tokens.FLT_VERTEX_FIELD_NOT_INCLUDED.append(key, (BsonValue) value))
.projection(Tokens.PRJ_ONLY_ID).iterator();
while (cursor.hasNext()) {
BsonDocument v = cursor.next();
ret.add(new ChronoVertex(v.getString(Tokens.ID).getValue(), this));
}
return ret.parallelStream();
}
public Stream<ChronoEdge> getChronoEdgeStream(String key, Object value) {
ElementHelper.validateProperty(null, key, value);
HashSet<ChronoEdge> ret = new HashSet<ChronoEdge>();
MongoCursor<BsonDocument> cursor = edges.find(Tokens.FLT_EDGE_FIELD_NOT_INCLUDED.append(key, (BsonValue) value))
.projection(Tokens.PRJ_ONLY_ID).iterator();
while (cursor.hasNext()) {
BsonDocument v = cursor.next();
ret.add(new ChronoEdge(v.getString(Tokens.ID).getValue(), this));
}
return ret.parallelStream();
}
public void func1(final MongoCollection collection, final Object starts, final boolean setParallel,
final boolean setPathEnabled, final Class elementClass) {
currentPath = new HashMap<Object, Object>();
if (starts instanceof ChronoGraph || starts instanceof ChronoVertex || starts instanceof ChronoEdge
|| starts instanceof VertexEvent || starts instanceof EdgeEvent || starts instanceof EPCTime) {
HashSet set = new HashSet();
set.add(starts);
if (setParallel == true)
stream = set.parallelStream();
else
stream = set.stream();
this.elementClass = starts.getClass();
if (setPathEnabled) {
HashSet initPathSet = new HashSet();
List list = new ArrayList();
list.add(starts);
initPathSet.add(list);
currentPath.put(starts, initPathSet);
}
}
stepList = new ArrayList<Step>();
stepIndex = new HashMap<String, Integer>();
this.isPathEnabled = setPathEnabled;
this.isParallel = setParallel;
this.loopCount = 0;
listElementClass = null;
this.collection = collection;
}
private NaiveTraversalEngine(final MongoCollection collection, final Object starts, final boolean setParallel,
final int loopCount, final boolean setPathEnabled, final Class elementClass, final Class listElementClass,
Map currentPath) {
// previousPath = new HashMap<Object, Object>();
// if (currentPath != null)
// this.currentPath = new HashMap(currentPath);
// else
this.currentPath = currentPath;
// Initialize Stream and Path
if (starts instanceof Stream) {
this.elementClass = elementClass;
// if (setPathEnabled) {
// if (setParallel) {
// stream = ((Set) ((Stream) starts).map(element -> {
// HashSet initPathSet = new HashSet();
// List list = new ArrayList();
// list.add(element);
// initPathSet.add(list);
// this.currentPath.put(element, initPathSet);
// return element;
// }).collect(Collectors.toSet())).parallelStream();
// } else {
// stream = ((Set) ((Stream) starts).map(element -> {
// HashSet initPathSet = new HashSet();
// List list = new ArrayList();
// list.add(element);
// initPathSet.add(list);
// this.currentPath.put(element, initPathSet);
// return element;
// }).collect(Collectors.toSet())).stream();
// }
// } else {
stream = (Stream) starts;
// }
} else if (starts instanceof Collection) {
this.elementClass = listElementClass;
// if (setPathEnabled) {
// if (setParallel) {
// stream = ((Set) ((Collection) starts).parallelStream().map(element -> {
// HashSet initPathSet = new HashSet();
// List list = new ArrayList();
// list.add(element);
// initPathSet.add(list);
// this.currentPath.put(element, initPathSet);
// return element;
// }).collect(Collectors.toSet())).parallelStream();
// } else {
// stream = ((Set) ((Collection) starts).parallelStream().map(element -> {
// HashSet initPathSet = new HashSet();
// List list = new ArrayList();
// list.add(element);
// initPathSet.add(list);
// this.currentPath.put(element, initPathSet);
// return element;
// }).collect(Collectors.toSet())).stream();
// }
// } else {
if (setParallel) {
stream = ((Collection) starts).parallelStream();
} else {
stream = ((Collection) starts).stream();
}
// }
} else if (starts instanceof ChronoGraph || starts instanceof ChronoVertex || starts instanceof ChronoEdge
|| starts instanceof VertexEvent || starts instanceof EdgeEvent || starts instanceof EPCTime) {
HashSet set = new HashSet();
set.add(starts);
if (setParallel == true)
stream = set.parallelStream();
else
stream = set.stream();
this.elementClass = starts.getClass();
// if (setPathEnabled) {
// HashSet initPathSet = new HashSet();
// List list = new ArrayList();
// list.add(starts);
// initPathSet.add(list);
// this.currentPath.put(starts, initPathSet);
// }
}
stepList = new ArrayList<Step>();
stepIndex = new HashMap<String, Integer>();
this.loopCount = loopCount;
this.isPathEnabled = setPathEnabled;
this.isParallel = setParallel;
this.listElementClass = null;
this.collection = collection;
}
private ExternalTraversalEngine(final MongoCollection collection, final Object starts, final boolean setParallel,
final int loopCount, final boolean setPathEnabled, final Class elementClass, final Class listElementClass,
Map currentPath) {
// previousPath = new HashMap<Object, Object>();
// if (currentPath != null)
// this.currentPath = new HashMap(currentPath);
// else
this.currentPath = currentPath;
// Initialize Stream and Path
if (starts instanceof Stream) {
this.elementClass = elementClass;
// if (setPathEnabled) {
// if (setParallel) {
// stream = ((Set) ((Stream) starts).map(element -> {
// HashSet initPathSet = new HashSet();
// List list = new ArrayList();
// list.add(element);
// initPathSet.add(list);
// this.currentPath.put(element, initPathSet);
// return element;
// }).collect(Collectors.toSet())).parallelStream();
// } else {
// stream = ((Set) ((Stream) starts).map(element -> {
// HashSet initPathSet = new HashSet();
// List list = new ArrayList();
// list.add(element);
// initPathSet.add(list);
// this.currentPath.put(element, initPathSet);
// return element;
// }).collect(Collectors.toSet())).stream();
// }
// } else {
stream = (Stream) starts;
// }
} else if (starts instanceof Collection) {
this.elementClass = listElementClass;
// if (setPathEnabled) {
// if (setParallel) {
// stream = ((Set) ((Collection) starts).parallelStream().map(element -> {
// HashSet initPathSet = new HashSet();
// List list = new ArrayList();
// list.add(element);
// initPathSet.add(list);
// this.currentPath.put(element, initPathSet);
// return element;
// }).collect(Collectors.toSet())).parallelStream();
// } else {
// stream = ((Set) ((Collection) starts).parallelStream().map(element -> {
// HashSet initPathSet = new HashSet();
// List list = new ArrayList();
// list.add(element);
// initPathSet.add(list);
// this.currentPath.put(element, initPathSet);
// return element;
// }).collect(Collectors.toSet())).stream();
// }
// } else {
if (setParallel) {
stream = ((Collection) starts).parallelStream();
} else {
stream = ((Collection) starts).stream();
}
// }
} else if (starts instanceof ChronoGraph || starts instanceof ChronoVertex || starts instanceof ChronoEdge
|| starts instanceof VertexEvent || starts instanceof EdgeEvent || starts instanceof EPCTime) {
HashSet set = new HashSet();
set.add(starts);
if (setParallel == true)
stream = set.parallelStream();
else
stream = set.stream();
this.elementClass = starts.getClass();
// if (setPathEnabled) {
// HashSet initPathSet = new HashSet();
// List list = new ArrayList();
// list.add(starts);
// initPathSet.add(list);
// this.currentPath.put(starts, initPathSet);
// }
}
stepList = new ArrayList<Step>();
stepIndex = new HashMap<String, Integer>();
this.loopCount = loopCount;
this.isPathEnabled = setPathEnabled;
this.isParallel = setParallel;
this.listElementClass = null;
this.collection = collection;
}
private TraversalEngine(final ChronoGraph g, final Object starts, final boolean setParallel, final int loopCount,
final boolean setPathEnabled, final Class elementClass, final Class listElementClass, Map currentPath) {
// previousPath = new HashMap<Object, Object>();
// if (currentPath != null)
// this.currentPath = new HashMap(currentPath);
// else
this.currentPath = currentPath;
// Initialize Stream and Path
if (starts instanceof Stream) {
this.elementClass = elementClass;
// if (setPathEnabled) {
// if (setParallel) {
// stream = ((Set) ((Stream) starts).map(element -> {
// HashSet initPathSet = new HashSet();
// List list = new ArrayList();
// list.add(element);
// initPathSet.add(list);
// this.currentPath.put(element, initPathSet);
// return element;
// }).collect(Collectors.toSet())).parallelStream();
// } else {
// stream = ((Set) ((Stream) starts).map(element -> {
// HashSet initPathSet = new HashSet();
// List list = new ArrayList();
// list.add(element);
// initPathSet.add(list);
// this.currentPath.put(element, initPathSet);
// return element;
// }).collect(Collectors.toSet())).stream();
// }
// } else {
stream = (Stream) starts;
// }
} else if (starts instanceof Collection) {
this.elementClass = listElementClass;
// if (setPathEnabled) {
// if (setParallel) {
// stream = ((Set) ((Collection) starts).parallelStream().map(element -> {
// HashSet initPathSet = new HashSet();
// List list = new ArrayList();
// list.add(element);
// initPathSet.add(list);
// this.currentPath.put(element, initPathSet);
// return element;
// }).collect(Collectors.toSet())).parallelStream();
// } else {
// stream = ((Set) ((Collection) starts).parallelStream().map(element -> {
// HashSet initPathSet = new HashSet();
// List list = new ArrayList();
// list.add(element);
// initPathSet.add(list);
// this.currentPath.put(element, initPathSet);
// return element;
// }).collect(Collectors.toSet())).stream();
// }
// } else {
if (setParallel) {
stream = ((Collection) starts).parallelStream();
} else {
stream = ((Collection) starts).stream();
}
// }
} else if (starts instanceof ChronoGraph || starts instanceof ChronoVertex || starts instanceof ChronoEdge
|| starts instanceof VertexEvent || starts instanceof EdgeEvent) {
HashSet set = new HashSet();
set.add(starts);
if (setParallel == true)
stream = set.parallelStream();
else
stream = set.stream();
this.elementClass = starts.getClass();
// if (setPathEnabled) {
// HashSet initPathSet = new HashSet();
// List list = new ArrayList();
// list.add(starts);
// initPathSet.add(list);
// this.currentPath.put(starts, initPathSet);
// }
}
stepList = new ArrayList<Step>();
stepIndex = new HashMap<String, Integer>();
this.loopCount = loopCount;
this.isPathEnabled = setPathEnabled;
this.isParallel = setParallel;
this.listElementClass = null;
this.g = g;
}
private Stream<ChronoEdge> getOutChronoEdgeStream(final BsonArray labels, final int branchFactor,
final boolean setParallel) {
while (true) {
try {
HashSet<ChronoEdge> edgeSet = new HashSet<ChronoEdge>();
BsonDocument filter = new BsonDocument();
filter.append(Tokens.OUT_VERTEX, new BsonString(this.toString()));
Iterator<BsonDocument> it = graph.getEdgeCollection()
.find(new BsonDocument(Tokens.OUT_VERTEX, new BsonString(this.toString())))
.projection(new BsonDocument(Tokens.LABEL, new BsonBoolean(true))
.append(Tokens.IN_VERTEX, new BsonBoolean(true))
.append(Tokens.ID, new BsonBoolean(false)))
.iterator();
while (it.hasNext()) {
BsonDocument doc = it.next();
String inV = doc.getString(Tokens.IN_VERTEX).getValue();
String label = doc.getString(Tokens.LABEL).getValue();
edgeSet.add(new ChronoEdge(this.toString() + "|" + label + "|" + inV, this.toString(), inV, label,
graph));
}
if (setParallel)
return edgeSet.parallelStream();
else
return edgeSet.stream();
} catch (MongoCursorNotFoundException e1) {
System.out.println(e1.getErrorMessage());
}
}
// HashSet<ChronoEdge> edgeSet = new HashSet<ChronoEdge>();
// BsonDocument filter = new BsonDocument();
// BsonDocument inner = new BsonDocument();
// filter.put(Tokens.OUT_VERTEX, new BsonString(this.toString()));
// if (labels != null && labels.size() != 0) {
// inner.put(Tokens.FC.$in.toString(), labels);
// filter.put(Tokens.LABEL, inner);
// }
//
// Iterator<BsonDocument> it = null;
// if (branchFactor == Integer.MAX_VALUE)
// it =
// graph.getEdgeCollection().find(filter).projection(Tokens.PRJ_ONLY_ID).iterator();
// else
// it =
// graph.getEdgeCollection().find(filter).projection(Tokens.PRJ_ONLY_ID).limit(branchFactor).iterator();
//
// while (it.hasNext()) {
// BsonDocument d = it.next();
// edgeSet.add(new ChronoEdge(d.getString(Tokens.ID).getValue(), this.graph));
// }
// if (setParallel)
// return edgeSet.parallelStream();
// else
// return edgeSet.stream();
}