下面列出了com.amazonaws.auth.PropertiesCredentials#org.apache.nifi.util.file.FileUtils 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testSimpleWriteWithToc() throws IOException {
final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite");
final File tocFile = TocUtil.getTocFile(journalFile);
final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
final RecordWriter writer = createWriter(journalFile, tocWriter, false, 1024 * 1024);
writer.writeHeader(1L);
writer.writeRecord(createEvent());
writer.close();
final TocReader tocReader = new StandardTocReader(tocFile);
try (final FileInputStream fis = new FileInputStream(journalFile);
final RecordReader reader = createReader(fis, journalFile.getName(), tocReader, 2048)) {
assertEquals(0, reader.getBlockIndex());
reader.skipToBlock(0);
final StandardProvenanceEventRecord recovered = reader.nextRecord();
assertNotNull(recovered);
assertEquals("nifi://unit-test", recovered.getTransitUri());
assertNull(reader.nextRecord());
}
FileUtils.deleteFile(journalFile.getParentFile(), true);
}
@Test
public void testSingleRecordCompressed() throws IOException {
final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
final File tocFile = TocUtil.getTocFile(journalFile);
final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
final RecordWriter writer = createWriter(journalFile, tocWriter, true, 8192);
writer.writeHeader(1L);
writer.writeRecord(createEvent());
writer.close();
final TocReader tocReader = new StandardTocReader(tocFile);
try (final FileInputStream fis = new FileInputStream(journalFile);
final RecordReader reader = createReader(fis, journalFile.getName(), tocReader, 2048)) {
assertEquals(0, reader.getBlockIndex());
reader.skipToBlock(0);
final StandardProvenanceEventRecord recovered = reader.nextRecord();
assertNotNull(recovered);
assertEquals("nifi://unit-test", recovered.getTransitUri());
assertNull(reader.nextRecord());
}
FileUtils.deleteFile(journalFile.getParentFile(), true);
}
@Override
protected void doPost(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
// Capture all the headers for reference. Intentionally choosing to not special handling for headers with multiple values for clarity
final Enumeration<String> headerNames = request.getHeaderNames();
lastPostHeaders = new HashMap<>();
while (headerNames.hasMoreElements()) {
final String nextHeader = headerNames.nextElement();
lastPostHeaders.put(nextHeader, request.getHeader(nextHeader));
}
try {
StreamUtils.copy(request.getInputStream(), baos);
this.lastPost = baos.toByteArray();
} finally {
FileUtils.closeQuietly(baos);
}
response.setStatus(Status.OK.getStatusCode());
}
@Override
protected void doPost(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
// Capture all the headers for reference. Intentionally choosing to not special handling for headers with multiple values for clarity
final Enumeration<String> headerNames = request.getHeaderNames();
lastPostHeaders = new HashMap<>();
while (headerNames.hasMoreElements()) {
final String nextHeader = headerNames.nextElement();
lastPostHeaders.put(nextHeader, request.getHeader(nextHeader));
}
try {
StreamUtils.copy(request.getInputStream(), baos);
this.lastPost = baos.toByteArray();
} finally {
FileUtils.closeQuietly(baos);
}
response.setStatus(Status.OK.getStatusCode());
}
private void syncWithRestoreDirectory() throws IOException {
// sanity check that restore directory is a directory, creating it if necessary
FileUtils.ensureDirectoryExistAndCanAccess(restoreDirectory);
// check that restore directory is not the same as the primary directory
if (config.getParentFile().getAbsolutePath().equals(restoreDirectory.getAbsolutePath())) {
throw new IllegalStateException(
String.format("Cluster firewall configuration file '%s' cannot be in the restore directory '%s' ",
config.getAbsolutePath(), restoreDirectory.getAbsolutePath()));
}
// the restore copy will have same file name, but reside in a different directory
final File restoreFile = new File(restoreDirectory, config.getName());
// sync the primary copy with the restore copy
FileUtils.syncWithRestore(config, restoreFile, logger);
}
@Override
public void copyCurrentFlow(final OutputStream os) throws IOException {
readLock.lock();
try {
if (!Files.exists(flowXml) || Files.size(flowXml) == 0) {
return;
}
try (final InputStream in = Files.newInputStream(flowXml, StandardOpenOption.READ);
final InputStream gzipIn = new GZIPInputStream(in)) {
FileUtils.copy(gzipIn, os);
}
} finally {
readLock.unlock();
}
}
@Before
public void setup() throws IOException
{
final String windowDataDir = "target/" + this.getClass().getSimpleName();
final File windowDataDirFile = new File(windowDataDir);
if (windowDataDirFile.exists()) {
FileUtils.deleteFile(windowDataDirFile, true);
}
Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
attributeMap.put(DAG.APPLICATION_PATH, windowDataDir);
context = mockOperatorContext(12345, attributeMap);
sink = new CollectorTestSink<>();
builder = new MockSiteToSiteClient.Builder();
windowDataManager = new FSWindowDataManager();
operator = new NiFiSinglePortInputOperator(builder, windowDataManager);
operator.outputPort.setSink(sink);
}
@Before
public void setup() throws IOException
{
final String windowDataDir = "target/" + this.getClass().getSimpleName();
final File windowDataDirFile = new File(windowDataDir);
if (windowDataDirFile.exists()) {
FileUtils.deleteFile(windowDataDirFile, true);
}
Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
attributeMap.put(DAG.APPLICATION_PATH, windowDataDir);
context = mockOperatorContext(12345, attributeMap);
windowDataManager = new FSWindowDataManager();
stsBuilder = new MockSiteToSiteClient.Builder();
dpBuilder = new StringNiFiDataPacketBuilder();
operator = new NiFiSinglePortOutputOperator(stsBuilder, dpBuilder, windowDataManager, 1);
}
@Override
public long getContainerCapacity(final String containerName) throws IOException {
Map<String, File> map = configuration.getStorageDirectories();
File container = map.get(containerName);
if(container != null) {
long capacity = FileUtils.getContainerCapacity(container.toPath());
if(capacity==0) {
throw new IOException("System returned total space of the partition for " + containerName + " is zero byte. "
+ "Nifi can not create a zero sized provenance repository.");
}
return capacity;
} else {
throw new IllegalArgumentException("There is no defined container with name " + containerName);
}
}
@Override
public long getContainerCapacity(final String containerName) throws IOException {
Map<String, File> map = config.getStorageDirectories();
File container = map.get(containerName);
if(container != null) {
long capacity = FileUtils.getContainerCapacity(container.toPath());
if(capacity==0) {
throw new IOException("System returned total space of the partition for " + containerName + " is zero byte. "
+ "Nifi can not create a zero sized provenance repository.");
}
return capacity;
} else {
throw new IllegalArgumentException("There is no defined container with name " + containerName);
}
}
@Test
public void testSimpleWriteWithToc() throws IOException {
final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite");
final File tocFile = TocUtil.getTocFile(journalFile);
final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
final RecordWriter writer = createWriter(journalFile, tocWriter, false, 1024 * 1024);
writer.writeHeader(1L);
writer.writeRecord(createEvent());
writer.close();
final TocReader tocReader = new StandardTocReader(tocFile);
final String expectedTransitUri = "nifi://unit-test";
final int expectedBlockIndex = 0;
assertRecoveredRecord(journalFile, tocReader, expectedTransitUri, expectedBlockIndex);
FileUtils.deleteFile(journalFile.getParentFile(), true);
}
@Test
public void testSingleRecordCompressed() throws IOException {
final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
final File tocFile = TocUtil.getTocFile(journalFile);
final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
final RecordWriter writer = createWriter(journalFile, tocWriter, true, 8192);
writer.writeHeader(1L);
writer.writeRecord(createEvent());
writer.close();
final TocReader tocReader = new StandardTocReader(tocFile);
assertRecoveredRecord(journalFile, tocReader, "nifi://unit-test", 0);
FileUtils.deleteFile(journalFile.getParentFile(), true);
}
@Override
protected void doPost(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
// Capture all the headers for reference. Intentionally choosing to not special handling for headers with multiple values for clarity
final Enumeration<String> headerNames = request.getHeaderNames();
lastPostHeaders = new HashMap<>();
while (headerNames.hasMoreElements()) {
final String nextHeader = headerNames.nextElement();
lastPostHeaders.put(nextHeader, request.getHeader(nextHeader));
}
try {
StreamUtils.copy(request.getInputStream(), baos);
this.lastPost = baos.toByteArray();
} finally {
FileUtils.closeQuietly(baos);
}
response.setStatus(Status.OK.getStatusCode());
}
@Override
protected void doPost(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
// Capture all the headers for reference. Intentionally choosing to not special handling for headers with multiple values for clarity
final Enumeration<String> headerNames = request.getHeaderNames();
lastPostHeaders = new HashMap<>();
while (headerNames.hasMoreElements()) {
final String nextHeader = headerNames.nextElement();
lastPostHeaders.put(nextHeader, request.getHeader(nextHeader));
}
try {
StreamUtils.copy(request.getInputStream(), baos);
this.lastPost = baos.toByteArray();
} finally {
FileUtils.closeQuietly(baos);
}
response.setStatus(Status.OK.getStatusCode());
}
private void syncWithRestoreDirectory() throws IOException {
// sanity check that restore directory is a directory, creating it if necessary
FileUtils.ensureDirectoryExistAndCanAccess(restoreDirectory);
// check that restore directory is not the same as the primary directory
if (config.getParentFile().getAbsolutePath().equals(restoreDirectory.getAbsolutePath())) {
throw new IllegalStateException(
String.format("Cluster firewall configuration file '%s' cannot be in the restore directory '%s' ",
config.getAbsolutePath(), restoreDirectory.getAbsolutePath()));
}
// the restore copy will have same file name, but reside in a different directory
final File restoreFile = new File(restoreDirectory, config.getName());
// sync the primary copy with the restore copy
FileUtils.syncWithRestore(config, restoreFile, logger);
}
@Override
public long getContainerCapacity(final String containerName) throws IOException {
final Path path = containers.get(containerName);
if (path == null) {
throw new IllegalArgumentException("No container exists with name " + containerName);
}
long capacity = FileUtils.getContainerCapacity(path);
if(capacity==0) {
throw new IOException("System returned total space of the partition for " + containerName + " is zero byte. "
+ "Nifi can not create a zero sized FileSystemRepository.");
}
return capacity;
}
@Override
public void copyCurrentFlow(final OutputStream os) throws IOException {
readLock.lock();
try {
if (!Files.exists(flowXml) || Files.size(flowXml) == 0) {
return;
}
try (final InputStream in = Files.newInputStream(flowXml, StandardOpenOption.READ);
final InputStream gzipIn = new GZIPInputStream(in)) {
FileUtils.copy(gzipIn, os);
}
} finally {
readLock.unlock();
}
}
public SSLContextFactory(final NiFiProperties properties) throws NoSuchAlgorithmException, CertificateException, FileNotFoundException, IOException, KeyStoreException, UnrecoverableKeyException {
keystore = properties.getProperty(NiFiProperties.SECURITY_KEYSTORE);
keystorePass = getPass(properties.getProperty(NiFiProperties.SECURITY_KEYSTORE_PASSWD));
keystoreType = properties.getProperty(NiFiProperties.SECURITY_KEYSTORE_TYPE);
truststore = properties.getProperty(NiFiProperties.SECURITY_TRUSTSTORE);
truststorePass = getPass(properties.getProperty(NiFiProperties.SECURITY_TRUSTSTORE_PASSWD));
truststoreType = properties.getProperty(NiFiProperties.SECURITY_TRUSTSTORE_TYPE);
// prepare the keystore
final KeyStore keyStore = KeyStoreUtils.getKeyStore(keystoreType);
final FileInputStream keyStoreStream = new FileInputStream(keystore);
try {
keyStore.load(keyStoreStream, keystorePass);
} finally {
FileUtils.closeQuietly(keyStoreStream);
}
final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
keyManagerFactory.init(keyStore, keystorePass);
// prepare the truststore
final KeyStore trustStore = KeyStoreUtils.getTrustStore(truststoreType);
final FileInputStream trustStoreStream = new FileInputStream(truststore);
try {
trustStore.load(trustStoreStream, truststorePass);
} finally {
FileUtils.closeQuietly(trustStoreStream);
}
final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
trustManagerFactory.init(trustStore);
keyManagers = keyManagerFactory.getKeyManagers();
trustManagers = trustManagerFactory.getTrustManagers();
}
protected boolean tryDeleteIndex(final File indexDirectory) {
final long startNanos = System.nanoTime();
boolean removed = false;
while (!removed && System.nanoTime() - startNanos < TimeUnit.SECONDS.toNanos(MAX_DELETE_INDEX_WAIT_SECONDS)) {
removed = indexManager.removeIndex(indexDirectory);
if (!removed) {
try {
Thread.sleep(5000L);
} catch (final InterruptedException ie) {
logger.debug("Interrupted when trying to remove index {} from IndexManager; will not remove index", indexDirectory);
Thread.currentThread().interrupt();
return false;
}
}
}
if (removed) {
try {
FileUtils.deleteFile(indexDirectory, true);
logger.debug("Successfully deleted directory {}", indexDirectory);
} catch (final IOException e) {
logger.warn("The Lucene Index located at " + indexDirectory + " has expired and contains no Provenance Events that still exist in the respository. "
+ "However, the directory could not be deleted.", e);
}
directoryManager.deleteDirectory(indexDirectory);
logger.info("Successfully removed expired Lucene Index {}", indexDirectory);
} else {
logger.warn("The Lucene Index located at {} has expired and contains no Provenance Events that still exist in the respository. "
+ "However, the directory could not be deleted because it is still actively being used. Will continue to try to delete "
+ "in a subsequent maintenance cycle.", indexDirectory);
}
return removed;
}
@Test
public void testMultipleRecordsMultipleBlocksCompressed() throws IOException {
final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
final File tocFile = TocUtil.getTocFile(journalFile);
final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
// new block each 10 bytes
final RecordWriter writer = createWriter(journalFile, tocWriter, true, 100);
writer.writeHeader(1L);
for (int i = 0; i < 10; i++) {
writer.writeRecord(createEvent());
}
writer.close();
final TocReader tocReader = new StandardTocReader(tocFile);
try (final FileInputStream fis = new FileInputStream(journalFile);
final RecordReader reader = createReader(fis, journalFile.getName(), tocReader, 2048)) {
for (int i = 0; i < 10; i++) {
final StandardProvenanceEventRecord recovered = reader.nextRecord();
System.out.println(recovered);
assertNotNull(recovered);
assertEquals(i, recovered.getEventId());
assertEquals("nifi://unit-test", recovered.getTransitUri());
final Map<String, String> updatedAttrs = recovered.getUpdatedAttributes();
assertNotNull(updatedAttrs);
assertEquals(2, updatedAttrs.size());
assertEquals("1.txt", updatedAttrs.get("filename"));
assertTrue(updatedAttrs.containsKey("uuid"));
}
assertNull(reader.nextRecord());
}
FileUtils.deleteFile(journalFile.getParentFile(), true);
}
@Test
public void testMultipleWritersSimultaneouslySameIndex() throws IOException {
final SimpleIndexManager mgr = new SimpleIndexManager(new RepositoryConfiguration());
final File dir = new File("target/" + UUID.randomUUID().toString());
try {
final EventIndexWriter writer1 = mgr.borrowIndexWriter(dir);
final EventIndexWriter writer2 = mgr.borrowIndexWriter(dir);
final Document doc1 = new Document();
doc1.add(new StringField("id", "1", Store.YES));
final Document doc2 = new Document();
doc2.add(new StringField("id", "2", Store.YES));
writer1.index(doc1, 1000);
writer2.index(doc2, 1000);
mgr.returnIndexWriter(writer2);
mgr.returnIndexWriter(writer1);
final EventIndexSearcher searcher = mgr.borrowIndexSearcher(dir);
final TopDocs topDocs = searcher.getIndexSearcher().search(new MatchAllDocsQuery(), 2);
assertEquals(2, topDocs.totalHits);
mgr.returnIndexSearcher(searcher);
} finally {
FileUtils.deleteFile(dir, true);
}
}
@BeforeClass
public static void setupBeforeClass() throws IOException {
System.setProperty("derby.stream.error.file", "target/derby.log");
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
try {
FileUtils.deleteFile(dbLocation, true);
} catch (IOException ioe) {
// Do nothing, may not have existed
}
}
@AfterClass
public static void cleanUpAfterClass() throws Exception {
try {
DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";shutdown=true");
} catch (SQLNonTransientConnectionException e) {
// Do nothing, this is what happens at Derby shutdown
}
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
try {
FileUtils.deleteFile(dbLocation, true);
} catch (IOException ioe) {
// Do nothing, may not have existed
}
}
@BeforeClass
public static void setupBeforeClass() throws IOException {
System.setProperty("derby.stream.error.file", "target/derby.log");
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
try {
FileUtils.deleteFile(dbLocation, true);
} catch (IOException ioe) {
// Do nothing, may not have existed
}
}
@AfterClass
public static void cleanUpAfterClass() throws Exception {
try {
DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";shutdown=true");
} catch (SQLNonTransientConnectionException e) {
// Do nothing, this is what happens at Derby shutdown
}
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
try {
FileUtils.deleteFile(dbLocation, true);
} catch (IOException ioe) {
// Do nothing, may not have existed
}
}
@BeforeClass
public static void setupBeforeClass() throws IOException {
System.setProperty("derby.stream.error.file", "target/derby.log");
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
try {
FileUtils.deleteFile(dbLocation, true);
} catch (IOException ioe) {
// Do nothing, may not have existed
}
}
@AfterClass
public static void cleanUpAfterClass() throws Exception {
try {
DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";shutdown=true");
} catch (SQLNonTransientConnectionException e) {
// Do nothing, this is what happens at Derby shutdown
}
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
try {
FileUtils.deleteFile(dbLocation, true);
} catch (IOException ioe) {
// Do nothing, may not have existed
}
}
@Test
@Ignore("Does not work on Windows")
public void testHdfsSink() throws IOException {
File destDir = temp.newFolder("hdfs");
TestRunner runner = TestRunners.newTestRunner(ExecuteFlumeSink.class);
runner.setProperty(ExecuteFlumeSink.SINK_TYPE, "hdfs");
runner.setProperty(ExecuteFlumeSink.FLUME_CONFIG,
"tier1.sinks.sink-1.hdfs.path = " + destDir.toURI().toString() + "\n" +
"tier1.sinks.sink-1.hdfs.fileType = DataStream\n" +
"tier1.sinks.sink-1.hdfs.serializer = TEXT\n" +
"tier1.sinks.sink-1.serializer.appendNewline = false"
);
try (InputStream inputStream = getClass().getResourceAsStream("/testdata/records.txt")) {
Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.FILENAME.key(), "records.txt");
runner.enqueue(inputStream, attributes);
runner.run();
}
File[] files = destDir.listFiles((FilenameFilter)HiddenFileFilter.VISIBLE);
assertEquals("Unexpected number of destination files.", 1, files.length);
File dst = files[0];
byte[] expectedMd5;
try (InputStream md5Stream = getClass().getResourceAsStream("/testdata/records.txt")) {
expectedMd5 = FileUtils.computeMd5Digest(md5Stream);
}
byte[] actualMd5 = FileUtils.computeMd5Digest(dst);
Assert.assertArrayEquals("Destination file doesn't match source data", expectedMd5, actualMd5);
}
@Test
@Ignore("Does not work on Windows")
public void testSourceWithConfig() throws IOException {
File spoolDirectory = temp.newFolder("spooldir");
File dst = new File(spoolDirectory, "records.txt");
FileUtils.copyFile(getClass().getResourceAsStream("/testdata/records.txt"), dst, true, false);
TestRunner runner = TestRunners.newTestRunner(ExecuteFlumeSource.class);
runner.setProperty(ExecuteFlumeSource.SOURCE_TYPE, "spooldir");
runner.setProperty(ExecuteFlumeSink.FLUME_CONFIG,
"tier1.sources.src-1.spoolDir = " + spoolDirectory.getAbsolutePath());
runner.run(1, false, true);
// Because the spool directory source is an event driven source, it may take some time for flow files to get
// produced. I'm willing to wait up to 5 seconds, but will bail out early if possible. If it takes longer than
// that then there is likely a bug.
int numWaits = 10;
while (runner.getFlowFilesForRelationship(ExecuteFlumeSource.SUCCESS).size() < 4 && --numWaits > 0) {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException ex) {
logger.warn("Sleep interrupted");
}
}
runner.shutdown();
runner.assertTransferCount(ExecuteFlumeSource.SUCCESS, 4);
int i = 1;
for (MockFlowFile flowFile : runner.getFlowFilesForRelationship(ExecuteFlumeSource.SUCCESS)) {
flowFile.assertContentEquals("record " + i);
i++;
}
}
@Override
public synchronized void load(final OutputStream os) throws IOException {
if (!isFlowPresent()) {
return;
}
try (final InputStream inStream = Files.newInputStream(flowXmlPath, StandardOpenOption.READ);
final InputStream gzipIn = new GZIPInputStream(inStream)) {
FileUtils.copy(gzipIn, os);
}
}