下面列出了怎么用com.mongodb.Function的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public ScheduleState getScheduleState(String versionId) {
BsonDocument doc = new BsonDocument();
doc.append("version", new BsonString(versionId));
ScheduleState state = scheduleStates.find(doc).map(new Function<Document, ScheduleState>() {
@Override
public ScheduleState apply(Document t) {
String json = t.toJson();
try {
return mapper.readValue(json, ScheduleState.class);
} catch (IOException e) {
LOG.error("deserialize config item failed!", e);
}
return null;
}
}).first();
if (state != null) {
// based on version, to add content from collections of spoutSpecs/alertSpecs/etc..
state = addDetailForScheduleState(state, versionId);
}
return state;
}
/**
* get the basic ScheduleState, and then based on the version to get all sub-part(spoutSpecs/alertSpecs/etc)
* to form a completed ScheduleState.
* @return the latest ScheduleState
*/
@Override
public ScheduleState getScheduleState() {
BsonDocument sort = new BsonDocument();
sort.append("generateTime", new BsonInt32(-1));
ScheduleState state = scheduleStates.find().sort(sort).map(new Function<Document, ScheduleState>() {
@Override
public ScheduleState apply(Document t) {
String json = t.toJson();
try {
return mapper.readValue(json, ScheduleState.class);
} catch (IOException e) {
LOG.error("deserialize config item failed!", e);
}
return null;
}
}).first();
if (state != null) {
String version = state.getVersion();
// based on version, to add content from collections of spoutSpecs/alertSpecs/etc..
state = addDetailForScheduleState(state, version);
}
return state;
}
private <T> List<T> list(MongoCollection<Document> collection, Class<T> clz, String version) {
BsonDocument doc = new BsonDocument();
doc.append("version", new BsonString(version));
List<T> result = new LinkedList<T>();
collection.find(doc).map(new Function<Document, T>() {
@Override
public T apply(Document t) {
String json = t.toJson();
try {
return mapper.readValue(json, clz);
} catch (IOException e) {
LOG.error("deserialize config item failed!", e);
}
return null;
}
}).into(result);
return result;
}
private <T> List<T> list(MongoCollection<Document> collection, Class<T> clz) {
List<T> result = new LinkedList<T>();
collection.find().map(new Function<Document, T>() {
@Override
public T apply(Document t) {
String json = t.toJson();
try {
return mapper.readValue(json, clz);
} catch (IOException e) {
LOG.error("deserialize config item failed!", e);
}
return null;
}
}).into(result);
return result;
}
/**
* Return non-redundant timestamps of all graph element events
*
* @return HashSet<Long> timestamps
*/
public TreeSet<Long> getTimestamps() {
TreeSet<Long> timestampSet = new TreeSet<Long>();
Function<BsonDateTime, Long> mapper = new Function<BsonDateTime, Long>() {
@Override
public Long apply(BsonDateTime val) {
return val.getValue();
}
};
edgeEvents.distinct(Tokens.TIMESTAMP, BsonDateTime.class)
.filter(new BsonDocument(Tokens.TIMESTAMP, new BsonDocument(Tokens.FC.$ne.toString(), new BsonNull())))
.map(mapper).into(timestampSet);
Set<Long> vtSet = new TreeSet<Long>();
vertexEvents.distinct(Tokens.TIMESTAMP, BsonDateTime.class)
.filter(new BsonDocument(Tokens.TIMESTAMP, new BsonDocument(Tokens.FC.$ne.toString(), new BsonNull())))
.map(mapper).into(vtSet);
timestampSet.addAll(vtSet);
return timestampSet;
}
public TreeSet<Long> getTimestamps(Long startTime, Long endTime) {
TreeSet<Long> timestampSet = new TreeSet<Long>();
Function<BsonDateTime, Long> mapper = new Function<BsonDateTime, Long>() {
@Override
public Long apply(BsonDateTime val) {
return val.getValue();
}
};
edgeEvents.distinct(Tokens.TIMESTAMP, BsonDateTime.class)
.filter(new BsonDocument(Tokens.TIMESTAMP,
new BsonDocument(Tokens.FC.$gt.toString(), new BsonDateTime(startTime))
.append(Tokens.FC.$lt.toString(), new BsonDateTime(endTime))))
.map(mapper).into(timestampSet);
Set<Long> vtSet = new TreeSet<Long>();
vertexEvents.distinct(Tokens.TIMESTAMP, BsonDateTime.class)
.filter(new BsonDocument(Tokens.TIMESTAMP,
new BsonDocument(Tokens.FC.$gt.toString(), new BsonDateTime(startTime))
.append(Tokens.FC.$lt.toString(), new BsonDateTime(endTime))))
.map(mapper).into(timestampSet);
timestampSet.addAll(vtSet);
return timestampSet;
}
public HashSet<Long> getTimestampsHashSet() {
HashSet<Long> timestampSet = new HashSet<Long>();
Function<BsonDateTime, Long> mapper = new Function<BsonDateTime, Long>() {
@Override
public Long apply(BsonDateTime val) {
return val.getValue();
}
};
edges.distinct(Tokens.TIMESTAMP, BsonDateTime.class)
.filter(new BsonDocument(Tokens.TIMESTAMP, new BsonDocument(Tokens.FC.$ne.toString(), new BsonNull())))
.map(mapper).into(timestampSet);
return timestampSet;
}
<T> Stream<T> getNamespaces(MongoConnection connection, ConsumerRegistry registry, Function<String[], T> f) {
Set<String> producerDbName = new HashSet<>();
for (String dbName : client.listDatabaseNames()) {
producerDbName.add(dbName);
}
checkOplog(producerDbName);
Set<Consumer> consumers = registry.outputSink(connection).keySet();
return consumers.stream().map(Consumer::getRepos).flatMap(Set::stream).flatMap(s -> {
if (!producerDbName.contains(s.getName())) {
throw new InvalidConfigException("No such repo(" + s.getName() + ") in " + connection);
}
Set<String> producerCollectionName = new HashSet<>();
for (String collectionName : client.getDatabase(s.getName()).listCollectionNames()) {
producerCollectionName.add(collectionName);
}
List<Entity> entities = s.getEntities();
ArrayList<T> res = new ArrayList<>(entities.size());
for (Entity entity : entities) {
if (!producerCollectionName.contains(entity.getName())) {
throw new InvalidConfigException("No such collection(" + s.getName() + "." + entity.getName() + ") in " + connection);
}
res.add(f.apply(new String[]{s.getName(), entity.getName()}));
}
return res.stream();
});
}
CoreRemoteMongoMappingCursor(
final CoreRemoteMongoCursor<T> proxied,
final Function<T, U> mapper
) {
notNull("proxied", proxied);
notNull("mapper", mapper);
this.proxied = proxied;
this.mapper = mapper;
}
public CoreRemoteMappingIterable(
final CoreRemoteMongoIterable<U> iterable,
final Function<U, V> mapper
) {
this.iterable = iterable;
this.mapper = mapper;
}
/**
* Return an iterable to all the vertices in the graph. If this is not possible
* for the implementation, then an UnsupportedOperationException can be thrown.
*
* @return an iterable reference to all vertices in the graph
*/
public Iterable<ChronoVertex> getChronoVertices() {
HashSet<String> idSet = new HashSet<String>();
Function<BsonString, String> mapper = new Function<BsonString, String>() {
@Override
public String apply(BsonString val) {
return val.getValue();
}
};
HashSet<String> outV = new HashSet<String>();
edges.distinct(Tokens.OUT_VERTEX, BsonString.class)
.filter(new BsonDocument(Tokens.OUT_VERTEX, new BsonDocument(Tokens.FC.$ne.toString(), new BsonNull())))
.map(mapper).into(outV);
idSet.addAll(outV);
HashSet<String> inV = new HashSet<String>();
edges.distinct(Tokens.IN_VERTEX, BsonString.class)
.filter(new BsonDocument(Tokens.IN_VERTEX, new BsonDocument(Tokens.FC.$ne.toString(), new BsonNull())))
.map(mapper).into(inV);
idSet.addAll(inV);
MongoCursor<BsonDocument> vi = vertices.find(Tokens.FLT_VERTEX_FIELD_NOT_INCLUDED)
.projection(Tokens.PRJ_ONLY_ID).iterator();
while (vi.hasNext()) {
BsonDocument d = vi.next();
idSet.add(d.getString(Tokens.ID).getValue());
}
HashSet<String> vertex = new HashSet<String>();
vertices.distinct(Tokens.VERTEX, BsonString.class)
.filter(new BsonDocument(Tokens.VERTEX, new BsonDocument(Tokens.FC.$ne.toString(), new BsonNull())))
.map(mapper).into(vertex);
idSet.addAll(vertex);
return idSet.parallelStream().map(s -> new ChronoVertex(s, this)).collect(Collectors.toSet());
}
/**
* Return an iterable to all the vertices in the graph. If this is not possible
* for the implementation, then an UnsupportedOperationException can be thrown.
*
* @return an iterable reference to all vertices in the graph
*/
public Set<ChronoVertex> getChronoVertexSet() {
HashSet<String> idSet = new HashSet<String>();
Function<BsonDocument, String> mapper = new Function<BsonDocument, String>() {
@Override
public String apply(BsonDocument val) {
return val.getString(Tokens.ID).getValue();
}
};
HashSet<String> outV = new HashSet<String>();
ArrayList<BsonDocument> outVQuery = new ArrayList<BsonDocument>();
outVQuery.add(new BsonDocument("$group", new BsonDocument(Tokens.ID, new BsonString("$" + Tokens.OUT_VERTEX))));
edges.aggregate(outVQuery).map(mapper).into(outV);
HashSet<String> inV = new HashSet<String>();
ArrayList<BsonDocument> inVQuery = new ArrayList<BsonDocument>();
inVQuery.add(new BsonDocument("$group", new BsonDocument(Tokens.ID, new BsonString("$" + Tokens.IN_VERTEX))));
edges.aggregate(inVQuery).map(mapper).into(inV);
idSet.addAll(inV);
MongoCursor<BsonDocument> vi = vertices.find(Tokens.FLT_VERTEX_FIELD_NOT_INCLUDED)
.projection(Tokens.PRJ_ONLY_ID).iterator();
while (vi.hasNext()) {
BsonDocument d = vi.next();
idSet.add(d.getString(Tokens.ID).getValue());
}
return idSet.parallelStream().map(s -> new ChronoVertex(s, this)).collect(Collectors.toSet());
}
public Set<ChronoVertex> getOutVertexSet() {
Function<BsonDocument, String> mapper = new Function<BsonDocument, String>() {
@Override
public String apply(BsonDocument val) {
return val.getString(Tokens.ID).getValue();
}
};
HashSet<String> outV = new HashSet<String>();
ArrayList<BsonDocument> outVQuery = new ArrayList<BsonDocument>();
outVQuery.add(new BsonDocument("$group", new BsonDocument(Tokens.ID, new BsonString("$" + Tokens.OUT_VERTEX))));
edges.aggregate(outVQuery).map(mapper).into(outV);
return outV.parallelStream().map(s -> new ChronoVertex(s, this)).collect(Collectors.toSet());
}
/**
* Return an iterable to all the vertices in the graph. If this is not possible
* for the implementation, then an UnsupportedOperationException can be thrown.
*
* @return an iterable reference to all vertices in the graph
*/
public Stream<ChronoVertex> getChronoVertexStream(boolean isParallel) {
HashSet<String> idSet = new HashSet<String>();
Function<BsonString, String> mapper = new Function<BsonString, String>() {
@Override
public String apply(BsonString val) {
return val.getValue();
}
};
HashSet<String> outV = new HashSet<String>();
edges.distinct(Tokens.OUT_VERTEX, BsonString.class)
.filter(new BsonDocument(Tokens.OUT_VERTEX, new BsonDocument(Tokens.FC.$ne.toString(), new BsonNull())))
.map(mapper).into(outV);
idSet.addAll(outV);
HashSet<String> inV = new HashSet<String>();
edges.distinct(Tokens.IN_VERTEX, BsonString.class)
.filter(new BsonDocument(Tokens.IN_VERTEX, new BsonDocument(Tokens.FC.$ne.toString(), new BsonNull())))
.map(mapper).into(inV);
idSet.addAll(inV);
MongoCursor<BsonDocument> vi = vertices.find(Tokens.FLT_VERTEX_FIELD_NOT_INCLUDED)
.projection(Tokens.PRJ_ONLY_ID).iterator();
while (vi.hasNext()) {
BsonDocument d = vi.next();
idSet.add(d.getString(Tokens.ID).getValue());
}
HashSet<String> vertex = new HashSet<String>();
vertices.distinct(Tokens.VERTEX, BsonString.class)
.filter(new BsonDocument(Tokens.VERTEX, new BsonDocument(Tokens.FC.$ne.toString(), new BsonNull())))
.map(mapper).into(vertex);
idSet.addAll(vertex);
if (isParallel)
return idSet.parallelStream().map(s -> new ChronoVertex(s, this)).collect(Collectors.toSet())
.parallelStream();
else
return idSet.parallelStream().map(s -> new ChronoVertex(s, this)).collect(Collectors.toSet()).stream();
}
public Set<VertexEvent> getOutVertexEventSet(final AC tt) {
while (true) {
try {
// db.tEdgeEvents.aggregate([{$match:{"_o":"1","_t":{ $lt : ISODate(0)
// }}},{$project:{"_i":1,"_t":1,"_id":0}},{$group:{"_id":"$_i", "_mt": {$min:
// "$_t"}}}])
BsonDocument match = new BsonDocument("$match",
new BsonDocument(Tokens.OUT_VERTEX, new BsonString(vertex.toString())).append(Tokens.TIMESTAMP,
new BsonDocument("$gt", new BsonDateTime(timestamp))));
BsonDocument project = new BsonDocument("$project",
new BsonDocument(Tokens.IN_VERTEX, new BsonBoolean(true))
.append(Tokens.TIMESTAMP, new BsonBoolean(true))
.append(Tokens.ID, new BsonBoolean(false)));
BsonDocument group = new BsonDocument("$group",
new BsonDocument(Tokens.ID, new BsonString("$" + Tokens.IN_VERTEX)).append(Tokens.TIMESTAMP,
new BsonDocument("$min", new BsonString("$" + Tokens.TIMESTAMP))));
ArrayList<BsonDocument> aggregateQuery = new ArrayList<BsonDocument>();
aggregateQuery.add(match);
aggregateQuery.add(project);
aggregateQuery.add(group);
HashSet<VertexEvent> ret = new HashSet<VertexEvent>();
Function<BsonDocument, VertexEvent> mapper = new Function<BsonDocument, VertexEvent>() {
@Override
public VertexEvent apply(BsonDocument d) {
String inV = d.getString(Tokens.ID).getValue();
Long t = d.getDateTime(Tokens.TIMESTAMP).getValue();
return new VertexEvent(graph, new ChronoVertex(inV, graph), t);
}
};
vertex.graph.getEdgeEvents().aggregate(aggregateQuery).map(mapper).into(ret);
return ret;
} catch (MongoCursorNotFoundException e1) {
System.out.println(e1.getErrorMessage());
}
}
}
public static <X> MongoIterable<X> iterate(Collection<X> result)
{
return new MongoIterable<X>()
{
@Override
public MongoCursor<X> iterator()
{
return new StubbingCursor<>(result.iterator());
}
@Override
public X first()
{
throw new UnsupportedOperationException();
}
@Override
public <U> MongoIterable<U> map(Function<X, U> function)
{
throw new UnsupportedOperationException();
}
@Override
public void forEach(Block<? super X> block)
{
throw new UnsupportedOperationException();
}
@Override
public <A extends Collection<? super X>> A into(A objects)
{
throw new UnsupportedOperationException();
}
@Override
public MongoIterable<X> batchSize(int i)
{
return this;
}
};
}
@Override
public <U> RemoteMongoIterable<U> map(final Function<ResultT, U> mapper) {
return new RemoteMongoIterableImpl<>(proxy.map(mapper));
}
@Override
public <U> RemoteMongoIterable<U> map(final Function<ResultT, U> mapper) {
return new RemoteMongoIterableImpl<>(proxy.map(mapper), dispatcher);
}
@Override
public <W> CoreRemoteMongoIterable<W> map(final Function<V, W> newMap) {
return new CoreRemoteMappingIterable<>(this, newMap);
}
/**
* Maps this iterable from the source document type to the target document type.
*
* @param mapper a function that maps from the source to the target document type
* @param <U> the target document type
* @return an iterable which maps T to U
*/
<U> RemoteMongoIterable<U> map(Function<ResultT, U> mapper);
/**
* Maps this iterable from the source document type to the target document type.
*
* @param mapper a function that maps from the source to the target document type
* @param <U> the target document type
* @return an iterable which maps T to U
*/
<U> RemoteMongoIterable<U> map(Function<ResultT, U> mapper);
/**
* Maps this iterable from the source document type to the target document type.
*
* @param mapper a function that maps from the source to the target document type
* @param <U> the target document type
* @return an iterable which maps T to U
*/
<U> CoreRemoteMongoIterable<U> map(final Function<ResultT, U> mapper);
/**
* Maps this iterable from the source document type to the target document type.
*
* @param mapper a function that maps from the source to the target document type
* @param <U> the target document type
* @return an iterable which maps T to U
*/
public <U> CoreRemoteMongoIterable<U> map(final Function<ResultT, U> mapper) {
return new CoreRemoteMappingIterable<>(this, mapper);
}
/**
* Maps this iterable from the source document type to the target document type.
*
* @param mapper a function that maps from the source to the target document type
* @param <U> the target document type
* @return an iterable which maps T to U
*/
public <U> CoreRemoteMongoIterable<U> map(final Function<ResultT, U> mapper) {
return new CoreRemoteMappingIterable<>(this, mapper);
}