下面列出了org.apache.hadoop.fs.CacheFlag#org.apache.hadoop.hdfs.server.datanode.DataNode 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void doGet(HttpServletRequest request, HttpServletResponse response
) throws ServletException, IOException {
final PrintWriter out = response.getWriter();
final String path = ServletUtil.getDecodedPath(request, "/getFileChecksum");
final XMLOutputter xml = new XMLOutputter(out, "UTF-8");
xml.declaration();
final ServletContext context = getServletContext();
final DataNode datanode = (DataNode) context.getAttribute("datanode");
final Configuration conf =
new HdfsConfiguration(datanode.getConf());
try {
final DFSClient dfs = DatanodeJspHelper.getDFSClient(request,
datanode, conf, getUGI(request, conf));
final MD5MD5CRC32FileChecksum checksum = dfs.getFileChecksum(path, Long.MAX_VALUE);
MD5MD5CRC32FileChecksum.write(xml, checksum);
} catch(IOException ioe) {
writeXml(ioe, path, xml);
} catch (InterruptedException e) {
writeXml(e, path, xml);
}
xml.endDocument();
}
/**
* Multiple-NameNode version of injectBlocks.
*/
public void injectBlocks(int nameNodeIndex, int dataNodeIndex,
Iterable<Block> blocksToInject) throws IOException {
if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) {
throw new IndexOutOfBoundsException();
}
final DataNode dn = dataNodes.get(dataNodeIndex).datanode;
final FsDatasetSpi<?> dataSet = DataNodeTestUtils.getFSDataset(dn);
if (!(dataSet instanceof SimulatedFSDataset)) {
throw new IOException("injectBlocks is valid only for SimilatedFSDataset");
}
String bpid = getNamesystem(nameNodeIndex).getBlockPoolId();
SimulatedFSDataset sdataset = (SimulatedFSDataset) dataSet;
sdataset.injectBlocks(bpid, blocksToInject);
dataNodes.get(dataNodeIndex).datanode.scheduleAllBlockReport(0);
}
/**
* Remove a block from disk
* @param blockFile block file
* @param metaFile block meta file
* @param b a block
* @return true if on-disk files are deleted; false otherwise
*/
private boolean delBlockFromDisk(File blockFile, File metaFile, Block b) {
if (blockFile == null) {
DataNode.LOG.warn("No file exists for block: " + b);
return true;
}
if (!blockFile.delete()) {
DataNode.LOG.warn("Not able to delete the block file: " + blockFile);
return false;
} else { // remove the meta file
if (metaFile != null && !metaFile.delete()) {
DataNode.LOG.warn(
"Not able to delete the meta block file: " + metaFile);
return false;
}
}
return true;
}
/**
* Multiple-NameNode version of injectBlocks.
*/
public void injectBlocks(int nameNodeIndex, int dataNodeIndex,
Iterable<Block> blocksToInject) throws IOException {
if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) {
throw new IndexOutOfBoundsException();
}
final DataNode dn = dataNodes.get(dataNodeIndex).datanode;
final FsDatasetSpi<?> dataSet = DataNodeTestUtils.getFSDataset(dn);
if (!(dataSet instanceof SimulatedFSDataset)) {
throw new IOException("injectBlocks is valid only for SimilatedFSDataset");
}
String bpid = getNamesystem(nameNodeIndex).getBlockPoolId();
SimulatedFSDataset sdataset = (SimulatedFSDataset) dataSet;
sdataset.injectBlocks(bpid, blocksToInject);
dataNodes.get(dataNodeIndex).datanode.scheduleAllBlockReport(0);
}
/**
* create a heartbeat packet
*/
Packet() {
this.lastPacketInBlock = false;
this.numChunks = 0;
this.offsetInBlock = 0;
this.seqno = HEART_BEAT_SEQNO;
buffer = null;
int packetSize = DataNode.PKT_HEADER_LEN + DFSClient.SIZE_OF_INTEGER;
buf = new byte[packetSize];
checksumStart = dataStart = packetSize;
checksumPos = checksumStart;
dataPos = dataStart;
maxChunks = 0;
}
/** Find the metadata file for the specified block file.
* Return the generation stamp from the name of the metafile.
*/
static long getGenerationStampFromFile(String[] listdir, String blockName) {
for (int j = 0; j < listdir.length; j++) {
String path = listdir[j];
if (!path.startsWith(blockName)) {
continue;
}
String[] vals = StringUtils.split(path, '_');
if (vals.length != 3) { // blk, blkid, genstamp.meta
continue;
}
String[] str = StringUtils.split(vals[2], '.');
if (str.length != 2) {
continue;
}
return Long.parseLong(str[0]);
}
DataNode.LOG.warn("Block " + blockName +
" does not have a metafile!");
return Block.GRANDFATHER_GENERATION_STAMP;
}
private InetSocketAddress getArbitraryLocalHostAddr()
throws UnknownHostException{
Random rand = new Random(System.currentTimeMillis());
int port = rand.nextInt(65535);
while (true) {
boolean conflict = false;
for (DataNode d : datanodes) {
if (d.getXferAddress().getPort() == port) {
port = rand.nextInt(65535);
conflict = true;
}
}
if (conflict == false) {
break;
}
}
return new InetSocketAddress(InetAddress.getLocalHost(), port);
}
@Test
public void testClose() throws Exception {
MiniDFSCluster cluster = new MiniDFSCluster.Builder(new HdfsConfiguration()).build();
try {
cluster.waitActive();
DataNode dn = cluster.getDataNodes().get(0);
FsDatasetImpl dataSet = (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn);
// set up replicasMap
String bpid = cluster.getNamesystem().getBlockPoolId();
ExtendedBlock[] blocks = setup(bpid, dataSet);
// test close
testClose(dataSet, blocks);
} finally {
cluster.shutdown();
}
}
/**
* Start the data-node.
*/
public DataNode startDataNode(int index, Configuration config)
throws IOException {
String dataDir = System.getProperty("test.build.data");
File dataNodeDir = new File(dataDir, "data-" + index);
config.set("dfs.data.dir", dataNodeDir.getPath());
String[] args = new String[] {};
// NameNode will modify config with the ports it bound to
return DataNode.createDataNode(args, config);
}
protected DFSClient getDFSClient(HttpServletRequest request)
throws IOException, InterruptedException {
final Configuration conf =
(Configuration) getServletContext().getAttribute(JspHelper.CURRENT_CONF);
UserGroupInformation ugi = getUGI(request, conf);
final ServletContext context = getServletContext();
final DataNode datanode = (DataNode) context.getAttribute("datanode");
return DatanodeJspHelper.getDFSClient(request, datanode, conf, ugi);
}
/**
* Shutdown all DataNodes started by this class. The NameNode
* is left running so that new DataNodes may be started.
*/
public void shutdownDataNodes() {
for (int i = dataNodes.size()-1; i >= 0; i--) {
System.out.println("Shutting down DataNode " + i);
DataNode dn = dataNodes.remove(i).datanode;
dn.shutdown();
numDataNodes--;
}
}
/**
* check if a data directory is healthy
* if some volumes failed - make sure to remove all the blocks that belong
* to these volumes
* @throws DiskErrorException
*/
public void checkDataDir() throws DiskErrorException {
long total_blocks=0, removed_blocks=0;
List<FSVolume> failed_vols = null;
lock.readLock().lock();
try {
failed_vols = volumes.checkDirs();
} finally {
lock.readLock().unlock();
}
//if there no failed volumes return
if(failed_vols == null)
return;
// else
// remove related blocks
long mlsec = System.currentTimeMillis();
lock.writeLock().lock();
try {
volumeMap.removeUnhealthyVolumes(failed_vols);
} finally {
lock.writeLock().unlock();
}
mlsec = System.currentTimeMillis() - mlsec;
DataNode.LOG.warn(">>>>>>>>>>>>Removed " + removed_blocks + " out of " + total_blocks +
"(took " + mlsec + " millisecs)");
// report the error
StringBuilder sb = new StringBuilder();
for(FSVolume fv : failed_vols) {
sb.append(fv.toString() + ";");
}
throw new DiskErrorException("DataNode failed volumes:" + sb);
}
/**
* Verify JobTracker port usage.
*/
public void testJobTrackerPorts() throws Exception {
NameNode nn = null;
DataNode dn = null;
try {
nn = hdfs.startNameNode();
setDataNodePorts(hdfs.getConfig());
dn = hdfs.startDataNode(1, hdfs.getConfig());
// start job tracker on the same port as name-node
JobConf conf2 = new JobConf(hdfs.getConfig());
conf2.set("mapred.job.tracker",
FileSystem.getDefaultUri(hdfs.getConfig()).toString());
conf2.set("mapred.job.tracker.http.address",
TestHDFSServerPorts.NAME_NODE_HTTP_HOST + 0);
boolean started = canStartJobTracker(conf2);
assertFalse(started); // should fail
// bind http server to the same port as name-node
conf2.set("mapred.job.tracker", TestHDFSServerPorts.NAME_NODE_HOST + 0);
conf2.set("mapred.job.tracker.http.address",
hdfs.getConfig().get("dfs.http.address"));
started = canStartJobTracker(conf2);
assertFalse(started); // should fail again
// both ports are different from the name-node ones
conf2.set("mapred.job.tracker", TestHDFSServerPorts.NAME_NODE_HOST + 0);
conf2.set("mapred.job.tracker.http.address",
TestHDFSServerPorts.NAME_NODE_HTTP_HOST + 0);
started = canStartJobTracker(conf2);
assertTrue(started); // should start now
} finally {
hdfs.stopDataNode(dn);
hdfs.stopNameNode(nn);
}
}
public static void waitForDatanodeDeath(DataNode dn)
throws InterruptedException, TimeoutException {
final int ATTEMPTS = 10;
int count = 0;
do {
Thread.sleep(1000);
count++;
} while (dn.isDatanodeUp() && count < ATTEMPTS);
if (count == ATTEMPTS) {
throw new TimeoutException("Timed out waiting for DN to die");
}
}
private void testDataNodeRedirect(Path path) throws IOException {
// Create the file
if (hdfs.exists(path)) {
hdfs.delete(path, true);
}
FSDataOutputStream out = hdfs.create(path, (short) 1);
out.writeBytes("0123456789");
out.close();
// Get the path's block location so we can determine
// if we were redirected to the right DN.
BlockLocation[] locations = hdfs.getFileBlockLocations(path, 0, 10);
String xferAddr = locations[0].getNames()[0];
// Connect to the NN to get redirected
URL u = hftpFs.getNamenodeURL(
"/data" + ServletUtil.encodePath(path.toUri().getPath()),
"ugi=userx,groupy");
HttpURLConnection conn = (HttpURLConnection) u.openConnection();
HttpURLConnection.setFollowRedirects(true);
conn.connect();
conn.getInputStream();
boolean checked = false;
// Find the datanode that has the block according to locations
// and check that the URL was redirected to this DN's info port
for (DataNode node : cluster.getDataNodes()) {
DatanodeRegistration dnR = DataNodeTestUtils.getDNRegistrationForBP(node,
blockPoolId);
if (dnR.getXferAddr().equals(xferAddr)) {
checked = true;
assertEquals(dnR.getInfoPort(), conn.getURL().getPort());
}
}
assertTrue("The test never checked that location of "
+ "the block and hftp desitnation are the same", checked);
}
/**
* Add a namenode to cluster and start it. Configuration of datanodes
* in the cluster is refreshed to register with the new namenode.
* @return newly started namenode
*/
public NameNode addNameNode(Configuration conf, int namenodePort)
throws IOException {
if(!federation) {
throw new IOException("cannot add namenode to non-federated cluster");
}
int nnIndex = nameNodes.length;
int numNameNodes = nameNodes.length + 1;
NameNodeInfo[] newlist = new NameNodeInfo[numNameNodes];
System.arraycopy(nameNodes, 0, newlist, 0, nameNodes.length);
nameNodes = newlist;
String nameserviceId = NAMESERVICE_ID_PREFIX + getNSId();
String nameserviceIds = conf.get(FSConstants.DFS_FEDERATION_NAMESERVICES);
nameserviceIds += "," + nameserviceId;
conf.set(FSConstants.DFS_FEDERATION_NAMESERVICES, nameserviceIds);
initFederatedNamenodeAddress(conf, nameserviceId, namenodePort);
createFederatedNameNode(nnIndex, conf, numDataNodes, true, true,
null, nameserviceId);
// Refresh datanodes with the newly started namenode
for (DataNodeProperties dn : dataNodes) {
DataNode datanode = dn.datanode;
datanode.refreshNamenodes(conf);
}
// Wait for new namenode to get registrations from all the datanodes
waitActive(true, nnIndex);
return nameNodes[nnIndex].nameNode;
}
/**
* Check whether the datanode can be started.
*/
private boolean canStartDataNode(Configuration conf) throws IOException {
DataNode dn = null;
try {
dn = DataNode.createDataNode(new String[]{}, conf);
} catch(IOException e) {
if (e instanceof java.net.BindException)
return false;
throw e;
} finally {
if(dn != null) dn.shutdown();
}
return true;
}
public static LocatedBlock getLastLocatedBlock(
ClientProtocol namenode, String src
) throws IOException {
//get block info for the last block
LocatedBlocks locations = namenode.getBlockLocations(src, 0, Long.MAX_VALUE);
List<LocatedBlock> blocks = locations.getLocatedBlocks();
DataNode.LOG.info("blocks.size()=" + blocks.size());
assertTrue(blocks.size() > 0);
return blocks.get(blocks.size() - 1);
}
/**
* Test recovery on restart OOB message. It also tests the delivery of
* OOB ack originating from the primary datanode. Since there is only
* one node in the cluster, failure of restart-recovery will fail the
* test.
*/
@Test
public void testPipelineRecoveryOnOOB() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY, "15");
MiniDFSCluster cluster = null;
try {
int numDataNodes = 1;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
cluster.waitActive();
FileSystem fileSys = cluster.getFileSystem();
Path file = new Path("dataprotocol2.dat");
DFSTestUtil.createFile(fileSys, file, 10240L, (short)1, 0L);
DFSOutputStream out = (DFSOutputStream)(fileSys.append(file).
getWrappedStream());
out.write(1);
out.hflush();
DFSAdmin dfsadmin = new DFSAdmin(conf);
DataNode dn = cluster.getDataNodes().get(0);
final String dnAddr = dn.getDatanodeId().getIpcAddr(false);
// issue shutdown to the datanode.
final String[] args1 = {"-shutdownDatanode", dnAddr, "upgrade" };
Assert.assertEquals(0, dfsadmin.run(args1));
// Wait long enough to receive an OOB ack before closing the file.
Thread.sleep(4000);
// Retart the datanode
cluster.restartDataNode(0, true);
// The following forces a data packet and end of block packets to be sent.
out.close();
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
/**
* Restart a datanode
* @param dnprop datanode's property
* @return true if restarting is successful
* @throws IOException
*/
public synchronized boolean restartDataNode(DataNodeProperties dnprop)
throws IOException {
Configuration conf = dnprop.conf;
String[] args = dnprop.dnArgs;
Configuration newconf = new Configuration(conf); // save cloned config
dataNodes.add(new DataNodeProperties(
DataNode.createDataNode(args, conf),
newconf, args));
numDataNodes++;
return true;
}
/**
* Gets a list of the started DataNodes. May be empty.
*/
public ArrayList<DataNode> getDataNodes() {
ArrayList<DataNode> list = new ArrayList<DataNode>();
for (int i = 0; i < dataNodes.size(); i++) {
DataNode node = dataNodes.get(i).datanode;
list.add(node);
}
return list;
}
/** @return the datanode having the ipc server listen port */
public DataNode getDataNode(int ipcPort) {
for(DataNode dn : getDataNodes()) {
if (dn.ipcServer.getListenerAddress().getPort() == ipcPort) {
return dn;
}
}
return null;
}
/**
* Shutdown all DataNodes started by this class. The NameNode
* is left running so that new DataNodes may be started.
*/
public void shutdownDataNodes() {
for (int i = dataNodes.size()-1; i >= 0; i--) {
LOG.info("Shutting down DataNode " + i);
DataNode dn = dataNodes.remove(i).datanode;
dn.shutdown();
numDataNodes--;
}
}
public synchronized DataNodeProperties stopDataNode(int i) {
if (i < 0 || i >= dataNodes.size()) {
return null;
}
DataNodeProperties dnprop = dataNodes.remove(i);
DataNode dn = dnprop.datanode;
LOG.info("MiniDFSCluster Stopping DataNode " +
dn.getDisplayName() +
" from a total of " + (dataNodes.size() + 1) +
" datanodes.");
dn.shutdown();
numDataNodes--;
return dnprop;
}
private static void initLoggers() {
((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
((Log4JLogger) LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL);
((Log4JLogger) DataNode.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger) TestFiPipelines.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger) DFSClient.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger) FiTestUtil.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger) BlockReceiverAspects.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger) DFSClientAspects.LOG).getLogger().setLevel(Level.ALL);
}
/**
* Returns the current set of datanodes
*/
DataNode[] listDataNodes() {
DataNode[] list = new DataNode[dataNodes.size()];
for (int i = 0; i < dataNodes.size(); i++) {
list[i] = dataNodes.get(i).datanode;
}
return list;
}
public synchronized DataNodeProperties stopDataNode(String dnName) {
int node = -1;
for (int i = 0; i < dataNodes.size(); i++) {
DataNode dn = dataNodes.get(i).datanode;
LOG.info("DN name=" + dnName + " found DN=" + dn +
" with name=" + dn.getDisplayName());
if (dnName.equals(dn.getDatanodeId().getXferAddr())) {
node = i;
break;
}
}
return stopDataNode(node);
}
private void computePacketChunkSize(int psize, int csize) {
int chunkSize = csize + checksum.getChecksumSize();
int n = DataNode.PKT_HEADER_LEN + DFSClient.SIZE_OF_INTEGER;
chunksPerPacket = Math.max((psize - n + chunkSize-1)/chunkSize, 1);
packetSize = n + chunkSize*chunksPerPacket;
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("computePacketChunkSize: src=" + src +
", chunkSize=" + chunkSize +
", chunksPerPacket=" + chunksPerPacket +
", packetSize=" + packetSize);
}
}
/**
* Corrupt a block on a data node. Replace the block file content with content
* of 1, 2, ...BLOCK_SIZE.
*
* @param block
* the ExtendedBlock to be corrupted
* @param dn
* the data node where the block needs to be corrupted
* @throws FileNotFoundException
* @throws IOException
*/
private static void corruptBlock(final ExtendedBlock block, final DataNode dn)
throws FileNotFoundException, IOException {
final File f = DataNodeTestUtils.getBlockFile(
dn, block.getBlockPoolId(), block.getLocalBlock());
final RandomAccessFile raFile = new RandomAccessFile(f, "rw");
final byte[] bytes = new byte[(int) BLOCK_SIZE];
for (int i = 0; i < BLOCK_SIZE; i++) {
bytes[i] = (byte) (i);
}
raFile.write(bytes);
raFile.close();
}
void getBlocksBeingWrittenInfo(LightWeightHashSet<Block> blockSet) {
if (rbwDir == null) {
return;
}
File[] blockFiles = rbwDir.listFiles();
if (blockFiles == null) {
return;
}
String[] blockFileNames = getFileNames(blockFiles);
for (int i = 0; i < blockFiles.length; i++) {
if (!blockFiles[i].isDirectory()) {
// get each block in the rbwDir direcotry
if (Block.isBlockFilename(blockFileNames[i])) {
long genStamp = FSDataset.getGenerationStampFromFile(
blockFileNames, blockFileNames[i]);
Block block =
new Block(blockFiles[i], blockFiles[i].length(), genStamp);
// add this block to block set
blockSet.add(block);
if (DataNode.LOG.isDebugEnabled()) {
DataNode.LOG.debug("recoverBlocksBeingWritten for block " + block);
}
}
}
}
}