下面列出了com.fasterxml.jackson.core.util.BufferRecycler#org.bson.BsonDocument 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void withDate() {
final Date date = new Date();
final ObjectId id = ObjectId.get();
final Jackson expected = ImmutableJackson.builder()
.id(id)
.prop1("prop1")
.prop2("22")
.date(new Date(date.getTime()))
.build();
repository.insert(expected).getUnchecked();
check(collection.count()).is(1L);
final Jackson actual = repository.findAll().fetchAll().getUnchecked().get(0);
check(expected).is(actual);
final BsonDocument doc = collection.find().first();
check(doc.keySet()).hasContentInAnyOrder("_id", "prop1", "prop2", "date", "uuid");
check(doc.get("date").asDateTime().getValue()).is(date.getTime());
check(doc.get("_id").asObjectId().getValue()).is(id);
}
private <T> List<T> list(MongoCollection<Document> collection, Class<T> clz, String version) {
BsonDocument doc = new BsonDocument();
doc.append("version", new BsonString(version));
List<T> result = new LinkedList<T>();
collection.find(doc).map(new Function<Document, T>() {
@Override
public T apply(Document t) {
String json = t.toJson();
try {
return mapper.readValue(json, clz);
} catch (IOException e) {
LOG.error("deserialize config item failed!", e);
}
return null;
}
}).into(result);
return result;
}
static List<QuantityElementType> putQuantityElementTypeList(BsonArray quantityDBList) {
List<QuantityElementType> qetList = new ArrayList<QuantityElementType>();
for (int i = 0; i < quantityDBList.size(); i++) {
QuantityElementType qet = new QuantityElementType();
BsonDocument quantityDBObject = quantityDBList.get(i).asDocument();
BsonValue epcClassObject = quantityDBObject.get("epcClass");
BsonValue quantity = quantityDBObject.get("quantity");
BsonValue uom = quantityDBObject.get("uom");
if (epcClassObject != null) {
qet.setEpcClass(epcClassObject.asString().getValue());
if (quantity != null) {
double quantityDouble = quantity.asDouble().getValue();
qet.setQuantity(BigDecimal.valueOf(quantityDouble));
}
if (uom != null)
qet.setUom(uom.asString().getValue());
qetList.add(qet);
}
}
return qetList;
}
private void actionGridFS(final BsonDocument action, final BsonDocument assertion) throws Throwable {
if (action.isEmpty()) {
return;
}
String operation = action.getString("operation").getValue();
if (operation.equals("delete")) {
doDelete(action.getDocument("arguments"), assertion);
} else if (operation.equals("download")) {
doDownload(action.getDocument("arguments"), assertion);
} else if (operation.equals("download_by_name")) {
doDownloadByName(action.getDocument("arguments"), assertion);
} else if (operation.equals("upload")) {
doUpload(action.getDocument("arguments"), assertion);
} else {
throw new IllegalArgumentException("Unknown operation: " + operation);
}
}
@Test
@DisplayName("when valid doc change cdc event then correct UpdateOneModel")
void testValidSinkDocumentForUpdate() {
BsonDocument keyDoc = BsonDocument.parse("{id: '1234'}");
BsonDocument valueDoc =
new BsonDocument("op", new BsonString("u"))
.append("patch", new BsonString(UPDATE_DOC.toJson()));
WriteModel<BsonDocument> result = UPDATE.perform(new SinkDocument(keyDoc, valueDoc));
assertTrue(result instanceof UpdateOneModel, "result expected to be of type UpdateOneModel");
UpdateOneModel<BsonDocument> writeModel = (UpdateOneModel<BsonDocument>) result;
assertEquals(UPDATE_DOC, writeModel.getUpdate(), "update doc not matching what is expected");
assertTrue(
writeModel.getFilter() instanceof BsonDocument,
"filter expected to be of type BsonDocument");
assertEquals(FILTER_DOC, writeModel.getFilter());
}
@Override
public Optional<WriteModel<BsonDocument>> handle(final SinkDocument doc) {
BsonDocument keyDoc =
doc.getKeyDoc()
.orElseThrow(
() -> new DataException("Error: key document must not be missing for CDC mode"));
BsonDocument valueDoc = doc.getValueDoc().orElseGet(BsonDocument::new);
if (keyDoc.containsKey(JSON_ID_FIELD) && valueDoc.isEmpty()) {
LOGGER.debug("skipping debezium tombstone event for kafka topic compaction");
return Optional.empty();
}
LOGGER.debug("key: " + keyDoc.toString());
LOGGER.debug("value: " + valueDoc.toString());
return Optional.of(getCdcOperation(valueDoc).perform(doc));
}
public BsonDocument putQuantityList(BsonDocument base, List<QuantityElement> quantityList) {
BsonArray quantityArray = new BsonArray();
for (QuantityElement quantityElement : quantityList) {
BsonDocument bsonQuantityElement = new BsonDocument("epcClass",
new BsonString(quantityElement.getEpcClass()));
if (quantityElement.getQuantity() != null) {
bsonQuantityElement.put("quantity", new BsonDouble(quantityElement.getQuantity()));
}
if (quantityElement.getUom() != null) {
bsonQuantityElement.put("uom", new BsonString(quantityElement.getUom()));
}
quantityArray.add(bsonQuantityElement);
}
base.put("quantityList", quantityArray);
return base;
}
/**
* Constructs a change event.
*
* @param id The id of the change event.
* @param operationType The operation type represented by the change event.
* @param fullDocument The full document at some point after the change is applied.
* @param ns The namespace (database and collection) of the document.
* @param documentKey The id if the underlying document that changed.
* @param updateDescription The description of what has changed (for updates only).
* @param hasUncommittedWrites Whether this represents a local uncommitted write.
*/
public ChangeEvent(
final BsonDocument id,
final OperationType operationType,
final DocumentT fullDocument,
final MongoNamespace ns,
final BsonDocument documentKey,
final UpdateDescription updateDescription,
final boolean hasUncommittedWrites
) {
super(
operationType, fullDocument, documentKey, updateDescription, hasUncommittedWrites
);
this.id = id;
this.ns = ns;
}
/**
* get the basic ScheduleState, and then based on the version to get all sub-part(spoutSpecs/alertSpecs/etc)
* to form a completed ScheduleState.
* @return the latest ScheduleState
*/
@Override
public ScheduleState getScheduleState() {
BsonDocument sort = new BsonDocument();
sort.append("generateTime", new BsonInt32(-1));
ScheduleState state = scheduleStates.find().sort(sort).map(new Function<Document, ScheduleState>() {
@Override
public ScheduleState apply(Document t) {
String json = t.toJson();
try {
return mapper.readValue(json, ScheduleState.class);
} catch (IOException e) {
LOG.error("deserialize config item failed!", e);
}
return null;
}
}).first();
if (state != null) {
String version = state.getVersion();
// based on version, to add content from collections of spoutSpecs/alertSpecs/etc..
state = addDetailForScheduleState(state, version);
}
return state;
}
@Override
public BsonValue generateId(final SinkDocument doc, final SinkRecord orig) {
Optional<BsonDocument> optionalDoc = Optional.empty();
if (where.equals(ProvidedIn.KEY)) {
optionalDoc = doc.getKeyDoc();
}
if (where.equals(ProvidedIn.VALUE)) {
optionalDoc = doc.getValueDoc();
}
BsonValue id =
optionalDoc
.map(d -> d.get(ID_FIELD))
.orElseThrow(
() ->
new DataException(
"Error: provided id strategy is used but the document structure either contained"
+ " no _id field or it was null"));
if (id instanceof BsonNull) {
throw new DataException(
"Error: provided id strategy used but the document structure contained an _id of type BsonNull");
}
return id;
}
public TreeSet<Long> getTimestamps(Long startTime, Long endTime) {
TreeSet<Long> timestampSet = new TreeSet<Long>();
Function<BsonDateTime, Long> mapper = new Function<BsonDateTime, Long>() {
@Override
public Long apply(BsonDateTime val) {
return val.getValue();
}
};
edgeEvents.distinct(Tokens.TIMESTAMP, BsonDateTime.class)
.filter(new BsonDocument(Tokens.TIMESTAMP,
new BsonDocument(Tokens.FC.$gt.toString(), new BsonDateTime(startTime))
.append(Tokens.FC.$lt.toString(), new BsonDateTime(endTime))))
.map(mapper).into(timestampSet);
Set<Long> vtSet = new TreeSet<Long>();
vertexEvents.distinct(Tokens.TIMESTAMP, BsonDateTime.class)
.filter(new BsonDocument(Tokens.TIMESTAMP,
new BsonDocument(Tokens.FC.$gt.toString(), new BsonDateTime(startTime))
.append(Tokens.FC.$lt.toString(), new BsonDateTime(endTime))))
.map(mapper).into(timestampSet);
timestampSet.addAll(vtSet);
return timestampSet;
}
/**
* Convert this update description to an update document.
*
* @return an update document with the appropriate $set and $unset documents.
*/
public BsonDocument toUpdateDocument() {
final List<BsonElement> unsets = new ArrayList<>();
for (final String removedField : this.removedFields) {
unsets.add(new BsonElement(removedField, new BsonBoolean(true)));
}
final BsonDocument updateDocument = new BsonDocument();
if (this.updatedFields.size() > 0) {
updateDocument.append("$set", this.updatedFields);
}
if (unsets.size() > 0) {
updateDocument.append("$unset", new BsonDocument(unsets));
}
return updateDocument;
}
@Override
public WriteModel<BsonDocument> createWriteModel(SinkDocument document) {
BsonDocument kd = document.getKeyDoc().orElseThrow(
() -> new DataException("error: cannot build the WriteModel since"
+ " the key document was missing unexpectedly")
);
//NOTE: fallback for backwards / deprecation compatibility
if(idStrategy == null) {
return kd.containsKey(DBCollection.ID_FIELD_NAME)
? new DeleteOneModel<>(kd)
: new DeleteOneModel<>(new BsonDocument(DBCollection.ID_FIELD_NAME,kd));
}
//NOTE: current design doesn't allow to access original SinkRecord (= null)
BsonValue _id = idStrategy.generateId(document,null);
return new DeleteOneModel<>(
new BsonDocument(DBCollection.ID_FIELD_NAME,_id)
);
}
private List<BsonDocument> processFiles(final BsonArray bsonArray, final List<BsonDocument> documents) {
for (BsonValue rawDocument : bsonArray.getValues()) {
if (rawDocument.isDocument()) {
BsonDocument document = rawDocument.asDocument();
if (document.get("length").isInt32()) {
document.put("length", new BsonInt64(document.getInt32("length").getValue()));
}
if (document.containsKey("metadata") && document.getDocument("metadata").isEmpty()) {
document.remove("metadata");
}
if (document.containsKey("aliases") && document.getArray("aliases").getValues().size() == 0) {
document.remove("aliases");
}
if (document.containsKey("contentType") && document.getString("contentType").getValue().length() == 0) {
document.remove("contentType");
}
documents.add(document);
}
}
return documents;
}
@BeforeAll
public static void initializeTestData() {
JSON_STRING_1 = "{\"myField\":\"some text\"}";
OBJ_SCHEMA_1 = SchemaBuilder.struct()
.field("myField", Schema.STRING_SCHEMA);
OBJ_STRUCT_1 = new Struct(OBJ_SCHEMA_1)
.put("myField", "some text");
OBJ_MAP_1 = new LinkedHashMap<>();
OBJ_MAP_1.put("myField", "some text");
EXPECTED_BSON_DOC = new BsonDocument("myField", new BsonString("some text"));
combinations = new HashMap<>();
combinations.put(JSON_STRING_1, null);
combinations.put(OBJ_STRUCT_1, OBJ_SCHEMA_1);
combinations.put(OBJ_MAP_1, null);
}
private static Set<String> addEPCClasstoSet(Set<String> epcSet, BsonArray epcList) {
Iterator<BsonValue> epcIterator = epcList.iterator();
while (epcIterator.hasNext()) {
BsonDocument epcDocument = epcIterator.next().asDocument();
epcSet.add(epcDocument.getString("epcClass").getValue());
}
return epcSet;
}
public TransformationEvent(long eventTime, String eventTimeZoneOffset) {
super(eventTime, eventTimeZoneOffset);
inputEPCList = new ArrayList<String>();
inputQuantityList = new ArrayList<QuantityElement>();
outputEPCList = new ArrayList<String>();
outputQuantityList = new ArrayList<QuantityElement>();
bizTransactionList = new HashMap<String, List<String>>();
sourceList = new HashMap<String, List<String>>();
destinationList = new HashMap<String, List<String>>();
namespaces = new HashMap<String, String>();
ilmds = new BsonDocument();
extensions = new BsonDocument();
}
InsertManyOperation(
final MongoNamespace namespace,
final List<BsonDocument> documents
) {
this.namespace = namespace;
this.documents = documents;
}
/**
* Generates a change event for a local deletion of a document in the given namespace referring
* to the given document _id.
*
* @param namespace the namespace where the document was deleted.
* @param documentId the _id of the document that was deleted.
* @return a change event for a local deletion of a document in the given namespace referring
* to the given document _id.
*/
static ChangeEvent<BsonDocument> changeEventForLocalDelete(
final MongoNamespace namespace,
final BsonValue documentId,
final boolean writePending
) {
return new ChangeEvent<>(
new BsonDocument(),
OperationType.DELETE,
null,
namespace,
new BsonDocument("_id", documentId),
null,
writePending);
}
public BsonDocument putBizTransactionList(BsonDocument base, Map<String, List<String>> bizTransactionList) {
BsonArray bsonBizTransactionList = new BsonArray();
for (String key : bizTransactionList.keySet()) {
List<String> list = bizTransactionList.get(key);
for (String element : list) {
bsonBizTransactionList.add(new BsonDocument(key, new BsonString(element)));
}
}
base.put("bizTransactionList", bsonBizTransactionList);
return base;
}
@Test
@DisplayName("when key doc and value 'before' field both empty then DataException")
public void testEmptyKeyDocAndEmptyValueBeforeField() {
assertThrows(DataException.class,() ->
RDBMS_DELETE.perform(new SinkDocument(new BsonDocument(),
new BsonDocument("op",new BsonString("d")).append("before",new BsonDocument())))
);
}
@Parameterized.Parameters(name = "{index}: Collection {0}")
public static Collection<Object[]> data() {
Collection<Object[]> allTests = Arrays.asList(new Object[][]{
{"ARRAY", new org.bson.BsonArray()},
{"BINARY", new org.bson.BsonBinary(new BigInteger("123456789123abcdef456789", 16).toByteArray())},
{"BOOLEAN", org.bson.BsonBoolean.TRUE},
{"DATE_TIME", new org.bson.BsonDateTime(1482000000)},
{"DB_POINTER", new org.bson.BsonDbPointer("namespace", new org.bson.types.ObjectId("123456789123abcdef456789"))},
{"DECIMAL128", new org.bson.BsonDecimal128( new org.bson.types.Decimal128(18283340L))},
{"DOCUMENT", new org.bson.BsonDocument("key",new org.bson.BsonInt32(456))},
{"DOUBLE", new org.bson.BsonDouble(2.3)},
{"INT32", new org.bson.BsonInt32(2)},
{"INT64", new org.bson.BsonInt64(2L)},
{"JAVASCRIPT", new org.bson.BsonJavaScript("alert(\"hello\");")},
{"JAVASCRIPT_WITH_SCOPE", new org.bson.BsonJavaScriptWithScope("alert(\"hello\");", new BsonDocument("key",new org.bson.BsonInt32(456)))},
{"MAX_KEY", new org.bson.BsonMaxKey()},
{"MIN_KEY", new org.bson.BsonMinKey()},
{"NULL", new org.bson.BsonNull()},
{"OBJECT_ID", new org.bson.BsonObjectId(new org.bson.types.ObjectId("123456789123abcdef456789"))},
{"REGULAR_EXPRESSION", new org.bson.BsonRegularExpression("hello", "is")},
{"STRING", new org.bson.BsonString("hello")},
{"SYMBOL", new org.bson.BsonSymbol("hello")},
{"TIMESTAMP", new org.bson.BsonTimestamp()},
{"UNDEFINED", new org.bson.BsonUndefined()}
});
Arrays.asList(org.bson.BsonType.values()).forEach(
type -> assertTrue(type + " type is never tested",
MongoBsonTranslator.deprecatedTypes.contains(type) ||
allTests.stream().anyMatch(
toTest -> type.name().compareTo((String)toTest[0]) == 0
)
)
);
return allTests;
}
static BsonDocument getErrorDeclaration(ErrorDeclarationType edt) {
BsonDocument errorBson = new BsonDocument();
long declarationTime = edt.getDeclarationTime().toGregorianCalendar().getTimeInMillis();
errorBson.put("declarationTime", new BsonDateTime(declarationTime));
// (Optional) reason
if (edt.getReason() != null) {
errorBson.put("reason", new BsonString(edt.getReason()));
}
// (Optional) correctiveEventIDs
if (edt.getCorrectiveEventIDs() != null) {
CorrectiveEventIDsType cIDs = edt.getCorrectiveEventIDs();
List<String> cIDStringList = cIDs.getCorrectiveEventID();
BsonArray correctiveIDBsonArray = new BsonArray();
for (String cIDString : cIDStringList) {
correctiveIDBsonArray.add(new BsonString(cIDString));
}
if (correctiveIDBsonArray.size() != 0) {
errorBson.put("correctiveEventIDs", correctiveIDBsonArray);
}
}
if (edt.getAny() != null) {
BsonDocument map2Save = getAnyMap(edt.getAny());
if (map2Save != null && map2Save.isEmpty() == false) {
errorBson.put("any", map2Save);
}
}
return errorBson;
}
private static SinkDocument buildSinkDocumentNestedStruct() {
BsonDocument nestedKey =
BsonDocument.parse(
"{ _id: 'ABC-123', myInt: 42, "
+ "subDoc1: { myString: 'BSON1', myBoolean: false }, subDoc2: { myString: 'BSON2', myBoolean: true } }");
BsonDocument nestedValue =
BsonDocument.parse(
"{ _id: 'XYZ-789', myBoolean: true, "
+ "subDoc1: { myFieldA: 'some text', myFieldB: 12.34, subSubDoc: { myString: 'some text', myInt: 0, myBoolean: false } }, "
+ "subDoc2: { myFieldA: 'some text', myFieldB: 12.34, "
+ " subSubDoc: { myBytes: { $binary: 'eHl6', $type: '00' }, "
+ " myArray: [{ key: 'abc', value: 123 }, { key: 'xyz', value: 987 }] } } }");
return new SinkDocument(nestedKey, nestedValue);
}
static BsonParser createParser(BsonDocument bson) {
BasicOutputBuffer buffer = new BasicOutputBuffer();
CodecRegistry registry = MongoClientSettings.getDefaultCodecRegistry();
registry.get(BsonDocument.class)
.encode(new BsonBinaryWriter(buffer), bson, EncoderContext.builder().build());
BsonBinaryReader reader = new BsonBinaryReader(ByteBuffer.wrap(buffer.toByteArray()));
IOContext ioContext = new IOContext(new BufferRecycler(), null, false);
return new BsonParser(ioContext, 0, reader);
}
@Test public void
deserialise_integer() {
int value = random.nextInt();
BsonDocument doc = new BsonDocument();
doc.put("my-value", new BsonInt32(value));
Key<Integer> integerKey = Key.named("my-value");
Record record = BsonRecordDeserialiser.builder()
.readInteger(integerKey)
.get()
.apply(doc);
assertEquals("wrong int value", value, (int) integerKey.get(record).get());
}
/**
* Adds and returns a document with a new version to the given document.
*
* @param document the document to attach a new version to.
* @param newVersion the version to attach to the document
* @return a document with a new version to the given document.
*/
private static BsonDocument withNewVersion(
final BsonDocument document,
final BsonDocument newVersion
) {
final BsonDocument newDocument = BsonUtils.copyOfDocument(document);
newDocument.put(DOCUMENT_VERSION_FIELD, newVersion);
return newDocument;
}
@Override
public WriteModel<BsonDocument> perform(final SinkDocument doc) {
BsonDocument keyDoc =
doc.getKeyDoc()
.orElseThrow(
() -> new DataException("Error: key doc must not be missing for delete operation"));
try {
return new DeleteOneModel<>(
BsonDocument.parse(
format("{%s: %s}", ID_FIELD, keyDoc.getString(JSON_ID_FIELD).getValue())));
} catch (Exception exc) {
throw new DataException(exc);
}
}
static Map getUpdatedFields(Document fullDocument, BsonDocument updatedFields, boolean bsonConversion) {
if (bsonConversion) {
if (fullDocument == null) {
return (Map) MongoTypeUtil.convertBson(updatedFields);
}
HashMap<String, Object> res = new HashMap<>();
for (String key : updatedFields.keySet()) {
res.put(key, fullDocument.get(key));
}
return res;
}
return updatedFields;
}
@Test public void
deserialise_string() {
String value = UUID.randomUUID().toString();
BsonDocument doc = new BsonDocument();
doc.put("my-value", new BsonString(value));
Key<String> stringKey = Key.named("my-value");
Record record = BsonRecordDeserialiser.builder()
.readString(stringKey)
.get()
.apply(doc);
assertEquals("wrong string value", value, stringKey.get(record).get());
}