下面列出了怎么用org.apache.hadoop.fs.FSDataOutputStream的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* test case: if the BlockSender decides there is only one packet to send,
* the previous computation of the pktSize based on transferToAllowed
* would result in too small a buffer to do the buffer-copy needed
* for partial chunks.
*/
public void testUnfinishedBlockPacketBufferOverrun() throws IOException {
// check that / exists
Path path = new Path("/");
System.out.println("Path : \"" + path.toString() + "\"");
System.out.println(fileSystem.getFileStatus(path).isDir());
assertTrue("/ should be a directory",
fileSystem.getFileStatus(path).isDir());
// create a new file in the root, write data, do no close
Path file1 = new Path("/unfinished-block");
final FSDataOutputStream stm =
TestFileCreation.createFile(fileSystem, file1, 1);
// write partial block and sync
final int bytesPerChecksum = conf.getInt("io.bytes.per.checksum", 512);
final int partialBlockSize = bytesPerChecksum - 1;
writeFileAndSync(stm, partialBlockSize);
// Make sure a client can read it before it is closed
checkCanRead(fileSystem, file1, partialBlockSize);
stm.close();
}
@Override
public void run() {
System.out.println("Workload starting ");
for (int i = 0; i < numberOfFiles; i++) {
Path filename = new Path(id + "." + i);
try {
System.out.println("Workload processing file " + filename);
FSDataOutputStream stm = createFile(fs, filename, replication);
DFSOutputStream dfstream = (DFSOutputStream)
(stm.getWrappedStream());
dfstream.setArtificialSlowdown(1000);
writeFile(stm, myseed);
stm.close();
checkFile(fs, filename, replication, numBlocks, fileSize, myseed);
} catch (Throwable e) {
System.out.println("Workload exception " + e);
assertTrue(e.toString(), false);
}
// increment the stamp to indicate that another file is done.
synchronized (this) {
stamp++;
}
}
}
@Override
public void storeCheckpoint(CheckpointInfo info, Checkpoint checkpoint)
throws StatefulStorageException {
Path path = new Path(getCheckpointPath(info.getCheckpointId(),
info.getComponent(),
info.getInstanceId()));
// We need to ensure the existence of directories structure,
// since it is not guaranteed that FileSystem.create(..) always creates parents' dirs.
String checkpointDir = getCheckpointDir(info.getCheckpointId(),
info.getComponent());
createDir(checkpointDir);
FSDataOutputStream out = null;
try {
out = fileSystem.create(path);
checkpoint.getCheckpoint().writeTo(out);
} catch (IOException e) {
throw new StatefulStorageException("Failed to persist", e);
} finally {
SysUtils.closeIgnoringExceptions(out);
}
}
@Test
public void testPersistDataStore() {
IDataStore dataStore = Mockito.mock(IDataStore.class);
IMetaData metaData = Mockito.mock(IMetaData.class);
IRecord record = Mockito.mock(IRecord.class);
IField fieldInt = Mockito.mock(IField.class);
IField fieldStr = Mockito.mock(IField.class);
Mockito.when(dataStore.getMetaData()).thenReturn(metaData);
Mockito.when(dataStore.getRecordAt(Mockito.anyInt())).thenReturn(record);
Mockito.when(dataStore.getRecordsCount()).thenReturn(10L);
Mockito.when(metaData.getFieldCount()).thenReturn(2);
Mockito.when(metaData.getFieldName(1)).thenReturn("column_Int");
Mockito.when(metaData.getFieldName(2)).thenReturn("column_Str");
Mockito.when(metaData.getFieldType(1)).thenReturn(Integer.class);
Mockito.when(metaData.getFieldType(2)).thenReturn(String.class);
Mockito.when(record.getFieldAt(1)).thenReturn(fieldInt);
Mockito.when(record.getFieldAt(2)).thenReturn(fieldStr);
Mockito.when(fieldInt.getValue()).thenReturn(new Integer(1));
Mockito.when(fieldStr.getValue()).thenReturn(new String("test"));
FSDataOutputStream fsOS = (FSDataOutputStream) hdfsManager.persistDataStore(dataStore, "test_table", "signature_xyz");
assertNotNull(fsOS);
assertEquals(fsOS.size(), 232);
}
void createFile(int count, String compress) throws IOException {
conf = new Configuration();
path = new Path(ROOT, outputFile + "." + compress);
fs = path.getFileSystem(conf);
FSDataOutputStream out = fs.create(path);
Writer writer = new Writer(out, BLOCK_SIZE, compress, comparator, conf);
int nx;
for (nx = 0; nx < count; nx++) {
byte[] key = composeSortedKey(KEY, count, nx).getBytes();
byte[] value = (VALUE + nx).getBytes();
writer.append(key, value);
}
writer.close();
out.close();
}
private void processBatchIfRequired(List<List<Writable>> list, boolean finalRecord) throws Exception {
if (list.isEmpty())
return;
if (list.size() < batchSize && !finalRecord)
return;
RecordReader rr = new CollectionRecordReader(list);
RecordReaderDataSetIterator iter = new RecordReaderDataSetIterator(rr, null, batchSize, labelIndex, labelIndex, numPossibleLabels, -1, regression);
DataSet ds = iter.next();
String filename = "dataset_" + uid + "_" + (outputCount++) + ".bin";
URI uri = new URI(outputDir.getPath() + "/" + filename);
Configuration c = conf == null ? DefaultHadoopConfig.get() : conf.getValue().getConfiguration();
FileSystem file = FileSystem.get(uri, c);
try (FSDataOutputStream out = file.create(new Path(uri))) {
ds.save(out);
}
list.clear();
}
/** @throws Exception If failed. */
@Test
public void testSetWorkingDirectory() throws Exception {
Path dir = new Path("/tmp/nested/dir");
Path file = new Path("file");
fs.mkdirs(dir);
fs.setWorkingDirectory(dir);
FSDataOutputStream os = fs.create(file);
os.close();
String filePath = fs.getFileStatus(new Path(dir, file)).getPath().toString();
assertTrue(filePath.contains("/tmp/nested/dir/file"));
}
@Test
public void outOfBandFolder_siblingCreate() throws Exception {
// NOTE: manual use of CloubBlockBlob targets working directory explicitly.
// WASB driver methods prepend working directory implicitly.
String workingDir = "user/"
+ UserGroupInformation.getCurrentUser().getShortUserName() + "/";
CloudBlockBlob blob = testAccount.getBlobReference(workingDir
+ "testFolder3/a/input/file");
BlobOutputStream s = blob.openOutputStream();
s.close();
assertTrue(fs.exists(new Path("testFolder3/a/input/file")));
Path targetFile = new Path("testFolder3/a/input/file2");
FSDataOutputStream s2 = fs.create(targetFile);
s2.close();
}
private SrcFileInfo createFile(Configuration conf, FileSystem fs, Path path, int numPartitions,
int numKeysPerPartition, int startKey) throws IOException {
FSDataOutputStream outStream = fs.create(path);
int currentKey = startKey;
SrcFileInfo srcFileInfo = new SrcFileInfo();
srcFileInfo.indexedRecords = new TezIndexRecord[numPartitions];
srcFileInfo.path = path;
for (int i = 0; i < numPartitions; i++) {
long pos = outStream.getPos();
IFile.Writer writer =
new IFile.Writer(conf, outStream, IntWritable.class, IntWritable.class, null, null, null);
for (int j = 0; j < numKeysPerPartition; j++) {
writer.append(new IntWritable(currentKey), new IntWritable(currentKey));
currentKey++;
}
writer.close();
srcFileInfo.indexedRecords[i] =
new TezIndexRecord(pos, writer.getRawLength(), writer.getCompressedLength());
}
outStream.close();
return srcFileInfo;
}
/**
* Ensure that even if a file is in a directory with the sticky bit on,
* another user can write to that file (assuming correct permissions).
*/
private void confirmCanAppend(Configuration conf, Path p) throws Exception {
// Write a file to the new tmp directory as a regular user
Path file = new Path(p, "foo");
writeFile(hdfsAsUser1, file);
hdfsAsUser1.setPermission(file, new FsPermission((short) 0777));
// Log onto cluster as another user and attempt to append to file
Path file2 = new Path(p, "foo");
FSDataOutputStream h = null;
try {
h = hdfsAsUser2.append(file2);
h.write("Some more data".getBytes());
h.close();
h = null;
} finally {
IOUtils.cleanup(null, h);
}
}
public void testBlocksScheduledCounter() throws IOException {
MiniDFSCluster cluster = new MiniDFSCluster(new Configuration(), 1,
true, null);
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
//open a file an write a few bytes:
FSDataOutputStream out = fs.create(new Path("/testBlockScheduledCounter"));
for (int i=0; i<1024; i++) {
out.write(i);
}
// flush to make sure a block is allocated.
((DFSOutputStream)(out.getWrappedStream())).sync();
ArrayList<DatanodeDescriptor> dnList = new ArrayList<DatanodeDescriptor>();
cluster.getNameNode().namesystem.DFSNodesStatus(dnList, dnList);
DatanodeDescriptor dn = dnList.get(0);
assertEquals(1, dn.getBlocksScheduled());
// close the file and the counter should go to zero.
out.close();
assertEquals(0, dn.getBlocksScheduled());
}
private void writeTableToStream(FSDataOutputStream stream, Table dataTable, String delimiter, Charset encoding) throws Exception {
BufferedWriter bufferedWriter = new BufferedWriter(
new OutputStreamWriter(stream, encoding));
List<List<String>> data = dataTable.getData();
for (int i = 0, flushThreshold = 0; i < data.size(); i++, flushThreshold++) {
List<String> row = data.get(i);
StringBuilder sBuilder = new StringBuilder();
for (int j = 0; j < row.size(); j++) {
sBuilder.append(row.get(j));
if (j != row.size() - 1) {
sBuilder.append(delimiter);
}
}
if (i != data.size() - 1) {
sBuilder.append("\n");
}
bufferedWriter.append(sBuilder.toString());
if (flushThreshold > ROW_BUFFER) {
bufferedWriter.flush();
}
}
bufferedWriter.close();
}
static public void createInputFile(FileSystem fs, String fileName,
String[] inputData) throws IOException {
if(Util.WINDOWS){
fileName = fileName.replace('\\','/');
}
if(fs.exists(new Path(fileName))) {
throw new IOException("File " + fileName + " already exists on the FileSystem");
}
FSDataOutputStream stream = fs.create(new Path(fileName));
PrintWriter pw = new PrintWriter(new OutputStreamWriter(stream, "UTF-8"));
for (int i=0; i<inputData.length; i++){
pw.print(inputData[i]);
pw.print("\n");
}
pw.close();
}
void writeFile(Path file, FSDataOutputStream stm, int size)
throws IOException {
long blocksBefore = stm.getPos() / BLOCK_SIZE;
TestFileCreation.writeFile(stm, BLOCK_SIZE);
// need to make sure the full block is completely flushed to the DataNodes
// (see FSOutputSummer#flush)
stm.flush();
int blocksAfter = 0;
// wait until the block is allocated by DataStreamer
BlockLocation[] locatedBlocks;
while(blocksAfter <= blocksBefore) {
locatedBlocks = DFSClientAdapter.getDFSClient(hdfs).getBlockLocations(
file.toString(), 0L, BLOCK_SIZE*NUM_BLOCKS);
blocksAfter = locatedBlocks == null ? 0 : locatedBlocks.length;
}
}
private byte[] getEvents() throws Exception {
ByteArrayOutputStream output = new ByteArrayOutputStream();
FSDataOutputStream fsOutput = new FSDataOutputStream(output,
new FileSystem.Statistics("scheme"));
EventWriter writer = new EventWriter(fsOutput);
writer.write(getJobPriorityChangedEvent());
writer.write(getJobStatusChangedEvent());
writer.write(getTaskUpdatedEvent());
writer.write(getReduceAttemptKilledEvent());
writer.write(getJobKilledEvent());
writer.write(getSetupAttemptStartedEvent());
writer.write(getTaskAttemptFinishedEvent());
writer.write(getSetupAttemptFieledEvent());
writer.write(getSetupAttemptKilledEvent());
writer.write(getCleanupAttemptStartedEvent());
writer.write(getCleanupAttemptFinishedEvent());
writer.write(getCleanupAttemptFiledEvent());
writer.write(getCleanupAttemptKilledEvent());
writer.flush();
writer.close();
return output.toByteArray();
}
@Test(timeout = SWIFT_TEST_TIMEOUT)
public void testRenameFile() throws Exception {
assumeRenameSupported();
final Path old = new Path("/test/alice/file");
final Path newPath = new Path("/test/bob/file");
fs.mkdirs(newPath.getParent());
final FSDataOutputStream fsDataOutputStream = fs.create(old);
final byte[] message = "Some data".getBytes();
fsDataOutputStream.write(message);
fsDataOutputStream.close();
assertTrue(fs.exists(old));
rename(old, newPath, true, false, true);
final FSDataInputStream bobStream = fs.open(newPath);
final byte[] bytes = new byte[512];
final int read = bobStream.read(bytes);
bobStream.close();
final byte[] buffer = new byte[read];
System.arraycopy(bytes, 0, buffer, 0, read);
assertEquals(new String(message), new String(buffer));
}
@Test
public void testOverwrite() throws IOException {
Path file = new Path("/" + name.getMethodName());
FSDataOutputStream out1 = FS.create(file);
FSDataOutputStream out2 = FS.create(file, true);
out1.write(2);
out2.write(1);
try {
out1.close();
// a successful close is also OK for us so no assertion here, we just need to confirm that the
// data in the file are correct.
} catch (FileNotFoundException fnfe) {
// hadoop3 throws one of these.
} catch (RemoteException e) {
// expected
assertThat(e.unwrapRemoteException(), instanceOf(LeaseExpiredException.class));
}
out2.close();
try (FSDataInputStream in = FS.open(file)) {
assertEquals(1, in.read());
assertEquals(-1, in.read());
}
}
private void exportHBaseConfiguration(String hbaseTableName) throws IOException {
Configuration hbaseConf = HBaseConnection.getCurrentHBaseConfiguration();
HadoopUtil.healSickConfig(hbaseConf);
Job job = Job.getInstance(hbaseConf, hbaseTableName);
HTable table = new HTable(hbaseConf, hbaseTableName);
HFileOutputFormat3.configureIncrementalLoadMap(job, table);
logger.info("Saving HBase configuration to {}", hbaseConfPath);
FileSystem fs = HadoopUtil.getWorkingFileSystem();
FSDataOutputStream out = null;
try {
out = fs.create(new Path(hbaseConfPath));
job.getConfiguration().writeXml(out);
} finally {
IOUtils.closeQuietly(out);
}
}
@Override
public FSDataOutputStream create(Path path, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress)
throws IOException
{
// Ignore the overwrite flag, since Presto always writes to unique file names.
// Checking for file existence can break read-after-write consistency.
if (!stagingDirectory.exists()) {
createDirectories(stagingDirectory.toPath());
}
if (!stagingDirectory.isDirectory()) {
throw new IOException("Configured staging path is not a directory: " + stagingDirectory);
}
File tempFile = createTempFile(stagingDirectory.toPath(), "presto-s3-", ".tmp").toFile();
String key = keyFromPath(qualifiedPath(path));
return new FSDataOutputStream(
new PrestoS3OutputStream(s3, getBucketName(uri), key, tempFile, sseEnabled, sseType, sseKmsKeyId, multiPartUploadMinFileSize, multiPartUploadMinPartSize, s3AclType, requesterPaysEnabled, s3StorageClass),
statistics);
}
/** @throws Exception If failed. */
@Test
public void testSetPermissionCheckNonRecursiveness() throws Exception {
Path fsHome = new Path(primaryFsUri);
Path file = new Path(fsHome, "/tmp/my");
FSDataOutputStream os = fs.create(file, EnumSet.noneOf(CreateFlag.class),
Options.CreateOpts.perms(FsPermission.getDefault()));
os.close();
Path tmpDir = new Path(fsHome, "/tmp");
FsPermission perm = new FsPermission((short)123);
fs.setPermission(tmpDir, perm);
assertEquals(perm, fs.getFileStatus(tmpDir).getPermission());
assertEquals(FsPermission.getDefault(), fs.getFileStatus(file).getPermission());
}
public static void createCleanFiles(HoodieTableMetaClient metaClient, String basePath,
String instantTime, Configuration configuration)
throws IOException {
createPendingCleanFiles(metaClient, instantTime);
Path commitFile = new Path(
basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeCleanerFileName(instantTime));
FileSystem fs = FSUtils.getFs(basePath, configuration);
try (FSDataOutputStream os = fs.create(commitFile, true)) {
HoodieCleanStat cleanStats = new HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS,
DEFAULT_PARTITION_PATHS[rand.nextInt(DEFAULT_PARTITION_PATHS.length)], new ArrayList<>(), new ArrayList<>(),
new ArrayList<>(), instantTime);
// Create the clean metadata
HoodieCleanMetadata cleanMetadata =
CleanerUtils.convertCleanMetadata(instantTime, Option.of(0L), Collections.singletonList(cleanStats));
// Write empty clean metadata
os.write(TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata).get());
}
}
private static byte[] createFile(FileSystem fs, Path name, long length,
short replication, long blocksize) throws IOException {
final FSDataOutputStream out = fs.create(name, false, 4096,
replication, blocksize);
try {
for(long n = length; n > 0; ) {
ran.nextBytes(buffer);
final int w = n < buffer.length? (int)n: buffer.length;
out.write(buffer, 0, w);
md5.update(buffer, 0, w);
n -= w;
}
} finally {
IOUtils.closeStream(out);
}
return md5.digest();
}
/** @throws Exception If failed. */
@Test
public void testSetOwnerCheckNonRecursiveness() throws Exception {
Path fsHome = new Path(primaryFsUri);
Path file = new Path(fsHome, "/tmp/my");
FSDataOutputStream os = fs.create(file, EnumSet.noneOf(CreateFlag.class),
Options.CreateOpts.perms(FsPermission.getDefault()));
os.close();
Path tmpDir = new Path(fsHome, "/tmp");
fs.setOwner(file, "fUser", "fGroup");
fs.setOwner(tmpDir, "dUser", "dGroup");
assertEquals("dUser", fs.getFileStatus(tmpDir).getOwner());
assertEquals("dGroup", fs.getFileStatus(tmpDir).getGroup());
assertEquals("fUser", fs.getFileStatus(file).getOwner());
assertEquals("fGroup", fs.getFileStatus(file).getGroup());
}
private long writeAndVerify(int shift) throws IOException {
FSDataOutputStream out = fs.create(path);
for (int i = Short.MIN_VALUE; i <= Short.MAX_VALUE; ++i) {
Utils.writeVLong(out, ((long) i) << shift);
}
out.close();
FSDataInputStream in = fs.open(path);
for (int i = Short.MIN_VALUE; i <= Short.MAX_VALUE; ++i) {
long n = Utils.readVLong(in);
Assert.assertEquals(n, ((long) i) << shift);
}
in.close();
long ret = fs.getFileStatus(path).getLen();
fs.delete(path, false);
return ret;
}
private FSDataOutputStream writeFile(FileSystem fileSys, Path name, int repl)
throws IOException {
FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf()
.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
(short) repl, blockSize);
byte[] buffer = new byte[fileSize];
Random rand = new Random(seed);
rand.nextBytes(buffer);
stm.write(buffer);
return stm;
}
private void testDeleteAddBlockRace(boolean hasSnapshot) throws Exception {
try {
conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
SlowBlockPlacementPolicy.class, BlockPlacementPolicy.class);
cluster = new MiniDFSCluster.Builder(conf).build();
FileSystem fs = cluster.getFileSystem();
final String fileName = "/testDeleteAddBlockRace";
Path filePath = new Path(fileName);
FSDataOutputStream out = null;
out = fs.create(filePath);
if (hasSnapshot) {
SnapshotTestHelper.createSnapshot((DistributedFileSystem) fs, new Path(
"/"), "s1");
}
Thread deleteThread = new DeleteThread(fs, filePath);
deleteThread.start();
try {
// write data and syn to make sure a block is allocated.
out.write(new byte[32], 0, 32);
out.hsync();
Assert.fail("Should have failed.");
} catch (FileNotFoundException e) {
GenericTestUtils.assertExceptionContains(filePath.getName(), e);
}
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
static void writeFile(FileSystem fileSys, Path name, int repl)
throws IOException {
FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf()
.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
(short) repl, blockSize);
byte[] buffer = new byte[TestCheckpoint.fileSize];
Random rand = new Random(TestCheckpoint.seed);
rand.nextBytes(buffer);
stm.write(buffer);
stm.close();
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
for (String dimension : dimensionNames) {
LOGGER.info("{} records passed metric threshold for dimension {}", thresholdPassCount.get(dimension), dimension);
// Get top k
TopKDimensionToMetricsSpec topkSpec = topKDimensionToMetricsSpecMap.get(dimension);
if (topkSpec != null && topkSpec.getDimensionName() != null && topkSpec.getTopk() != null) {
// Get top k for each metric specified
Map<String, Integer> topkMetricsMap = topkSpec.getTopk();
for (Entry<String, Integer> topKEntry : topkMetricsMap.entrySet()) {
String metric = topKEntry.getKey();
int k = topKEntry.getValue();
MinMaxPriorityQueue<DimensionValueMetricPair> topKQueue = MinMaxPriorityQueue.maximumSize(k).create();
Map<Object, Number[]> dimensionToMetricsMap = dimensionNameToValuesMap.get(dimension);
for (Entry<Object, Number[]> entry : dimensionToMetricsMap.entrySet()) {
topKQueue.add(new DimensionValueMetricPair(entry.getKey(), entry.getValue()[metricToIndexMapping.get(metric)]));
}
LOGGER.info("Picking Top {} values for {} based on Metric {} : {}", k, dimension, metric, topKQueue);
for (DimensionValueMetricPair pair : topKQueue) {
topkDimensionValues.addValue(dimension, String.valueOf(pair.getDimensionValue()));
}
}
}
}
if (topkDimensionValues.getTopKDimensions().size() > 0) {
String topkValuesPath = configuration.get(TOPK_PHASE_OUTPUT_PATH.toString());
LOGGER.info("Writing top k values to {}",topkValuesPath);
FSDataOutputStream topKDimensionValuesOutputStream = fileSystem.create(
new Path(topkValuesPath + File.separator + ThirdEyeConstants.TOPK_VALUES_FILE));
OBJECT_MAPPER.writeValue((DataOutput) topKDimensionValuesOutputStream, topkDimensionValues);
topKDimensionValuesOutputStream.close();
}
}
@Test
public void testHDFSCSVSchemaToFile() throws Exception {
String csvSample = "hdfs:/tmp/sample/users.csv";
FSDataOutputStream out = getDFS()
.create(new Path(csvSample), true /* overwrite */);
OutputStreamWriter writer = new OutputStreamWriter(out, "utf8");
writer.append("id, username, email\n");
writer.append("1, test, [email protected]\n");
writer.close();
Schema schema = SchemaBuilder.record("User").fields()
.optionalLong("id")
.optionalString("username")
.optionalString("email")
.endRecord();
CSVSchemaCommand command = new CSVSchemaCommand(console);
command.setConf(getConfiguration());
command.samplePaths = Lists.newArrayList(csvSample);
command.outputPath = "target/csv2.avsc";
command.recordName = "User";
int rc = command.run();
Assert.assertEquals("Should return success code", 0, rc);
String fileContent = Files.toString(
new File("target/csv2.avsc"), BaseCommand.UTF8);
Assert.assertTrue("File should contain pretty printed schema",
TestUtil.matchesSchema(schema).matches(fileContent));
verifyNoMoreInteractions(console);
}
/** Create a file with the name <code>file</code> and
* a length of <code>fileSize</code>. The file is filled with character 'a'.
*/
private void genFile(Path file, long fileSize) throws IOException {
FSDataOutputStream out = fc.create(file,
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
CreateOpts.createParent(), CreateOpts.bufferSize(4096),
CreateOpts.repFac((short) 3));
for(long i=0; i<fileSize; i++) {
out.writeByte('a');
}
out.close();
}