下面列出了怎么用org.bson.BsonValue的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Un-assigns a key/value property from the element. The object value of the
* removed property is returned.
*
* @param key the key of the property to remove from the element
* @return the object value associated with that key prior to removal. Should be
* instance of BsonValue
*/
@Override
public <T> T removeProperty(final String key) {
try {
BsonValue value = getProperty(key);
BsonDocument filter = new BsonDocument();
filter.put(Tokens.ID, new BsonString(this.id));
BsonDocument update = new BsonDocument();
update.put("$unset", new BsonDocument(key, new BsonNull()));
if (this instanceof ChronoVertex) {
graph.getVertexCollection().updateOne(filter, update, new UpdateOptions().upsert(true));
return (T) value;
} else {
graph.getEdgeCollection().updateOne(filter, update, new UpdateOptions().upsert(true));
return (T) value;
}
} catch (MongoWriteException e) {
throw e;
}
}
static List<QuantityElementType> putQuantityElementTypeList(BsonArray quantityDBList) {
List<QuantityElementType> qetList = new ArrayList<QuantityElementType>();
for (int i = 0; i < quantityDBList.size(); i++) {
QuantityElementType qet = new QuantityElementType();
BsonDocument quantityDBObject = quantityDBList.get(i).asDocument();
BsonValue epcClassObject = quantityDBObject.get("epcClass");
BsonValue quantity = quantityDBObject.get("quantity");
BsonValue uom = quantityDBObject.get("uom");
if (epcClassObject != null) {
qet.setEpcClass(epcClassObject.asString().getValue());
if (quantity != null) {
double quantityDouble = quantity.asDouble().getValue();
qet.setQuantity(BigDecimal.valueOf(quantityDouble));
}
if (uom != null)
qet.setUom(uom.asString().getValue());
qetList.add(qet);
}
}
return qetList;
}
private BsonDocument convertToExtensionDocument(Map<String, String> namespaces, BsonDocument extension) {
BsonDocument ext = new BsonDocument();
for (String key : extension.keySet()) {
String[] namespaceAndKey = key.split("#");
if (namespaceAndKey.length != 2)
continue;
String namespace = namespaceAndKey[0];
if (!namespaces.containsKey(namespace))
continue;
ext.put("@" + encodeMongoObjectKey(namespace), new BsonString(namespaces.get(namespace)));
BsonValue extValue = extension.get(key);
if (extValue instanceof BsonDocument) {
ext.put(encodeMongoObjectKey(key), convertToExtensionDocument(namespaces, extValue.asDocument()));
} else {
ext.put(encodeMongoObjectKey(key), extValue);
}
}
return ext;
}
@Override
public BsonValue toBson(final Object data) {
if (data instanceof BigDecimal) {
if (format.equals(Format.DECIMAL128)) {
return new BsonDecimal128(new Decimal128((BigDecimal) data));
}
if (format.equals(Format.LEGACYDOUBLE)) {
return new BsonDouble(((BigDecimal) data).doubleValue());
}
}
throw new DataException(
"Error: decimal conversion not possible when data is of type "
+ data.getClass().getName()
+ " and format is "
+ format);
}
/**
* Returns a synthesized change event for a remote document.
*
* @param ns the namspace where the document lives.
* @param documentId the _id of the document.
* @param document the remote document.
* @return a synthesized change event for a remote document.
*/
private CompactChangeEvent<BsonDocument> getSynthesizedRemoteChangeEventForDocument(
final MongoNamespace ns,
final BsonValue documentId,
final BsonDocument document
) {
// a. When the document is looked up, if it cannot be found the synthesized change event is a
// DELETE, otherwise it's a REPLACE.
if (document == null) {
return ChangeEvents.compactChangeEventForLocalDelete(
documentId,
false // when synthesizing remote events, writes shouldn't be pending
);
}
return ChangeEvents.compactChangeEventForLocalReplace(documentId, document, false);
}
private void handleWildcard(
final String firstPart, final String otherParts, final BsonDocument doc) {
Iterator<Map.Entry<String, BsonValue>> iter = doc.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, BsonValue> entry = iter.next();
BsonValue value = entry.getValue();
// NOTE: never try to remove the _id field
if (entry.getKey().equals(ID_FIELD)) {
continue;
}
if (firstPart.equals(FieldProjector.DOUBLE_WILDCARD)) {
iter.remove();
} else if (firstPart.equals(FieldProjector.SINGLE_WILDCARD)) {
if (!value.isDocument()) {
iter.remove();
} else if (!otherParts.isEmpty()) {
doProjection(otherParts, (BsonDocument) value);
}
}
}
}
private void doRenaming(final String field, final BsonDocument doc) {
BsonDocument modifications = new BsonDocument();
Iterator<Map.Entry<String, BsonValue>> iter = doc.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, BsonValue> entry = iter.next();
String oldKey = entry.getKey();
BsonValue value = entry.getValue();
String newKey = renamed(field, oldKey);
if (!oldKey.equals(newKey)) {
// IF NEW KEY ALREADY EXISTS WE THEN DON'T RENAME
// AS IT WOULD CAUSE OTHER DATA TO BE SILENTLY OVERWRITTEN
// WHICH IS ALMOST NEVER WHAT YOU WANT
// MAYBE LOG WARNING HERE?
doc.computeIfAbsent(newKey, k -> modifications.putIfAbsent(k, value));
iter.remove();
}
if (value.isDocument()) {
String pathToField = field + SUB_FIELD_DOT_SEPARATOR + newKey;
doRenaming(pathToField, value.asDocument());
}
}
doc.putAll(modifications);
}
/**
* Generates a change event for a local update of a document in the given namespace referring
* to the given document _id.
*
* @param documentId the _id of the document that was updated.
* @param updateDescription the update specifier.
* @param documentVersion the version document for the document version after this update.
* @param documentHash the hash of this document (sanitized) after the update
* @return a change event for a local update of a document in the given namespace referring
* to the given document _id.
*/
static CompactChangeEvent<BsonDocument> compactChangeEventForLocalUpdate(
final BsonValue documentId,
final UpdateDescription updateDescription,
final BsonDocument documentVersion,
final Long documentHash,
final boolean writePending
) {
return new CompactChangeEvent<>(
OperationType.UPDATE,
null,
new BsonDocument("_id", documentId),
updateDescription,
(documentVersion == null)
? null
: DocumentVersionInfo.Version.fromBsonDocument(documentVersion),
documentHash,
writePending);
}
@Test
@DisplayName("test PartialKeyStrategy with Block List")
void testPartialKeyStrategyBlockList() {
BsonDocument keyDoc = BsonDocument.parse("{keyPart1: 123, keyPart2: 'ABC', keyPart3: true}");
BsonDocument expected = BsonDocument.parse("{keyPart2: 'ABC', keyPart3: true}");
MongoSinkTopicConfig cfg =
createTopicConfig(
format(
"{'%s': '%s', '%s': 'keyPart1'}",
DOCUMENT_ID_STRATEGY_PARTIAL_KEY_PROJECTION_TYPE_CONFIG,
BLOCKLIST,
DOCUMENT_ID_STRATEGY_PARTIAL_KEY_PROJECTION_LIST_CONFIG));
IdStrategy ids = new PartialKeyStrategy();
ids.configure(cfg);
SinkDocument sd = new SinkDocument(keyDoc, null);
BsonValue id = ids.generateId(sd, null);
assertAll(
"id checks",
() -> assertTrue(id instanceof BsonDocument),
() -> assertEquals(expected, id.asDocument()));
assertEquals(new BsonDocument(), ids.generateId(new SinkDocument(null, null), null));
}
boolean wrapForRecovery(final Callable<Boolean> callable) {
final List<BsonValue> idsAsList = new ArrayList<>();
idsAsList.addAll(ids);
final List<BsonDocument> oldDocs = localCollection.find(
new BsonDocument("_id", new BsonDocument("$in", new BsonArray(idsAsList)))
).into(new ArrayList<>());
if (oldDocs.size() > 0) {
undoCollection.insertMany(oldDocs);
}
boolean result;
try {
result = callable.call();
} catch (Exception e) {
result = false;
}
if (result && oldDocs.size() > 0) {
undoCollection.deleteMany(new Document("_id", new Document("$in", ids)));
}
return result;
}
private List<BsonDocument> processFiles(final BsonArray bsonArray, final List<BsonDocument> documents) {
for (BsonValue rawDocument : bsonArray.getValues()) {
if (rawDocument.isDocument()) {
BsonDocument document = rawDocument.asDocument();
if (document.get("length").isInt32()) {
document.put("length", new BsonInt64(document.getInt32("length").getValue()));
}
if (document.containsKey("metadata") && document.getDocument("metadata").isEmpty()) {
document.remove("metadata");
}
if (document.containsKey("aliases") && document.getArray("aliases").getValues().size() == 0) {
document.remove("aliases");
}
if (document.containsKey("contentType") && document.getString("contentType").getValue().length() == 0) {
document.remove("contentType");
}
documents.add(document);
}
}
return documents;
}
private static Uni<Void> persistOrUpdate(ReactiveMongoCollection collection, List<Object> entities) {
//this will be an ordered bulk: it's less performant than a unordered one but will fail at the first failed write
List<WriteModel> bulk = new ArrayList<>();
for (Object entity : entities) {
//we transform the entity as a document first
BsonDocument document = getBsonDocument(collection, entity);
//then we get its id field and create a new Document with only this one that will be our replace query
BsonValue id = document.get(ID);
if (id == null) {
//insert with autogenerated ID
bulk.add(new InsertOneModel(entity));
} else {
//insert with user provided ID or update
BsonDocument query = new BsonDocument().append(ID, id);
bulk.add(new ReplaceOneModel(query, entity,
new ReplaceOptions().upsert(true)));
}
}
return collection.bulkWrite(bulk).onItem().ignore().andContinueWithNull();
}
@Override
public BsonValue visit(Constant constant) {
Object value = constant.value();
if (value == null) {
return BsonNull.VALUE;
}
if (value instanceof Iterable) {
return Filters.in("ignore", (Iterable<?>) value)
.toBsonDocument(BsonDocument.class, codecRegistry)
.get("ignore").asDocument()
.get("$in").asArray();
}
return Filters.eq("ignore", value)
.toBsonDocument(BsonDocument.class, codecRegistry)
.get("ignore");
}
/**
* Serializes this change event into a {@link BsonDocument}.
* @return the serialized document.
*/
public BsonDocument toBsonDocument() {
final BsonDocument asDoc = new BsonDocument();
asDoc.put(Fields.OPERATION_TYPE_FIELD, new BsonString(getOperationType().toRemote()));
asDoc.put(Fields.DOCUMENT_KEY_FIELD, getDocumentKey());
if (getFullDocument() != null && (getFullDocument() instanceof BsonValue)
&& ((BsonValue) getFullDocument()).isDocument()) {
asDoc.put(Fields.FULL_DOCUMENT_FIELD, (BsonValue) getFullDocument());
}
if (getUpdateDescription() != null) {
asDoc.put(Fields.UPDATE_DESCRIPTION_FIELD, getUpdateDescription().toBsonDocument());
}
if (stitchDocumentVersion != null) {
asDoc.put(Fields.STITCH_DOCUMENT_VERSION_FIELD, stitchDocumentVersion.toBsonDocument());
}
if (stitchDocumentHash != null) {
asDoc.put(Fields.STITCH_DOCUMENT_HASH_FIELD, new BsonInt64(stitchDocumentHash));
}
asDoc.put(Fields.WRITE_PENDING_FIELD, new BsonBoolean(hasUncommittedWrites()));
return asDoc;
}
/**
* Unilaterally merge an update description into this update description.
* @param otherDescription the update description to merge into this
* @return this merged update description
*/
public UpdateDescription merge(@Nullable final UpdateDescription otherDescription) {
if (otherDescription != null) {
for (final Map.Entry<String, BsonValue> entry : this.updatedFields.entrySet()) {
if (otherDescription.removedFields.contains(entry.getKey())) {
this.updatedFields.remove(entry.getKey());
}
}
for (final String removedField : this.removedFields) {
if (otherDescription.updatedFields.containsKey(removedField)) {
this.removedFields.remove(removedField);
}
}
this.removedFields.addAll(otherDescription.removedFields);
this.updatedFields.putAll(otherDescription.updatedFields);
}
return this;
}
public Stream<ChronoEdge> getChronoEdgeStream(String key, Object value) {
ElementHelper.validateProperty(null, key, value);
HashSet<ChronoEdge> ret = new HashSet<ChronoEdge>();
MongoCursor<BsonDocument> cursor = edges.find(Tokens.FLT_EDGE_FIELD_NOT_INCLUDED.append(key, (BsonValue) value))
.projection(Tokens.PRJ_ONLY_ID).iterator();
while (cursor.hasNext()) {
BsonDocument v = cursor.next();
ret.add(new ChronoEdge(v.getString(Tokens.ID).getValue(), this));
}
return ret.parallelStream();
}
/**
* Serializes this change event into a {@link BsonDocument}.
* @return the serialized document.
*/
@Override
public BsonDocument toBsonDocument() {
final BsonDocument asDoc = new BsonDocument();
asDoc.put(Fields.ID_FIELD, id);
asDoc.put(Fields.OPERATION_TYPE_FIELD, new BsonString(getOperationType().toRemote()));
final BsonDocument nsDoc = new BsonDocument();
nsDoc.put(Fields.NS_DB_FIELD, new BsonString(ns.getDatabaseName()));
nsDoc.put(Fields.NS_COLL_FIELD, new BsonString(getNamespace().getCollectionName()));
asDoc.put(Fields.NS_FIELD, nsDoc);
asDoc.put(Fields.DOCUMENT_KEY_FIELD, getDocumentKey());
if (getFullDocument() != null && (getFullDocument() instanceof BsonValue)
&& ((BsonValue) getFullDocument()).isDocument()) {
asDoc.put(Fields.FULL_DOCUMENT_FIELD, (BsonValue) getFullDocument());
}
if (getUpdateDescription() != null) {
asDoc.put(Fields.UPDATE_DESCRIPTION_FIELD, getUpdateDescription().toBsonDocument());
}
asDoc.put(Fields.WRITE_PENDING_FIELD, new BsonBoolean(hasUncommittedWrites()));
return asDoc;
}
static BsonDocument getQueryObject(String[] fieldArr, BsonArray paramArray) {
BsonArray orQueries = new BsonArray();
for (String field : fieldArr) {
Iterator<BsonValue> paramIterator = paramArray.iterator();
BsonArray pureStringParamArray = new BsonArray();
while (paramIterator.hasNext()) {
BsonValue param = paramIterator.next();
if (param instanceof BsonRegularExpression) {
BsonDocument regexQuery = new BsonDocument(field, new BsonDocument("$regex", param));
orQueries.add(regexQuery);
} else {
pureStringParamArray.add(param);
}
}
if (pureStringParamArray.size() != 0) {
BsonDocument stringInQueries = new BsonDocument(field, new BsonDocument("$in", pureStringParamArray));
orQueries.add(stringInQueries);
}
}
if (orQueries.size() != 0) {
BsonDocument queryObject = new BsonDocument();
queryObject.put("$or", orQueries);
return queryObject;
} else {
return null;
}
}
@Override
public Task<Void> syncOne(final BsonValue id) {
return this.dispatcher.dispatchTask(new Callable<Void>() {
@Override
public Void call() throws Exception {
proxy.syncOne(id);
return null;
}
});
}
@Override
public Task<Void> desyncMany(final BsonValue... ids) {
return this.dispatcher.dispatchTask(new Callable<Void>() {
@Override
public Void call() throws Exception {
proxy.desyncMany(ids);
return null;
}
});
}
@Override
public BsonValue generateId(SinkDocument doc, SinkRecord orig) {
//NOTE: this has to operate on a clone because
//otherwise it would interfere with further projections
//happening later in the chain e.g. for value fields
SinkDocument clone = doc.clone();
fieldProjector.process(clone,orig);
//NOTE: If there is no key doc present the strategy
//simply returns an empty BSON document per default.
return clone.getValueDoc().orElseGet(() -> new BsonDocument());
}
private static JsonObject convertUpsertId(BsonValue upsertId) {
JsonObject jsonUpsertId;
if (upsertId != null) {
JsonObjectCodec jsonObjectCodec = new JsonObjectCodec(new JsonObject());
BsonDocument upsertIdDocument = new BsonDocument();
upsertIdDocument.append(ID_FIELD, upsertId);
BsonDocumentReader bsonDocumentReader = new BsonDocumentReader(upsertIdDocument);
jsonUpsertId = jsonObjectCodec.decode(bsonDocumentReader, DecoderContext.builder().build());
} else {
jsonUpsertId = null;
}
return jsonUpsertId;
}
private static Set<String> addEPCtoSet(Set<String> epcSet, BsonArray epcList) {
Iterator<BsonValue> epcIterator = epcList.iterator();
while (epcIterator.hasNext()) {
BsonDocument epcDocument = epcIterator.next().asDocument();
epcSet.add(epcDocument.getString("epc").getValue());
}
return epcSet;
}
private static Document assembleDocument(final CharSequence key, final BsonValue value, final BsonArray grants,
final BsonArray revokes) {
return new Document().append(FIELD_INTERNAL_KEY, key.toString())
.append(FIELD_INTERNAL_VALUE, value)
.append(FIELD_GRANTED, grants)
.append(FIELD_REVOKED, revokes);
}
@Override
public Publisher<BsonBinary> encrypt(final BsonValue value, final EncryptOptions options) {
return new ObservableToPublisher<BsonBinary>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<BsonBinary>>(){
@Override
public void apply(final com.mongodb.async.SingleResultCallback<BsonBinary> callback) {
wrapped.encrypt(value, options, callback);
}
}));
}
public RemoteInsertManyResult decode(
final BsonReader reader,
final DecoderContext decoderContext
) {
final BsonDocument document = (new BsonDocumentCodec()).decode(reader, decoderContext);
keyPresent(Fields.INSERTED_IDS_FIELD, document);
final BsonArray arr = document.getArray(Fields.INSERTED_IDS_FIELD);
final Map<Long, BsonValue> insertedIds = new HashMap<>();
for (int i = 0; i < arr.size(); i++) {
insertedIds.put((long) i, arr.get(i));
}
return new RemoteInsertManyResult(insertedIds);
}
public BsonValue getDocumentId() {
docLock.readLock().lock();
try {
return documentId;
} finally {
docLock.readLock().unlock();
}
}
@Override
public Publisher<Success> uploadFromStream(final ClientSession clientSession, final BsonValue id, final String filename,
final com.mongodb.reactivestreams.client.gridfs.AsyncInputStream source,
final GridFSUploadOptions options) {
return new ObservableToPublisher<Success>(com.mongodb.async.client.Observables.observe(
new Block<com.mongodb.async.SingleResultCallback<Success>>() {
@Override
public void apply(final com.mongodb.async.SingleResultCallback<Success> callback) {
wrapped.uploadFromStream(clientSession.getWrapped(), id, filename, toCallbackAsyncInputStream(source), options,
voidToSuccessCallback(callback));
}
}));
}
private Map<String, BsonValue> getExtensionMap(List<ECReportMemberField> fields) {
Map<String, BsonValue> extMap = new HashMap<String, BsonValue>();
for (int l = 0; l < fields.size(); l++) {
ECReportMemberField field = fields.get(l);
String key = field.getName();
String value = field.getValue();
String[] valArr = value.split("\\^");
if (valArr.length != 2) {
extMap.put(key, new BsonString(value));
continue;
}
try {
String type = valArr[1];
if (type.equals("int")) {
extMap.put(key, new BsonInt32(Integer.parseInt(valArr[0])));
} else if (type.equals("long")) {
extMap.put(key, new BsonInt64(Long.parseLong(valArr[0])));
} else if (type.equals("double")) {
extMap.put(key, new BsonDouble(Double.parseDouble(valArr[0])));
} else if (type.equals("boolean")) {
extMap.put(key, new BsonBoolean(Boolean.parseBoolean(valArr[0])));
} else if (type.equals("dateTime")) {
extMap.put(key, new BsonDateTime(Long.parseLong(valArr[0])));
} else {
extMap.put(key, new BsonString(valArr[0]));
}
} catch (NumberFormatException e) {
extMap.put(key, new BsonString(valArr[0]));
}
}
return extMap;
}
@Override
public Observable<Success> rename(final BsonValue id, final String newFilename) {
return RxObservables.create(Observables.observe(new Block<SingleResultCallback<Success>>() {
@Override
public void apply(final SingleResultCallback<Success> callback) {
wrapped.rename(id, newFilename, voidToSuccessCallback(callback));
}
}), observableAdapter);
}