下面列出了怎么用org.apache.hadoop.fs.FileChecksum的API类实例代码及写法,或者点击链接到github查看源代码。
public PathMetadata(
Path location,
long lastModifiedTimestamp,
FileChecksum checksum,
List<PathMetadata> childrenMetadata) {
this.location = location.toUri().toString();
this.lastModifiedTimestamp = lastModifiedTimestamp;
if (checksum == null) {
checkSumAlgorithmName = null;
checkSumLength = 0;
this.checksum = null;
} else {
checkSumAlgorithmName = checksum.getAlgorithmName();
checkSumLength = checksum.getLength();
this.checksum = checksum.getBytes();
}
this.childrenMetadata = childrenMetadata == null ? ImmutableList.<PathMetadata> of()
: ImmutableList.copyOf(childrenMetadata);
}
private void testChecksum() throws Exception {
if (!isLocalFS()) {
FileSystem fs = FileSystem.get(getProxiedFSConf());
fs.mkdirs(getProxiedFSTestDir());
Path path = new Path(getProxiedFSTestDir(), "foo.txt");
OutputStream os = fs.create(path);
os.write(1);
os.close();
FileChecksum hdfsChecksum = fs.getFileChecksum(path);
fs.close();
fs = getHttpFSFileSystem();
FileChecksum httpChecksum = fs.getFileChecksum(path);
fs.close();
Assert.assertEquals(httpChecksum.getAlgorithmName(), hdfsChecksum.getAlgorithmName());
Assert.assertEquals(httpChecksum.getLength(), hdfsChecksum.getLength());
Assert.assertArrayEquals(httpChecksum.getBytes(), hdfsChecksum.getBytes());
}
}
private FileChecksum getFileChecksum(String f) throws IOException {
final HttpURLConnection connection = openConnection(
"/fileChecksum" + ServletUtil.encodePath(f),
"ugi=" + getEncodedUgiParameter());
try {
final XMLReader xr = XMLReaderFactory.createXMLReader();
xr.setContentHandler(this);
xr.parse(new InputSource(connection.getInputStream()));
} catch(SAXException e) {
final Exception embedded = e.getException();
if (embedded != null && embedded instanceof IOException) {
throw (IOException)embedded;
}
throw new IOException("invalid xml directory content", e);
} finally {
connection.disconnect();
}
return filechecksum;
}
@Test
public void testGetFileChecksum() throws IOException, URISyntaxException {
// Create two different files in HDFS
fileSystemTestHelper.createFile(fHdfs, someFile);
fileSystemTestHelper.createFile(fHdfs, fileSystemTestHelper
.getTestRootPath(fHdfs, someFile + "other"), 1, 512);
// Get checksum through ViewFS
FileChecksum viewFSCheckSum = vfs.getFileChecksum(
new Path("/vfstmp/someFileForTestGetFileChecksum"));
// Get checksum through HDFS.
FileChecksum hdfsCheckSum = fHdfs.getFileChecksum(
new Path(someFile));
// Get checksum of different file in HDFS
FileChecksum otherHdfsFileCheckSum = fHdfs.getFileChecksum(
new Path(someFile+"other"));
// Checksums of the same file (got through HDFS and ViewFS should be same)
assertEquals("HDFS and ViewFS checksums were not the same", viewFSCheckSum,
hdfsCheckSum);
// Checksum of different files should be different.
assertFalse("Some other HDFS file which should not have had the same " +
"checksum as viewFS did!", viewFSCheckSum.equals(otherHdfsFileCheckSum));
}
public void testGetFileChecksum(final Path foo, final int appendLength)
throws Exception {
final int appendRounds = 16;
FileChecksum[] fc = new FileChecksum[appendRounds + 1];
DFSTestUtil.createFile(dfs, foo, appendLength, REPLICATION, 0L);
fc[0] = dfs.getFileChecksum(foo);
for (int i = 0; i < appendRounds; i++) {
DFSTestUtil.appendFile(dfs, foo, appendLength);
fc[i + 1] = dfs.getFileChecksum(foo);
}
for (int i = 0; i < appendRounds + 1; i++) {
FileChecksum checksum = dfs.getFileChecksum(foo, appendLength * (i+1));
Assert.assertTrue(checksum.equals(fc[i]));
}
}
private long copyToFile(Path targetPath, FileSystem targetFS,
FileStatus sourceFileStatus, long sourceOffset, Mapper.Context context,
EnumSet<FileAttribute> fileAttributes, final FileChecksum sourceChecksum)
throws IOException {
FsPermission permission = FsPermission.getFileDefault().applyUMask(
FsPermission.getUMask(targetFS.getConf()));
final OutputStream outStream;
if (action == FileAction.OVERWRITE) {
final short repl = getReplicationFactor(fileAttributes, sourceFileStatus,
targetFS, targetPath);
final long blockSize = getBlockSize(fileAttributes, sourceFileStatus,
targetFS, targetPath);
FSDataOutputStream out = targetFS.create(targetPath, permission,
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
BUFFER_SIZE, repl, blockSize, context,
getChecksumOpt(fileAttributes, sourceChecksum));
outStream = new BufferedOutputStream(out);
} else {
outStream = new BufferedOutputStream(targetFS.append(targetPath,
BUFFER_SIZE));
}
return copyBytes(sourceFileStatus, sourceOffset, outStream, BUFFER_SIZE,
context);
}
private void compareCheckSums(FileSystem sourceFS, Path source,
FileChecksum sourceChecksum, FileSystem targetFS, Path target)
throws IOException {
if (!DistCpUtils.checksumsAreEqual(sourceFS, source, sourceChecksum,
targetFS, target)) {
StringBuilder errorMessage = new StringBuilder("Check-sum mismatch between ")
.append(source).append(" and ").append(target).append(".");
if (sourceFS.getFileStatus(source).getBlockSize() != targetFS.getFileStatus(target).getBlockSize()) {
errorMessage.append(" Source and target differ in block-size.")
.append(" Use -pb to preserve block-sizes during copy.")
.append(" Alternatively, skip checksum-checks altogether, using -skipCrc.")
.append(" (NOTE: By skipping checksums, one runs the risk of masking data-corruption during file-transfer.)");
}
throw new IOException(errorMessage.toString());
}
}
@Override
protected void processPath(PathData item) throws IOException {
if (item.stat.isDirectory()) {
throw new PathIsDirectoryException(item.toString());
}
FileChecksum checksum = item.fs.getFileChecksum(item.path);
if (checksum == null) {
out.printf("%s\tNONE\t%n", item.toString());
} else {
String checksumString = StringUtils.byteToHexString(
checksum.getBytes(), 0, checksum.getLength());
out.printf("%s\t%s\t%s%n",
item.toString(), checksum.getAlgorithmName(),
checksumString);
}
}
private void testChecksum() throws Exception {
if (!isLocalFS()) {
FileSystem fs = FileSystem.get(getProxiedFSConf());
fs.mkdirs(getProxiedFSTestDir());
Path path = new Path(getProxiedFSTestDir(), "foo.txt");
OutputStream os = fs.create(path);
os.write(1);
os.close();
FileChecksum hdfsChecksum = fs.getFileChecksum(path);
fs.close();
fs = getHttpFSFileSystem();
FileChecksum httpChecksum = fs.getFileChecksum(path);
fs.close();
Assert.assertEquals(httpChecksum.getAlgorithmName(), hdfsChecksum.getAlgorithmName());
Assert.assertEquals(httpChecksum.getLength(), hdfsChecksum.getLength());
Assert.assertArrayEquals(httpChecksum.getBytes(), hdfsChecksum.getBytes());
}
}
private FileChecksum getFileChecksum(String f) throws IOException {
final HttpURLConnection connection = openConnection(
"/fileChecksum" + ServletUtil.encodePath(f),
"ugi=" + getEncodedUgiParameter());
try {
final XMLReader xr = XMLReaderFactory.createXMLReader();
xr.setContentHandler(this);
xr.parse(new InputSource(connection.getInputStream()));
} catch(SAXException e) {
final Exception embedded = e.getException();
if (embedded != null && embedded instanceof IOException) {
throw (IOException)embedded;
}
throw new IOException("invalid xml directory content", e);
} finally {
connection.disconnect();
}
return filechecksum;
}
@Test
public void testGetFileChecksum() throws IOException, URISyntaxException {
// Create two different files in HDFS
fileSystemTestHelper.createFile(fHdfs, someFile);
fileSystemTestHelper.createFile(fHdfs, fileSystemTestHelper
.getTestRootPath(fHdfs, someFile + "other"), 1, 512);
// Get checksum through ViewFS
FileChecksum viewFSCheckSum = vfs.getFileChecksum(
new Path("/vfstmp/someFileForTestGetFileChecksum"));
// Get checksum through HDFS.
FileChecksum hdfsCheckSum = fHdfs.getFileChecksum(
new Path(someFile));
// Get checksum of different file in HDFS
FileChecksum otherHdfsFileCheckSum = fHdfs.getFileChecksum(
new Path(someFile+"other"));
// Checksums of the same file (got through HDFS and ViewFS should be same)
assertEquals("HDFS and ViewFS checksums were not the same", viewFSCheckSum,
hdfsCheckSum);
// Checksum of different files should be different.
assertFalse("Some other HDFS file which should not have had the same " +
"checksum as viewFS did!", viewFSCheckSum.equals(otherHdfsFileCheckSum));
}
public void testGetFileChecksum(final Path foo, final int appendLength)
throws Exception {
final int appendRounds = 16;
FileChecksum[] fc = new FileChecksum[appendRounds + 1];
DFSTestUtil.createFile(dfs, foo, appendLength, REPLICATION, 0L);
fc[0] = dfs.getFileChecksum(foo);
for (int i = 0; i < appendRounds; i++) {
DFSTestUtil.appendFile(dfs, foo, appendLength);
fc[i + 1] = dfs.getFileChecksum(foo);
}
for (int i = 0; i < appendRounds + 1; i++) {
FileChecksum checksum = dfs.getFileChecksum(foo, appendLength * (i+1));
Assert.assertTrue(checksum.equals(fc[i]));
}
}
private long copyToFile(Path targetPath, FileSystem targetFS,
FileStatus sourceFileStatus, long sourceOffset, Mapper.Context context,
EnumSet<FileAttribute> fileAttributes, final FileChecksum sourceChecksum)
throws IOException {
FsPermission permission = FsPermission.getFileDefault().applyUMask(
FsPermission.getUMask(targetFS.getConf()));
final OutputStream outStream;
if (action == FileAction.OVERWRITE) {
final short repl = getReplicationFactor(fileAttributes, sourceFileStatus,
targetFS, targetPath);
final long blockSize = getBlockSize(fileAttributes, sourceFileStatus,
targetFS, targetPath);
FSDataOutputStream out = targetFS.create(targetPath, permission,
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
BUFFER_SIZE, repl, blockSize, context,
getChecksumOpt(fileAttributes, sourceChecksum));
outStream = new BufferedOutputStream(out);
} else {
outStream = new BufferedOutputStream(targetFS.append(targetPath,
BUFFER_SIZE));
}
return copyBytes(sourceFileStatus, sourceOffset, outStream, BUFFER_SIZE,
context);
}
private void compareCheckSums(FileSystem sourceFS, Path source,
FileChecksum sourceChecksum, FileSystem targetFS, Path target)
throws IOException {
if (!DistCpUtils.checksumsAreEqual(sourceFS, source, sourceChecksum,
targetFS, target)) {
StringBuilder errorMessage = new StringBuilder("Check-sum mismatch between ")
.append(source).append(" and ").append(target).append(".");
if (sourceFS.getFileStatus(source).getBlockSize() != targetFS.getFileStatus(target).getBlockSize()) {
errorMessage.append(" Source and target differ in block-size.")
.append(" Use -pb to preserve block-sizes during copy.")
.append(" Alternatively, skip checksum-checks altogether, using -skipCrc.")
.append(" (NOTE: By skipping checksums, one runs the risk of masking data-corruption during file-transfer.)");
}
throw new IOException(errorMessage.toString());
}
}
@Override
protected void processPath(PathData item) throws IOException {
if (item.stat.isDirectory()) {
throw new PathIsDirectoryException(item.toString());
}
FileChecksum checksum = item.fs.getFileChecksum(item.path);
if (checksum == null) {
out.printf("%s\tNONE\t%n", item.toString());
} else {
String checksumString = StringUtils.byteToHexString(
checksum.getBytes(), 0, checksum.getLength());
out.printf("%s\t%s\t%s%n",
item.toString(), checksum.getAlgorithmName(),
checksumString);
}
}
/**
* Check if the two files are equal by looking at the file length,
* and at the checksum (if user has specified the verifyChecksum flag).
*/
private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) {
// Not matching length
if (inputStat.getLen() != outputStat.getLen()) return false;
// Mark files as equals, since user asked for no checksum verification
if (!verifyChecksum) return true;
// If checksums are not available, files are not the same.
FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
if (inChecksum == null) return false;
FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
if (outChecksum == null) return false;
return inChecksum.equals(outChecksum);
}
@Override
public FileChecksum getFileChecksum(Path hadoopPath) throws IOException {
long startTime = System.nanoTime();
Preconditions.checkArgument(hadoopPath != null, "hadoopPath must not be null");
checkOpen();
URI gcsPath = getGcsPath(hadoopPath);
final FileInfo fileInfo = getGcsFs().getFileInfo(gcsPath);
if (!fileInfo.exists()) {
throw new FileNotFoundException(
String.format(
"%s not found: %s", fileInfo.isDirectory() ? "Directory" : "File", hadoopPath));
}
FileChecksum checksum = getFileChecksum(checksumType, fileInfo);
logger.atFinest().log(
"getFileChecksum(hadoopPath: %s [gcsPath: %s]): %s", hadoopPath, gcsPath, checksum);
long duration = System.nanoTime() - startTime;
increment(Counter.GET_FILE_CHECKSUM);
increment(Counter.GET_FILE_CHECKSUM_TIME, duration);
return checksum;
}
private static void testFileChecksum(
GcsFileChecksumType checksumType, Function<String, byte[]> checksumFn) throws Exception {
Configuration config = getConfigurationWithImplementation();
config.set("fs.gs.checksum.type", checksumType.name());
GoogleHadoopFileSystem myGhfs = new GoogleHadoopFileSystem();
myGhfs.initialize(ghfs.getUri(), config);
URI fileUri = GoogleCloudStorageFileSystemIntegrationTest.getTempFilePath();
Path filePath = ghfsHelper.castAsHadoopPath(fileUri);
String fileContent = "foo-testFileChecksum-" + checksumType;
ghfsHelper.writeFile(filePath, fileContent, 1, /* overwrite= */ true);
FileChecksum fileChecksum = myGhfs.getFileChecksum(filePath);
assertThat(fileChecksum.getAlgorithmName()).isEqualTo(checksumType.getAlgorithmName());
assertThat(fileChecksum.getLength()).isEqualTo(checksumType.getByteLength());
assertThat(fileChecksum.getBytes()).isEqualTo(checksumFn.apply(fileContent));
assertThat(fileChecksum.toString())
.contains(String.format("%s: ", checksumType.getAlgorithmName()));
// Cleanup.
assertThat(ghfs.delete(filePath, /* recursive= */ true)).isTrue();
}
private FileChecksum getFileChecksum(String f) throws IOException {
final HttpURLConnection connection = openConnection(
"/fileChecksum" + f, "ugi=" + ugi);
try {
final XMLReader xr = XMLReaderFactory.createXMLReader();
xr.setContentHandler(this);
connection.setRequestMethod("GET");
connection.connect();
xr.parse(new InputSource(connection.getInputStream()));
} catch(SAXException e) {
final Exception embedded = e.getException();
if (embedded != null && embedded instanceof IOException) {
throw (IOException)embedded;
}
throw new IOException("invalid xml directory content", e);
} finally {
connection.disconnect();
}
return filechecksum;
}
private FileChecksum getFileChecksum(String f) throws IOException {
final HttpURLConnection connection = openConnection(
"/fileChecksum" + f, "ugi=" + ugi);
try {
final XMLReader xr = XMLReaderFactory.createXMLReader();
xr.setContentHandler(this);
connection.setRequestMethod("GET");
connection.connect();
xr.parse(new InputSource(connection.getInputStream()));
} catch(SAXException e) {
final Exception embedded = e.getException();
if (embedded != null && embedded instanceof IOException) {
throw (IOException)embedded;
}
throw new IOException("invalid xml directory content", e);
} finally {
connection.disconnect();
}
return filechecksum;
}
/**
* @param filePath
* @return
* @throws NoSuchAlgorithmException
* @throws IOException
*/
private byte[] checksumFile(Path filePath) throws NoSuchAlgorithmException, IOException {
FileChecksum checksum = fs.getFileChecksum(filePath);
if (null == checksum) {
return "".getBytes();
} else
return checksum.getBytes();
}
@Override
public PathMetadata apply(@Nonnull Path location) {
try {
FileSystem fs = location.getFileSystem(conf);
FileStatus fileStatus = fs.getFileStatus(location);
FileChecksum checksum = null;
if (fileStatus.isFile()) {
checksum = fs.getFileChecksum(location);
}
long modificationTime = 0;
List<PathMetadata> childPathDescriptors = new ArrayList<>();
if (fileStatus.isDirectory()) {
FileStatus[] childStatuses = fs.listStatus(location);
for (FileStatus childStatus : childStatuses) {
childPathDescriptors.add(apply(childStatus.getPath()));
}
} else {
modificationTime = fileStatus.getModificationTime();
}
return new PathMetadata(location, modificationTime, checksum, childPathDescriptors);
} catch (IOException e) {
throw new CircusTrainException("Unable to compute digest for location " + location.toString(), e);
}
}
@Override
public FileChecksum getFileChecksum(Path f) throws IOException {
Map<String, String> params = new HashMap<String, String>();
params.put(OP_PARAM, Operation.GETFILECHECKSUM.toString());
HttpURLConnection conn =
getConnection(Operation.GETFILECHECKSUM.getMethod(), params, f, true);
HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
final JSONObject json = (JSONObject) ((JSONObject)
HttpFSUtils.jsonParse(conn)).get(FILE_CHECKSUM_JSON);
return new FileChecksum() {
@Override
public String getAlgorithmName() {
return (String) json.get(CHECKSUM_ALGORITHM_JSON);
}
@Override
public int getLength() {
return ((Long) json.get(CHECKSUM_LENGTH_JSON)).intValue();
}
@Override
public byte[] getBytes() {
return StringUtils.hexStringToByte((String) json.get(CHECKSUM_BYTES_JSON));
}
@Override
public void write(DataOutput out) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void readFields(DataInput in) throws IOException {
throw new UnsupportedOperationException();
}
};
}
/**
* Converts a <code>FileChecksum</code> object into a JSON array
* object.
*
* @param checksum file checksum.
*
* @return The JSON representation of the file checksum.
*/
@SuppressWarnings({"unchecked"})
private static Map fileChecksumToJSON(FileChecksum checksum) {
Map json = new LinkedHashMap();
json.put(HttpFSFileSystem.CHECKSUM_ALGORITHM_JSON, checksum.getAlgorithmName());
json.put(HttpFSFileSystem.CHECKSUM_BYTES_JSON,
org.apache.hadoop.util.StringUtils.byteToHexString(checksum.getBytes()));
json.put(HttpFSFileSystem.CHECKSUM_LENGTH_JSON, checksum.getLength());
Map response = new LinkedHashMap();
response.put(HttpFSFileSystem.FILE_CHECKSUM_JSON, json);
return response;
}
@Test
public void testEncryptedReadAfterNameNodeRestart() throws IOException {
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
cluster = new MiniDFSCluster.Builder(conf).build();
FileSystem fs = getFileSystem(conf);
writeTestDataToFile(fs);
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
fs.close();
cluster.shutdown();
setEncryptionConfigKeys(conf);
cluster = new MiniDFSCluster.Builder(conf)
.manageDataDfsDirs(false)
.manageNameDfsDirs(false)
.format(false)
.startupOption(StartupOption.REGULAR)
.build();
fs = getFileSystem(conf);
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
fs.close();
cluster.restartNameNode();
fs = getFileSystem(conf);
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
fs.close();
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test
public void testGetFileChecksum() throws Exception {
final String f = "/testGetFileChecksum";
final Path p = new Path(f);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
try {
cluster.waitActive();
//create a file
final FileSystem fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, p, 1L << 20, (short)3, 20100402L);
//get checksum
final FileChecksum cs1 = fs.getFileChecksum(p);
assertTrue(cs1 != null);
//stop the first datanode
final List<LocatedBlock> locatedblocks = DFSClient.callGetBlockLocations(
cluster.getNameNodeRpc(), f, 0, Long.MAX_VALUE)
.getLocatedBlocks();
final DatanodeInfo first = locatedblocks.get(0).getLocations()[0];
cluster.stopDataNode(first.getXferAddr());
//get checksum again
final FileChecksum cs2 = fs.getFileChecksum(p);
assertEquals(cs1, cs2);
} finally {
cluster.shutdown();
}
}
/**
* @return the checksum spec of the source checksum if checksum type should be
* preserved
*/
private ChecksumOpt getChecksumOpt(EnumSet<FileAttribute> fileAttributes,
FileChecksum sourceChecksum) {
if (fileAttributes.contains(FileAttribute.CHECKSUMTYPE)
&& sourceChecksum != null) {
return sourceChecksum.getChecksumOpt();
}
return null;
}
private FileAction checkUpdate(FileSystem sourceFS, FileStatus source,
Path target) throws IOException {
final FileStatus targetFileStatus;
try {
targetFileStatus = targetFS.getFileStatus(target);
} catch (FileNotFoundException e) {
return FileAction.OVERWRITE;
}
if (targetFileStatus != null && !overWrite) {
if (canSkip(sourceFS, source, targetFileStatus)) {
return FileAction.SKIP;
} else if (append) {
long targetLen = targetFileStatus.getLen();
if (targetLen < source.getLen()) {
FileChecksum sourceChecksum = sourceFS.getFileChecksum(
source.getPath(), targetLen);
if (sourceChecksum != null
&& sourceChecksum.equals(targetFS.getFileChecksum(target))) {
// We require that the checksum is not null. Thus currently only
// DistributedFileSystem is supported
return FileAction.APPEND;
}
}
}
}
return FileAction.OVERWRITE;
}
@Override
public FileChecksum getFileChecksum(final Path f)
throws AccessControlException, FileNotFoundException,
IOException {
InodeTree.ResolveResult<FileSystem> res =
fsState.resolve(getUriPath(f), true);
return res.targetFileSystem.getFileChecksum(res.remainingPath);
}
@Override
public FileChecksum getFileChecksum(final Path f)
throws AccessControlException, FileNotFoundException,
UnresolvedLinkException, IOException {
InodeTree.ResolveResult<AbstractFileSystem> res =
fsState.resolve(getUriPath(f), true);
return res.targetFileSystem.getFileChecksum(res.remainingPath);
}