下面列出了com.mongodb.DBCollection#ID_FIELD_NAME 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
protected static BsonDocument generateFilterDoc(BsonDocument keyDoc, BsonDocument valueDoc, OperationType opType) {
if (keyDoc.keySet().isEmpty()) {
if (opType.equals(OperationType.CREATE) || opType.equals(OperationType.READ)) {
//create: no PK info in keyDoc -> generate ObjectId
return new BsonDocument(DBCollection.ID_FIELD_NAME,new BsonObjectId());
}
//update or delete: no PK info in keyDoc -> take everything in 'before' field
try {
BsonDocument filter = valueDoc.getDocument(JSON_DOC_BEFORE_FIELD);
if (filter.isEmpty())
throw new BsonInvalidOperationException("value doc before field is empty");
return filter;
} catch(BsonInvalidOperationException exc) {
throw new DataException("error: value doc 'before' field is empty or has invalid type" +
" for update/delete operation which seems severely wrong -> defensive actions taken!",exc);
}
}
//build filter document composed of all PK columns
BsonDocument pk = new BsonDocument();
for (String f : keyDoc.keySet()) {
pk.put(f,keyDoc.get(f));
}
return new BsonDocument(DBCollection.ID_FIELD_NAME,pk);
}
@Override
public WriteModel<BsonDocument> perform(SinkDocument doc) {
BsonDocument valueDoc = doc.getValueDoc().orElseThrow(
() -> new DataException("error: value doc must not be missing for insert operation")
);
try {
BsonDocument insertDoc = BsonDocument.parse(
valueDoc.get(JSON_DOC_FIELD_PATH).asString().getValue()
);
return new ReplaceOneModel<>(
new BsonDocument(DBCollection.ID_FIELD_NAME,
insertDoc.get(DBCollection.ID_FIELD_NAME)),
insertDoc,
UPDATE_OPTIONS
);
} catch(Exception exc) {
throw new DataException(exc);
}
}
@Override
public WriteModel<BsonDocument> createWriteModel(SinkDocument document) {
BsonDocument kd = document.getKeyDoc().orElseThrow(
() -> new DataException("error: cannot build the WriteModel since"
+ " the key document was missing unexpectedly")
);
//NOTE: fallback for backwards / deprecation compatibility
if(idStrategy == null) {
return kd.containsKey(DBCollection.ID_FIELD_NAME)
? new DeleteOneModel<>(kd)
: new DeleteOneModel<>(new BsonDocument(DBCollection.ID_FIELD_NAME,kd));
}
//NOTE: current design doesn't allow to access original SinkRecord (= null)
BsonValue _id = idStrategy.generateId(document,null);
return new DeleteOneModel<>(
new BsonDocument(DBCollection.ID_FIELD_NAME,_id)
);
}
@Override
public WriteModel<BsonDocument> createWriteModel(SinkDocument document) {
BsonDocument vd = document.getValueDoc().orElseThrow(
() -> new DataException("error: cannot build the WriteModel since"
+ " the value document was missing unexpectedly")
);
BsonDateTime dateTime = new BsonDateTime(Instant.now().toEpochMilli());
return new UpdateOneModel<>(
new BsonDocument(DBCollection.ID_FIELD_NAME,
vd.get(DBCollection.ID_FIELD_NAME)),
new BsonDocument("$set", vd.append(FIELDNAME_MODIFIED_TS, dateTime))
.append("$setOnInsert", new BsonDocument(FIELDNAME_INSERTED_TS, dateTime)),
UPDATE_OPTIONS
);
}
@Test
@DisplayName("when valid cdc event with single field PK then correct DeleteOneModel")
public void testValidSinkDocumentSingleFieldPK() {
BsonDocument filterDoc =
new BsonDocument(DBCollection.ID_FIELD_NAME,
new BsonDocument("id",new BsonInt32(1004)));
BsonDocument keyDoc = new BsonDocument("id",new BsonInt32(1004));
BsonDocument valueDoc = new BsonDocument("op",new BsonString("d"));
WriteModel<BsonDocument> result =
RDBMS_DELETE.perform(new SinkDocument(keyDoc,valueDoc));
assertTrue(result instanceof DeleteOneModel,
() -> "result expected to be of type DeleteOneModel");
DeleteOneModel<BsonDocument> writeModel =
(DeleteOneModel<BsonDocument>) result;
assertTrue(writeModel.getFilter() instanceof BsonDocument,
() -> "filter expected to be of type BsonDocument");
assertEquals(filterDoc,writeModel.getFilter());
}
@Override
public WriteModel<BsonDocument> createWriteModel(SinkDocument document) {
BsonDocument vd = document.getValueDoc().orElseThrow(
() -> new DataException("error: cannot build the WriteModel since"
+ " the value document was missing unexpectedly")
);
return new ReplaceOneModel<>(
new BsonDocument(DBCollection.ID_FIELD_NAME,
vd.get(DBCollection.ID_FIELD_NAME)),
vd,
UPDATE_OPTIONS);
}
@Test
@DisplayName("when valid cdc event with compound PK then correct DeleteOneModel")
public void testValidSinkDocumentCompoundPK() {
BsonDocument filterDoc =
new BsonDocument(DBCollection.ID_FIELD_NAME,
new BsonDocument("idA",new BsonInt32(123))
.append("idB",new BsonString("ABC")));
BsonDocument keyDoc = new BsonDocument("idA",new BsonInt32(123))
.append("idB",new BsonString("ABC"));
BsonDocument valueDoc = new BsonDocument("op",new BsonString("d"));
WriteModel<BsonDocument> result =
RDBMS_DELETE.perform(new SinkDocument(keyDoc,valueDoc));
assertTrue(result instanceof DeleteOneModel,
() -> "result expected to be of type DeleteOneModel");
DeleteOneModel<BsonDocument> writeModel =
(DeleteOneModel<BsonDocument>) result;
assertTrue(writeModel.getFilter() instanceof BsonDocument,
() -> "filter expected to be of type BsonDocument");
assertEquals(filterDoc,writeModel.getFilter());
}
@Override
public WriteModel<BsonDocument> createWriteModel(SinkDocument document, SinkRecord record) {
BsonDocument vd = document.getValueDoc().orElseThrow(
() -> new DataException("error: cannot build the WriteModel since"
+ " the value document was missing unexpectedly")
);
//1) add kafka coordinates to the value document
//NOTE: future versions might allow to configure the fieldnames
//via external configuration properties, for now this is pre-defined.
vd.append(FIELD_KAFKA_COORDS, new BsonDocument(
FIELD_TOPIC, new BsonString(record.topic()))
.append(FIELD_PARTITION, new BsonInt32(record.kafkaPartition()))
.append(FIELD_OFFSET, new BsonInt64(record.kafkaOffset()))
);
//2) build the conditional update pipeline based on Kafka coordinates
//which makes sure that in case records get replayed - e.g. either due to
//uncommitted offsets or newly started connectors with different names -
//that stale data never overwrites newer data which was previously written
//to the sink already.
List<Bson> conditionalUpdatePipeline = new ArrayList<>();
conditionalUpdatePipeline.add(new BsonDocument("$replaceRoot",
new BsonDocument("newRoot", new BsonDocument("$cond",
new BsonDocument("if", new BsonDocument("$and",
new BsonArray(Arrays.asList(
new BsonDocument("$eq", new BsonArray(Arrays.asList(
new BsonString("$$ROOT." + FIELD_KAFKA_COORDS + "." + FIELD_TOPIC),
new BsonString(record.topic())))),
new BsonDocument("$eq", new BsonArray(Arrays.asList(
new BsonString("$$ROOT." + FIELD_KAFKA_COORDS + "." + FIELD_PARTITION),
new BsonInt32(record.kafkaPartition())))),
new BsonDocument("$gte", new BsonArray(Arrays.asList(
new BsonString("$$ROOT." + FIELD_KAFKA_COORDS + "." + FIELD_OFFSET),
new BsonInt64(record.kafkaOffset()))))
))))
.append("then", new BsonString("$$ROOT"))
.append("else", vd)
))
));
return new UpdateOneModel<>(
new BsonDocument(DBCollection.ID_FIELD_NAME, vd.get(DBCollection.ID_FIELD_NAME)),
conditionalUpdatePipeline,
UPDATE_OPTIONS
);
}
@Test
@DisplayName("when valid cdc event with single field PK then correct ReplaceOneModel")
public void testValidSinkDocumentSingleFieldPK() {
BsonDocument filterDoc =
new BsonDocument(DBCollection.ID_FIELD_NAME,
new BsonDocument("id",new BsonInt32(1004)));
BsonDocument replacementDoc =
new BsonDocument(DBCollection.ID_FIELD_NAME,
new BsonDocument("id",new BsonInt32(1004)))
.append("first_name",new BsonString("Anne"))
.append("last_name",new BsonString("Kretchmar"))
.append("email",new BsonString("[email protected]"));
BsonDocument keyDoc = new BsonDocument("id",new BsonInt32(1004));
BsonDocument valueDoc = new BsonDocument("op",new BsonString("u"))
.append("after",new BsonDocument("id",new BsonInt32(1004))
.append("first_name",new BsonString("Anne"))
.append("last_name",new BsonString("Kretchmar"))
.append("email",new BsonString("[email protected]")));
WriteModel<BsonDocument> result =
RDBMS_UPDATE.perform(new SinkDocument(keyDoc,valueDoc));
assertTrue(result instanceof ReplaceOneModel,
() -> "result expected to be of type ReplaceOneModel");
ReplaceOneModel<BsonDocument> writeModel =
(ReplaceOneModel<BsonDocument>) result;
assertEquals(replacementDoc,writeModel.getReplacement(),
()-> "replacement doc not matching what is expected");
assertTrue(writeModel.getFilter() instanceof BsonDocument,
() -> "filter expected to be of type BsonDocument");
assertEquals(filterDoc,writeModel.getFilter());
assertTrue(writeModel.getOptions().isUpsert(),
() -> "replacement expected to be done in upsert mode");
}
@Test
@DisplayName("when valid cdc event with compound PK then correct ReplaceOneModel")
public void testValidSinkDocumentCompoundPK() {
BsonDocument filterDoc =
new BsonDocument(DBCollection.ID_FIELD_NAME,
new BsonDocument("idA",new BsonInt32(123))
.append("idB",new BsonString("ABC")));
BsonDocument replacementDoc =
new BsonDocument(DBCollection.ID_FIELD_NAME,
new BsonDocument("idA",new BsonInt32(123))
.append("idB",new BsonString("ABC")))
.append("number", new BsonDouble(567.89))
.append("active", new BsonBoolean(true));
BsonDocument keyDoc = new BsonDocument("idA",new BsonInt32(123))
.append("idB",new BsonString("ABC"));
BsonDocument valueDoc = new BsonDocument("op",new BsonString("c"))
.append("after",new BsonDocument("idA",new BsonInt32(123))
.append("idB",new BsonString("ABC"))
.append("number", new BsonDouble(567.89))
.append("active", new BsonBoolean(true)));
WriteModel<BsonDocument> result =
RDBMS_UPDATE.perform(new SinkDocument(keyDoc,valueDoc));
assertTrue(result instanceof ReplaceOneModel,
() -> "result expected to be of type ReplaceOneModel");
ReplaceOneModel<BsonDocument> writeModel =
(ReplaceOneModel<BsonDocument>) result;
assertEquals(replacementDoc,writeModel.getReplacement(),
()-> "replacement doc not matching what is expected");
assertTrue(writeModel.getFilter() instanceof BsonDocument,
() -> "filter expected to be of type BsonDocument");
assertEquals(filterDoc,writeModel.getFilter());
assertTrue(writeModel.getOptions().isUpsert(),
() -> "replacement expected to be done in upsert mode");
}
@Test
@DisplayName("when valid cdc event with single field PK then correct ReplaceOneModel")
public void testValidSinkDocumentSingleFieldPK() {
BsonDocument filterDoc =
new BsonDocument(DBCollection.ID_FIELD_NAME,
new BsonDocument("id",new BsonInt32(1004)));
BsonDocument replacementDoc =
new BsonDocument(DBCollection.ID_FIELD_NAME,
new BsonDocument("id",new BsonInt32(1004)))
.append("first_name",new BsonString("Anne"))
.append("last_name",new BsonString("Kretchmar"))
.append("email",new BsonString("[email protected]"));
BsonDocument keyDoc = new BsonDocument("id",new BsonInt32(1004));
BsonDocument valueDoc = new BsonDocument("op",new BsonString("c"))
.append("after",new BsonDocument("id",new BsonInt32(1004))
.append("first_name",new BsonString("Anne"))
.append("last_name",new BsonString("Kretchmar"))
.append("email",new BsonString("[email protected]")));
WriteModel<BsonDocument> result =
RDBMS_INSERT.perform(new SinkDocument(keyDoc,valueDoc));
assertTrue(result instanceof ReplaceOneModel,
() -> "result expected to be of type ReplaceOneModel");
ReplaceOneModel<BsonDocument> writeModel =
(ReplaceOneModel<BsonDocument>) result;
assertEquals(replacementDoc,writeModel.getReplacement(),
()-> "replacement doc not matching what is expected");
assertTrue(writeModel.getFilter() instanceof BsonDocument,
() -> "filter expected to be of type BsonDocument");
assertEquals(filterDoc,writeModel.getFilter());
assertTrue(writeModel.getOptions().isUpsert(),
() -> "replacement expected to be done in upsert mode");
}
@Test
@DisplayName("when valid cdc event with compound PK then correct ReplaceOneModel")
public void testValidSinkDocumentCompoundPK() {
BsonDocument filterDoc =
new BsonDocument(DBCollection.ID_FIELD_NAME,
new BsonDocument("idA",new BsonInt32(123))
.append("idB",new BsonString("ABC")));
BsonDocument replacementDoc =
new BsonDocument(DBCollection.ID_FIELD_NAME,
new BsonDocument("idA",new BsonInt32(123))
.append("idB",new BsonString("ABC")))
.append("number", new BsonDouble(567.89))
.append("active", new BsonBoolean(true));
BsonDocument keyDoc = new BsonDocument("idA",new BsonInt32(123))
.append("idB",new BsonString("ABC"));
BsonDocument valueDoc = new BsonDocument("op",new BsonString("c"))
.append("after",new BsonDocument("idA",new BsonInt32(123))
.append("idB",new BsonString("ABC"))
.append("number", new BsonDouble(567.89))
.append("active", new BsonBoolean(true)));
WriteModel<BsonDocument> result =
RDBMS_INSERT.perform(new SinkDocument(keyDoc,valueDoc));
assertTrue(result instanceof ReplaceOneModel,
() -> "result expected to be of type ReplaceOneModel");
ReplaceOneModel<BsonDocument> writeModel =
(ReplaceOneModel<BsonDocument>) result;
assertEquals(replacementDoc,writeModel.getReplacement(),
()-> "replacement doc not matching what is expected");
assertTrue(writeModel.getFilter() instanceof BsonDocument,
() -> "filter expected to be of type BsonDocument");
assertEquals(filterDoc,writeModel.getFilter());
assertTrue(writeModel.getOptions().isUpsert(),
() -> "replacement expected to be done in upsert mode");
}