下面列出了org.apache.hadoop.io.MapFile#DATA_FILE_NAME 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
protected FileStatus[] listStatus(JobConf job) throws IOException {
FileStatus[] files = super.listStatus(job);
for (int i = 0; i < files.length; i++) {
FileStatus file = files[i];
if (file.isDirectory()) { // it's a MapFile
Path dataFile = new Path(file.getPath(), MapFile.DATA_FILE_NAME);
FileSystem fs = file.getPath().getFileSystem(job);
// use the data file
files[i] = fs.getFileStatus(dataFile);
}
}
return files;
}
@Override
protected FileStatus[] listStatus(JobConf job) throws IOException {
FileStatus[] files = super.listStatus(job);
for (int i = 0; i < files.length; i++) {
FileStatus file = files[i];
if (file.isDirectory()) { // it's a MapFile
Path dataFile = new Path(file.getPath(), MapFile.DATA_FILE_NAME);
FileSystem fs = file.getPath().getFileSystem(job);
// use the data file
files[i] = fs.getFileStatus(dataFile);
}
}
return files;
}
public static void main(String[] args) throws Exception {
System.out.println("generating data and index file");
if (SEQ_FILE_PATH.isEmpty()) {
System.out.println("Missing path to sequence file. Please specify it in the properties file.");
return;
}
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
for (String name : new String[]{"ast", "commit"}) {
Path dataFile = new Path(SEQ_FILE_PATH + "/" + name + "/" + MapFile.DATA_FILE_NAME);
MapFile.fix(fs, dataFile.getParent(), LongWritable.class, BytesWritable.class, false, conf);
}
fs.close();
}
public MapFileSource(FileSystem fs, Path dir) throws IOException {
m_fs = fs;
m_config = m_fs.getConf();
Path root = new Path(fs.getUri());
m_dir = new Path(root, dir);
m_dirStr = m_dir.toUri().getPath();
m_dataPath = new Path(m_dir,MapFile.DATA_FILE_NAME);
m_indexPath = new Path(m_dir,MapFile.INDEX_FILE_NAME);
m_metaPath = new Path(m_dir,"metadata");
}
@Override
protected LocatedFileStatus[] listLocatedStatus(JobConf job) throws IOException {
LocatedFileStatus[] files = super.listLocatedStatus(job);
for (int i = 0; i < files.length; i++) {
FileStatus file = files[i];
if (file.isDir()) { // it's a MapFile
Path dataFile = new Path(file.getPath(), MapFile.DATA_FILE_NAME);
FileSystem fs = file.getPath().getFileSystem(job);
// use the data file
files[i] = fs.listLocatedStatus(dataFile).next();
}
}
return files;
}
@Override
protected FileStatus[] listStatus(JobConf job) throws IOException {
FileStatus[] files = super.listStatus(job);
for (int i = 0; i < files.length; i++) {
FileStatus file = files[i];
if (file.isDir()) { // it's a MapFile
Path dataFile = new Path(file.getPath(), MapFile.DATA_FILE_NAME);
FileSystem fs = file.getPath().getFileSystem(job);
// use the data file
files[i] = fs.getFileStatus(dataFile);
}
}
return files;
}
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException
{
long start = System.currentTimeMillis();
Configuration conf = context.getConfiguration();
// In order to be used in MrGeo, this InputFormat must return instances
// of TiledInputSplit. To do that, we need to determine the start and end
// tile id's for each split. First we read the splits file and get the
// partition info, then we break the partition into blocks, which become the
// actual splits used.
ImageInputFormatContext ifContext = ImageInputFormatContext.load(conf);
int zoom = ifContext.getZoomLevel();
int tilesize = ifContext.getTileSize();
HdfsMrsImageDataProvider dp = createHdfsMrsImageDataProvider(context.getConfiguration());
Path inputWithZoom = new Path(dp.getResourcePath(true), "" + zoom);
// This appears to never be used
// org.mrgeo.hdfs.tile.FileSplit splitfile = createFileSplit();
// splitfile.readSplits(inputWithZoom);
MrsPyramidMetadataReader metadataReader = dp.getMetadataReader();
MrsPyramidMetadata metadata = metadataReader.read();
org.mrgeo.hdfs.tile.FileSplit fsplit = createFileSplit();
fsplit.readSplits(inputWithZoom);
FileSplitInfo[] splits =
(FileSplitInfo[]) fsplit.getSplits();
List<InputSplit> result = new ArrayList<>(splits.length);
Bounds requestedBounds = ifContext.getBounds();
for (FileSplitInfo split : splits)
{
Path part = new Path(inputWithZoom, split.getName());
Path dataFile = new Path(part, MapFile.DATA_FILE_NAME);
long endTileId = split.getEndId();
long startTileId = split.getStartId();
if (requestedBounds != null)
{
// Do not include splits that can't possibly intersect the requested bounds. This
// is an HDFS-specific efficiency to avoid needlessly processing splits.
Tile startTile = TMSUtils.tileid(startTileId, zoom);
Bounds startTileBounds = TMSUtils.tileBounds(startTile, zoom, tilesize);
Tile endTile = TMSUtils.tileid(endTileId, zoom);
Bounds endTileBounds = TMSUtils.tileBounds(endTile, zoom, tilesize);
if (startTileBounds.s > requestedBounds.n || endTileBounds.n < requestedBounds.s)
{
// Ignore the split because it's either completely above or completey below
// the requested bounds.
}
else
{
result.add(new TiledInputSplit(new FileSplit(dataFile, 0, 0, null), startTileId, endTileId,
zoom, metadata.getTilesize()));
}
}
else
{
// If no bounds were specified by the caller, then we include
// all splits.
result.add(new TiledInputSplit(new FileSplit(dataFile, 0, 0, null),
startTileId, endTileId, zoom, metadata.getTilesize()));
}
}
// The following code is useful for debugging. The gaps can be compared against the
// contents of the actual index file for the partition to see if there are any gaps
// in areas where there actually is tile information.
// long lastEndTile = -1;
// for (InputSplit split: result)
// {
// if (lastEndTile >= 0)
// {
// long startTileId = ((TiledInputSplit)split).getStartId();
// if (startTileId > lastEndTile + 1)
// {
// log.error("Gap in splits: " + lastEndTile + " - " + startTileId);
// }
// lastEndTile = ((TiledInputSplit)split).getEndId();
// }
// }
long end = System.currentTimeMillis();
log.info("Time to generate splits: " + (end - start) + " ms");
return result;
}