下面列出了怎么用org.apache.hadoop.fs.HardLink的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Remove a hard link by copying the block to a temporary place and
* then moving it back
* @param numLinks number of hard links
* @return true if copy is successful;
* false if it is already detached or no need to be detached
* @throws IOException if there is any copy error
*/
public boolean unlinkBlock(int numLinks) throws IOException {
if (isUnlinked()) {
return false;
}
File file = getBlockFile();
if (file == null || getVolume() == null) {
throw new IOException("detachBlock:Block not found. " + this);
}
File meta = getMetaFile();
if (HardLink.getLinkCount(file) > numLinks) {
DataNode.LOG.info("CopyOnWrite for block " + this);
unlinkFile(file, this);
}
if (HardLink.getLinkCount(meta) > numLinks) {
unlinkFile(meta, this);
}
setUnlinked();
return true;
}
/**
* Remove a hard link by copying the block to a temporary place and
* then moving it back
* @param numLinks number of hard links
* @return true if copy is successful;
* false if it is already detached or no need to be detached
* @throws IOException if there is any copy error
*/
public boolean unlinkBlock(int numLinks) throws IOException {
if (isUnlinked()) {
return false;
}
File file = getBlockFile();
if (file == null || getVolume() == null) {
throw new IOException("detachBlock:Block not found. " + this);
}
File meta = getMetaFile();
if (HardLink.getLinkCount(file) > numLinks) {
DataNode.LOG.info("CopyOnWrite for block " + this);
unlinkFile(file, this);
}
if (HardLink.getLinkCount(meta) > numLinks) {
unlinkFile(meta, this);
}
setUnlinked();
return true;
}
private void verifyHardLinks(DatanodeInfo srcInfo, DatanodeInfo dstInfo,
int srcNamespaceId, Block srcBlock, int dstNamespaceId, Block dstBlock,
boolean hardlink) throws IOException {
// Verify hard links.
DataNode dnSrc = dnMap.get(srcInfo.getPort());
File blockFileSrc = dnSrc.data.getBlockFile(srcNamespaceId, srcBlock);
LOG.warn("Link count for : " + blockFileSrc + " is : "
+ HardLink.getLinkCount(blockFileSrc));
if (hardlink) {
assertTrue(HardLink.getLinkCount(blockFileSrc) > 1);
} else {
assertEquals(1, HardLink.getLinkCount(blockFileSrc));
}
DataNode dnDst = dnMap.get(dstInfo.getPort());
File blockFileDst = dnDst.data.getBlockFile(dstNamespaceId, dstBlock);
if (hardlink) {
assertTrue(HardLink.getLinkCount(blockFileDst) > 1);
} else {
assertEquals(1, HardLink.getLinkCount(blockFileDst));
}
}
/**
* Returns true if this block was copied, otherwise returns false.
*/
boolean detachBlock(int namespaceId, Block block, int numLinks) throws IOException {
if (isDetached()) {
return false;
}
if (file == null || volume == null) {
throw new IOException("detachBlock:Block not found. " + block);
}
File meta = FSDataset.getMetaFile(file, block);
if (meta == null) {
throw new IOException("Meta file not found for block " + block);
}
if (HardLink.getLinkCount(file) > numLinks) {
DataNode.LOG.info("CopyOnWrite for block " + block);
detachFile(namespaceId, file, block);
}
if (HardLink.getLinkCount(meta) > numLinks) {
detachFile(namespaceId, meta, block);
}
setDetached();
return true;
}
/**
* Hardlink all finalized and RBW blocks in fromDir to toDir
*
* @param fromDir directory where the snapshot is stored
* @param toDir the current data directory
* @throws IOException if error occurs during hardlink
*/
private void linkAllBlocks(DataNode datanode, File fromDir, File toDir)
throws IOException {
// do the link
int diskLayoutVersion = this.getLayoutVersion();
// hardlink finalized blocks in tmpDir
HardLink hardLink = new HardLink();
DataStorage.linkBlocks(datanode, new File(fromDir, DataStorage.STORAGE_DIR_FINALIZED),
new File(toDir,DataStorage.STORAGE_DIR_FINALIZED), diskLayoutVersion, hardLink);
DataStorage.linkBlocks(datanode, new File(fromDir, DataStorage.STORAGE_DIR_RBW),
new File(toDir, DataStorage.STORAGE_DIR_RBW), diskLayoutVersion, hardLink);
LOG.info( hardLink.linkStats.report() );
}
/**
* Hardlink all finalized and RBW blocks in fromDir to toDir
*
* @param fromDir The directory where the 'from' snapshot is stored
* @param fromBbwDir In HDFS 1.x, the directory where blocks
* that are under construction are stored.
* @param toDir The current data directory
*
* @throws IOException If error occurs during hardlink
*/
private void linkAllBlocks(DataNode datanode, File fromDir, File fromBbwDir,
File toDir) throws IOException {
HardLink hardLink = new HardLink();
// do the link
int diskLayoutVersion = this.getLayoutVersion();
if (DataNodeLayoutVersion.supports(
LayoutVersion.Feature.APPEND_RBW_DIR, diskLayoutVersion)) {
// hardlink finalized blocks in tmpDir/finalized
linkBlocks(datanode, new File(fromDir, STORAGE_DIR_FINALIZED),
new File(toDir, STORAGE_DIR_FINALIZED), diskLayoutVersion, hardLink);
// hardlink rbw blocks in tmpDir/rbw
linkBlocks(datanode, new File(fromDir, STORAGE_DIR_RBW),
new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink);
} else { // pre-RBW version
// hardlink finalized blocks in tmpDir
linkBlocks(datanode, fromDir, new File(toDir, STORAGE_DIR_FINALIZED),
diskLayoutVersion, hardLink);
if (fromBbwDir.exists()) {
/*
* We need to put the 'blocksBeingWritten' from HDFS 1.x into the rbw
* directory. It's a little messy, because the blocksBeingWriten was
* NOT underneath the 'current' directory in those releases. See
* HDFS-3731 for details.
*/
linkBlocks(datanode, fromBbwDir,
new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink);
}
}
LOG.info( hardLink.linkStats.report() );
}
public static void link(File src, File dst) throws IOException {
if (!nativeLoaded) {
HardLink.createHardLink(src, dst);
} else {
link0(src.getAbsolutePath(), dst.getAbsolutePath());
}
}
/**
* Hardlink all finalized and RBW blocks in fromDir to toDir
*
* @param fromDir directory where the snapshot is stored
* @param toDir the current data directory
* @throws IOException if error occurs during hardlink
*/
private void linkAllBlocks(DataNode datanode, File fromDir, File toDir)
throws IOException {
// do the link
int diskLayoutVersion = this.getLayoutVersion();
// hardlink finalized blocks in tmpDir
HardLink hardLink = new HardLink();
DataStorage.linkBlocks(datanode, new File(fromDir, DataStorage.STORAGE_DIR_FINALIZED),
new File(toDir,DataStorage.STORAGE_DIR_FINALIZED), diskLayoutVersion, hardLink);
DataStorage.linkBlocks(datanode, new File(fromDir, DataStorage.STORAGE_DIR_RBW),
new File(toDir, DataStorage.STORAGE_DIR_RBW), diskLayoutVersion, hardLink);
LOG.info( hardLink.linkStats.report() );
}
/**
* Hardlink all finalized and RBW blocks in fromDir to toDir
*
* @param fromDir The directory where the 'from' snapshot is stored
* @param fromBbwDir In HDFS 1.x, the directory where blocks
* that are under construction are stored.
* @param toDir The current data directory
*
* @throws IOException If error occurs during hardlink
*/
private void linkAllBlocks(DataNode datanode, File fromDir, File fromBbwDir,
File toDir) throws IOException {
HardLink hardLink = new HardLink();
// do the link
int diskLayoutVersion = this.getLayoutVersion();
if (DataNodeLayoutVersion.supports(
LayoutVersion.Feature.APPEND_RBW_DIR, diskLayoutVersion)) {
// hardlink finalized blocks in tmpDir/finalized
linkBlocks(datanode, new File(fromDir, STORAGE_DIR_FINALIZED),
new File(toDir, STORAGE_DIR_FINALIZED), diskLayoutVersion, hardLink);
// hardlink rbw blocks in tmpDir/rbw
linkBlocks(datanode, new File(fromDir, STORAGE_DIR_RBW),
new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink);
} else { // pre-RBW version
// hardlink finalized blocks in tmpDir
linkBlocks(datanode, fromDir, new File(toDir, STORAGE_DIR_FINALIZED),
diskLayoutVersion, hardLink);
if (fromBbwDir.exists()) {
/*
* We need to put the 'blocksBeingWritten' from HDFS 1.x into the rbw
* directory. It's a little messy, because the blocksBeingWriten was
* NOT underneath the 'current' directory in those releases. See
* HDFS-3731 for details.
*/
linkBlocks(datanode, fromBbwDir,
new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink);
}
}
LOG.info( hardLink.linkStats.report() );
}
public static void link(File src, File dst) throws IOException {
if (!nativeLoaded) {
HardLink.createHardLink(src, dst);
} else {
link0(src.getAbsolutePath(), dst.getAbsolutePath());
}
}
/**
* Ensure that the tests are picking up the modified Hadoop classes
*/
private static void checkOverriddenHadoopClasses() {
List<Class<?>> modifiedHadoopClasses = Arrays.asList(BlockPoolSlice.class, DiskChecker.class,
FileUtil.class, HardLink.class, HttpServer2.class, NameNodeResourceChecker.class, RawLocalFileSystem.class);
for (Class<?> clazz : modifiedHadoopClasses) {
try {
LuceneTestCase.assertNotNull("Field on " + clazz.getCanonicalName() + " should not have been null",
clazz.getField(SOLR_HACK_FOR_CLASS_VERIFICATION_FIELD));
} catch (NoSuchFieldException e) {
LuceneTestCase.fail("Expected to load Solr modified Hadoop class " + clazz.getCanonicalName() +
" , but it was not found.");
}
}
}
public static void link(File src, File dst) throws IOException {
if (!nativeLoaded) {
HardLink.createHardLink(src, dst);
} else {
link0(src.getAbsolutePath(), dst.getAbsolutePath());
}
}
static void linkBlocks(DataNode datanode, File from, File to, int oldLV,
HardLink hl) throws IOException {
boolean upgradeToIdBasedLayout = false;
// If we are upgrading from a version older than the one where we introduced
// block ID-based layout AND we're working with the finalized directory,
// we'll need to upgrade from the old flat layout to the block ID-based one
if (oldLV > DataNodeLayoutVersion.Feature.BLOCKID_BASED_LAYOUT.getInfo().
getLayoutVersion() && to.getName().equals(STORAGE_DIR_FINALIZED)) {
upgradeToIdBasedLayout = true;
}
final ArrayList<LinkArgs> idBasedLayoutSingleLinks = Lists.newArrayList();
linkBlocksHelper(from, to, oldLV, hl, upgradeToIdBasedLayout, to,
idBasedLayoutSingleLinks);
// Detect and remove duplicate entries.
final ArrayList<LinkArgs> duplicates =
findDuplicateEntries(idBasedLayoutSingleLinks);
if (!duplicates.isEmpty()) {
LOG.error("There are " + duplicates.size() + " duplicate block " +
"entries within the same volume.");
removeDuplicateEntries(idBasedLayoutSingleLinks, duplicates);
}
int numLinkWorkers = datanode.getConf().getInt(
DFSConfigKeys.DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS_KEY,
DFSConfigKeys.DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS);
ExecutorService linkWorkers = Executors.newFixedThreadPool(numLinkWorkers);
final int step = idBasedLayoutSingleLinks.size() / numLinkWorkers + 1;
List<Future<Void>> futures = Lists.newArrayList();
for (int i = 0; i < idBasedLayoutSingleLinks.size(); i += step) {
final int iCopy = i;
futures.add(linkWorkers.submit(new Callable<Void>() {
@Override
public Void call() throws IOException {
int upperBound = Math.min(iCopy + step,
idBasedLayoutSingleLinks.size());
for (int j = iCopy; j < upperBound; j++) {
LinkArgs cur = idBasedLayoutSingleLinks.get(j);
NativeIO.link(cur.src, cur.dst);
}
return null;
}
}));
}
linkWorkers.shutdown();
for (Future<Void> f : futures) {
Futures.get(f, IOException.class);
}
}
static void linkBlocks(DataNode datanode, File from, File to, int oldLV,
HardLink hl) throws IOException {
boolean upgradeToIdBasedLayout = false;
// If we are upgrading from a version older than the one where we introduced
// block ID-based layout AND we're working with the finalized directory,
// we'll need to upgrade from the old flat layout to the block ID-based one
if (oldLV > DataNodeLayoutVersion.Feature.BLOCKID_BASED_LAYOUT.getInfo().
getLayoutVersion() && to.getName().equals(STORAGE_DIR_FINALIZED)) {
upgradeToIdBasedLayout = true;
}
final ArrayList<LinkArgs> idBasedLayoutSingleLinks = Lists.newArrayList();
linkBlocksHelper(from, to, oldLV, hl, upgradeToIdBasedLayout, to,
idBasedLayoutSingleLinks);
// Detect and remove duplicate entries.
final ArrayList<LinkArgs> duplicates =
findDuplicateEntries(idBasedLayoutSingleLinks);
if (!duplicates.isEmpty()) {
LOG.error("There are " + duplicates.size() + " duplicate block " +
"entries within the same volume.");
removeDuplicateEntries(idBasedLayoutSingleLinks, duplicates);
}
int numLinkWorkers = datanode.getConf().getInt(
DFSConfigKeys.DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS_KEY,
DFSConfigKeys.DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS);
ExecutorService linkWorkers = Executors.newFixedThreadPool(numLinkWorkers);
final int step = idBasedLayoutSingleLinks.size() / numLinkWorkers + 1;
List<Future<Void>> futures = Lists.newArrayList();
for (int i = 0; i < idBasedLayoutSingleLinks.size(); i += step) {
final int iCopy = i;
futures.add(linkWorkers.submit(new Callable<Void>() {
@Override
public Void call() throws IOException {
int upperBound = Math.min(iCopy + step,
idBasedLayoutSingleLinks.size());
for (int j = iCopy; j < upperBound; j++) {
LinkArgs cur = idBasedLayoutSingleLinks.get(j);
NativeIO.link(cur.src, cur.dst);
}
return null;
}
}));
}
linkWorkers.shutdown();
for (Future<Void> f : futures) {
Futures.get(f, IOException.class);
}
}
/**
* Copies a file as fast as possible. Tries to do a hardlink instead of a copy
* if the hardlink parameter is specified.
*
* @param src
* the source file for copying
* @param dst
* the destination file for copying
* @param hardlink
* whether or not to attempt a hardlink
* @throws IOException
*/
public void copyFile(File src, File dst, boolean hardlink) throws IOException {
if (src == null || dst == null) {
throw new IOException("src/dst file is null");
}
try {
if (hardlink && shouldHardLinkBlockCopy) {
// Remove destination before hard linking, since this file might already
// exist and a hardlink would fail as a result.
if (dst.exists()) {
if(!dst.delete()) {
throw new IOException("Deletion of file : " + dst + " failed");
}
}
HardLink.createHardLink(src, dst);
DataNode.LOG.info("Hard Link Created from : " + src + " to " + dst);
return;
}
} catch (IOException e) {
DataNode.LOG.warn("Hard link failed from : " + src + " to " + dst
+ " continuing with regular file copy");
}
FileChannel input = null;
FileChannel output = null;
try {
// This improves copying performance a lot, it uses native buffers
// for copying.
input = new FileInputStream(src).getChannel();
output = new FileOutputStream(dst).getChannel();
if (input == null || output == null) {
throw new IOException("Could not create file channels for src : " + src
+ " dst : " + dst);
}
long bytesLeft = input.size();
long position = 0;
while (bytesLeft > 0) {
long bytesWritten = output.transferFrom(input, position, bytesLeft);
bytesLeft -= bytesWritten;
position += bytesWritten;
}
if (datanode.syncOnClose) {
output.force(true);
}
} finally {
if (input != null) {
input.close();
}
if (output != null) {
output.close();
}
}
}