com.amazonaws.auth.PropertiesCredentials#org.apache.nifi.util.file.FileUtils源码实例Demo

下面列出了com.amazonaws.auth.PropertiesCredentials#org.apache.nifi.util.file.FileUtils 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

@Test
public void testSimpleWriteWithToc() throws IOException {
    final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite");
    final File tocFile = TocUtil.getTocFile(journalFile);
    final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
    final RecordWriter writer = createWriter(journalFile, tocWriter, false, 1024 * 1024);

    writer.writeHeader(1L);
    writer.writeRecord(createEvent());
    writer.close();

    final TocReader tocReader = new StandardTocReader(tocFile);

    try (final FileInputStream fis = new FileInputStream(journalFile);
        final RecordReader reader = createReader(fis, journalFile.getName(), tocReader, 2048)) {
        assertEquals(0, reader.getBlockIndex());
        reader.skipToBlock(0);
        final StandardProvenanceEventRecord recovered = reader.nextRecord();
        assertNotNull(recovered);

        assertEquals("nifi://unit-test", recovered.getTransitUri());
        assertNull(reader.nextRecord());
    }

    FileUtils.deleteFile(journalFile.getParentFile(), true);
}
 
@Test
public void testSingleRecordCompressed() throws IOException {
    final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
    final File tocFile = TocUtil.getTocFile(journalFile);
    final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
    final RecordWriter writer = createWriter(journalFile, tocWriter, true, 8192);

    writer.writeHeader(1L);
    writer.writeRecord(createEvent());
    writer.close();

    final TocReader tocReader = new StandardTocReader(tocFile);

    try (final FileInputStream fis = new FileInputStream(journalFile);
        final RecordReader reader = createReader(fis, journalFile.getName(), tocReader, 2048)) {
        assertEquals(0, reader.getBlockIndex());
        reader.skipToBlock(0);
        final StandardProvenanceEventRecord recovered = reader.nextRecord();
        assertNotNull(recovered);

        assertEquals("nifi://unit-test", recovered.getTransitUri());
        assertNull(reader.nextRecord());
    }

    FileUtils.deleteFile(journalFile.getParentFile(), true);
}
 
源代码3 项目: localization_nifi   文件: CaptureServlet.java
@Override
protected void doPost(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException {
    final ByteArrayOutputStream baos = new ByteArrayOutputStream();

    // Capture all the headers for reference.  Intentionally choosing to not special handling for headers with multiple values for clarity
    final Enumeration<String> headerNames = request.getHeaderNames();
    lastPostHeaders = new HashMap<>();
    while (headerNames.hasMoreElements()) {
        final String nextHeader = headerNames.nextElement();
        lastPostHeaders.put(nextHeader, request.getHeader(nextHeader));
    }

    try {
        StreamUtils.copy(request.getInputStream(), baos);
        this.lastPost = baos.toByteArray();
    } finally {
        FileUtils.closeQuietly(baos);
    }
    response.setStatus(Status.OK.getStatusCode());
}
 
源代码4 项目: localization_nifi   文件: CaptureServlet.java
@Override
protected void doPost(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException {
    final ByteArrayOutputStream baos = new ByteArrayOutputStream();

    // Capture all the headers for reference.  Intentionally choosing to not special handling for headers with multiple values for clarity
    final Enumeration<String> headerNames = request.getHeaderNames();
    lastPostHeaders = new HashMap<>();
    while (headerNames.hasMoreElements()) {
        final String nextHeader = headerNames.nextElement();
        lastPostHeaders.put(nextHeader, request.getHeader(nextHeader));
    }

    try {
        StreamUtils.copy(request.getInputStream(), baos);
        this.lastPost = baos.toByteArray();
    } finally {
        FileUtils.closeQuietly(baos);
    }
    response.setStatus(Status.OK.getStatusCode());
}
 
private void syncWithRestoreDirectory() throws IOException {

        // sanity check that restore directory is a directory, creating it if necessary
        FileUtils.ensureDirectoryExistAndCanAccess(restoreDirectory);

        // check that restore directory is not the same as the primary directory
        if (config.getParentFile().getAbsolutePath().equals(restoreDirectory.getAbsolutePath())) {
            throw new IllegalStateException(
                    String.format("Cluster firewall configuration file '%s' cannot be in the restore directory '%s' ",
                            config.getAbsolutePath(), restoreDirectory.getAbsolutePath()));
        }

        // the restore copy will have same file name, but reside in a different directory
        final File restoreFile = new File(restoreDirectory, config.getName());

        // sync the primary copy with the restore copy
        FileUtils.syncWithRestore(config, restoreFile, logger);

    }
 
源代码6 项目: localization_nifi   文件: StandardFlowService.java
@Override
public void copyCurrentFlow(final OutputStream os) throws IOException {
    readLock.lock();
    try {
        if (!Files.exists(flowXml) || Files.size(flowXml) == 0) {
            return;
        }

        try (final InputStream in = Files.newInputStream(flowXml, StandardOpenOption.READ);
                final InputStream gzipIn = new GZIPInputStream(in)) {
            FileUtils.copy(gzipIn, os);
        }
    } finally {
        readLock.unlock();
    }
}
 
@Before
public void setup() throws IOException
{
  final String windowDataDir = "target/" + this.getClass().getSimpleName();
  final File windowDataDirFile = new File(windowDataDir);
  if (windowDataDirFile.exists()) {
    FileUtils.deleteFile(windowDataDirFile, true);
  }

  Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
  attributeMap.put(DAG.APPLICATION_PATH, windowDataDir);

  context = mockOperatorContext(12345, attributeMap);

  sink = new CollectorTestSink<>();
  builder = new MockSiteToSiteClient.Builder();
  windowDataManager = new FSWindowDataManager();

  operator = new NiFiSinglePortInputOperator(builder, windowDataManager);
  operator.outputPort.setSink(sink);
}
 
@Before
public void setup() throws IOException
{
  final String windowDataDir = "target/" + this.getClass().getSimpleName();
  final File windowDataDirFile = new File(windowDataDir);
  if (windowDataDirFile.exists()) {
    FileUtils.deleteFile(windowDataDirFile, true);
  }

  Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
  attributeMap.put(DAG.APPLICATION_PATH, windowDataDir);

  context = mockOperatorContext(12345, attributeMap);

  windowDataManager = new FSWindowDataManager();

  stsBuilder = new MockSiteToSiteClient.Builder();
  dpBuilder = new StringNiFiDataPacketBuilder();
  operator = new NiFiSinglePortOutputOperator(stsBuilder, dpBuilder, windowDataManager, 1);
}
 
源代码9 项目: nifi   文件: PersistentProvenanceRepository.java
@Override
public long getContainerCapacity(final String containerName) throws IOException {
    Map<String, File> map = configuration.getStorageDirectories();

    File container = map.get(containerName);
    if(container != null) {
        long capacity = FileUtils.getContainerCapacity(container.toPath());
        if(capacity==0) {
            throw new IOException("System returned total space of the partition for " + containerName + " is zero byte. "
                    + "Nifi can not create a zero sized provenance repository.");
        }
        return capacity;
    } else {
        throw new IllegalArgumentException("There is no defined container with name " + containerName);
    }
}
 
源代码10 项目: nifi   文件: WriteAheadProvenanceRepository.java
@Override
public long getContainerCapacity(final String containerName) throws IOException {
    Map<String, File> map = config.getStorageDirectories();

    File container = map.get(containerName);
    if(container != null) {
        long capacity = FileUtils.getContainerCapacity(container.toPath());
        if(capacity==0) {
            throw new IOException("System returned total space of the partition for " + containerName + " is zero byte. "
                    + "Nifi can not create a zero sized provenance repository.");
        }
        return capacity;
    } else {
        throw new IllegalArgumentException("There is no defined container with name " + containerName);
    }
}
 
源代码11 项目: nifi   文件: AbstractTestRecordReaderWriter.java
@Test
public void testSimpleWriteWithToc() throws IOException {
    final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite");
    final File tocFile = TocUtil.getTocFile(journalFile);
    final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
    final RecordWriter writer = createWriter(journalFile, tocWriter, false, 1024 * 1024);

    writer.writeHeader(1L);
    writer.writeRecord(createEvent());
    writer.close();

    final TocReader tocReader = new StandardTocReader(tocFile);
    final String expectedTransitUri = "nifi://unit-test";
        final int expectedBlockIndex = 0;

    assertRecoveredRecord(journalFile, tocReader, expectedTransitUri, expectedBlockIndex);

    FileUtils.deleteFile(journalFile.getParentFile(), true);
}
 
源代码12 项目: nifi   文件: AbstractTestRecordReaderWriter.java
@Test
public void testSingleRecordCompressed() throws IOException {
    final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
    final File tocFile = TocUtil.getTocFile(journalFile);
    final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
    final RecordWriter writer = createWriter(journalFile, tocWriter, true, 8192);

    writer.writeHeader(1L);
    writer.writeRecord(createEvent());
    writer.close();

    final TocReader tocReader = new StandardTocReader(tocFile);

    assertRecoveredRecord(journalFile, tocReader, "nifi://unit-test", 0);

    FileUtils.deleteFile(journalFile.getParentFile(), true);
}
 
源代码13 项目: nifi   文件: CaptureServlet.java
@Override
protected void doPost(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException {
    final ByteArrayOutputStream baos = new ByteArrayOutputStream();

    // Capture all the headers for reference.  Intentionally choosing to not special handling for headers with multiple values for clarity
    final Enumeration<String> headerNames = request.getHeaderNames();
    lastPostHeaders = new HashMap<>();
    while (headerNames.hasMoreElements()) {
        final String nextHeader = headerNames.nextElement();
        lastPostHeaders.put(nextHeader, request.getHeader(nextHeader));
    }

    try {
        StreamUtils.copy(request.getInputStream(), baos);
        this.lastPost = baos.toByteArray();
    } finally {
        FileUtils.closeQuietly(baos);
    }
    response.setStatus(Status.OK.getStatusCode());
}
 
源代码14 项目: nifi   文件: CaptureServlet.java
@Override
protected void doPost(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException {
    final ByteArrayOutputStream baos = new ByteArrayOutputStream();

    // Capture all the headers for reference.  Intentionally choosing to not special handling for headers with multiple values for clarity
    final Enumeration<String> headerNames = request.getHeaderNames();
    lastPostHeaders = new HashMap<>();
    while (headerNames.hasMoreElements()) {
        final String nextHeader = headerNames.nextElement();
        lastPostHeaders.put(nextHeader, request.getHeader(nextHeader));
    }

    try {
        StreamUtils.copy(request.getInputStream(), baos);
        this.lastPost = baos.toByteArray();
    } finally {
        FileUtils.closeQuietly(baos);
    }
    response.setStatus(Status.OK.getStatusCode());
}
 
源代码15 项目: nifi   文件: FileBasedClusterNodeFirewall.java
private void syncWithRestoreDirectory() throws IOException {

        // sanity check that restore directory is a directory, creating it if necessary
        FileUtils.ensureDirectoryExistAndCanAccess(restoreDirectory);

        // check that restore directory is not the same as the primary directory
        if (config.getParentFile().getAbsolutePath().equals(restoreDirectory.getAbsolutePath())) {
            throw new IllegalStateException(
                    String.format("Cluster firewall configuration file '%s' cannot be in the restore directory '%s' ",
                            config.getAbsolutePath(), restoreDirectory.getAbsolutePath()));
        }

        // the restore copy will have same file name, but reside in a different directory
        final File restoreFile = new File(restoreDirectory, config.getName());

        // sync the primary copy with the restore copy
        FileUtils.syncWithRestore(config, restoreFile, logger);

    }
 
源代码16 项目: nifi   文件: FileSystemRepository.java
@Override
public long getContainerCapacity(final String containerName) throws IOException {
    final Path path = containers.get(containerName);

    if (path == null) {
        throw new IllegalArgumentException("No container exists with name " + containerName);
    }

    long capacity = FileUtils.getContainerCapacity(path);

    if(capacity==0) {
        throw new IOException("System returned total space of the partition for " + containerName + " is zero byte. "
                + "Nifi can not create a zero sized FileSystemRepository.");
    }

    return capacity;
}
 
源代码17 项目: nifi   文件: StandardFlowService.java
@Override
public void copyCurrentFlow(final OutputStream os) throws IOException {
    readLock.lock();
    try {
        if (!Files.exists(flowXml) || Files.size(flowXml) == 0) {
            return;
        }

        try (final InputStream in = Files.newInputStream(flowXml, StandardOpenOption.READ);
                final InputStream gzipIn = new GZIPInputStream(in)) {
            FileUtils.copy(gzipIn, os);
        }
    } finally {
        readLock.unlock();
    }
}
 
源代码18 项目: localization_nifi   文件: SSLContextFactory.java
public SSLContextFactory(final NiFiProperties properties) throws NoSuchAlgorithmException, CertificateException, FileNotFoundException, IOException, KeyStoreException, UnrecoverableKeyException {
    keystore = properties.getProperty(NiFiProperties.SECURITY_KEYSTORE);
    keystorePass = getPass(properties.getProperty(NiFiProperties.SECURITY_KEYSTORE_PASSWD));
    keystoreType = properties.getProperty(NiFiProperties.SECURITY_KEYSTORE_TYPE);

    truststore = properties.getProperty(NiFiProperties.SECURITY_TRUSTSTORE);
    truststorePass = getPass(properties.getProperty(NiFiProperties.SECURITY_TRUSTSTORE_PASSWD));
    truststoreType = properties.getProperty(NiFiProperties.SECURITY_TRUSTSTORE_TYPE);

    // prepare the keystore
    final KeyStore keyStore = KeyStoreUtils.getKeyStore(keystoreType);
    final FileInputStream keyStoreStream = new FileInputStream(keystore);
    try {
        keyStore.load(keyStoreStream, keystorePass);
    } finally {
        FileUtils.closeQuietly(keyStoreStream);
    }
    final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
    keyManagerFactory.init(keyStore, keystorePass);

    // prepare the truststore
    final KeyStore trustStore = KeyStoreUtils.getTrustStore(truststoreType);
    final FileInputStream trustStoreStream = new FileInputStream(truststore);
    try {
        trustStore.load(trustStoreStream, truststorePass);
    } finally {
        FileUtils.closeQuietly(trustStoreStream);
    }
    final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
    trustManagerFactory.init(trustStore);

    keyManagers = keyManagerFactory.getKeyManagers();
    trustManagers = trustManagerFactory.getTrustManagers();
}
 
源代码19 项目: localization_nifi   文件: LuceneEventIndex.java
protected boolean tryDeleteIndex(final File indexDirectory) {
    final long startNanos = System.nanoTime();
    boolean removed = false;
    while (!removed && System.nanoTime() - startNanos < TimeUnit.SECONDS.toNanos(MAX_DELETE_INDEX_WAIT_SECONDS)) {
        removed = indexManager.removeIndex(indexDirectory);

        if (!removed) {
            try {
                Thread.sleep(5000L);
            } catch (final InterruptedException ie) {
                logger.debug("Interrupted when trying to remove index {} from IndexManager; will not remove index", indexDirectory);
                Thread.currentThread().interrupt();
                return false;
            }
        }
    }

    if (removed) {
        try {
            FileUtils.deleteFile(indexDirectory, true);
            logger.debug("Successfully deleted directory {}", indexDirectory);
        } catch (final IOException e) {
            logger.warn("The Lucene Index located at " + indexDirectory + " has expired and contains no Provenance Events that still exist in the respository. "
                + "However, the directory could not be deleted.", e);
        }

        directoryManager.deleteDirectory(indexDirectory);
        logger.info("Successfully removed expired Lucene Index {}", indexDirectory);
    } else {
        logger.warn("The Lucene Index located at {} has expired and contains no Provenance Events that still exist in the respository. "
            + "However, the directory could not be deleted because it is still actively being used. Will continue to try to delete "
            + "in a subsequent maintenance cycle.", indexDirectory);
    }

    return removed;
}
 
@Test
public void testMultipleRecordsMultipleBlocksCompressed() throws IOException {
    final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
    final File tocFile = TocUtil.getTocFile(journalFile);
    final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
    // new block each 10 bytes
    final RecordWriter writer = createWriter(journalFile, tocWriter, true, 100);

    writer.writeHeader(1L);
    for (int i = 0; i < 10; i++) {
        writer.writeRecord(createEvent());
    }
    writer.close();

    final TocReader tocReader = new StandardTocReader(tocFile);

    try (final FileInputStream fis = new FileInputStream(journalFile);
        final RecordReader reader = createReader(fis, journalFile.getName(), tocReader, 2048)) {
        for (int i = 0; i < 10; i++) {
            final StandardProvenanceEventRecord recovered = reader.nextRecord();
            System.out.println(recovered);
            assertNotNull(recovered);
            assertEquals(i, recovered.getEventId());
            assertEquals("nifi://unit-test", recovered.getTransitUri());

            final Map<String, String> updatedAttrs = recovered.getUpdatedAttributes();
            assertNotNull(updatedAttrs);
            assertEquals(2, updatedAttrs.size());
            assertEquals("1.txt", updatedAttrs.get("filename"));
            assertTrue(updatedAttrs.containsKey("uuid"));
        }

        assertNull(reader.nextRecord());
    }

    FileUtils.deleteFile(journalFile.getParentFile(), true);
}
 
源代码21 项目: localization_nifi   文件: TestSimpleIndexManager.java
@Test
public void testMultipleWritersSimultaneouslySameIndex() throws IOException {
    final SimpleIndexManager mgr = new SimpleIndexManager(new RepositoryConfiguration());
    final File dir = new File("target/" + UUID.randomUUID().toString());
    try {
        final EventIndexWriter writer1 = mgr.borrowIndexWriter(dir);
        final EventIndexWriter writer2 = mgr.borrowIndexWriter(dir);

        final Document doc1 = new Document();
        doc1.add(new StringField("id", "1", Store.YES));

        final Document doc2 = new Document();
        doc2.add(new StringField("id", "2", Store.YES));

        writer1.index(doc1, 1000);
        writer2.index(doc2, 1000);
        mgr.returnIndexWriter(writer2);
        mgr.returnIndexWriter(writer1);

        final EventIndexSearcher searcher = mgr.borrowIndexSearcher(dir);
        final TopDocs topDocs = searcher.getIndexSearcher().search(new MatchAllDocsQuery(), 2);
        assertEquals(2, topDocs.totalHits);
        mgr.returnIndexSearcher(searcher);
    } finally {
        FileUtils.deleteFile(dir, true);
    }
}
 
源代码22 项目: localization_nifi   文件: TestListDatabaseTables.java
@BeforeClass
public static void setupBeforeClass() throws IOException {
    System.setProperty("derby.stream.error.file", "target/derby.log");

    // remove previous test database, if any
    final File dbLocation = new File(DB_LOCATION);
    try {
        FileUtils.deleteFile(dbLocation, true);
    } catch (IOException ioe) {
        // Do nothing, may not have existed
    }
}
 
源代码23 项目: localization_nifi   文件: TestListDatabaseTables.java
@AfterClass
public static void cleanUpAfterClass() throws Exception {
    try {
        DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";shutdown=true");
    } catch (SQLNonTransientConnectionException e) {
        // Do nothing, this is what happens at Derby shutdown
    }
    // remove previous test database, if any
    final File dbLocation = new File(DB_LOCATION);
    try {
        FileUtils.deleteFile(dbLocation, true);
    } catch (IOException ioe) {
        // Do nothing, may not have existed
    }
}
 
源代码24 项目: localization_nifi   文件: QueryDatabaseTableTest.java
@BeforeClass
public static void setupBeforeClass() throws IOException {
    System.setProperty("derby.stream.error.file", "target/derby.log");

    // remove previous test database, if any
    final File dbLocation = new File(DB_LOCATION);
    try {
        FileUtils.deleteFile(dbLocation, true);
    } catch (IOException ioe) {
        // Do nothing, may not have existed
    }
}
 
源代码25 项目: localization_nifi   文件: QueryDatabaseTableTest.java
@AfterClass
public static void cleanUpAfterClass() throws Exception {
    try {
        DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";shutdown=true");
    } catch (SQLNonTransientConnectionException e) {
        // Do nothing, this is what happens at Derby shutdown
    }
    // remove previous test database, if any
    final File dbLocation = new File(DB_LOCATION);
    try {
        FileUtils.deleteFile(dbLocation, true);
    } catch (IOException ioe) {
        // Do nothing, may not have existed
    }
}
 
源代码26 项目: localization_nifi   文件: TestGenerateTableFetch.java
@BeforeClass
public static void setupBeforeClass() throws IOException {
    System.setProperty("derby.stream.error.file", "target/derby.log");

    // remove previous test database, if any
    final File dbLocation = new File(DB_LOCATION);
    try {
        FileUtils.deleteFile(dbLocation, true);
    } catch (IOException ioe) {
        // Do nothing, may not have existed
    }
}
 
源代码27 项目: localization_nifi   文件: TestGenerateTableFetch.java
@AfterClass
public static void cleanUpAfterClass() throws Exception {
    try {
        DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";shutdown=true");
    } catch (SQLNonTransientConnectionException e) {
        // Do nothing, this is what happens at Derby shutdown
    }
    // remove previous test database, if any
    final File dbLocation = new File(DB_LOCATION);
    try {
        FileUtils.deleteFile(dbLocation, true);
    } catch (IOException ioe) {
        // Do nothing, may not have existed
    }
}
 
源代码28 项目: localization_nifi   文件: ExecuteFlumeSinkTest.java
@Test
@Ignore("Does not work on Windows")
public void testHdfsSink() throws IOException {
    File destDir = temp.newFolder("hdfs");

    TestRunner runner = TestRunners.newTestRunner(ExecuteFlumeSink.class);
    runner.setProperty(ExecuteFlumeSink.SINK_TYPE, "hdfs");
    runner.setProperty(ExecuteFlumeSink.FLUME_CONFIG,
        "tier1.sinks.sink-1.hdfs.path = " + destDir.toURI().toString() + "\n" +
        "tier1.sinks.sink-1.hdfs.fileType = DataStream\n" +
        "tier1.sinks.sink-1.hdfs.serializer = TEXT\n" +
        "tier1.sinks.sink-1.serializer.appendNewline = false"
    );
    try (InputStream inputStream = getClass().getResourceAsStream("/testdata/records.txt")) {
        Map<String, String> attributes = new HashMap<>();
        attributes.put(CoreAttributes.FILENAME.key(), "records.txt");
        runner.enqueue(inputStream, attributes);
        runner.run();
    }

    File[] files = destDir.listFiles((FilenameFilter)HiddenFileFilter.VISIBLE);
    assertEquals("Unexpected number of destination files.", 1, files.length);
    File dst = files[0];
    byte[] expectedMd5;
    try (InputStream md5Stream = getClass().getResourceAsStream("/testdata/records.txt")) {
        expectedMd5 = FileUtils.computeMd5Digest(md5Stream);
    }
    byte[] actualMd5 = FileUtils.computeMd5Digest(dst);
    Assert.assertArrayEquals("Destination file doesn't match source data", expectedMd5, actualMd5);
}
 
源代码29 项目: localization_nifi   文件: ExecuteFlumeSourceTest.java
@Test
@Ignore("Does not work on Windows")
public void testSourceWithConfig() throws IOException {
    File spoolDirectory = temp.newFolder("spooldir");
    File dst = new File(spoolDirectory, "records.txt");
    FileUtils.copyFile(getClass().getResourceAsStream("/testdata/records.txt"), dst, true, false);

    TestRunner runner = TestRunners.newTestRunner(ExecuteFlumeSource.class);
    runner.setProperty(ExecuteFlumeSource.SOURCE_TYPE, "spooldir");
    runner.setProperty(ExecuteFlumeSink.FLUME_CONFIG,
        "tier1.sources.src-1.spoolDir = " + spoolDirectory.getAbsolutePath());
    runner.run(1, false, true);
    // Because the spool directory source is an event driven source, it may take some time for flow files to get
    // produced. I'm willing to wait up to 5 seconds, but will bail out early if possible. If it takes longer than
    // that then there is likely a bug.
    int numWaits = 10;
    while (runner.getFlowFilesForRelationship(ExecuteFlumeSource.SUCCESS).size() < 4 && --numWaits > 0) {
        try {
            TimeUnit.MILLISECONDS.sleep(500);
        } catch (InterruptedException ex) {
            logger.warn("Sleep interrupted");
        }
    }
    runner.shutdown();
    runner.assertTransferCount(ExecuteFlumeSource.SUCCESS, 4);
    int i = 1;
    for (MockFlowFile flowFile : runner.getFlowFilesForRelationship(ExecuteFlumeSource.SUCCESS)) {
        flowFile.assertContentEquals("record " + i);
        i++;
    }
}
 
@Override
public synchronized void load(final OutputStream os) throws IOException {
    if (!isFlowPresent()) {
        return;
    }

    try (final InputStream inStream = Files.newInputStream(flowXmlPath, StandardOpenOption.READ);
            final InputStream gzipIn = new GZIPInputStream(inStream)) {
        FileUtils.copy(gzipIn, os);
    }
}