下面列出了org.apache.hadoop.fs.Path#getPathWithoutSchemeAndAuthority ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private FileStatus fixFileStatus(String endpoint, FileStatus status) throws IOException {
final Path remotePath = Path.getPathWithoutSchemeAndAuthority(status.getPath());
if (status.isDirectory()) {
return new PDFSFileStatus(makeQualified(remotePath), status);
}
String basename = remotePath.getName();
boolean hidden = isHidden(basename);
StringBuilder sb = new StringBuilder();
if (hidden) {
sb.append(basename.charAt(0));
}
sb.append(endpoint).append('@');
sb.append(hidden ? basename.substring(1) : basename);
return new PDFSFileStatus(makeQualified(new Path(remotePath.getParent(), sb.toString())), status);
}
public static String makeQualifiedPathInHBaseCluster(String inPath) {
Path path = new Path(inPath);
path = Path.getPathWithoutSchemeAndAuthority(path);
FileSystem fs = HadoopUtil.getFileSystem(path, getCurrentHBaseConfiguration()); // Must be HBase's FS, not working FS
return fs.makeQualified(path).toString();
}
/**
* Constructs relative path from child full path and base path. Or return child path if the last one is already relative
*
* @param childPath full absolute path
* @param baseDir base path (the part of the Path, which should be cut off from child path)
* @return relative path
*/
public static Path relativize(Path baseDir, Path childPath) {
Path fullPathWithoutSchemeAndAuthority = Path.getPathWithoutSchemeAndAuthority(childPath);
Path basePathWithoutSchemeAndAuthority = Path.getPathWithoutSchemeAndAuthority(baseDir);
// Since hadoop Path hasn't relativize() we use uri.relativize() to get relative path
Path relativeFilePath = new Path(basePathWithoutSchemeAndAuthority.toUri()
.relativize(fullPathWithoutSchemeAndAuthority.toUri()));
if (relativeFilePath.isAbsolute()) {
throw new IllegalStateException(String.format("Path %s is not a subpath of %s.",
basePathWithoutSchemeAndAuthority.toUri().getPath(), fullPathWithoutSchemeAndAuthority.toUri().getPath()));
}
return relativeFilePath;
}
private void dropHdfsPathOnCluster(List<String> oldHdfsPaths, FileSystem fileSystem) throws IOException {
if (oldHdfsPaths != null && oldHdfsPaths.size() > 0) {
logger.debug("Drop HDFS path on FileSystem: " + fileSystem.getUri());
output.append("Drop HDFS path on FileSystem: \"" + fileSystem.getUri() + "\" \n");
for (String path : oldHdfsPaths) {
if (path.endsWith("*"))
path = path.substring(0, path.length() - 1);
Path oldPath = Path.getPathWithoutSchemeAndAuthority(new Path(path));
if (fileSystem.exists(oldPath)) {
fileSystem.delete(oldPath, true);
logger.debug("HDFS path " + oldPath + " is dropped.");
output.append("HDFS path " + oldPath + " is dropped.\n");
} else {
logger.debug("HDFS path " + oldPath + " not exists.");
output.append("HDFS path " + oldPath + " not exists.\n");
}
// If hbase was deployed on another cluster, the job dir is empty and should be dropped,
// because of rowkey_stats and hfile dirs are both dropped.
if (fileSystem.listStatus(oldPath.getParent()).length == 0) {
Path emptyJobPath = new Path(JobBuilderSupport.getJobWorkingDir(config, getJobId()));
emptyJobPath = Path.getPathWithoutSchemeAndAuthority(emptyJobPath);
if (fileSystem.exists(emptyJobPath)) {
fileSystem.delete(emptyJobPath, true);
logger.debug("HDFS path " + emptyJobPath + " is empty and dropped.");
output.append("HDFS path " + emptyJobPath + " is empty and dropped.\n");
}
}
}
}
}
public static String makeQualifiedPathInHBaseCluster(String inPath) {
Path path = new Path(inPath);
path = Path.getPathWithoutSchemeAndAuthority(path);
FileSystem fs = HadoopUtil.getFileSystem(path, getCurrentHBaseConfiguration()); // Must be HBase's FS, not working FS
return fs.makeQualified(path).toString();
}
public static FileSystem getFileSystemInHBaseCluster(String inPath) {
Path path = new Path(inPath);
path = Path.getPathWithoutSchemeAndAuthority(path);
FileSystem fs = HadoopUtil.getFileSystem(path, getCurrentHBaseConfiguration()); // Must be HBase's FS, not working FS
return fs;
}
private void dropHdfsPathOnCluster(List<String> oldHdfsPaths, FileSystem fileSystem) throws IOException {
if (oldHdfsPaths != null && oldHdfsPaths.size() > 0) {
logger.debug("Drop HDFS path on FileSystem: " + fileSystem.getUri());
output.append("Drop HDFS path on FileSystem: \"" + fileSystem.getUri() + "\" \n");
for (String path : oldHdfsPaths) {
if (path.endsWith("*"))
path = path.substring(0, path.length() - 1);
Path oldPath = Path.getPathWithoutSchemeAndAuthority(new Path(path));
if (fileSystem.exists(oldPath)) {
fileSystem.delete(oldPath, true);
logger.debug("HDFS path " + oldPath + " is dropped.");
output.append("HDFS path " + oldPath + " is dropped.\n");
} else {
logger.debug("HDFS path " + oldPath + " not exists.");
output.append("HDFS path " + oldPath + " not exists.\n");
}
// If hbase was deployed on another cluster, the job dir is empty and should be dropped,
// because of rowkey_stats and hfile dirs are both dropped.
if (fileSystem.listStatus(oldPath.getParent()).length == 0) {
Path emptyJobPath = new Path(JobBuilderSupport.getJobWorkingDir(config, getJobId()));
emptyJobPath = Path.getPathWithoutSchemeAndAuthority(emptyJobPath);
if (fileSystem.exists(emptyJobPath)) {
fileSystem.delete(emptyJobPath, true);
logger.debug("HDFS path " + emptyJobPath + " is empty and dropped.");
output.append("HDFS path " + emptyJobPath + " is empty and dropped.\n");
}
}
}
}
}
@Override
public void transition(LocalizedResource rsrc, ResourceEvent event) {
ResourceLocalizedEvent locEvent = (ResourceLocalizedEvent) event;
rsrc.localPath =
Path.getPathWithoutSchemeAndAuthority(locEvent.getLocation());
rsrc.size = locEvent.getSize();
for (ContainerId container : rsrc.ref) {
rsrc.dispatcher.getEventHandler().handle(
new ContainerResourceLocalizedEvent(
container, rsrc.rsrc, rsrc.localPath));
}
}
public static void copyInit(FileSystem fs, Path path) throws IOException {
path = Path.getPathWithoutSchemeAndAuthority(path);
Path pathP = path.getParent();
if (!fs.exists(pathP)) {
fs.mkdirs(pathP);
}
if (fs.exists(path)) {
logger.warn("path {} already existed and will be deleted", path);
HadoopUtil.deletePath(fs.getConf(), path);
}
}
public static RemotePath getRemotePath(Path path) throws IOException {
final String basename = path.getName();
boolean hidden = isHidden(basename);
Matcher matcher = BASENAME_SPLIT_PATTERN.matcher(hidden ? basename.substring(1) : basename);
if (!matcher.matches()) {
throw new IllegalArgumentException("Cannot parse basename for path " + path);
}
final String remoteBasename = matcher.group(2);
return new RemotePath(
matcher.group(1),
new Path(Path.getPathWithoutSchemeAndAuthority(path.getParent()), hidden ? basename.charAt(0) + remoteBasename : remoteBasename));
}
@Override
public void transition(LocalizedResource rsrc, ResourceEvent event) {
ResourceLocalizedEvent locEvent = (ResourceLocalizedEvent) event;
rsrc.localPath =
Path.getPathWithoutSchemeAndAuthority(locEvent.getLocation());
rsrc.size = locEvent.getSize();
for (ContainerId container : rsrc.ref) {
rsrc.dispatcher.getEventHandler().handle(
new ContainerResourceLocalizedEvent(
container, rsrc.rsrc, rsrc.localPath));
}
}
/**
* Given a base partition and a partition path, return relative path of partition path to the base path.
*/
public static String getRelativePartitionPath(Path basePath, Path partitionPath) {
basePath = Path.getPathWithoutSchemeAndAuthority(basePath);
partitionPath = Path.getPathWithoutSchemeAndAuthority(partitionPath);
String partitionFullPath = partitionPath.toString();
int partitionStartIndex = partitionFullPath.indexOf(basePath.getName(),
basePath.getParent() == null ? 0 : basePath.getParent().toString().length());
// Partition-Path could be empty for non-partitioned tables
return partitionStartIndex + basePath.getName().length() == partitionFullPath.length() ? ""
: partitionFullPath.substring(partitionStartIndex + basePath.getName().length() + 1);
}
Map<Path, ArrayList<String>> removeScheme(Map<Path, ArrayList<String>> pathToAliases) {
Map<Path, ArrayList<String>> result = new HashMap<>();
for (Map.Entry<Path, ArrayList<String>> entry : pathToAliases.entrySet()) {
Path newKey = Path.getPathWithoutSchemeAndAuthority(entry.getKey());
StringInternUtils.internUriStringsInPath(newKey);
result.put(newKey, entry.getValue());
}
return result;
}
public static FileSystem getFileSystemInHBaseCluster(String inPath) {
Path path = new Path(inPath);
path = Path.getPathWithoutSchemeAndAuthority(path);
FileSystem fs = HadoopUtil.getFileSystem(path, getCurrentHBaseConfiguration()); // Must be HBase's FS, not working FS
return fs;
}
protected final Path pushdownPath(String resPath) {
Path p = new Path(pushdownRootPath() + resPath);
return Path.getPathWithoutSchemeAndAuthority(p);
}
/**
* Read the parquet metadata from a file
*
* @param path to metadata file
* @param dirsOnly true for {@link Metadata#METADATA_DIRECTORIES_FILENAME}
* or false for {@link Metadata#OLD_METADATA_FILENAME} files reading
* @param metaContext current metadata context
*/
private void readBlockMeta(Path path, boolean dirsOnly, MetadataContext metaContext, FileSystem fs) {
Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
Path metadataParentDir = Path.getPathWithoutSchemeAndAuthority(path.getParent());
String metadataParentDirPath = metadataParentDir.toUri().getPath();
ObjectMapper mapper = new ObjectMapper();
final SimpleModule serialModule = new SimpleModule();
serialModule.addDeserializer(SchemaPath.class, new SchemaPath.De());
serialModule.addKeyDeserializer(Metadata_V2.ColumnTypeMetadata_v2.Key.class, new Metadata_V2.ColumnTypeMetadata_v2.Key.DeSerializer());
serialModule.addKeyDeserializer(Metadata_V3.ColumnTypeMetadata_v3.Key.class, new Metadata_V3.ColumnTypeMetadata_v3.Key.DeSerializer());
serialModule.addKeyDeserializer(ColumnTypeMetadata_v4.Key.class, new ColumnTypeMetadata_v4.Key.DeSerializer());
AfterburnerModule module = new AfterburnerModule();
module.setUseOptimizedBeanDeserializer(true);
boolean isFileMetadata = path.toString().endsWith(METADATA_FILENAME);
boolean isSummaryFile = path.toString().endsWith(METADATA_SUMMARY_FILENAME);
mapper.registerModule(serialModule);
mapper.registerModule(module);
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
try (InputStream is = fs.open(path)) {
boolean alreadyCheckedModification;
boolean newMetadata = false;
alreadyCheckedModification = metaContext.getStatus(metadataParentDirPath);
if (dirsOnly) {
parquetTableMetadataDirs = mapper.readValue(is, ParquetTableMetadataDirs.class);
if (timer != null) {
logger.debug("Took {} ms to read directories from directory cache file", timer.elapsed(TimeUnit.MILLISECONDS));
timer.stop();
}
parquetTableMetadataDirs.updateRelativePaths(metadataParentDirPath);
if (!alreadyCheckedModification && tableModified(parquetTableMetadataDirs.getDirectories(), path, metadataParentDir, metaContext, fs)) {
parquetTableMetadataDirs =
(createMetaFilesRecursivelyAsProcessUser(Path.getPathWithoutSchemeAndAuthority(path.getParent()), fs, true, null, true)).getRight();
newMetadata = true;
}
} else {
if (isFileMetadata) {
parquetTableMetadata.assignFiles((mapper.readValue(is, FileMetadata.class)).getFiles());
if (new MetadataVersion(parquetTableMetadata.getMetadataVersion()).compareTo(new MetadataVersion(4, 0)) >= 0) {
((ParquetTableMetadata_v4) parquetTableMetadata).updateRelativePaths(metadataParentDirPath);
}
if (!alreadyCheckedModification && tableModified(parquetTableMetadata.getDirectories(), path, metadataParentDir, metaContext, fs)) {
parquetTableMetadata =
(createMetaFilesRecursivelyAsProcessUser(Path.getPathWithoutSchemeAndAuthority(path.getParent()), fs, true, null, true)).getLeft();
newMetadata = true;
}
} else if (isSummaryFile) {
MetadataSummary metadataSummary = mapper.readValue(is, Metadata_V4.MetadataSummary.class);
ParquetTableMetadata_v4 parquetTableMetadata_v4 = new ParquetTableMetadata_v4(metadataSummary);
parquetTableMetadata = (ParquetTableMetadataBase) parquetTableMetadata_v4;
} else {
parquetTableMetadata = mapper.readValue(is, ParquetTableMetadataBase.class);
if (new MetadataVersion(parquetTableMetadata.getMetadataVersion()).compareTo(new MetadataVersion(3, 0)) >= 0) {
((Metadata_V3.ParquetTableMetadata_v3) parquetTableMetadata).updateRelativePaths(metadataParentDirPath);
}
if (!alreadyCheckedModification && tableModified((parquetTableMetadata.getDirectories()), path, metadataParentDir, metaContext, fs)) {
parquetTableMetadata =
(createMetaFilesRecursivelyAsProcessUser(Path.getPathWithoutSchemeAndAuthority(path.getParent()), fs, true, null, true)).getLeft();
newMetadata = true;
}
}
if (timer != null) {
logger.debug("Took {} ms to read metadata from cache file", timer.elapsed(TimeUnit.MILLISECONDS));
timer.stop();
}
if (!isSummaryFile) {
// DRILL-5009: Remove the RowGroup if it is empty
List<? extends ParquetFileMetadata> files = parquetTableMetadata.getFiles();
if (files != null) {
for (ParquetFileMetadata file : files) {
List<? extends RowGroupMetadata> rowGroups = file.getRowGroups();
rowGroups.removeIf(r -> r.getRowCount() == 0);
}
}
}
if (newMetadata) {
// if new metadata files were created, invalidate the existing metadata context
metaContext.clear();
}
}
} catch (IOException e) {
logger.error("Failed to read '{}' metadata file", path, e);
metaContext.setMetadataCacheCorrupted(true);
}
}
protected final Path pushdownPath(String resPath) {
Path p = new Path(pushdownRootPath() + resPath);
return Path.getPathWithoutSchemeAndAuthority(p);
}
protected PDFSDistributedTask(Path path) {
this.path = Path.getPathWithoutSchemeAndAuthority(path);
}
@Test
public void testImportDFSAutoID() throws Exception {
String cmd = "IMPORT -host localhost -username admin -password admin"
+ " -input_file_path " + Constants.TEST_PATH.toUri() + "/agg/agg3.xml"
+ " -input_file_type documents"
+ " -hadoop_conf_dir " + Constants.HADOOP_CONF_DIR
+ " -output_uri_replace " + Path.getPathWithoutSchemeAndAuthority(Constants.TEST_PATH) + "/agg,''"
+ " -port " + Constants.port + " -database " + Constants.testDb;
String[] args = cmd.split(" ");
assertFalse(args.length == 0);
Utils.clearDB(Utils.getTestDbXccUri(), Constants.testDb);
String[] expandedArgs = null;
expandedArgs = OptionsFileUtil.expandArguments(args);
ContentPump.runCommand(expandedArgs);
cmd = "EXPORT -host localhost -username admin -password admin"
+ " -output_file_path /sample-agg"
+ " -output_type document"
+ " -hadoop_conf_dir " + Constants.HADOOP_CONF_DIR
+ " -port " + Constants.port + " -database " + Constants.testDb;
args = cmd.split(" ");
assertFalse(args.length == 0);
expandedArgs = null;
expandedArgs = OptionsFileUtil.expandArguments(args);
ContentPump.runCommand(expandedArgs);
cmd = "IMPORT -host localhost -username admin -password"
+ " admin -input_file_path " + "/sample-agg/agg3.xml"
+ " -input_file_type aggregates"
+ " -hadoop_conf_dir " + Constants.HADOOP_CONF_DIR
+ " -port " + Constants.port + " -database " + Constants.testDb;
args = cmd.split(" ");
assertFalse(args.length == 0);
Utils.clearDB(Utils.getTestDbXccUri(), Constants.testDb);
expandedArgs = null;
expandedArgs = OptionsFileUtil.expandArguments(args);
ContentPump.runCommand(expandedArgs);
ResultSequence result = Utils.runQuery(
Utils.getTestDbXccUri(), "fn:count(fn:collection())");
assertTrue(result.hasNext());
assertEquals("2", result.next().asString());
Utils.closeSession();
}
public void setLocalPath(Path localPath) {
this.localPath = Path.getPathWithoutSchemeAndAuthority(localPath);
}