下面列出了怎么用com.mongodb.client.model.DeleteOneModel的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public WriteModel<BsonDocument> perform(final SinkDocument doc) {
BsonDocument keyDoc =
doc.getKeyDoc()
.orElseThrow(
() -> new DataException("Error: key doc must not be missing for delete operation"));
BsonDocument valueDoc =
doc.getValueDoc()
.orElseThrow(
() ->
new DataException("Error: value doc must not be missing for delete operation"));
try {
BsonDocument filterDoc =
RdbmsHandler.generateFilterDoc(keyDoc, valueDoc, OperationType.DELETE);
return new DeleteOneModel<>(filterDoc);
} catch (Exception exc) {
throw new DataException(exc);
}
}
@Test
@DisplayName("when valid cdc event with single field PK then correct DeleteOneModel")
void testValidSinkDocumentSingleFieldPK() {
BsonDocument filterDoc = BsonDocument.parse("{_id: {id: 1004}}");
BsonDocument keyDoc = BsonDocument.parse("{id: 1004}");
BsonDocument valueDoc = BsonDocument.parse("{op: 'd'}");
WriteModel<BsonDocument> result = RDBMS_DELETE.perform(new SinkDocument(keyDoc, valueDoc));
assertTrue(result instanceof DeleteOneModel, "result expected to be of type DeleteOneModel");
DeleteOneModel<BsonDocument> writeModel = (DeleteOneModel<BsonDocument>) result;
assertTrue(
writeModel.getFilter() instanceof BsonDocument,
"filter expected to be of type BsonDocument");
assertEquals(filterDoc, writeModel.getFilter());
}
@Test
@DisplayName("when valid cdc event with compound PK then correct DeleteOneModel")
void testValidSinkDocumentCompoundPK() {
BsonDocument filterDoc = BsonDocument.parse("{_id: {idA: 123, idB: 'ABC'}}");
BsonDocument keyDoc = BsonDocument.parse("{idA: 123, idB: 'ABC'}");
BsonDocument valueDoc = BsonDocument.parse("{op: 'd'}");
WriteModel<BsonDocument> result = RDBMS_DELETE.perform(new SinkDocument(keyDoc, valueDoc));
assertTrue(result instanceof DeleteOneModel, "result expected to be of type DeleteOneModel");
DeleteOneModel<BsonDocument> writeModel = (DeleteOneModel<BsonDocument>) result;
assertTrue(
writeModel.getFilter() instanceof BsonDocument,
"filter expected to be of type BsonDocument");
assertEquals(filterDoc, writeModel.getFilter());
}
@Test
@DisplayName("when valid cdc event without PK then correct DeleteOneModel")
void testValidSinkDocumentNoPK() {
BsonDocument filterDoc = BsonDocument.parse("{text: 'misc', number: 9876, active: true}");
BsonDocument keyDoc = new BsonDocument();
BsonDocument valueDoc =
BsonDocument.parse("{op: 'c', before: {text: 'misc', number: 9876, active: true}}");
WriteModel<BsonDocument> result = RDBMS_DELETE.perform(new SinkDocument(keyDoc, valueDoc));
assertTrue(result instanceof DeleteOneModel, "result expected to be of type DeleteOneModel");
DeleteOneModel<BsonDocument> writeModel = (DeleteOneModel<BsonDocument>) result;
assertTrue(
writeModel.getFilter() instanceof BsonDocument,
"filter expected to be of type BsonDocument");
assertEquals(filterDoc, writeModel.getFilter());
}
@Test
@DisplayName(
"when sink document is valid for DeleteOneDefaultStrategy then correct DeleteOneModel")
void testDeleteOneDefaultStrategyWitValidSinkDocument() {
BsonDocument keyDoc = BsonDocument.parse("{id: 1234}");
WriteModel<BsonDocument> result =
DELETE_ONE_DEFAULT_STRATEGY.createWriteModel(new SinkDocument(keyDoc, null));
assertTrue(result instanceof DeleteOneModel, "result expected to be of type DeleteOneModel");
DeleteOneModel<BsonDocument> writeModel = (DeleteOneModel<BsonDocument>) result;
assertTrue(
writeModel.getFilter() instanceof BsonDocument,
"filter expected to be of type BsonDocument");
assertEquals(FILTER_DOC_DELETE_DEFAULT, writeModel.getFilter());
}
/**
* Adds a new edit to the batch
*
* @param key the key to write
* @param value the value to write. Null indicates we should delete this key
* @return this
*/
public WriteBatch addEdit(byte[] key, byte[] value) {
if (value == null) {
DeleteOneModel deleteModel =
new DeleteOneModel<>(eq(MongoConstants.ID_FIELD_NAME, new BsonBinary(key)));
edits.add(deleteModel);
} else {
UpdateOneModel updateModel =
new UpdateOneModel<>(
eq(MongoConstants.ID_FIELD_NAME, new BsonBinary(key)),
Updates.set(MongoConstants.VALUE_FIELD_NAME, new BsonBinary(value)),
new UpdateOptions().upsert(true));
edits.add(updateModel);
}
return this;
}
@Test
void bulkWrite() {
ReactiveMongoDatabase database = client.getDatabase(DATABASE);
ReactiveMongoCollection<Document> collection = database.getCollection("test");
BulkWriteResult result = collection.bulkWrite(Arrays.asList(
new InsertOneModel<>(new Document("_id", 4)),
new InsertOneModel<>(new Document("_id", 5)),
new InsertOneModel<>(new Document("_id", 6)),
new UpdateOneModel<>(new Document("_id", 1),
new Document("$set", new Document("x", 2))),
new DeleteOneModel<>(new Document("_id", 2)),
new ReplaceOneModel<>(new Document("_id", 3),
new Document("_id", 3).append("x", 4))))
.await().indefinitely();
assertThat(result.getDeletedCount()).isEqualTo(0);
assertThat(result.getInsertedCount()).isEqualTo(3);
}
@Test
void bulkWriteWithOptions() {
ReactiveMongoDatabase database = client.getDatabase(DATABASE);
ReactiveMongoCollection<Document> collection = database.getCollection("test");
BulkWriteResult result = collection.bulkWrite(Arrays.asList(
new InsertOneModel<>(new Document("_id", 4)),
new InsertOneModel<>(new Document("_id", 5)),
new InsertOneModel<>(new Document("_id", 6)),
new UpdateOneModel<>(new Document("_id", 1),
new Document("$set", new Document("x", 2))),
new DeleteOneModel<>(new Document("_id", 2)),
new ReplaceOneModel<>(new Document("_id", 3),
new Document("_id", 3).append("x", 4))),
new BulkWriteOptions().ordered(true)).await().indefinitely();
assertThat(result.getDeletedCount()).isEqualTo(0);
assertThat(result.getInsertedCount()).isEqualTo(3);
}
@Override
public WriteModel<BsonDocument> perform(SinkDocument doc) {
BsonDocument keyDoc = doc.getKeyDoc().orElseThrow(
() -> new DataException("error: key doc must not be missing for delete operation")
);
BsonDocument valueDoc = doc.getValueDoc().orElseThrow(
() -> new DataException("error: value doc must not be missing for delete operation")
);
try {
BsonDocument filterDoc = RdbmsHandler.generateFilterDoc(keyDoc, valueDoc, OperationType.DELETE);
return new DeleteOneModel<>(filterDoc);
} catch(Exception exc) {
throw new DataException(exc);
}
}
@Override
public WriteModel<BsonDocument> perform(SinkDocument doc) {
BsonDocument keyDoc = doc.getKeyDoc().orElseThrow(
() -> new DataException("error: key doc must not be missing for delete operation")
);
try {
BsonDocument filterDoc = BsonDocument.parse(
"{"+DBCollection.ID_FIELD_NAME+
":"+keyDoc.getString(MongoDbHandler.JSON_ID_FIELD_PATH)
.getValue()+"}"
);
return new DeleteOneModel<>(filterDoc);
} catch(Exception exc) {
throw new DataException(exc);
}
}
@Override
public WriteModel<BsonDocument> createWriteModel(SinkDocument document) {
BsonDocument kd = document.getKeyDoc().orElseThrow(
() -> new DataException("error: cannot build the WriteModel since"
+ " the key document was missing unexpectedly")
);
//NOTE: fallback for backwards / deprecation compatibility
if(idStrategy == null) {
return kd.containsKey(DBCollection.ID_FIELD_NAME)
? new DeleteOneModel<>(kd)
: new DeleteOneModel<>(new BsonDocument(DBCollection.ID_FIELD_NAME,kd));
}
//NOTE: current design doesn't allow to access original SinkRecord (= null)
BsonValue _id = idStrategy.generateId(document,null);
return new DeleteOneModel<>(
new BsonDocument(DBCollection.ID_FIELD_NAME,_id)
);
}
@Test
@DisplayName("when valid cdc event with single field PK then correct DeleteOneModel")
public void testValidSinkDocumentSingleFieldPK() {
BsonDocument filterDoc =
new BsonDocument(DBCollection.ID_FIELD_NAME,
new BsonDocument("id",new BsonInt32(1004)));
BsonDocument keyDoc = new BsonDocument("id",new BsonInt32(1004));
BsonDocument valueDoc = new BsonDocument("op",new BsonString("d"));
WriteModel<BsonDocument> result =
RDBMS_DELETE.perform(new SinkDocument(keyDoc,valueDoc));
assertTrue(result instanceof DeleteOneModel,
() -> "result expected to be of type DeleteOneModel");
DeleteOneModel<BsonDocument> writeModel =
(DeleteOneModel<BsonDocument>) result;
assertTrue(writeModel.getFilter() instanceof BsonDocument,
() -> "filter expected to be of type BsonDocument");
assertEquals(filterDoc,writeModel.getFilter());
}
@Test
@DisplayName("when valid cdc event then correct DeleteOneModel")
public void testValidSinkDocument() {
BsonDocument keyDoc = new BsonDocument("id",new BsonString("1004"));
WriteModel<BsonDocument> result =
MONGODB_DELETE.perform(new SinkDocument(keyDoc,null));
assertTrue(result instanceof DeleteOneModel,
() -> "result expected to be of type DeleteOneModel");
DeleteOneModel<BsonDocument> writeModel =
(DeleteOneModel<BsonDocument>) result;
assertTrue(writeModel.getFilter() instanceof BsonDocument,
() -> "filter expected to be of type BsonDocument");
assertEquals(FILTER_DOC,writeModel.getFilter());
}
@Override
public long bulkDelete(List<?> ids) {
var watch = new StopWatch();
int size = ids.size();
int deletedRows = 0;
try {
List<DeleteOneModel<T>> models = new ArrayList<>(size);
for (Object id : ids) {
models.add(new DeleteOneModel<>(Filters.eq("_id", id)));
}
BulkWriteResult result = collection().bulkWrite(models, new BulkWriteOptions().ordered(false));
deletedRows = result.getDeletedCount();
return deletedRows;
} finally {
long elapsed = watch.elapsed();
ActionLogContext.track("mongo", elapsed, 0, deletedRows);
logger.debug("bulkDelete, collection={}, ids={}, size={}, deletedRows={}, elapsed={}", collectionName, ids, size, deletedRows, elapsed);
checkSlowOperation(elapsed);
}
}
@Override
public WriteModel<BsonDocument> perform(final SinkDocument doc) {
BsonDocument keyDoc =
doc.getKeyDoc()
.orElseThrow(
() -> new DataException("Error: key doc must not be missing for delete operation"));
try {
return new DeleteOneModel<>(
BsonDocument.parse(
format("{%s: %s}", ID_FIELD, keyDoc.getString(JSON_ID_FIELD).getValue())));
} catch (Exception exc) {
throw new DataException(exc);
}
}
@Test
@DisplayName("when valid cdc event then correct DeleteOneModel")
void testValidSinkDocument() {
BsonDocument keyDoc = BsonDocument.parse("{id: '1234'}");
WriteModel<BsonDocument> result = DELETE.perform(new SinkDocument(keyDoc, null));
assertTrue(result instanceof DeleteOneModel, "result expected to be of type DeleteOneModel");
DeleteOneModel<BsonDocument> writeModel = (DeleteOneModel<BsonDocument>) result;
assertTrue(
writeModel.getFilter() instanceof BsonDocument,
"filter expected to be of type BsonDocument");
assertEquals(FILTER_DOC, writeModel.getFilter());
}
@Test
@DisplayName(
"test DeleteOneDefaultStrategy with custom config and sink records with keys and null values")
void testBuildDeleteOneModelsCustomConfigSinkRecordsWithKeyAndNullValuePresent() {
MongoSinkTask sinkTask = new MongoSinkTask();
MongoSinkTopicConfig cfg =
createTopicConfig(
format(
"{'%s': '%s', '%s': '%s', '%s': %s}",
DOCUMENT_ID_STRATEGY_CONFIG,
FullKeyStrategy.class.getName(),
WRITEMODEL_STRATEGY_CONFIG,
ReplaceOneDefaultStrategy.class.getName(),
DELETE_ON_NULL_VALUES_CONFIG,
"true"));
TopicSettingsAndResults settings = new TopicSettingsAndResults(TEST_TOPIC, 10, 0);
Schema keySchema = SchemaBuilder.struct().field("myKeyField", Schema.STRING_SCHEMA);
settings.setKeySchema(keySchema);
settings.setKey(new Struct(keySchema).put("myKeyField", "ABCD-1234"));
List<SinkRecord> sinkRecordList = createSinkRecordList(settings);
List<? extends WriteModel> writeModelList = sinkTask.buildWriteModel(cfg, sinkRecordList);
assertNotNull(writeModelList, "WriteModel list was null");
assertFalse(writeModelList.isEmpty(), "WriteModel list was empty");
assertAll(
"checking all generated WriteModel entries",
writeModelList.stream()
.map(
wm ->
() -> {
assertTrue(wm instanceof DeleteOneModel);
DeleteOneModel<BsonDocument> dom = (DeleteOneModel<BsonDocument>) wm;
BsonDocument filter =
dom.getFilter().toBsonDocument(BsonDocument.class, null);
assertEquals(BsonDocument.parse("{_id: {myKeyField: 'ABCD-1234'}}"), filter);
}));
}
@Test
@DisplayName("when valid cdc event with compound PK then correct DeleteOneModel")
public void testValidSinkDocumentCompoundPK() {
BsonDocument filterDoc =
new BsonDocument(DBCollection.ID_FIELD_NAME,
new BsonDocument("idA",new BsonInt32(123))
.append("idB",new BsonString("ABC")));
BsonDocument keyDoc = new BsonDocument("idA",new BsonInt32(123))
.append("idB",new BsonString("ABC"));
BsonDocument valueDoc = new BsonDocument("op",new BsonString("d"));
WriteModel<BsonDocument> result =
RDBMS_DELETE.perform(new SinkDocument(keyDoc,valueDoc));
assertTrue(result instanceof DeleteOneModel,
() -> "result expected to be of type DeleteOneModel");
DeleteOneModel<BsonDocument> writeModel =
(DeleteOneModel<BsonDocument>) result;
assertTrue(writeModel.getFilter() instanceof BsonDocument,
() -> "filter expected to be of type BsonDocument");
assertEquals(filterDoc,writeModel.getFilter());
}
@Test
@DisplayName("when valid cdc event without PK then correct DeleteOneModel")
public void testValidSinkDocumentNoPK() {
BsonDocument filterDoc = new BsonDocument("text", new BsonString("hohoho"))
.append("number", new BsonInt32(9876))
.append("active", new BsonBoolean(true));
BsonDocument keyDoc = new BsonDocument();
BsonDocument valueDoc = new BsonDocument("op",new BsonString("c"))
.append("before",new BsonDocument("text", new BsonString("hohoho"))
.append("number", new BsonInt32(9876))
.append("active", new BsonBoolean(true)));
WriteModel<BsonDocument> result =
RDBMS_DELETE.perform(new SinkDocument(keyDoc,valueDoc));
assertTrue(result instanceof DeleteOneModel,
() -> "result expected to be of type DeleteOneModel");
DeleteOneModel<BsonDocument> writeModel =
(DeleteOneModel<BsonDocument>) result;
assertTrue(writeModel.getFilter() instanceof BsonDocument,
() -> "filter expected to be of type BsonDocument");
assertEquals(filterDoc,writeModel.getFilter());
}
/**
* Deletes a single synchronized document by its given id. No deletion will occur if the _id is
* not being synchronized.
*
* @param nsConfig the namespace synchronization config of the namespace where the document
* lives.
* @param documentId the _id of the document.
*/
@CheckReturnValue
private @Nullable
LocalSyncWriteModelContainer deleteOneFromResolution(
final NamespaceSynchronizationConfig nsConfig,
final BsonValue documentId,
final BsonDocument atVersion
) {
final MongoNamespace namespace = nsConfig.getNamespace();
final ChangeEvent<BsonDocument> event;
final Lock lock =
this.syncConfig.getNamespaceConfig(namespace).getLock().writeLock();
lock.lock();
final CoreDocumentSynchronizationConfig config;
try {
config = syncConfig.getSynchronizedDocument(namespace, documentId);
if (config == null) {
return null;
}
event = ChangeEvents.changeEventForLocalDelete(namespace, documentId, true);
config.setSomePendingWrites(logicalT, atVersion, 0L, event);
} finally {
lock.unlock();
}
final LocalSyncWriteModelContainer container = newWriteModelContainer(nsConfig);
container.addDocIDs(documentId);
container.addLocalWrite(new DeleteOneModel<>(getDocumentIdFilter(documentId)));
container.addLocalChangeEvent(event);
container.addConfigWrite(
new ReplaceOneModel<>(CoreDocumentSynchronizationConfig.getDocFilter(
namespace, config.getDocumentId()
), config));
return container;
}
@Test
@DisplayName("test build WriteModelCDC for Rdbms Delete")
void testBuildWriteModelCdcForRdbmsDelete() {
String topic = "dbserver1.catalogA.tableB";
Schema keySchema = getRdbmsKeySchemaSample();
Schema valueSchema = getRdbmsValueSchemaSample();
List<SinkRecord> sinkRecords =
IntStream.iterate(1234, i -> i + 1)
.limit(5)
.mapToObj(
i ->
new SinkRecord(
topic,
0,
keySchema,
new Struct(keySchema).put("id", i),
valueSchema,
new Struct(valueSchema)
.put("op", "d")
.put(
"before",
new Struct(valueSchema.field("before").schema())
.put("id", i)
.put("first_name", "Alice" + i)
.put("last_name", "in Wonderland")
.put("email", "alice" + i + "@wonder.land"))
.put("after", null)
.put("source", "ignored"),
i - 1234))
.collect(Collectors.toList());
MongoSinkTask sinkTask = new MongoSinkTask();
MongoSinkTopicConfig cfg =
SinkTestHelper.createSinkConfig(
format(
"{'%s': '%s', '%s': '%s'}",
TOPICS_CONFIG,
topic,
CHANGE_DATA_CAPTURE_HANDLER_CONFIG,
RdbmsHandler.class.getName()))
.getMongoSinkTopicConfig(topic);
List<? extends WriteModel> writeModels = sinkTask.buildWriteModelCDC(cfg, sinkRecords);
assertNotNull(writeModels, "WriteModel list was null");
assertFalse(writeModels.isEmpty(), "WriteModel list was empty");
assertAll(
"checking all generated WriteModel entries",
IntStream.iterate(1234, i -> i + 1)
.limit(5)
.mapToObj(
i ->
() -> {
int index = i - 1234;
WriteModel wm = writeModels.get(index);
assertNotNull(wm, "WriteModel at index " + index + " must not be null");
assertTrue(wm instanceof DeleteOneModel);
DeleteOneModel<BsonDocument> rom = (DeleteOneModel<BsonDocument>) wm;
BsonDocument filter =
rom.getFilter().toBsonDocument(BsonDocument.class, null);
assertEquals(BsonDocument.parse(format("{_id: {id: %s}}", i)), filter);
}));
}
@Test
@DisplayName("test DeleteOneDefaultStrategy with custom config and sink records with keys and null values")
void testBuildDeleteOneModelsCustomConfigSinkRecordsWithKeyAndNullValuePresent() {
MongoDbSinkTask sinkTask = new MongoDbSinkTask();
Map<String,String> props = new HashMap<>();
props.put(MongoDbSinkConnectorConfig.MONGODB_DOCUMENT_ID_STRATEGY_CONF,FullKeyStrategy.class.getName());
props.put(MongoDbSinkConnectorConfig.MONGODB_WRITEMODEL_STRATEGY,ReplaceOneDefaultStrategy.class.getName());
props.put(MongoDbSinkConnectorConfig.MONGODB_DELETE_ON_NULL_VALUES,"true");
props.put("topics","foo");
props.put(MongoDbSinkConnectorConfig.MONGODB_COLLECTIONS_CONF,"foo-collection");
props.put(MongoDbSinkConnectorConfig.MONGODB_COLLECTION_CONF+"."+"foo","foo-collection");
sinkTask.start(props);
TopicSettingsAndResults settings = new TopicSettingsAndResults("foo","foo-collection",10,0);
Schema ks = SchemaBuilder.struct()
.field("myKeyField", Schema.STRING_SCHEMA);
settings.setKeySchema(ks);
settings.setKey(new Struct(ks)
.put("myKeyField", "ABCD-1234"));
List<SinkRecord> sinkRecordList = createSinkRecordList(settings);
List<? extends WriteModel> writeModelList =
sinkTask.buildWriteModel(sinkRecordList,"blah-collection");
assertNotNull(writeModelList, "WriteModel list was null");
assertFalse(writeModelList.isEmpty(), "WriteModel list was empty");
assertAll("checking all generated WriteModel entries",
writeModelList.stream().map(wm ->
() -> assertAll("assertions for single WriteModel",
() -> assertTrue(wm instanceof DeleteOneModel),
() -> {
DeleteOneModel<BsonDocument> dom = (DeleteOneModel<BsonDocument>)wm;
BsonDocument filter = dom.getFilter().toBsonDocument(BsonDocument.class,null);
assertEquals(new BsonDocument("_id",
new BsonDocument("myKeyField",new BsonString("ABCD-1234"))),
filter);
}
)
)
);
}
@Test
@DisplayName("test build WriteModelCDC for Rdbms Delete")
void testBuildWriteModelCdcForRdbmsDelete() {
Schema keySchema = getRdbmsKeySchemaSample();
Schema valueSchema = getRdbmsValueSchemaSample();
List<SinkRecord> sinkRecords = IntStream.iterate(1234,i -> i+1).limit(5)
.mapToObj(i -> new SinkRecord("test-topic",0,
keySchema,new Struct(keySchema)
.put("id",i),
valueSchema,new Struct(valueSchema)
.put("op","d")
.put("before", new Struct(valueSchema.field("before").schema())
.put("id",i)
.put("first_name","Alice"+i)
.put("last_name","in Wonderland")
.put("email","alice"+i+"@wonder.land"))
.put("after", null)
//.put("source",...) //NOTE: SKIPPED SINCE NOT USED AT ALL SO FAR
,i - 1234
))
.collect(Collectors.toList());
MongoDbSinkTask sinkTask = new MongoDbSinkTask();
Map<String,String> props = new HashMap<>();
props.put("topics","dbserver1.catalogA.tableB");
props.put(MongoDbSinkConnectorConfig.MONGODB_COLLECTIONS_CONF,"dbserver1.catalogA.tableB");
props.put(MongoDbSinkConnectorConfig.MONGODB_COLLECTION_CONF
+"."+"dbserver1.catalogA.tableB","dbserver1.catalogA.tableB");
props.put(MongoDbSinkConnectorConfig.MONGODB_CHANGE_DATA_CAPTURE_HANDLER
+"."+"dbserver1.catalogA.tableB",RdbmsHandler.class.getName());
sinkTask.start(props);
List<? extends WriteModel> writeModels =
sinkTask.buildWriteModelCDC(sinkRecords,"dbserver1.catalogA.tableB");
assertNotNull(writeModels, "WriteModel list was null");
assertFalse(writeModels.isEmpty(), "WriteModel list was empty");
assertAll("checking all generated WriteModel entries",
IntStream.iterate(1234,i -> i+1).limit(5).mapToObj(
i -> () -> {
int index = i-1234;
WriteModel wm = writeModels.get(index);
assertNotNull(wm, "WriteModel at index "+index+" must not be null");
assertTrue(wm instanceof DeleteOneModel);
DeleteOneModel<BsonDocument> rom = (DeleteOneModel<BsonDocument>)wm;
BsonDocument filter = rom.getFilter().toBsonDocument(BsonDocument.class,null);
assertEquals(new BsonDocument("_id",
new BsonDocument("id",new BsonInt32(i))),
filter);
}
)
);
}
@Test
@DisplayName("when sink document is valid for DeleteOneDefaultStrategy then correct DeleteOneModel")
public void testDeleteOneDefaultStrategyWitValidSinkDocument() {
BsonDocument keyDoc = new BsonDocument("id",new BsonInt32(1004));
WriteModel<BsonDocument> result =
DELETE_ONE_DEFAULT_STRATEGY.createWriteModel(new SinkDocument(keyDoc,null));
assertTrue(result instanceof DeleteOneModel,
() -> "result expected to be of type DeleteOneModel");
DeleteOneModel<BsonDocument> writeModel =
(DeleteOneModel<BsonDocument>) result;
assertTrue(writeModel.getFilter() instanceof BsonDocument,
() -> "filter expected to be of type BsonDocument");
assertEquals(FILTER_DOC_DELETE_DEFAULT,writeModel.getFilter());
}
@SuppressWarnings("unchecked")
private void testStreamRestart(final Supplier<Throwable> errorSupplier) throws Exception {
new TestKit(actorSystem) {{
// GIVEN: The persistence fails with an error on every write
final MongoDatabase db = Mockito.mock(MongoDatabase.class);
final MongoCollection<Document> collection = Mockito.mock(MongoCollection.class);
final Publisher<BulkWriteResult> publisher = s -> s.onError(errorSupplier.get());
Mockito.when(db.getCollection(Mockito.any())).thenReturn(collection);
Mockito.when(collection.bulkWrite(Mockito.any(), Mockito.any(BulkWriteOptions.class)))
.thenReturn(publisher);
// GIVEN: MongoSearchUpdaterFlow is wrapped inside a RestartSink
final MongoSearchUpdaterFlow flow = MongoSearchUpdaterFlow.of(db);
final Sink<Source<AbstractWriteModel, NotUsed>, ?> sink =
flow.start(1, 1, Duration.ZERO).to(Sink.ignore());
final Sink<Source<AbstractWriteModel, NotUsed>, ?> restartSink =
RestartSink.withBackoff(Duration.ZERO, Duration.ZERO, 1.0, () -> sink);
// WHEN: Many changes stream through MongoSearchUpdaterFlow
final int numberOfChanges = 25;
final CountDownLatch latch = new CountDownLatch(numberOfChanges);
final AbstractWriteModel abstractWriteModel = Mockito.mock(AbstractWriteModel.class);
final WriteModel<Document> mongoWriteModel = new DeleteOneModel<>(new Document());
Mockito.when(abstractWriteModel.toMongo()).thenReturn(mongoWriteModel);
Source.repeat(Source.single(abstractWriteModel))
.take(numberOfChanges)
.buffer(1, OverflowStrategy.backpressure())
.map(source -> {
latch.countDown();
return source;
})
.runWith(restartSink, ActorMaterializer.create(actorSystem));
// THEN: MongoSearchUpdaterFlow should keep restarting and keep consuming changes from the stream
latch.await(5L, TimeUnit.SECONDS);
assertThat(latch.getCount()).isZero();
}};
}
/**
* Gets the number of deletes which are in this batch
*
* @return Number of deletes
*/
public long getDeleteCount() {
return this.edits.stream().filter(e -> e instanceof DeleteOneModel).count();
}