下面列出了org.junit.contrib.java.lang.system.Assertion#org.apache.hadoop.fs.FileSystem 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public void testDeletionOfCheckSum() throws Exception {
Configuration conf = new Configuration();
URI uri = URI.create("ramfs://mapoutput" + "_tmp");
InMemoryFileSystem inMemFs = (InMemoryFileSystem)FileSystem.get(uri, conf);
Path testPath = new Path("/file_1");
inMemFs.reserveSpaceWithCheckSum(testPath, 1024);
FSDataOutputStream fout = inMemFs.create(testPath);
fout.write("testing".getBytes());
fout.close();
assertTrue("checksum exists", inMemFs.exists(inMemFs.getChecksumFile(testPath)));
inMemFs.delete(testPath, true);
assertTrue("checksum deleted", !inMemFs.exists(inMemFs.getChecksumFile(testPath)));
// check for directories getting deleted.
testPath = new Path("/tesdir/file_1");
inMemFs.reserveSpaceWithCheckSum(testPath, 1024);
fout = inMemFs.create(testPath);
fout.write("testing".getBytes());
fout.close();
testPath = new Path("/testdir/file_2");
inMemFs.reserveSpaceWithCheckSum(testPath, 1024);
fout = inMemFs.create(testPath);
fout.write("testing".getBytes());
fout.close();
inMemFs.delete(testPath, true);
assertTrue("nothing in the namespace", inMemFs.listStatus(new Path("/")).length == 0);
}
@Override
public RecordWriter<NullWritable, List<DocumentInfo>> getRecordWriter(TaskAttemptContext job)
throws IOException
{
//get the current path
Configuration conf = job.getConfiguration();
String extension = ".txt";
//create the full path with the output directory plus our filename
Path file = getDefaultWorkFile(job, extension);
//create the file in the file system
FileSystem fs = file.getFileSystem(conf);
FSDataOutputStream fileOut = fs.create(file, false);
//create our record writer with the new file
return new DocumentInfoRecordWriter(fileOut);
}
private String export(DataSet dataSet, int partitionIdx, int outputCount) throws Exception {
String filename = "dataset_" + partitionIdx + jvmuid + "_" + outputCount + ".bin";
URI uri = new URI(exportBaseDirectory
+ (exportBaseDirectory.endsWith("/") || exportBaseDirectory.endsWith("\\") ? "" : "/")
+ filename);
Configuration c = conf == null ? DefaultHadoopConfig.get() : conf.getValue().getConfiguration();
FileSystem file = FileSystem.get(uri, c);
try (FSDataOutputStream out = file.create(new Path(uri))) {
dataSet.save(out);
}
return uri.toString();
}
/**
* Computes the correct {@link OwnerAndPermission} obtained from replicating source owner and permissions and applying
* the {@link PreserveAttributes} rules in copyConfiguration.
* @throws IOException
*/
public static OwnerAndPermission resolveReplicatedOwnerAndPermission(FileSystem fs, Path path,
CopyConfiguration copyConfiguration) throws IOException {
PreserveAttributes preserve = copyConfiguration.getPreserve();
Optional<FileStatus> originFileStatus = copyConfiguration.getCopyContext().getFileStatus(fs, path);
if (!originFileStatus.isPresent()) {
throw new IOException(String.format("Origin path %s does not exist.", path));
}
String group = null;
if (copyConfiguration.getTargetGroup().isPresent()) {
group = copyConfiguration.getTargetGroup().get();
} else if (preserve.preserve(Option.GROUP)) {
group = originFileStatus.get().getGroup();
}
return new OwnerAndPermission(preserve.preserve(Option.OWNER) ? originFileStatus.get().getOwner() : null, group,
preserve.preserve(Option.PERMISSION) ? originFileStatus.get().getPermission() : null);
}
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
this.split = (SSTableSplit) inputSplit;
final FileSystem fileSystem = FileSystem.get(this.split.getPath().toUri(), context.getConfiguration());
final CompressionMetadata compressionMetadata =
CompressionMetadata.create(split.getPath().toString(), fileSystem);
if (compressionMetadata == null) {
throw new IOException("Compression metadata for file " + split.getPath() + " not found, cannot run");
}
// open the file and seek to the start of the split
this.reader = CompressedRandomAccessReader.open(split.getPath(), compressionMetadata, false, fileSystem);
this.reader.seek(split.getStart());
this.cfMetaData = initializeCfMetaData(context);
}
public StreamXmlRecordReader(FSDataInputStream in, FileSplit split, Reporter reporter,
JobConf job, FileSystem fs) throws IOException {
super(in, split, reporter, job, fs);
beginMark_ = checkJobGet(CONF_NS + "begin");
endMark_ = checkJobGet(CONF_NS + "end");
maxRecSize_ = job_.getInt(CONF_NS + "maxrec", 50 * 1000);
lookAhead_ = job_.getInt(CONF_NS + "lookahead", 2 * maxRecSize_);
synched_ = false;
slowMatch_ = job_.getBoolean(CONF_NS + "slowmatch", false);
if (slowMatch_) {
beginPat_ = makePatternCDataOrMark(beginMark_);
endPat_ = makePatternCDataOrMark(endMark_);
}
init();
}
@Override
public long computeBulkLoadSize(FileSystem fs, List<String> paths) throws SpaceLimitingException {
// Compute the amount of space that could be used to save some arithmetic in the for-loop
final long sizeAvailableForBulkLoads = quotaSnapshot.getLimit() - quotaSnapshot.getUsage();
long size = 0L;
for (String path : paths) {
try {
size += getFileSize(fs, path);
} catch (IOException e) {
throw new SpaceLimitingException(
getPolicyName(), "Colud not verify length of file to bulk load: " + path, e);
}
if (size > sizeAvailableForBulkLoads) {
throw new SpaceLimitingException(getPolicyName(), "Bulk load of " + paths
+ " is disallowed because the file(s) exceed the limits of a space quota.");
}
}
return size;
}
static void createInputOutPutFolder(Path inDir, Path outDir, int numMaps)
throws Exception {
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outDir)) {
fs.delete(outDir, true);
}
if (!fs.exists(inDir)) {
fs.mkdirs(inDir);
}
String input = "The quick brown fox\n" + "has many silly\n"
+ "red fox sox\n";
for (int i = 0; i < numMaps; ++i) {
DataOutputStream file = fs.create(new Path(inDir, "part-" + i));
file.writeBytes(input);
file.close();
}
}
private void runTest(String testName, TableDescriptor htd, boolean preCreateTable,
byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap, boolean copyFiles, int depth)
throws Exception {
loadHFiles(testName, htd, util, FAMILY, QUALIFIER, preCreateTable, tableSplitKeys, hfileRanges,
useMap, true, copyFiles, 0, 1000, depth);
final TableName tableName = htd.getTableName();
// verify staging folder has been cleaned up
Path stagingBasePath = new Path(CommonFSUtils.getRootDir(util.getConfiguration()),
HConstants.BULKLOAD_STAGING_DIR_NAME);
FileSystem fs = util.getTestFileSystem();
if (fs.exists(stagingBasePath)) {
FileStatus[] files = fs.listStatus(stagingBasePath);
for (FileStatus file : files) {
assertTrue("Folder=" + file.getPath() + " is not cleaned up.",
file.getPath().getName() != "DONOTERASE");
}
}
util.deleteTable(tableName);
}
private static FileSystem getTrashFileSystem(Configuration conf) throws IOException {
conf = new Configuration(conf);
conf.set("fs.shell.delete.classname",
"org.apache.hadoop.fs.TrashPolicyDefault.deleteCheckpoint");
InetSocketAddress serviceAddress = NameNode.getDNProtocolAddress(conf);
if (serviceAddress != null) {
URI defaultUri = FileSystem.getDefaultUri(conf);
URI serviceUri = null;
try {
serviceUri = new URI(defaultUri.getScheme(), defaultUri.getUserInfo(),
serviceAddress.getHostName(), serviceAddress.getPort(),
defaultUri.getPath(), defaultUri.getQuery(),
defaultUri.getFragment());
} catch (URISyntaxException uex) {
throw new IOException("Failed to initialize a uri for trash FS");
}
Path trashFsPath = new Path(serviceUri.toString());
return trashFsPath.getFileSystem(conf);
} else {
return FileSystem.get(conf);
}
}
@Test
public void test2() throws IOException, InterruptedException {
FileSystem fileSystem = _miniCluster.getFileSystem();
String rootStr = fileSystem.getUri().toString();
Path root = new Path(rootStr + "/");
fileSystem.mkdirs(new Path(root, "/test/table/shard-00000000"));
String shardServer = "host4.foo.com";
Path p = writeFileNotOnShardServer(fileSystem, "/testfile", shardServer);
Path dst = new Path(root, "/test/table/shard-00000000/test2");
fileSystem.rename(p, dst);
p = dst;
setReplication(fileSystem, p, 2);
assertBlocksExistOnShardServer(fileSystem, p, shardServer);
setReplication(fileSystem, p, 4);
assertBlocksExistOnShardServer(fileSystem, p, shardServer);
setReplication(fileSystem, p, 5);
assertBlocksExistOnShardServer(fileSystem, p, shardServer);
setReplication(fileSystem, p, 1);
assertBlocksExistOnShardServer(fileSystem, p, shardServer);
}
private static void tryReadMetaDataFileDataCharacteristics( DataOp dop )
{
try
{
//get meta data filename
String mtdname = DataExpression.getMTDFileName(dop.getFileName());
Path path = new Path(mtdname);
try( FileSystem fs = IOUtilFunctions.getFileSystem(mtdname) ) {
if( fs.exists(path) ){
try(BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(path)))) {
JSONObject mtd = JSONHelper.parse(br);
DataType dt = DataType.valueOf(String.valueOf(mtd.get(DataExpression.DATATYPEPARAM)).toUpperCase());
dop.setDataType(dt);
if(dt != DataType.FRAME)
dop.setValueType(ValueType.valueOf(String.valueOf(mtd.get(DataExpression.VALUETYPEPARAM)).toUpperCase()));
dop.setDim1((dt==DataType.MATRIX||dt==DataType.FRAME)?Long.parseLong(mtd.get(DataExpression.READROWPARAM).toString()):0);
dop.setDim2((dt==DataType.MATRIX||dt==DataType.FRAME)?Long.parseLong(mtd.get(DataExpression.READCOLPARAM).toString()):0);
}
}
}
}
catch(Exception ex) {
throw new DMLRuntimeException(ex);
}
}
@BeforeClass
public static void setup() throws Exception {
localFs = FileSystem.getLocal(clusterConf).getRaw();
long jvmMax = Runtime.getRuntime().maxMemory();
tezTestServiceCluster = MiniTezTestServiceCluster
.create(TestExternalTezServices.class.getSimpleName(), 3, ((long) (jvmMax * 0.5d)), 1);
tezTestServiceCluster.init(clusterConf);
tezTestServiceCluster.start();
LOG.info("MiniTezTestServer started");
confForJobs = new Configuration(clusterConf);
for (Map.Entry<String, String> entry : tezTestServiceCluster
.getClusterSpecificConfiguration()) {
confForJobs.set(entry.getKey(), entry.getValue());
}
confForJobs.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
confForJobs.set(TezConfiguration.TEZ_AM_STAGING_DIR, STAGING_DIR.toString());
}
@Test
public void testDFSCommand() throws IOException {
String namenode = null;
MiniDFSCluster cluster = null;
try {
Configuration conf = new HdfsConfiguration();
cluster = new MiniDFSCluster.Builder(conf).build();
namenode = FileSystem.getDefaultUri(conf).toString();
String [] args = new String[4];
args[2] = "-mkdir";
args[3] = "/data";
testFsOption(args, namenode);
testConfOption(args, namenode);
testPropertyOption(args, namenode);
} finally {
if (cluster != null) { cluster.shutdown(); }
}
}
public static String readOutput(Path outDir, Configuration conf)
throws IOException {
FileSystem fs = outDir.getFileSystem(conf);
StringBuffer result = new StringBuffer();
Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
new Utils.OutputFileUtils.OutputFilesFilter()));
for (Path outputFile : fileList) {
LOG.info("Path" + ": "+ outputFile);
BufferedReader file =
new BufferedReader(new InputStreamReader(fs.open(outputFile)));
String line = file.readLine();
while (line != null) {
result.append(line);
result.append("\n");
line = file.readLine();
}
file.close();
}
return result.toString();
}
private void readFile(FileSystem fs,
Path f,
String name,
Configuration conf
) throws IOException {
System.out.print("Reading " + name);
resetMeasurements();
InputStream in = fs.open(f);
byte[] data = new byte[BUFFER_SIZE];
long val = 0;
while (val >= 0) {
val = in.read(data);
}
in.close();
printMeasurements();
}
@Override
public void testRenameDirIntoExistingDir() throws Throwable {
describe("Verify renaming a dir into an existing dir puts the files"
+" from the source dir into the existing dir"
+" and leaves existing files alone");
FileSystem fs = getFileSystem();
String sourceSubdir = "source";
Path srcDir = path(sourceSubdir);
Path srcFilePath = new Path(srcDir, "source-256.txt");
byte[] srcDataset = dataset(256, 'a', 'z');
writeDataset(fs, srcFilePath, srcDataset, srcDataset.length, 1024, false);
Path destDir = path("dest");
Path destFilePath = new Path(destDir, "dest-512.txt");
byte[] destDateset = dataset(512, 'A', 'Z');
writeDataset(fs, destFilePath, destDateset, destDateset.length, 1024,
false);
assertIsFile(destFilePath);
boolean rename = fs.rename(srcDir, destDir);
assertFalse("s3a doesn't support rename to non-empty directory", rename);
}
public boolean createFolder(String pathFolder) {
boolean result = false;
try {
fs = FileSystem.get(conf);
Path path = new Path(pathFolder);
fs.mkdirs(path, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
result = true;
fs.close();
} catch (Exception e) {
e.printStackTrace();
result = false;
}
return result;
}
protected void deleteBulkLoadDirectory() throws IOException {
// delete original bulk load directory on method exit
Path path = getBulkOutputDir();
FileSystem fs = FileSystem.get(path.toUri(), conf);
boolean result = fs.delete(path, true);
if (!result) {
LOG.warn("Could not delete " + path);
}
}
private Path createReferences(final MasterServices services, final TableDescriptor td,
final RegionInfo parent, final RegionInfo daughter, final byte[] midkey, final boolean top)
throws IOException {
Path rootdir = services.getMasterFileSystem().getRootDir();
Path tabledir = CommonFSUtils.getTableDir(rootdir, parent.getTable());
Path storedir = HStore.getStoreHomedir(tabledir, daughter, td.getColumnFamilies()[0].getName());
Reference ref =
top ? Reference.createTopReference(midkey) : Reference.createBottomReference(midkey);
long now = System.currentTimeMillis();
// Reference name has this format: StoreFile#REF_NAME_PARSER
Path p = new Path(storedir, Long.toString(now) + "." + parent.getEncodedName());
FileSystem fs = services.getMasterFileSystem().getFileSystem();
ref.write(fs, p);
return p;
}
protected boolean isSplitable(FileSystem fs, Path file) {
final CompressionCodec codec =
new CompressionCodecFactory(fs.getConf()).getCodec(file);
if (null == codec) {
return true;
}
return codec instanceof SplittableCompressionCodec;
}
public static void main(String[] args) throws IOException {
final Configuration conf = new Configuration();
final FileSystem lfs = FileSystem.getLocal(conf);
for (String arg : args) {
Path filePath = new Path(arg).makeQualified(lfs);
String fileName = filePath.getName();
if (fileName.startsWith("input")) {
LoggedDiscreteCDF newResult = histogramFileToCDF(filePath, lfs);
String testName = fileName.substring("input".length());
Path goldFilePath = new Path(filePath.getParent(), "gold"+testName);
ObjectMapper mapper = new ObjectMapper();
JsonFactory factory = mapper.getJsonFactory();
FSDataOutputStream ostream = lfs.create(goldFilePath, true);
JsonGenerator gen = factory.createJsonGenerator(ostream,
JsonEncoding.UTF8);
gen.useDefaultPrettyPrinter();
gen.writeObject(newResult);
gen.close();
} else {
System.err.println("Input file not started with \"input\". File "+fileName+" skipped.");
}
}
}
@Override
public Path getPath(FileSystem fs, Date recordDate, Record record) throws StageException, IOException {
// runUuid is fixed for the current pipeline run. it avoids collisions with other SDCs running the same/similar
// pipeline
try {
return dirPathCache.get(recordWriterManager.getDirPath(recordDate, record));
} catch (ExecutionException ex) {
if (ex.getCause() instanceof StageException) {
throw (StageException) ex.getCause();
} else{
throw new StageException(Errors.HADOOPFS_24, ex.toString(), ex);
}
}
}
@Test
public void testNewApis() throws Exception {
Random r = new Random(System.currentTimeMillis());
Path tmpBaseDir = new Path("/tmp/wc-" + r.nextInt());
final Path inDir = new Path(tmpBaseDir, "input");
final Path outDir = new Path(tmpBaseDir, "output");
String input = "The quick brown fox\nhas many silly\nred fox sox\n";
FileSystem inFs = inDir.getFileSystem(conf);
FileSystem outFs = outDir.getFileSystem(conf);
outFs.delete(outDir, true);
if (!inFs.mkdirs(inDir)) {
throw new IOException("Mkdirs failed to create " + inDir.toString());
}
{
DataOutputStream file = inFs.create(new Path(inDir, "part-0"));
file.writeBytes(input);
file.close();
}
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(TestLocalModeWithNewApis.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, inDir);
FileOutputFormat.setOutputPath(job, outDir);
assertEquals(job.waitForCompletion(true), true);
String output = readOutput(outDir, conf);
assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" +
"quick\t1\nred\t1\nsilly\t1\nsox\t1\n", output);
outFs.delete(tmpBaseDir, true);
}
private Path copyFileToRemote(
Path destDir,
Path srcPath,
Short replication) throws IOException {
FileSystem destFs = destDir.getFileSystem(hadoopConf);
FileSystem srcFs = srcPath.getFileSystem(hadoopConf);
Path destPath = new Path(destDir, srcPath.getName());
LOGGER.info("Uploading resource " + srcPath + " to " + destPath);
FileUtil.copy(srcFs, srcPath, destFs, destPath, false, hadoopConf);
destFs.setReplication(destPath, replication);
destFs.setPermission(destPath, APP_FILE_PERMISSION);
return destPath;
}
/**
* Get a list of all paths where output from committed tasks are stored.
* @param context the context of the current job
* @return the list of these Paths/FileStatuses.
* @throws IOException
*/
private FileStatus[] getAllCommittedTaskPaths(JobContext context)
throws IOException {
Path jobAttemptPath = getJobAttemptPath(context);
FileSystem fs = jobAttemptPath.getFileSystem(context.getConfiguration());
return fs.listStatus(jobAttemptPath, new CommittedTaskFilter());
}
@Override
public long getDefaultBlockSize(Path f) {
try {
InodeTree.ResolveResult<FileSystem> res =
fsState.resolve(getUriPath(f), true);
return res.targetFileSystem.getDefaultBlockSize(res.remainingPath);
} catch (FileNotFoundException e) {
throw new NotInMountpointException(f, "getDefaultBlockSize");
}
}
@Test
public void testInitForHadoopFileSystem() throws Exception {
final Path path = new Path("hdfs://localhost:51234/some/path/");
FileSystem fs = BucketingSink.createHadoopFileSystem(path, null);
assertEquals("hdfs", fs.getUri().getScheme());
}
@Test(enabled = false)
public void testCreate9() throws IOException, URISyntaxException {
HDFSRoot hdfsRoot = new HDFSRoot("/tmp/create");
MetricsFileSystemInstrumentation
fs = (MetricsFileSystemInstrumentation) FileSystem.get(new URI(instrumentedURI), new Configuration());
Path newFile = new Path("/tmp/create/newFile");
FSDataOutputStream fstream = fs.create(newFile, (short)2, null);
Assert.assertEquals(fs.createTimer.getCount(), 1);
fstream.close();
hdfsRoot.cleanupRoot();
}
private static void getHdfsToken(Configuration conf, Credentials cred) throws IOException {
FileSystem fs = FileSystem.get(conf);
LOG.info("Getting DFS token from " + fs.getUri());
Token<?> fsToken = fs.getDelegationToken(getMRTokenRenewerInternal(new JobConf()).toString());
if (fsToken == null) {
LOG.error("Failed to fetch DFS token for ");
throw new IOException("Failed to fetch DFS token.");
}
LOG.info("Created DFS token: " + fsToken.toString());
LOG.info("Token kind: " + fsToken.getKind());
LOG.info("Token id: " + Arrays.toString(fsToken.getIdentifier()));
LOG.info("Token service: " + fsToken.getService());
cred.addToken(fsToken.getService(), fsToken);
}