下面列出了com.amazonaws.auth.AnonymousAWSCredentials#org.apache.nifi.util.TestRunner 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void validateFailedPublishAndTransferToFailure() throws Exception {
ConnectionFactory cf = mock(ConnectionFactory.class);
PublishJMS pubProc = new PublishJMS();
TestRunner runner = TestRunners.newTestRunner(pubProc);
JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class);
when(cs.getIdentifier()).thenReturn("cfProvider");
when(cs.getConnectionFactory()).thenReturn(cf);
runner.addControllerService("cfProvider", cs);
runner.enableControllerService(cs);
runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
runner.setProperty(PublishJMS.DESTINATION, "fooQueue");
runner.enqueue("Hello Joe".getBytes());
runner.run();
Thread.sleep(200);
assertTrue(runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).isEmpty());
assertNotNull(runner.getFlowFilesForRelationship(PublishJMS.REL_FAILURE).get(0));
}
@Test(expected = IllegalArgumentException.class)
public void testScanWithInvalidFilter() throws InitializationException, IOException {
final String tableName = "nifi";
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
// Mock an HBase Table so we can verify the put operations later
final Table table = Mockito.mock(Table.class);
when(table.getName()).thenReturn(TableName.valueOf(tableName));
// create the controller service and link it to the test processor
final MockHBaseClientService service = configureHBaseClientService(runner, table);
runner.assertValid(service);
// perform a scan and verify the four rows were returned
final CollectingResultHandler handler = new CollectingResultHandler();
final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE)
.asControllerService(HBaseClientService.class);
// this should throw IllegalArgumentException
final String filter = "this is not a filter";
hBaseClientService.scan(tableName, new ArrayList<Column>(), filter, System.currentTimeMillis(), handler);
}
@Test
public void testAttribute_singleUserDefinedAttribute() throws Exception {
final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON());
testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, TEST_ATTRIBUTE_KEY);
testRunner.setProperty(AttributesToJSON.DESTINATION, AttributesToJSON.DESTINATION_ATTRIBUTE);
ProcessSession session = testRunner.getProcessSessionFactory().createSession();
FlowFile ff = session.create();
ff = session.putAttribute(ff, TEST_ATTRIBUTE_KEY, TEST_ATTRIBUTE_VALUE);
testRunner.enqueue(ff);
testRunner.run();
testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0)
.assertAttributeExists(AttributesToJSON.JSON_ATTRIBUTE_NAME);
testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1);
testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0);
String json = testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS)
.get(0).getAttribute(AttributesToJSON.JSON_ATTRIBUTE_NAME);
ObjectMapper mapper = new ObjectMapper();
Map<String, String> val = mapper.readValue(json, HashMap.class);
assertTrue(val.get(TEST_ATTRIBUTE_KEY).equals(TEST_ATTRIBUTE_VALUE));
assertTrue(val.size() == 1);
}
@Test
public void onTriggerShouldProperlyHandleANullEventBatch() throws Exception {
when(inotifyEventInputStream.poll(1000000L, TimeUnit.MICROSECONDS)).thenReturn(null);
when(hdfsAdmin.getInotifyEventStream()).thenReturn(inotifyEventInputStream);
GetHDFSEvents processor = new TestableGetHDFSEvents(kerberosProperties, hdfsAdmin);
TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(GetHDFSEvents.POLL_DURATION, "1 second");
runner.setProperty(GetHDFSEvents.HDFS_PATH_TO_WATCH, "/some/path${now()}");
runner.run();
List<MockFlowFile> successfulFlowFiles = runner.getFlowFilesForRelationship(GetHDFSEvents.REL_SUCCESS);
assertEquals(0, successfulFlowFiles.size());
assertEquals("-1", runner.getProcessContext().getStateManager().getState(Scope.CLUSTER).get("last.tx.id"));
}
@Test
public void testStringHashStringRangePutNoHashValueFailure() {
final TestRunner putRunner = TestRunners.newTestRunner(PutDynamoDB.class);
putRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
putRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
putRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
putRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
putRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "document");
String document = "{\"hello\": 2}";
putRunner.enqueue(document.getBytes());
putRunner.run(1);
putRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_HASH_KEY_VALUE_ERROR));
}
}
@Test
public void validateUrl() throws Exception {
Field displayUrlField = AbstractEmailProcessor.class.getDeclaredField("displayUrl");
displayUrlField.setAccessible(true);
AbstractEmailProcessor<? extends AbstractMailReceiver> consume = new ConsumeIMAP();
TestRunner runner = TestRunners.newTestRunner(consume);
runner.setProperty(ConsumeIMAP.HOST, "foo.bar.com");
runner.setProperty(ConsumeIMAP.PORT, "1234");
runner.setProperty(ConsumeIMAP.USER, "jon");
runner.setProperty(ConsumeIMAP.PASSWORD, "qhgwjgehr");
runner.setProperty(ConsumeIMAP.FOLDER, "MYBOX");
runner.setProperty(ConsumeIMAP.USE_SSL, "false");
assertEquals("imap://jon:[email protected]:1234/MYBOX", consume.buildUrl(runner.getProcessContext()));
assertEquals("imap://jon:[password]@foo.bar.com:1234/MYBOX", displayUrlField.get(consume));
}
@Test
public void testWithDelimiterAndNotEnoughMessages() {
final List<String> messages = new ArrayList<>();
messages.add("Hello");
messages.add("Good-bye");
final TestableProcessor proc = new TestableProcessor(null, messages);
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, "localhost:2181");
runner.setProperty(GetKafka.TOPIC, "testX");
runner.setProperty(GetKafka.KAFKA_TIMEOUT, "3 secs");
runner.setProperty(GetKafka.ZOOKEEPER_TIMEOUT, "3 secs");
runner.setProperty(GetKafka.MESSAGE_DEMARCATOR, "\\n");
runner.setProperty(GetKafka.BATCH_SIZE, "3");
runner.run();
runner.assertAllFlowFilesTransferred(GetKafka.REL_SUCCESS, 1);
final MockFlowFile mff = runner.getFlowFilesForRelationship(GetKafka.REL_SUCCESS).get(0);
mff.assertContentEquals("Hello\nGood-bye");
}
@Test
public void testLogPropertyCSVWithIgnoreRegex() {
final LogAttribute logAttribute = new LogAttribute();
final TestRunner runner = TestRunners.newTestRunner(logAttribute);
final ProcessContext context = runner.getProcessContext();
final ProcessSession session = runner.getProcessSessionFactory().createSession();
final MockComponentLog LOG = runner.getLogger();
// we're saying add and remove the same properties, so the net result should be nothing
runner.setProperty(LogAttribute.ATTRIBUTES_TO_LOG_CSV, "foo");
runner.setProperty(LogAttribute.ATTRIBUTES_TO_IGNORE_REGEX, "foo.*");
final Map<String,String> attrs = Maps.newHashMap();
attrs.put("foo", "foo-value");
attrs.put("bar", "bar-value");
attrs.put("foobaz", "foobaz-value");
final MockFlowFile flowFile = runner.enqueue("content", attrs);
final String logMessage = logAttribute.processFlowFile(LOG, LogAttribute.DebugLevels.info, flowFile, session, context);
assertThat(logMessage, not(containsString("foobaz-value")));
assertThat(logMessage, not(containsString("foo-value")));
assertThat(logMessage, not(containsString("bar-value")));
}
@Test
public void testEscapingDollarSign() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new ReplaceTextWithMapping());
runner.setProperty(ReplaceTextWithMapping.REGEX, "-(.*?)-");
runner.setProperty(ReplaceTextWithMapping.MATCHING_GROUP_FOR_LOOKUP_KEY, "1");
runner.setProperty(ReplaceTextWithMapping.MAPPING_FILE, Paths.get("src/test/resources/TestReplaceTextWithMapping/color-fruit-escaped-dollar-mapping.txt").toFile().getAbsolutePath());
runner.enqueue(Paths.get("src/test/resources/TestReplaceTextWithMapping/colors.txt"));
runner.run();
runner.assertAllFlowFilesTransferred(ReplaceTextWithMapping.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ReplaceTextWithMapping.REL_SUCCESS).get(0);
String outputString = new String(out.toByteArray());
String expected = "-roses- are -$1 apple-\n"
+ "violets are -$1 blueberry-\n"
+ "something else is -$1 grape-\n"
+ "I'm not good at writing poems";
assertEquals(expected, outputString);
}
/**
* Demonstrates writing to counters
* @throws Exception
*/
@Test
public void testCounter() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new ExecuteScript());
runner.setValidateExpressionUsage(false);
runner.setProperty(SCRIPT_ENGINE, "python");
runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, "src/test/resources/executescript/counter/counter.py");
runner.setProperty(ScriptingComponentUtils.MODULES, "src/test/resources/executescript");
runner.assertValid();
runner.enqueue("sample text".getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred("success", 1);
double counterValue = runner.getCounterValue("SampleScriptCounter");
Assert.assertEquals(1d, counterValue, 0.01d);
}
@Test
public void testInsertWithMissingColumnFail() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertJSONToSQL.class);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.executeUpdate("CREATE TABLE PERSONS (id integer, name varchar(100), code integer, generated_key integer primary key)");
}
}
runner.setProperty(ConvertJSONToSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(ConvertJSONToSQL.TABLE_NAME, "PERSONS");
runner.setProperty(ConvertJSONToSQL.STATEMENT_TYPE, "INSERT");
runner.setProperty(ConvertJSONToSQL.UNMATCHED_COLUMN_BEHAVIOR, ConvertJSONToSQL.FAIL_UNMATCHED_COLUMN);
runner.enqueue(Paths.get("src/test/resources/TestConvertJSONToSQL/person-1.json"));
runner.run();
runner.assertAllFlowFilesTransferred(ConvertJSONToSQL.REL_FAILURE, 1);
}
@Test
public void testMultipleFlowFilesSameTableDifferentRowFailure() throws IOException, InitializationException {
final String tableName = "nifi";
final String row1 = "row1";
final String row2 = "row2";
final String columnFamily = "family1";
final String columnQualifier = "qualifier1";
final PutHBaseCell proc = new PutHBaseCell();
final TestRunner runner = getTestRunnerWithEL(proc);
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
hBaseClient.setThrowException(true);
final String content1 = "some content1";
final Map<String, String> attributes1 = getAttributeMapWithEL(tableName, row1, columnFamily, columnQualifier);
runner.enqueue(content1.getBytes("UTF-8"), attributes1);
final String content2 = "some content1";
final Map<String, String> attributes2 = getAttributeMapWithEL(tableName, row2, columnFamily, columnQualifier);
runner.enqueue(content2.getBytes("UTF-8"), attributes2);
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_FAILURE, 2);
assertEquals(0, runner.getProvenanceEvents().size());
}
@Test
public void testSmallSplits() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new SplitContent());
runner.setProperty(SplitContent.KEEP_SEQUENCE, "false");
runner.setProperty(SplitContent.BYTE_SEQUENCE.getName(), "FFFF");
runner.enqueue(new byte[]{1, 2, 3, 4, 5, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, 5, 4, 3, 2, 1});
runner.run();
runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
runner.getFlowFilesForRelationship(SplitContent.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT, "2");
runner.assertTransferCount(SplitContent.REL_SPLITS, 2);
runner.assertQueueEmpty();
final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS);
final MockFlowFile split1 = splits.get(0);
final MockFlowFile split2 = splits.get(1);
split1.assertContentEquals(new byte[]{1, 2, 3, 4, 5});
split2.assertContentEquals(new byte[]{(byte) 0xFF, 5, 4, 3, 2, 1});
}
@Test
public void testExtractionWithZeroUsers() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new ExtractAvroMetadata());
runner.setProperty(ExtractAvroMetadata.COUNT_ITEMS, "true");
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc"));
final ByteArrayOutputStream out = getOutputStreamWithMultipleUsers(schema, 0);
runner.enqueue(out.toByteArray());
runner.run();
runner.assertAllFlowFilesTransferred(ExtractAvroMetadata.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ExtractAvroMetadata.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_FINGERPRINT_ATTR, "b2d1d8d3de2833ce");
flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_TYPE_ATTR, Schema.Type.RECORD.getName());
flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_NAME_ATTR, "User");
flowFile.assertAttributeEquals(ExtractAvroMetadata.ITEM_COUNT_ATTR, "0");
}
@Test
public void validateComplexPartialMatchDemarcatedMessages() {
String topicName = "validateComplexPartialMatchDemarcatedMessages";
PutKafka putKafka = new PutKafka();
TestRunner runner = TestRunners.newTestRunner(putKafka);
runner.setProperty(PutKafka.TOPIC, topicName);
runner.setProperty(PutKafka.CLIENT_NAME, "foo");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort());
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "僠<僠WILDSTUFF僠>僠");
runner.enqueue("Hello World僠<僠WILDSTUFF僠>僠Goodbye僠<僠WILDBOOMSTUFF僠>僠".getBytes(StandardCharsets.UTF_8));
runner.run(1, false);
runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
ConsumerIterator<byte[], byte[]> consumer = this.buildConsumer(topicName);
assertEquals("Hello World", new String(consumer.next().message(), StandardCharsets.UTF_8));
assertEquals("Goodbye僠<僠WILDBOOMSTUFF僠>僠", new String(consumer.next().message(), StandardCharsets.UTF_8));
runner.shutdown();
}
@Test
public void testDeflateCompress() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(CompressContent.class);
runner.setProperty(CompressContent.MODE, "compress");
runner.setProperty(CompressContent.COMPRESSION_LEVEL, "6");
runner.setProperty(CompressContent.COMPRESSION_FORMAT, "deflate");
assertTrue(runner.setProperty(CompressContent.UPDATE_FILENAME, "true").isValid());
runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt"));
runner.run();
runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0);
flowFile.assertContentEquals(Paths.get("src/test/resources/CompressedData/SampleFile.txt.zlib"));
flowFile.assertAttributeEquals("filename", "SampleFile.txt.zlib");
}
@Test
public void testInferCompressionCodecDisabled() throws IOException {
GetHDFS proc = new TestableGetHDFS(kerberosProperties);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutHDFS.DIRECTORY, "src/test/resources/testdata");
runner.setProperty(GetHDFS.FILE_FILTER_REGEX, "random.*.gz");
runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true");
runner.setProperty(GetHDFS.COMPRESSION_CODEC, "NONE");
runner.run();
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetHDFS.REL_SUCCESS);
assertEquals(1, flowFiles.size());
MockFlowFile flowFile = flowFiles.get(0);
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("randombytes-1.gz"));
InputStream expected = getClass().getResourceAsStream("/testdata/randombytes-1.gz");
flowFile.assertContentEquals(expected);
}
@Test
public void testSimplePutEncrypted() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutS3Object.REGION, REGION);
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
runner.setProperty(PutS3Object.SERVER_SIDE_ENCRYPTION, ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
for (int i = 0; i < 3; i++) {
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", String.valueOf(i) + ".txt");
runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
}
runner.run(3);
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 3);
final List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS);
for (MockFlowFile flowFile : ffs) {
flowFile.assertAttributeEquals(PutS3Object.S3_SSE_ALGORITHM, ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
}
}
@Test
public void testAttrListNoCoreNullTwoNewAttrToAttribute() throws Exception {
final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToCSV());
testRunner.setProperty(AttributesToCSV.DESTINATION, OUTPUT_NEW_ATTRIBUTE);
testRunner.setProperty(AttributesToCSV.INCLUDE_CORE_ATTRIBUTES, "false");
testRunner.setProperty(AttributesToCSV.NULL_VALUE_FOR_EMPTY_STRING, "true");
final String NON_PRESENT_ATTRIBUTE_KEY = "beach-type,beach-length";
testRunner.setProperty(AttributesToCSV.ATTRIBUTES_LIST, NON_PRESENT_ATTRIBUTE_KEY);
testRunner.enqueue(new byte[0]);
testRunner.run();
testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS).get(0)
.assertAttributeExists("CSVData");
testRunner.assertTransferCount(AttributesToCSV.REL_SUCCESS, 1);
testRunner.assertTransferCount(AttributesToCSV.REL_FAILURE, 0);
testRunner.getFlowFilesForRelationship(AttributesToCSV.REL_SUCCESS)
.get(0).assertAttributeEquals("CSVData","null,null");
}
@Test
public void testAclAttributeDomain() throws Exception {
reset(storage, blob);
final PutGCSObject processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
runner.assertValid();
when(storage.create(any(BlobInfo.class), any(InputStream.class), any(Storage.BlobWriteOption.class)))
.thenReturn(blob);
final Acl.Domain mockDomain = mock(Acl.Domain.class);
when(mockDomain.getDomain()).thenReturn(OWNER_DOMAIN);
when(blob.getOwner()).thenReturn(mockDomain);
runner.enqueue("test");
runner.run();
runner.assertAllFlowFilesTransferred(PutGCSObject.REL_SUCCESS);
runner.assertTransferCount(PutGCSObject.REL_SUCCESS, 1);
final MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(PutGCSObject.REL_SUCCESS).get(0);
mockFlowFile.assertAttributeEquals(OWNER_ATTR, OWNER_DOMAIN);
mockFlowFile.assertAttributeEquals(OWNER_TYPE_ATTR, "domain");
}
@Test
public void testTransformInputCustomTransformationIgnored() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new JoltTransformJSON());
final String customJarPath = "src/test/resources/TestJoltTransformJson/TestCustomJoltTransform.jar";
final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformJson/defaultrSpec.json")));
runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec);
runner.setProperty(JoltTransformJSON.CUSTOM_CLASS,"TestCustomJoltTransform");
runner.setProperty(JoltTransformJSON.MODULES,customJarPath);
runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM,JoltTransformJSON.DEFAULTR);
runner.enqueue(JSON_INPUT);
runner.run();
runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_SUCCESS);
final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformJSON.REL_SUCCESS).get(0);
transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(),"application/json");
Object transformedJson = JsonUtils.jsonToObject(new ByteArrayInputStream(transformed.toByteArray()));
Object compareJson = JsonUtils.jsonToObject(Files.newInputStream(Paths.get("src/test/resources/TestJoltTransformJson/defaultrOutput.json")));
assertTrue(DIFFY.diff(compareJson, transformedJson).isEmpty());
}
@Test
public void testSimpleKeyValueLookupService() throws InitializationException {
final SimpleKeyValueLookupService service = new SimpleKeyValueLookupService();
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
runner.addControllerService("simple-key-value-lookup-service", service);
runner.setProperty(service, "key1", "value1");
runner.setProperty(service, "key2", "value2");
runner.enableControllerService(service);
runner.assertValid(service);
assertThat(service, instanceOf(LookupService.class));
final Optional<String> get1 = service.lookup(Collections.singletonMap("key", "key1"));
assertEquals(Optional.of("value1"), get1);
final Optional<String> get2 = service.lookup(Collections.singletonMap("key", "key2"));
assertEquals(Optional.of("value2"), get2);
final Optional<String> get3 = service.lookup(Collections.singletonMap("key", "key3"));
assertEquals(EMPTY_STRING, get3);
}
@Test
public void testKeysCredentialsProvider() throws Throwable {
final TestRunner runner = TestRunners.newTestRunner(FetchS3Object.class);
final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService();
runner.addControllerService("awsCredentialsProvider", serviceImpl);
runner.setProperty(serviceImpl, AbstractAWSProcessor.ACCESS_KEY, "awsAccessKey");
runner.setProperty(serviceImpl, AbstractAWSProcessor.SECRET_KEY, "awsSecretKey");
runner.enableControllerService(serviceImpl);
runner.assertValid(serviceImpl);
final AWSCredentialsProviderService service = (AWSCredentialsProviderService) runner.getProcessContext()
.getControllerServiceLookup().getControllerService("awsCredentialsProvider");
Assert.assertNotNull(service);
final AWSCredentialsProvider credentialsProvider = service.getCredentialsProvider();
Assert.assertNotNull(credentialsProvider);
assertEquals("credentials provider should be equal", StaticCredentialsProvider.class,
credentialsProvider.getClass());
}
@Test
public void testZipException() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_ZIP);
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/plain-text");
attributes.put("filename", "duplicate-filename.txt");
runner.enqueue("Hello".getBytes("UTF-8"), attributes);
runner.enqueue(", ".getBytes("UTF-8"), attributes);
runner.enqueue("World!".getBytes("UTF-8"), attributes);
runner.run();
runner.assertQueueEmpty();
runner.assertTransferCount(MergeContent.REL_MERGED, 1);
runner.assertTransferCount(MergeContent.REL_FAILURE, 2);
runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
}
@Test
public void testRecordSplitDatafileOutputWithMultipleRecords() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new SplitAvro());
runner.setProperty(SplitAvro.OUTPUT_SIZE, "20");
runner.enqueue(users.toByteArray());
runner.run();
runner.assertTransferCount(SplitAvro.REL_SPLIT, 5);
runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
runner.getFlowFilesForRelationship(SplitAvro.REL_ORIGINAL).get(0).assertAttributeEquals(FRAGMENT_COUNT.key(), "5");
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
checkDataFileSplitSize(flowFiles, 20, true);
}
private void testEncryptionServiceWithClientSideCEncryptionStrategy(byte[] data) throws InitializationException, IOException {
TestRunner runner = createPutEncryptionTestRunner(AmazonS3EncryptionService.STRATEGY_NAME_CSE_C, randomKeyMaterial);
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "test.txt");
runner.enqueue(data, attrs);
runner.assertValid();
runner.run();
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS);
Assert.assertEquals(1, flowFiles.size());
Assert.assertEquals(0, runner.getFlowFilesForRelationship(PutS3Object.REL_FAILURE).size());
MockFlowFile putSuccess = flowFiles.get(0);
Assert.assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY), AmazonS3EncryptionService.STRATEGY_NAME_CSE_C);
MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, AmazonS3EncryptionService.STRATEGY_NAME_CSE_C, randomKeyMaterial);
flowFile.assertAttributeEquals(PutS3Object.S3_ENCRYPTION_STRATEGY, AmazonS3EncryptionService.STRATEGY_NAME_CSE_C);
flowFile.assertContentEquals(data);
flowFile.assertAttributeExists("x-amz-key");
flowFile.assertAttributeNotEquals("x-amz-key", "");
flowFile.assertAttributeExists("x-amz-iv");
flowFile.assertAttributeNotEquals("x-amz-iv", "");
}
@Test
public void testExtractionWithMetadataKeysWhitespace() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new ExtractAvroMetadata());
runner.setProperty(ExtractAvroMetadata.METADATA_KEYS, "foo, bar, " + AVRO_SCHEMA_ATTR); // test dynamic attribute avro.schema
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc"));
final ByteArrayOutputStream out = getOutputStreamWithOneUser(schema);
runner.enqueue(out.toByteArray());
runner.run();
runner.assertAllFlowFilesTransferred(ExtractAvroMetadata.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ExtractAvroMetadata.REL_SUCCESS).get(0);
flowFile.assertAttributeExists(ExtractAvroMetadata.SCHEMA_FINGERPRINT_ATTR);
flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_TYPE_ATTR, Schema.Type.RECORD.getName());
flowFile.assertAttributeEquals(ExtractAvroMetadata.SCHEMA_NAME_ATTR, "User");
flowFile.assertAttributeEquals(AVRO_SCHEMA_ATTR, schema.toString());
}
@Test
public void testTransformInputWithShiftr() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new JoltTransformJSON());
runner.setValidateExpressionUsage(false);
final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformJson/shiftrSpec.json")));
runner.setProperty(JoltTransformJSON.JOLT_SPEC, spec);
runner.setProperty(JoltTransformJSON.JOLT_TRANSFORM, JoltTransformJSON.SHIFTR);
runner.enqueue(JSON_INPUT);
runner.run();
runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_SUCCESS);
final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformJSON.REL_SUCCESS).get(0);
transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(),"application/json");
Object transformedJson = JsonUtils.jsonToObject(new ByteArrayInputStream(transformed.toByteArray()));
Object compareJson = JsonUtils.jsonToObject(Files.newInputStream(Paths.get("src/test/resources/TestJoltTransformJson/shiftrOutput.json")));
assertTrue(DIFFY.diff(compareJson, transformedJson).isEmpty());
}
@Test
public void testCheckIfElementExists() throws XPathFactoryConfigurationException, IOException {
final TestRunner testRunner = TestRunners.newTestRunner(new EvaluateXQuery());
testRunner.setProperty(EvaluateXQuery.DESTINATION, EvaluateXQuery.DESTINATION_ATTRIBUTE);
testRunner.setProperty("xquery.result.exist.1", "boolean(/*:fruitbasket/fruit[1])");
testRunner.setProperty("xquery.result.exist.2", "boolean(/*:fruitbasket/fruit[100])");
testRunner.enqueue(XML_SNIPPET);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(EvaluateXQuery.REL_MATCH, 1);
final MockFlowFile out = testRunner.getFlowFilesForRelationship(EvaluateXQuery.REL_MATCH).get(0);
out.assertAttributeEquals("xquery.result.exist.1", "true");
out.assertAttributeEquals("xquery.result.exist.2", "false");
}
@Test
public void testMultipleFlowFilesSameTableDifferentRow() throws IOException, InitializationException {
final String tableName = "nifi";
final String row1 = "row1";
final String row2 = "row2";
final String columnFamily = "family1";
final String columnQualifier = "qualifier1";
final PutHBaseCell proc = new PutHBaseCell();
final TestRunner runner = getTestRunnerWithEL(proc);
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
final String content1 = "some content1";
final Map<String, String> attributes1 = getAttributeMapWithEL(tableName, row1, columnFamily, columnQualifier);
runner.enqueue(content1.getBytes("UTF-8"), attributes1);
final String content2 = "some content1";
final Map<String, String> attributes2 = getAttributeMapWithEL(tableName, row2, columnFamily, columnQualifier);
runner.enqueue(content2.getBytes("UTF-8"), attributes2);
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
outFile.assertContentEquals(content1);
assertNotNull(hBaseClient.getFlowFilePuts());
assertEquals(1, hBaseClient.getFlowFilePuts().size());
List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(tableName);
assertEquals(2, puts.size());
verifyPut(row1, columnFamily, columnQualifier, null, content1, puts.get(0));
verifyPut(row2, columnFamily, columnQualifier, null, content2, puts.get(1));
assertEquals(2, runner.getProvenanceEvents().size());
}