下面列出了org.apache.hadoop.fs.BlockLocation 类实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testOzoneManagerLocatedFileStatus() throws IOException {
String data = RandomStringUtils.randomAlphanumeric(20);
String filePath = RandomStringUtils.randomAlphanumeric(5);
Path path = createPath("/" + filePath);
try (FSDataOutputStream stream = fs.create(path)) {
stream.writeBytes(data);
}
FileStatus status = fs.getFileStatus(path);
assertTrue(status instanceof LocatedFileStatus);
LocatedFileStatus locatedFileStatus = (LocatedFileStatus) status;
assertTrue(locatedFileStatus.getBlockLocations().length >= 1);
for (BlockLocation blockLocation : locatedFileStatus.getBlockLocations()) {
assertTrue(blockLocation.getNames().length >= 1);
assertTrue(blockLocation.getHosts().length >= 1);
}
}
@SuppressWarnings("checkstyle:ParameterNumber")
public FileStatusAdapter(long length, Path path, boolean isdir,
short blockReplication, long blocksize, long modificationTime,
long accessTime, short permission, String owner,
String group, Path symlink, BlockLocation[] locations) {
this.length = length;
this.path = path;
this.isdir = isdir;
this.blockReplication = blockReplication;
this.blocksize = blocksize;
this.modificationTime = modificationTime;
this.accessTime = accessTime;
this.permission = permission;
this.owner = owner;
this.group = group;
this.symlink = symlink;
this.blockLocations = locations.clone();
}
private void verifyLocatedFileStatus(
JobConf conf, List<LocatedFileStatus> stats)
throws IOException {
if (!conf.getBoolean("mapred.fileinputformat.verifysplits", true)) {
return;
}
for (LocatedFileStatus stat: stats) {
long fileLen = stat.getLen();
long blockLenTotal = 0;
for (BlockLocation loc: stat.getBlockLocations()) {
blockLenTotal += loc.getLength();
}
if (blockLenTotal != fileLen) {
throw new IOException("Error while getting located status, " +
stat.getPath() + " has length " + fileLen + " but blocks total is " +
blockLenTotal);
}
}
}
private void waitForBlocks(FileSystem fileSys, Path name)
throws IOException {
// wait until we have at least one block in the file to read.
boolean done = false;
while (!done) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
done = true;
BlockLocation[] locations = fileSys.getFileBlockLocations(
fileSys.getFileStatus(name), 0, blockSize);
if (locations.length < 1) {
done = false;
continue;
}
}
}
/**
* Get the host affinity for a row group.
*
* @param fileStatus the parquet file
* @param start the start of the row group
* @param length the length of the row group
* @return host affinity for the row group
*/
private Map<String, Float> getHostAffinity(FileStatus fileStatus, FileSystem fs, long start, long length)
throws IOException {
BlockLocation[] blockLocations = fs.getFileBlockLocations(fileStatus, start, length);
Map<String, Float> hostAffinityMap = Maps.newHashMap();
for (BlockLocation blockLocation : blockLocations) {
for (String host : blockLocation.getHosts()) {
Float currentAffinity = hostAffinityMap.get(host);
float blockStart = blockLocation.getOffset();
float blockEnd = blockStart + blockLocation.getLength();
float rowGroupEnd = start + length;
Float newAffinity = (blockLocation.getLength() - (blockStart < start ? start - blockStart : 0) -
(blockEnd > rowGroupEnd ? blockEnd - rowGroupEnd : 0)) / length;
if (currentAffinity != null) {
hostAffinityMap.put(host, currentAffinity + newAffinity);
} else {
hostAffinityMap.put(host, newAffinity);
}
}
}
return hostAffinityMap;
}
/**
* Creates a mapping of hoplog to hdfs blocks on disk
*
* @param files
* list of hoplog file status objects
* @return array of hdfs block location objects associated with a hoplog
* @throws IOException
*/
public static Map<FileStatus, BlockLocation[]> getBlocks(Configuration config,
Collection<FileStatus> files) throws IOException {
Map<FileStatus, BlockLocation[]> blocks = new HashMap<FileStatus, BlockLocation[]>();
if (files == null || files.isEmpty()) {
return blocks;
}
FileSystem fs = files.iterator().next().getPath().getFileSystem(config);
for (FileStatus hoplog : files) {
long length = hoplog.getLen();
BlockLocation[] fileBlocks = fs.getFileBlockLocations(hoplog, 0, length);
blocks.put(hoplog, fileBlocks);
}
return blocks;
}
@Override
protected List<FileStatus> listStatus(JobContext job) throws IOException {
FileStatus mockFileStatus = mock(FileStatus.class);
when(mockFileStatus.getBlockSize()).thenReturn(splitSize);
Path mockPath = mock(Path.class);
FileSystem mockFs = mock(FileSystem.class);
BlockLocation[] blockLocations = mockBlockLocations(length, splitSize);
when(mockFs.getFileBlockLocations(mockFileStatus, 0, length)).thenReturn(
blockLocations);
when(mockPath.getFileSystem(any(Configuration.class))).thenReturn(mockFs);
when(mockFileStatus.getPath()).thenReturn(mockPath);
when(mockFileStatus.getLen()).thenReturn(length);
List<FileStatus> list = new ArrayList<FileStatus>();
list.add(mockFileStatus);
return list;
}
private static LocatedFileStatus locatedFileStatus(Path path, long fileLength)
{
return new LocatedFileStatus(
fileLength,
false,
0,
0L,
0L,
0L,
null,
null,
null,
null,
path,
new BlockLocation[] {new BlockLocation(new String[1], new String[] {"localhost"}, 0, fileLength)});
}
private static LocatedFileStatus locatedFileStatusWithNoBlocks(Path path)
{
return new LocatedFileStatus(
0L,
false,
0,
0L,
0L,
0L,
null,
null,
null,
null,
path,
new BlockLocation[] {});
}
@Override
public BlockLocation[] getFileBlockLocations(Path path, long start, long len) throws IOException {
try {
statistics.incrementReadOps(1);
CrailBlockLocation[] _locations = dfs.lookup(path.toUri().getRawPath()).get().asFile().getBlockLocations(start, len);
BlockLocation[] locations = new BlockLocation[_locations.length];
for (int i = 0; i < locations.length; i++){
locations[i] = new BlockLocation();
locations[i].setOffset(_locations[i].getOffset());
locations[i].setLength(_locations[i].getLength());
locations[i].setNames(_locations[i].getNames());
locations[i].setHosts(_locations[i].getHosts());
locations[i].setTopologyPaths(_locations[i].getTopology());
}
return locations;
} catch(Exception e){
throw new IOException(e);
}
}
static void checkFullFile(FileSystem fs, Path name) throws IOException {
FileStatus stat = fs.getFileStatus(name);
BlockLocation[] locations = fs.getFileBlockLocations(stat, 0,
fileSize);
for (int idx = 0; idx < locations.length; idx++) {
String[] hosts = locations[idx].getNames();
for (int i = 0; i < hosts.length; i++) {
System.out.print( hosts[i] + " ");
}
System.out.println(" off " + locations[idx].getOffset() +
" len " + locations[idx].getLength());
}
byte[] expected = AppendTestUtil.randomBytes(seed, fileSize);
FSDataInputStream stm = fs.open(name);
byte[] actual = new byte[fileSize];
stm.readFully(0, actual);
checkData(actual, 0, expected, "Read 2");
stm.close();
}
private void waitForBlocks(FileSystem fileSys, Path name)
throws IOException {
// wait until we have at least one block in the file to read.
boolean done = false;
while (!done) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
done = true;
BlockLocation[] locations = fileSys.getFileBlockLocations(
fileSys.getFileStatus(name), 0, blockSize);
if (locations.length < 1) {
done = false;
continue;
}
}
}
@Test(timeout=180000)
public void testFavoredNodesEndToEnd() throws Exception {
//create 10 files with random preferred nodes
for (int i = 0; i < NUM_FILES; i++) {
Random rand = new Random(System.currentTimeMillis() + i);
//pass a new created rand so as to get a uniform distribution each time
//without too much collisions (look at the do-while loop in getDatanodes)
InetSocketAddress datanode[] = getDatanodes(rand);
Path p = new Path("/filename"+i);
FSDataOutputStream out = dfs.create(p, FsPermission.getDefault(), true,
4096, (short)3, 4096L, null, datanode);
out.write(SOME_BYTES);
out.close();
BlockLocation[] locations = getBlockLocations(p);
//verify the files got created in the right nodes
for (BlockLocation loc : locations) {
String[] hosts = loc.getNames();
String[] hosts1 = getStringForInetSocketAddrs(datanode);
assertTrue(compareNodes(hosts, hosts1));
}
}
}
@Override
public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
try {
try (PreparedStatement statement = connection.prepareStatement("call SYSCS_UTIL.SYSCS_hdfs_OPERATION(?, ?)")) {
statement.setString(1, file.getPath().toUri().getPath());
statement.setString(2, "tokens");
try (ResultSet rs = statement.executeQuery()) {
List<HdfsProtos.LocatedBlockProto> protos = new ArrayList<>();
while (rs.next()) {
Blob blob = rs.getBlob(1);
byte[] bytes = blob.getBytes(1, (int) blob.length());
HdfsProtos.LocatedBlockProto lbp = HdfsProtos.LocatedBlockProto.parseFrom(bytes);
protos.add(lbp);
}
// TODO return DFSUtil.locatedBlocks2Locations(PBHelper.convertLocatedBlock(protos));
return null;
}
}
} catch (SQLException e) {
throw new IOException(e);
}
}
private static BlockLocation[] getBlockLocationsOutput(int fileSize,
int blockSize, long start, long len, String blockLocationHost)
throws Exception {
Configuration conf = new Configuration();
conf.set(NativeAzureFileSystem.AZURE_BLOCK_SIZE_PROPERTY_NAME, ""
+ blockSize);
if (blockLocationHost != null) {
conf.set(NativeAzureFileSystem.AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME,
blockLocationHost);
}
AzureBlobStorageTestAccount testAccount = AzureBlobStorageTestAccount
.createMock(conf);
FileSystem fs = testAccount.getFileSystem();
Path testFile = createTestFile(fs, fileSize);
FileStatus stat = fs.getFileStatus(testFile);
BlockLocation[] locations = fs.getFileBlockLocations(stat, start, len);
testAccount.cleanup();
return locations;
}
/**
* 取得文件块所在的位置..
*/
public void getFileBlockLocation(String pathuri) {
try {
Path filePath = new Path(pathuri);
FileStatus fileStatus = fs.getFileStatus(filePath);
if (fileStatus.isDirectory()) {
System.out.println("**** getFileBlockLocations only for file");
return;
}
System.out.println(">>>> file block location:");
BlockLocation[] blkLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
for (BlockLocation currentLocation : blkLocations) {
String[] hosts = currentLocation.getHosts();
for (String host : hosts) {
System.out.println(">>>> host: " + host);
}
}
//取得最后修改时间
long modifyTime = fileStatus.getModificationTime();
Date d = new Date(modifyTime);
System.out.println(">>>> ModificationTime = " + d);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* Read and write some JSON
* @throws IOException
*/
@Test(timeout = SWIFT_TEST_TIMEOUT)
public void testRWJson() throws IOException {
final String message = "{" +
" 'json': { 'i':43, 'b':true}," +
" 's':'string'" +
"}";
final Path filePath = new Path("/test/file.json");
writeTextFile(fs, filePath, message, false);
String readJson = readBytesToString(fs, filePath, message.length());
assertEquals(message,readJson);
//now find out where it is
FileStatus status = fs.getFileStatus(filePath);
BlockLocation[] locations = fs.getFileBlockLocations(status, 0, 10);
}
private void printFileLocations(FileStatus file)
throws IOException {
System.out.println(file.getPath() + " block locations:");
BlockLocation[] locations = fileSys.getFileBlockLocations(file, 0,
file.getLen());
for (int idx = 0; idx < locations.length; idx++) {
String[] loc = locations[idx].getNames();
System.out.print("Block[" + idx + "] : ");
for (int j = 0; j < loc.length; j++) {
System.out.print(loc[j] + " ");
}
System.out.println();
}
}
/**
* groups together all the data blocks for the same HDFS block
*
* @param rowGroupBlocks data blocks (row groups)
* @param hdfsBlocksArray hdfs blocks
* @param fileStatus the containing file
* @param requestedSchema the schema requested by the user
* @param readSupportMetadata the metadata provided by the readSupport implementation in init
* @param minSplitSize the mapred.min.split.size
* @param maxSplitSize the mapred.max.split.size
* @return the splits (one per HDFS block)
* @throws IOException If hosts can't be retrieved for the HDFS block
*/
static <T> List<ParquetInputSplit> generateSplits(
List<BlockMetaData> rowGroupBlocks,
BlockLocation[] hdfsBlocksArray,
FileStatus fileStatus,
String requestedSchema,
Map<String, String> readSupportMetadata, long minSplitSize, long maxSplitSize) throws IOException {
List<SplitInfo> splitRowGroups =
generateSplitInfo(rowGroupBlocks, hdfsBlocksArray, minSplitSize, maxSplitSize);
//generate splits from rowGroups of each split
List<ParquetInputSplit> resultSplits = new ArrayList<ParquetInputSplit>();
for (SplitInfo splitInfo : splitRowGroups) {
ParquetInputSplit split = splitInfo.getParquetInputSplit(fileStatus, requestedSchema, readSupportMetadata);
resultSplits.add(split);
}
return resultSplits;
}
@Test(timeout = SWIFT_TEST_TIMEOUT)
public void testLocateNullStatus() throws Throwable {
describe("verify that a null filestatus maps to a null location array");
BlockLocation[] locations =
getFs().getFileBlockLocations((FileStatus) null, 0, 1);
assertNull(locations);
}
void collectFileCorruptBlocksInStripe(final DistributedFileSystem dfs,
final RaidInfo raidInfo, final Path filePath,
final HashMap<Integer, Integer> corruptBlocksPerStripe)
throws IOException {
// read conf
final int stripeBlocks = raidInfo.codec.stripeLength;
// figure out which blocks are missing/corrupted
final FileStatus fileStatus = dfs.getFileStatus(filePath);
final long blockSize = fileStatus.getBlockSize();
final long fileLength = fileStatus.getLen();
final long fileLengthInBlocks = RaidNode.numBlocks(fileStatus);
final long fileStripes = RaidNode.numStripes(fileLengthInBlocks,
stripeBlocks);
final BlockLocation[] fileBlocks =
dfs.getFileBlockLocations(fileStatus, 0, fileLength);
// figure out which stripes these corrupted blocks belong to
for (BlockLocation fileBlock: fileBlocks) {
int blockNo = (int) (fileBlock.getOffset() / blockSize);
final int stripe = blockNo / stripeBlocks;
if (this.isBlockCorrupt(fileBlock)) {
this.incCorruptBlocksPerStripe(corruptBlocksPerStripe, stripe);
if (LOG.isDebugEnabled()) {
LOG.debug("file " + filePath.toString() + " corrupt in block " +
blockNo + "/" + fileLengthInBlocks + ", stripe " + stripe +
"/" + fileStripes);
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("file " + filePath.toString() + " OK in block " + blockNo +
"/" + fileLengthInBlocks + ", stripe " + stripe + "/" +
fileStripes);
}
}
}
checkParityBlocks(filePath, corruptBlocksPerStripe, blockSize, 0, fileStripes,
fileStripes, raidInfo);
}
@Test(timeout = SWIFT_TEST_TIMEOUT)
public void testLocateSingleFileBlocks() throws Throwable {
describe("verify that a file returns 1+ blocks");
FileStatus fileStatus = createFileAndGetStatus();
BlockLocation[] locations =
getFs().getFileBlockLocations(fileStatus, 0, 1);
assertNotEqual("No block locations supplied for " + fileStatus, 0,
locations.length);
for (BlockLocation location : locations) {
assertLocationValid(location);
}
}
/**
* Return an array containing hostnames, offset and size of
* portions of the given file. For WASB we'll just lie and give
* fake hosts to make sure we get many splits in MR jobs.
*/
@Override
public BlockLocation[] getFileBlockLocations(FileStatus file,
long start, long len) throws IOException {
if (file == null) {
return null;
}
if ((start < 0) || (len < 0)) {
throw new IllegalArgumentException("Invalid start or len parameter");
}
if (file.getLen() < start) {
return new BlockLocation[0];
}
final String blobLocationHost = getConf().get(
AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME,
AZURE_BLOCK_LOCATION_HOST_DEFAULT);
final String[] name = { blobLocationHost };
final String[] host = { blobLocationHost };
long blockSize = file.getBlockSize();
if (blockSize <= 0) {
throw new IllegalArgumentException(
"The block size for the given file is not a positive number: "
+ blockSize);
}
int numberOfLocations = (int) (len / blockSize)
+ ((len % blockSize == 0) ? 0 : 1);
BlockLocation[] locations = new BlockLocation[numberOfLocations];
for (int i = 0; i < locations.length; i++) {
long currentOffset = start + (i * blockSize);
long currentLength = Math.min(blockSize, start + len - currentOffset);
locations[i] = new BlockLocation(name, host, currentOffset, currentLength);
}
return locations;
}
@Test
public void testGetFileBlockLocations_shouldReturnLocalhost()
throws IOException, URISyntaxException {
Configuration config = GoogleHadoopFileSystemIntegrationHelper.getTestConfig();
GoogleHadoopFS ghfs = new GoogleHadoopFS(initUri, config);
URI testFile = initUri.resolve("/testGetFileBlockLocations_shouldReturnLocalhost");
Path testFilePath = new Path(testFile.toString());
gcsFsIHelper.writeTextFile(testFile.getAuthority(), testFile.getPath(), "file content");
BlockLocation[] fileBlockLocations = ghfs.getFileBlockLocations(testFilePath, 1, 1);
assertThat(fileBlockLocations).hasLength(1);
assertThat(fileBlockLocations[0].getHosts()).isEqualTo(new String[] {"localhost"});
}
@Test
public void testGetBlockLocations()
throws Exception
{
RubixConfig rubixConfig = new RubixConfig();
InternalNode coordinatorNode = new InternalNode(
"master",
URI.create("http://" + getLocalHost().getHostAddress() + ":8080"),
UNKNOWN,
true);
InternalNode workerNode1 = new InternalNode(
"worker1",
URI.create("http://127.0.0.2:8080"),
UNKNOWN,
false);
InternalNode workerNode2 = new InternalNode(
"worker2",
URI.create("http://127.0.0.3:8080"),
UNKNOWN,
false);
initializeRubix(rubixConfig, ImmutableList.of(coordinatorNode, workerNode1, workerNode2));
cachingFileSystem = getCachingFileSystem();
FileStatus file1 = new FileStatus(3, false, 0, 3, 0, new Path("aaa"));
FileStatus file2 = new FileStatus(3, false, 0, 3, 0, new Path("bbb"));
BlockLocation[] file1Locations = cachingFileSystem.getFileBlockLocations(file1, 0, 3);
BlockLocation[] file2Locations = cachingFileSystem.getFileBlockLocations(file2, 0, 3);
assertEquals(file1Locations.length, 1);
assertEquals(file2Locations.length, 1);
assertEquals(file1Locations[0].getHosts()[0], "127.0.0.3");
assertEquals(file2Locations[0].getHosts()[0], "127.0.0.2");
}
@Override
public BlockLocation[] getFileBlockLocations(Path p, long start, long len)
throws IOException {
return new BlockLocation[] {
new BlockLocation(new String[] { "localhost:50010", "otherhost:50010" },
new String[] { "localhost", "otherhost" }, new String[] { "localhost" },
new String[0], 0, len, false) };
}
protected int getBlockIndex(BlockLocation[] blkLocations,
long offset) {
for (int i = 0 ; i < blkLocations.length; i++) {
// is the offset inside this block?
if ((blkLocations[i].getOffset() <= offset) &&
(offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
return i;
}
}
BlockLocation last = blkLocations[blkLocations.length -1];
long fileLength = last.getOffset() + last.getLength() -1;
throw new IllegalArgumentException("Offset " + offset +
" is outside of file (0.." +
fileLength + ")");
}
private String[] fakeRacks(BlockLocation[] blkLocations, int index)
throws IOException {
String[] allHosts = blkLocations[index].getHosts();
String[] allTopos = new String[allHosts.length];
for (int i = 0; i < allHosts.length; i++) {
allTopos[i] = NetworkTopology.DEFAULT_RACK + "/" + allHosts[i];
}
return allTopos;
}
private BlockLocation[] mockBlockLocations(long size, long splitSize) {
int numLocations = (int) (size / splitSize);
if (size % splitSize != 0)
numLocations++;
BlockLocation[] blockLocations = new BlockLocation[numLocations];
for (int i = 0; i < numLocations; i++) {
String[] names = new String[] { "b" + i };
String[] hosts = new String[] { "host" + i };
blockLocations[i] = new BlockLocation(names, hosts, i * splitSize,
Math.min(splitSize, size - (splitSize * i)));
}
return blockLocations;
}
@Test(timeout = SWIFT_TEST_TIMEOUT)
public void testLocateOutOfRangeSrc() throws Throwable {
describe("Seeking out of the file length returns an empty array");
BlockLocation[] locations =
getFs().getFileBlockLocations(createFileAndGetStatus(),
data.length + 100,
1);
assertEmptyBlockLocations(locations);
}