下面列出了com.amazonaws.auth.AnonymousAWSCredentials#org.apache.nifi.util.TestRunners 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testRecordSplitDatafileOutputWithMultipleRecords() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new SplitAvro());
runner.setProperty(SplitAvro.OUTPUT_SIZE, "20");
runner.enqueue(users.toByteArray());
runner.run();
runner.assertTransferCount(SplitAvro.REL_SPLIT, 5);
runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
runner.getFlowFilesForRelationship(SplitAvro.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "5");
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
checkDataFileSplitSize(flowFiles, 20, true);
}
@Test
public void testNullValues() {
final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\n");
runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
runner.setProperty(ValidateCsv.HEADER, "true");
runner.setProperty(ValidateCsv.VALIDATION_STRATEGY, ValidateCsv.VALIDATE_LINES_INDIVIDUALLY);
runner.setProperty(ValidateCsv.SCHEMA, "Null, Null, Null");
runner.enqueue("#Name,Birthdate,Weight\nJohn,\"\",63.2\nBob,,45.0");
runner.run();
runner.assertTransferCount(ValidateCsv.REL_VALID, 1);
runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).get(0).assertContentEquals("#Name,Birthdate,Weight\nJohn,,63.2\nBob,,45.0");
runner.assertTransferCount(ValidateCsv.REL_INVALID, 0);
}
@Test
public void testMetadataKeyFilter() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new ExtractMediaMetadata());
runner.setProperty(ExtractMediaMetadata.METADATA_KEY_FILTER, "(X-Parsed.*)");
runner.setProperty(ExtractMediaMetadata.METADATA_KEY_PREFIX, "txt.");
runner.assertValid();
runner.enqueue(new File("target/test-classes/textFile.txt").toPath());
runner.run();
runner.assertAllFlowFilesTransferred(ExtractMediaMetadata.SUCCESS, 1);
runner.assertTransferCount(ExtractMediaMetadata.FAILURE, 0);
final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(ExtractMediaMetadata.SUCCESS);
MockFlowFile flowFile0 = successFiles.get(0);
flowFile0.assertAttributeExists("filename");
flowFile0.assertAttributeEquals("filename", "textFile.txt");
flowFile0.assertAttributeExists("txt.X-Parsed-By");
assertTrue(flowFile0.getAttribute("txt.X-Parsed-By").contains("org.apache.tika.parser.DefaultParser"));
assertTrue(flowFile0.getAttribute("txt.X-Parsed-By").contains("org.apache.tika.parser.txt.TXTParser"));
flowFile0.assertAttributeNotExists("txt.Content-Encoding");
}
@Test
public void validateFullConfigWithUserLib() throws Exception {
TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
runner.addControllerService("cfProvider", cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, "myhost:1234");
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH,
new File("test-lib").getAbsolutePath()); // see README in 'test-lib' dir for more info
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL,
"org.apache.nifi.jms.testcflib.TestConnectionFactory");
runner.setProperty(cfProvider, "Foo", "foo");
runner.setProperty(cfProvider, "Bar", "3");
runner.enableControllerService(cfProvider);
runner.assertValid(cfProvider);
ConnectionFactory cf = cfProvider.getConnectionFactory();
assertNotNull(cf);
assertEquals("org.apache.nifi.jms.testcflib.TestConnectionFactory", cf.getClass().getName());
assertEquals("myhost", this.get("getHost", cf));
assertEquals(1234, ((Integer) this.get("getPort", cf)).intValue());
assertEquals("foo", this.get("getFoo", cf));
assertEquals(3, ((Integer) this.get("getBar", cf)).intValue());
}
@Test
public void testTryToDeleteNotExistingFile() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new DeleteS3Object());
runner.setProperty(DeleteS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(DeleteS3Object.REGION, REGION);
runner.setProperty(DeleteS3Object.BUCKET, BUCKET_NAME);
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "no-such-a-file");
runner.enqueue(new byte[0], attrs);
runner.run(1);
runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_SUCCESS, 1);
}
@Test
public void testStateMigratedFromCacheService() throws InitializationException {
final ConcreteListProcessor proc = new ConcreteListProcessor();
final TestRunner runner = TestRunners.newTestRunner(proc);
final DistributedCache cache = new DistributedCache();
runner.addControllerService("cache", cache);
runner.enableControllerService(cache);
runner.setProperty(AbstractListProcessor.DISTRIBUTED_CACHE_SERVICE, "cache");
final String serviceState = "{\"latestTimestamp\":1492,\"matchingIdentifiers\":[\"id\"]}";
final String cacheKey = runner.getProcessor().getIdentifier() + ".lastListingTime./path";
cache.stored.put(cacheKey, serviceState);
runner.run();
final MockStateManager stateManager = runner.getStateManager();
final Map<String, String> expectedState = new HashMap<>();
// Ensure only timestamp is migrated
expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, "1492");
expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, "1492");
stateManager.assertStateEquals(expectedState, Scope.CLUSTER);
}
@Test
public void testTransformInputWithSortr() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new JoltTransformJSON());
runner.setValidateExpressionUsage(false);
runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM, JoltTransformJSON.SORTR);
runner.enqueue(JSON_INPUT);
runner.run();
runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_SUCCESS);
final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformJSON.REL_SUCCESS).get(0);
transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(),"application/json");
Object transformedJson = JsonUtils.jsonToObject(new ByteArrayInputStream(transformed.toByteArray()));
Object compareJson = JsonUtils.jsonToObject(Files.newInputStream(Paths.get("src/test/resources/TestJoltTransformJson/sortrOutput.json")));
String transformedJsonString = JsonUtils.toJsonString(transformedJson);
String compareJsonString = JsonUtils.toJsonString(compareJson);
assertTrue(compareJsonString.equals(transformedJsonString));
}
@Test
public void testMatchesMultipleXmlAttribute() throws XPathFactoryConfigurationException, IOException {
final TestRunner testRunner = TestRunners.newTestRunner(new EvaluateXQuery());
testRunner.setProperty(EvaluateXQuery.DESTINATION, EvaluateXQuery.DESTINATION_ATTRIBUTE);
testRunner.setProperty("some.property", "//fruit/name");
testRunner.enqueue(XML_SNIPPET);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(EvaluateXQuery.REL_MATCH, 1);
final MockFlowFile out = testRunner.getFlowFilesForRelationship(EvaluateXQuery.REL_MATCH).get(0);
for (int i = 0; i < fruitNames.length; i++) {
final String outXml = out.getAttribute("some.property." + ((int) i + 1));
String expectedXml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><name xmlns:ns=\"http://namespace/1\">" + fruitNames[i] + "</name>";
assertEquals(expectedXml, outXml.trim());
}
testRunner.getFlowFilesForRelationship(EvaluateXQuery.REL_MATCH).get(0).assertContentEquals(XML_SNIPPET);
}
@Test
public void testExpressionLiteralDelete() {
final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute());
runner.setProperty(UpdateAttribute.DELETE_ATTRIBUTES, "${literal('attribute\\.'):append(${literal(6)})}");
final Map<String, String> attributes = new HashMap<>();
attributes.put("attribute.1", "value.1");
attributes.put("attribute.2", "value.2");
attributes.put("attribute.6", "value.6");
attributes.put("sampleSize", "value.size");
attributes.put("sample.1", "value.sample.1");
attributes.put("simple.1", "value.simple.1");
runner.enqueue(new byte[0], attributes);
runner.run();
runner.assertAllFlowFilesTransferred(UpdateAttribute.REL_SUCCESS, 1);
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS);
result.get(0).assertAttributeEquals("attribute.1", "value.1");
result.get(0).assertAttributeExists("attribute.2");
result.get(0).assertAttributeNotExists("attribute.6");
result.get(0).assertAttributeExists("sampleSize");
result.get(0).assertAttributeExists("sample.1");
result.get(0).assertAttributeExists("simple.1");
}
@Test
public void testDeleteFolderNoExpressionLanguage() throws IOException {
// Prepares for this test
putTestFile("folder/delete-me", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
final TestRunner runner = TestRunners.newTestRunner(new DeleteS3Object());
runner.setProperty(DeleteS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(DeleteS3Object.REGION, REGION);
runner.setProperty(DeleteS3Object.BUCKET, BUCKET_NAME);
runner.setProperty(DeleteS3Object.KEY, "folder/delete-me");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "a-different-name");
runner.enqueue(new byte[0], attrs);
runner.run(1);
runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_SUCCESS, 1);
}
@Test
public void testSimpleSuccess() throws IOException {
final File sourceFile = new File("target/1.txt");
final byte[] content = "Hello, World!".getBytes();
Files.write(sourceFile.toPath(), content, StandardOpenOption.CREATE);
final TestRunner runner = TestRunners.newTestRunner(new FetchFile());
runner.setProperty(FetchFile.FILENAME, sourceFile.getAbsolutePath());
runner.setProperty(FetchFile.COMPLETION_STRATEGY, FetchFile.COMPLETION_NONE.getValue());
runner.enqueue(new byte[0]);
runner.run();
runner.assertAllFlowFilesTransferred(FetchFile.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(FetchFile.REL_SUCCESS).get(0).assertContentEquals(content);
assertTrue(sourceFile.exists());
}
@Test
public void testIgnoreZeroCaptureGroupProperty() throws Exception {
final TestRunner testRunner = TestRunners.newTestRunner(new ExtractText());
testRunner.setProperty(ExtractText.INCLUDE_CAPTURE_GROUP_ZERO, "false");
final String attributeKey = "regex.result";
testRunner.setProperty(attributeKey, "(?s)(.*)");
testRunner.enqueue(SAMPLE_STRING.getBytes("UTF-8"));
testRunner.run();
testRunner.assertAllFlowFilesTransferred(ExtractText.REL_MATCH, 1);
final MockFlowFile out = testRunner.getFlowFilesForRelationship(ExtractText.REL_MATCH).get(0);
// Ensure the zero capture group is not in the resultant attributes
out.assertAttributeNotExists(attributeKey + ".0");
out.assertAttributeEquals(attributeKey, SAMPLE_STRING);
}
@Test
public void validateComplexPartialMatchDemarcatedMessages() {
String topicName = "validateComplexPartialMatchDemarcatedMessages";
PutKafka putKafka = new PutKafka();
TestRunner runner = TestRunners.newTestRunner(putKafka);
runner.setProperty(PutKafka.TOPIC, topicName);
runner.setProperty(PutKafka.CLIENT_NAME, "foo");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort());
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "僠<僠WILDSTUFF僠>僠");
runner.enqueue("Hello World僠<僠WILDSTUFF僠>僠Goodbye僠<僠WILDBOOMSTUFF僠>僠".getBytes(StandardCharsets.UTF_8));
runner.run(1, false);
runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
ConsumerIterator<byte[], byte[]> consumer = this.buildConsumer(topicName);
assertEquals("Hello World", new String(consumer.next().message(), StandardCharsets.UTF_8));
assertEquals("Goodbye僠<僠WILDBOOMSTUFF僠>僠", new String(consumer.next().message(), StandardCharsets.UTF_8));
runner.shutdown();
}
@Test
public void testPutInFolder() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutS3Object.REGION, REGION);
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
runner.assertValid();
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "folder/1.txt");
runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
runner.run();
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
}
@Test
public void testFetchElasticsearchOnTriggerWithFields() throws IOException {
runner = TestRunners.newTestRunner(new FetchElasticsearchHttpTestProcessor(true)); // all docs are found
runner.setValidateExpressionUsage(true);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
runner.assertNotValid();
runner.setProperty(FetchElasticsearchHttp.TYPE, "status");
runner.assertNotValid();
runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
runner.assertValid();
runner.setProperty(FetchElasticsearchHttp.FIELDS, "id,, userinfo.location");
runner.assertValid();
runner.enqueue(docExample, new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(FetchElasticsearchHttp.REL_SUCCESS).get(0);
assertNotNull(out);
out.assertAttributeEquals("doc_id", "28039652140");
}
@Test
public void testClusterMonitorInactivityOnPrimaryNode() throws Exception {
final TestableProcessor processor = new TestableProcessor(10000);
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setClustered(true);
runner.setPrimaryNode(true);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
runner.setProperty(MonitorActivity.THRESHOLD, "100 ms");
runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
// Becomes inactive
runner.run();
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
final List<MockFlowFile> inactiveFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_INACTIVE);
assertEquals(1, inactiveFiles.size());
final MockFlowFile inactiveFile = inactiveFiles.get(0);
assertNotNull(inactiveFile.getAttribute("inactivityStartMillis"));
assertNotNull(inactiveFile.getAttribute("inactivityDurationMillis"));
runner.clearTransferState();
}
@Test
public void testStringHashStringRangeGetOnlyHashWithRangeValueNoRangeNameFailure() {
final TestRunner getRunner = TestRunners.newTestRunner(GetDynamoDB.class);
getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
getRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j1");
getRunner.enqueue(new byte[] {});
getRunner.run(1);
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR));
}
}
@Test
public void testAttribute_singleUserDefinedAttribute() throws Exception {
final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, TEST_ATTRIBUTE_KEY);
testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE);
ProcessSession session = testRunner.getProcessSessionFactory().createSession();
FlowFile ff = session.create();
ff = session.putAttribute(ff, TEST_ATTRIBUTE_KEY, TEST_ATTRIBUTE_VALUE);
testRunner.enqueue(ff);
testRunner.run();
testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0)
.assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
String json = testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS)
.get(0).getAttribute(AttributesToJSON.JSON_ATTRIBUTE_NAME);
ObjectMapper mapper = new ObjectMapper();
Map<String, String> val = mapper.readValue(json, HashMap.class);
assertTrue(val.get(TEST_ATTRIBUTE_KEY).equals(TEST_ATTRIBUTE_VALUE));
assertTrue(val.size() == 1);
}
@Test
public void testMp3() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new ExtractMediaMetadata());
runner.setProperty(ExtractMediaMetadata.METADATA_KEY_FILTER, "");
runner.setProperty(ExtractMediaMetadata.METADATA_KEY_PREFIX, "mp3.");
runner.assertValid();
runner.enqueue(new File("target/test-classes/testMP3id3v1.mp3").toPath());
runner.run();
runner.assertAllFlowFilesTransferred(ExtractMediaMetadata.SUCCESS, 1);
runner.assertTransferCount(ExtractMediaMetadata.FAILURE, 0);
final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(ExtractMediaMetadata.SUCCESS);
MockFlowFile flowFile0 = successFiles.get(0);
flowFile0.assertAttributeExists("filename");
flowFile0.assertAttributeEquals("filename", "testMP3id3v1.mp3");
flowFile0.assertAttributeExists("mp3.Content-Type");
assertTrue(flowFile0.getAttribute("mp3.Content-Type").startsWith("audio/mpeg"));
flowFile0.assertAttributeExists("mp3.X-Parsed-By");
assertTrue(flowFile0.getAttribute("mp3.X-Parsed-By").contains("org.apache.tika.parser.DefaultParser"));
assertTrue(flowFile0.getAttribute("mp3.X-Parsed-By").contains("org.apache.tika.parser.mp3.Mp3Parser"));
flowFile0.assertAttributeExists("mp3.title");
flowFile0.assertAttributeEquals("mp3.title", "Test Title");
}
@Test
public void testContentFetched() {
final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
proc.addContent("hello.txt", "world".getBytes());
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "hello.txt");
runner.enqueue(new byte[0], attrs);
runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
assertFalse(proc.closed);
runner.getFlowFilesForRelationship(FetchFileTransfer.REL_SUCCESS).get(0).assertContentEquals("world");
}
@Test
public void testQueryElasticsearchOnTriggerWithServerFailAfterSuccess() throws IOException {
QueryElasticsearchHttpTestProcessor processor = new QueryElasticsearchHttpTestProcessor();
processor.setStatus(100, "Should fail", 2);
runner = TestRunners.newTestRunner(processor); // simulate doc not found
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
runner.setValidateExpressionUsage(true);
runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
runner.enqueue("".getBytes(), new HashMap<String, String>() {
{
put("identifier", "28039652140");
}
});
runner.run(1, true, true);
// This test generates a HTTP 100 "Should fail"
runner.assertTransferCount(QueryElasticsearchHttp.REL_SUCCESS, 2);
runner.assertTransferCount(QueryElasticsearchHttp.REL_FAILURE, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(
QueryElasticsearchHttp.REL_FAILURE).get(0);
assertNotNull(out);
}
@Test
public void testEmptyContent() throws IOException {
TestRunner runner = TestRunners.newTestRunner(ConvertJSONToAvro.class);
runner.assertNotValid();
runner.setProperty(ConvertJSONToAvro.SCHEMA, SCHEMA.toString());
runner.assertValid();
runner.enqueue(streamFor(""));
runner.run();
long converted = runner.getCounterValue("Converted records");
long errors = runner.getCounterValue("Conversion errors");
Assert.assertEquals("Should convert 0 rows", 0, converted);
Assert.assertEquals("Should reject 0 row", 0, errors);
runner.assertTransferCount("success", 0);
runner.assertTransferCount("failure", 1);
runner.assertTransferCount("incompatible", 0);
MockFlowFile incompatible = runner.getFlowFilesForRelationship("failure").get(0);
Assert.assertEquals("Should set an error message",
"No incoming records", incompatible.getAttribute("errors"));
}
@Test
public void testClusterMonitorActive() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(100));
runner.setClustered(true);
runner.setPrimaryNode(false);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
// This has to be very small threshold, otherwise, MonitorActivity skip persisting state.
runner.setProperty(MonitorActivity.THRESHOLD, "1 ms");
runner.enqueue("Incoming data");
runner.run();
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
final StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
assertNotNull("Latest timestamp should be persisted", updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
// Should be null because COPY_ATTRIBUTES is null.
assertNull(updatedState.get("key1"));
assertNull(updatedState.get("key2"));
}
@Before
public void init() throws IOException, InitializationException {
startServer();
broker = "ssl://localhost:8883";
testRunner = TestRunners.newTestRunner(ConsumeMQTT.class);
testRunner.setProperty(ConsumeMQTT.PROP_BROKER_URI, broker);
testRunner.setProperty(ConsumeMQTT.PROP_CLIENTID, "TestClient");
testRunner.setProperty(ConsumeMQTT.PROP_TOPIC_FILTER, "testTopic");
testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "100");
final StandardSSLContextService sslService = new StandardSSLContextService();
Map<String, String> sslProperties = createSslProperties();
testRunner.addControllerService("ssl-context", sslService, sslProperties);
testRunner.enableControllerService(sslService);
testRunner.setProperty(ConsumeMQTT.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
}
@Test
public void testSimpleDefaultContains() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new RouteText());
runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.CONTAINS);
runner.setProperty("simple", "middle");
runner.enqueue("start middle end\nnot match".getBytes("UTF-8"));
runner.run();
runner.assertTransferCount("simple", 1);
runner.assertTransferCount("unmatched", 1);
runner.assertTransferCount("original", 1);
final MockFlowFile outMatched = runner.getFlowFilesForRelationship("simple").get(0);
outMatched.assertContentEquals("start middle end\n".getBytes("UTF-8"));
final MockFlowFile outUnmatched = runner.getFlowFilesForRelationship("unmatched").get(0);
outUnmatched.assertContentEquals("not match".getBytes("UTF-8"));
}
@Test
public void testDifferentKeyPassword() {
try {
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
final SSLContextService service = new StandardSSLContextService();
final Map<String, String> properties = new HashMap<String, String>();
properties.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/diffpass-ks.jks");
properties.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "storepassword");
properties.put(StandardSSLContextService.KEY_PASSWORD.getName(), "keypassword");
properties.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS");
runner.addControllerService("test-diff-keys", service, properties);
runner.enableControllerService(service);
runner.setProperty("SSL Context Svc ID", "test-diff-keys");
runner.assertValid();
Assert.assertNotNull(service);
assertTrue(service instanceof StandardSSLContextService);
SSLContextService sslService = service;
sslService.createSSLContext(ClientAuth.NONE);
} catch (Exception e) {
System.out.println(e);
Assert.fail("Should not have thrown a exception " + e.getMessage());
}
}
@Test
public void testClusterMonitorActiveCopyAttribute() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(100));
runner.setClustered(true);
runner.setPrimaryNode(false);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
// This has to be very small threshold, otherwise, MonitorActivity skip persisting state.
runner.setProperty(MonitorActivity.THRESHOLD, "1 ms");
runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
final HashMap<String, String> attributes = new HashMap<>();
attributes.put("key1", "value1");
attributes.put("key2", "value2");
runner.enqueue("Incoming data", attributes);
runner.run();
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
final StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
assertNotNull("Latest timestamp should be persisted", updatedState.get(MonitorActivity.STATE_KEY_LATEST_SUCCESS_TRANSFER));
assertEquals("value1", updatedState.get("key1"));
assertEquals("value2", updatedState.get("key2"));
}
@Ignore
@Test
public void testExecuteTouch() throws Exception {
File testFile = new File("target/test.txt");
testFile.delete();
File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt");
final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
controller.setValidateExpressionUsage(false);
controller.enqueue(dummy.toPath());
controller.enqueue(dummy.toPath());
controller.enqueue(dummy.toPath());
controller.enqueue(dummy.toPath());
controller.enqueue(dummy.toPath());
controller.setProperty(ExecuteStreamCommand.WORKING_DIR, "target/xx1");
controller.setThreadCount(6);
controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "c:\\cygwin\\bin\\touch");
controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "test.txt");
controller.assertValid();
controller.run(6);
List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP);
assertEquals(5, flowFiles.size());
assertEquals(0, flowFiles.get(0).getSize());
}
@Test
public void testWithDelimiterAndNotEnoughMessages() {
final List<String> messages = new ArrayList<>();
messages.add("Hello");
messages.add("Good-bye");
final TestableProcessor proc = new TestableProcessor(null, messages);
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, "localhost:2181");
runner.setProperty(GetKafka.TOPIC, "testX");
runner.setProperty(GetKafka.KAFKA_TIMEOUT, "3 secs");
runner.setProperty(GetKafka.ZOOKEEPER_TIMEOUT, "3 secs");
runner.setProperty(GetKafka.MESSAGE_DEMARCATOR, "\\n");
runner.setProperty(GetKafka.BATCH_SIZE, "3");
runner.run();
runner.assertAllFlowFilesTransferred(GetKafka.REL_SUCCESS, 1);
final MockFlowFile mff = runner.getFlowFilesForRelationship(GetKafka.REL_SUCCESS).get(0);
mff.assertContentEquals("Hello\nGood-bye");
}
@Test
public void testGetIgniteCacheDefaultConfOneFlowFileWithPlainKey() throws IOException, InterruptedException {
getRunner = TestRunners.newTestRunner(getIgniteCache);
getRunner.setProperty(GetIgniteCache.IGNITE_CACHE_ENTRY_KEY, "mykey");
getRunner.assertValid();
getRunner.enqueue(new byte[] {});
getIgniteCache.initialize(getRunner.getProcessContext());
getIgniteCache.getIgniteCache().put("mykey", "test".getBytes());
getRunner.run(1, false, true);
getRunner.assertAllFlowFilesTransferred(GetIgniteCache.REL_SUCCESS, 1);
List<MockFlowFile> getSucessfulFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS);
assertEquals(1, getSucessfulFlowFiles.size());
List<MockFlowFile> getFailureFlowFiles = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_FAILURE);
assertEquals(0, getFailureFlowFiles.size());
final MockFlowFile getOut = getRunner.getFlowFilesForRelationship(GetIgniteCache.REL_SUCCESS).get(0);
getOut.assertContentEquals("test".getBytes());
getRunner.shutdown();
}