下面列出了org.apache.hadoop.fs.CacheFlag#org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
static void assertReports(int numDatanodes, DatanodeReportType type,
DFSClient client, List<DataNode> datanodes, String bpid) throws IOException {
final DatanodeInfo[] infos = client.datanodeReport(type);
assertEquals(numDatanodes, infos.length);
final DatanodeStorageReport[] reports = client.getDatanodeStorageReport(type);
assertEquals(numDatanodes, reports.length);
for(int i = 0; i < infos.length; i++) {
assertEquals(infos[i], reports[i].getDatanodeInfo());
final DataNode d = findDatanode(infos[i].getDatanodeUuid(), datanodes);
if (bpid != null) {
//check storage
final StorageReport[] computed = reports[i].getStorageReports();
Arrays.sort(computed, CMP);
final StorageReport[] expected = d.getFSDataset().getStorageReports(bpid);
Arrays.sort(expected, CMP);
assertEquals(expected.length, computed.length);
for(int j = 0; j < expected.length; j++) {
assertEquals(expected[j].getStorage().getStorageID(),
computed[j].getStorage().getStorageID());
}
}
}
}
public HashMap<String, Integer> getNumberOfDataDirsPerHost(){
HashMap<String, Integer> disksPerHost = new HashMap<>();
try {
@SuppressWarnings("resource")
DFSClient dfsClient = new DFSClient(NameNode.getAddress(getConf()), getConf());
DatanodeStorageReport[] datanodeStorageReports = dfsClient.getDatanodeStorageReport(DatanodeReportType.ALL);
for (DatanodeStorageReport datanodeStorageReport : datanodeStorageReports) {
disksPerHost.put(
datanodeStorageReport.getDatanodeInfo().getHostName(),
datanodeStorageReport.getStorageReports().length);
}
} catch (IOException e) {
LOG.warn("number of data directories (disks) per node could not be collected (requieres higher privilegies).");
}
return disksPerHost;
}
static void assertReports(int numDatanodes, DatanodeReportType type,
DFSClient client, List<DataNode> datanodes, String bpid) throws IOException {
final DatanodeInfo[] infos = client.datanodeReport(type);
assertEquals(numDatanodes, infos.length);
final DatanodeStorageReport[] reports = client.getDatanodeStorageReport(type);
assertEquals(numDatanodes, reports.length);
for(int i = 0; i < infos.length; i++) {
assertEquals(infos[i], reports[i].getDatanodeInfo());
final DataNode d = findDatanode(infos[i].getDatanodeUuid(), datanodes);
if (bpid != null) {
//check storage
final StorageReport[] computed = reports[i].getStorageReports();
Arrays.sort(computed, CMP);
final StorageReport[] expected = d.getFSDataset().getStorageReports(bpid);
Arrays.sort(expected, CMP);
assertEquals(expected.length, computed.length);
for(int j = 0; j < expected.length; j++) {
assertEquals(expected[j].getStorage().getStorageID(),
computed[j].getStorage().getStorageID());
}
}
}
}
public DatanodeInfo[] datanodeReport(DatanodeReportType type)
throws IOException {
checkOpen();
TraceScope scope = Trace.startSpan("datanodeReport", traceSampler);
try {
return namenode.getDatanodeReport(type);
} finally {
scope.close();
}
}
public DatanodeStorageReport[] getDatanodeStorageReport(
DatanodeReportType type) throws IOException {
checkOpen();
TraceScope scope =
Trace.startSpan("datanodeStorageReport", traceSampler);
try {
return namenode.getDatanodeStorageReport(type);
} finally {
scope.close();
}
}
/** Handle fsck request */
@Override
public void doGet(HttpServletRequest request, HttpServletResponse response
) throws IOException {
@SuppressWarnings("unchecked")
final Map<String,String[]> pmap = request.getParameterMap();
final PrintWriter out = response.getWriter();
final InetAddress remoteAddress =
InetAddress.getByName(request.getRemoteAddr());
final ServletContext context = getServletContext();
final Configuration conf = NameNodeHttpServer.getConfFromContext(context);
final UserGroupInformation ugi = getUGI(request, conf);
try {
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
NameNode nn = NameNodeHttpServer.getNameNodeFromContext(context);
final FSNamesystem namesystem = nn.getNamesystem();
final BlockManager bm = namesystem.getBlockManager();
final int totalDatanodes =
namesystem.getNumberOfDatanodes(DatanodeReportType.LIVE);
new NamenodeFsck(conf, nn,
bm.getDatanodeManager().getNetworkTopology(), pmap, out,
totalDatanodes, remoteAddress).fsck();
return null;
}
});
} catch (InterruptedException e) {
response.sendError(400, e.getMessage());
}
}
@Override // ClientProtocol
public DatanodeInfo[] getDatanodeReport(DatanodeReportType type)
throws IOException {
checkNNStartup();
DatanodeInfo results[] = namesystem.datanodeReport(type);
return results;
}
@Override // ClientProtocol
public DatanodeStorageReport[] getDatanodeStorageReport(
DatanodeReportType type) throws IOException {
checkNNStartup();
final DatanodeStorageReport[] reports = namesystem.getDatanodeStorageReport(type);
return reports;
}
/** @return list of datanodes where decommissioning is in progress. */
public List<DatanodeDescriptor> getDecommissioningNodes() {
// There is no need to take namesystem reader lock as
// getDatanodeListForReport will synchronize on datanodeMap
// A decommissioning DN may be "alive" or "dead".
return getDatanodeListForReport(DatanodeReportType.DECOMMISSIONING);
}
/** Fetch live and dead datanodes. */
public void fetchDatanodes(final List<DatanodeDescriptor> live,
final List<DatanodeDescriptor> dead, final boolean removeDecommissionNode) {
if (live == null && dead == null) {
throw new HadoopIllegalArgumentException("Both live and dead lists are null");
}
// There is no need to take namesystem reader lock as
// getDatanodeListForReport will synchronize on datanodeMap
final List<DatanodeDescriptor> results =
getDatanodeListForReport(DatanodeReportType.ALL);
for(DatanodeDescriptor node : results) {
if (isDatanodeDead(node)) {
if (dead != null) {
dead.add(node);
}
} else {
if (live != null) {
live.add(node);
}
}
}
if (removeDecommissionNode) {
if (live != null) {
removeDecomNodeFromList(live);
}
if (dead != null) {
removeDecomNodeFromList(dead);
}
}
}
public static DatanodeReportTypeProto
convert(DatanodeReportType t) {
switch (t) {
case ALL: return DatanodeReportTypeProto.ALL;
case LIVE: return DatanodeReportTypeProto.LIVE;
case DEAD: return DatanodeReportTypeProto.DEAD;
case DECOMMISSIONING: return DatanodeReportTypeProto.DECOMMISSIONING;
default:
throw new IllegalArgumentException("Unexpected data type report:" + t);
}
}
public static DatanodeReportType
convert(DatanodeReportTypeProto t) {
switch (t) {
case ALL: return DatanodeReportType.ALL;
case LIVE: return DatanodeReportType.LIVE;
case DEAD: return DatanodeReportType.DEAD;
case DECOMMISSIONING: return DatanodeReportType.DECOMMISSIONING;
default:
throw new IllegalArgumentException("Unexpected data type report:" + t);
}
}
@Override
public DatanodeInfo[] getDatanodeReport(DatanodeReportType type)
throws IOException {
GetDatanodeReportRequestProto req = GetDatanodeReportRequestProto
.newBuilder()
.setType(PBHelper.convert(type)).build();
try {
return PBHelper.convert(
rpcProxy.getDatanodeReport(null, req).getDiList());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public DatanodeStorageReport[] getDatanodeStorageReport(DatanodeReportType type)
throws IOException {
final GetDatanodeStorageReportRequestProto req
= GetDatanodeStorageReportRequestProto.newBuilder()
.setType(PBHelper.convert(type)).build();
try {
return PBHelper.convertDatanodeStorageReports(
rpcProxy.getDatanodeStorageReport(null, req).getDatanodeStorageReportsList());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
private static String getRegisteredDatanodeUid(
MiniDFSCluster cluster, int nnIndex) {
List<DatanodeDescriptor> registeredDatanodes = cluster.getNamesystem(nnIndex)
.getBlockManager().getDatanodeManager()
.getDatanodeListForReport(DatanodeReportType.ALL);
assertEquals(1, registeredDatanodes.size());
return registeredDatanodes.get(0).getDatanodeUuid();
}
private String decommissionNode(FSNamesystem namesystem, DFSClient client,
FileSystem localFileSys, int nodeIndex) throws IOException {
DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
String nodename = info[nodeIndex].getXferAddr();
decommissionNode(namesystem, localFileSys, nodename);
return nodename;
}
/**
* Wait until balanced: each datanode gives utilization within
* BALANCE_ALLOWED_VARIANCE of average
* @throws IOException
* @throws TimeoutException
*/
private void waitForBalancer(long totalUsedSpace, long totalCapacity)
throws IOException, TimeoutException {
long timeout = TIMEOUT;
long failtime = (timeout <= 0L) ? Long.MAX_VALUE
: System.currentTimeMillis() + timeout;
final double avgUtilization = ((double)totalUsedSpace) / totalCapacity;
boolean balanced;
do {
DatanodeInfo[] datanodeReport =
client.getDatanodeReport(DatanodeReportType.ALL);
assertEquals(datanodeReport.length, cluster.getDataNodes().size());
balanced = true;
for (DatanodeInfo datanode : datanodeReport) {
double nodeUtilization = ((double)datanode.getDfsUsed())
/ datanode.getCapacity();
if (Math.abs(avgUtilization - nodeUtilization) >
BALANCE_ALLOWED_VARIANCE) {
balanced = false;
if (System.currentTimeMillis() > failtime) {
throw new TimeoutException(
"Rebalancing expected avg utilization to become "
+ avgUtilization + ", but on datanode " + datanode
+ " it remains at " + nodeUtilization
+ " after more than " + TIMEOUT + " msec.");
}
try {
Thread.sleep(100);
} catch (InterruptedException ignored) {
}
break;
}
}
} while (!balanced);
}
public void testHostsFile(int numNameNodes) throws IOException,
InterruptedException {
int numDatanodes = 1;
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(numNameNodes))
.numDataNodes(numDatanodes).setupHostsFile(true).build();
cluster.waitActive();
// Now empty hosts file and ensure the datanode is disallowed
// from talking to namenode, resulting in it's shutdown.
ArrayList<String>list = new ArrayList<String>();
final String bogusIp = "127.0.30.1";
list.add(bogusIp);
writeConfigFile(hostsFile, list);
for (int j = 0; j < numNameNodes; j++) {
refreshNodes(cluster.getNamesystem(j), conf);
DFSClient client = getDfsClient(cluster.getNameNode(j), conf);
DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
for (int i = 0 ; i < 5 && info.length != 0; i++) {
LOG.info("Waiting for datanode to be marked dead");
Thread.sleep(HEARTBEAT_INTERVAL * 1000);
info = client.datanodeReport(DatanodeReportType.LIVE);
}
assertEquals("Number of live nodes should be 0", 0, info.length);
// Test that non-live and bogus hostnames are considered "dead".
// The dead report should have an entry for (1) the DN that is
// now considered dead because it is no longer allowed to connect
// and (2) the bogus entry in the hosts file (these entries are
// always added last)
info = client.datanodeReport(DatanodeReportType.DEAD);
assertEquals("There should be 2 dead nodes", 2, info.length);
DatanodeID id = cluster.getDataNodes().get(0).getDatanodeId();
assertEquals(id.getHostName(), info[0].getHostName());
assertEquals(bogusIp, info[1].getHostName());
}
}
public DatanodeInfo[] datanodeReport(DatanodeReportType type)
throws IOException {
checkOpen();
TraceScope scope = Trace.startSpan("datanodeReport", traceSampler);
try {
return namenode.getDatanodeReport(type);
} finally {
scope.close();
}
}
public DatanodeStorageReport[] getDatanodeStorageReport(
DatanodeReportType type) throws IOException {
checkOpen();
TraceScope scope =
Trace.startSpan("datanodeStorageReport", traceSampler);
try {
return namenode.getDatanodeStorageReport(type);
} finally {
scope.close();
}
}
/** Handle fsck request */
@Override
public void doGet(HttpServletRequest request, HttpServletResponse response
) throws IOException {
@SuppressWarnings("unchecked")
final Map<String,String[]> pmap = request.getParameterMap();
final PrintWriter out = response.getWriter();
final InetAddress remoteAddress =
InetAddress.getByName(request.getRemoteAddr());
final ServletContext context = getServletContext();
final Configuration conf = NameNodeHttpServer.getConfFromContext(context);
final UserGroupInformation ugi = getUGI(request, conf);
try {
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
NameNode nn = NameNodeHttpServer.getNameNodeFromContext(context);
final FSNamesystem namesystem = nn.getNamesystem();
final BlockManager bm = namesystem.getBlockManager();
final int totalDatanodes =
namesystem.getNumberOfDatanodes(DatanodeReportType.LIVE);
new NamenodeFsck(conf, nn,
bm.getDatanodeManager().getNetworkTopology(), pmap, out,
totalDatanodes, remoteAddress).fsck();
return null;
}
});
} catch (InterruptedException e) {
response.sendError(400, e.getMessage());
}
}
@Override // ClientProtocol
public DatanodeInfo[] getDatanodeReport(DatanodeReportType type)
throws IOException {
checkNNStartup();
DatanodeInfo results[] = namesystem.datanodeReport(type);
return results;
}
@Override // ClientProtocol
public DatanodeStorageReport[] getDatanodeStorageReport(
DatanodeReportType type) throws IOException {
checkNNStartup();
final DatanodeStorageReport[] reports = namesystem.getDatanodeStorageReport(type);
return reports;
}
/** @return list of datanodes where decommissioning is in progress. */
public List<DatanodeDescriptor> getDecommissioningNodes() {
// There is no need to take namesystem reader lock as
// getDatanodeListForReport will synchronize on datanodeMap
// A decommissioning DN may be "alive" or "dead".
return getDatanodeListForReport(DatanodeReportType.DECOMMISSIONING);
}
/** Fetch live and dead datanodes. */
public void fetchDatanodes(final List<DatanodeDescriptor> live,
final List<DatanodeDescriptor> dead, final boolean removeDecommissionNode) {
if (live == null && dead == null) {
throw new HadoopIllegalArgumentException("Both live and dead lists are null");
}
// There is no need to take namesystem reader lock as
// getDatanodeListForReport will synchronize on datanodeMap
final List<DatanodeDescriptor> results =
getDatanodeListForReport(DatanodeReportType.ALL);
for(DatanodeDescriptor node : results) {
if (isDatanodeDead(node)) {
if (dead != null) {
dead.add(node);
}
} else {
if (live != null) {
live.add(node);
}
}
}
if (removeDecommissionNode) {
if (live != null) {
removeDecomNodeFromList(live);
}
if (dead != null) {
removeDecomNodeFromList(dead);
}
}
}
public static DatanodeReportTypeProto
convert(DatanodeReportType t) {
switch (t) {
case ALL: return DatanodeReportTypeProto.ALL;
case LIVE: return DatanodeReportTypeProto.LIVE;
case DEAD: return DatanodeReportTypeProto.DEAD;
case DECOMMISSIONING: return DatanodeReportTypeProto.DECOMMISSIONING;
default:
throw new IllegalArgumentException("Unexpected data type report:" + t);
}
}
public static DatanodeReportType
convert(DatanodeReportTypeProto t) {
switch (t) {
case ALL: return DatanodeReportType.ALL;
case LIVE: return DatanodeReportType.LIVE;
case DEAD: return DatanodeReportType.DEAD;
case DECOMMISSIONING: return DatanodeReportType.DECOMMISSIONING;
default:
throw new IllegalArgumentException("Unexpected data type report:" + t);
}
}
@Override
public DatanodeInfo[] getDatanodeReport(DatanodeReportType type)
throws IOException {
GetDatanodeReportRequestProto req = GetDatanodeReportRequestProto
.newBuilder()
.setType(PBHelper.convert(type)).build();
try {
return PBHelper.convert(
rpcProxy.getDatanodeReport(null, req).getDiList());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public DatanodeStorageReport[] getDatanodeStorageReport(DatanodeReportType type)
throws IOException {
final GetDatanodeStorageReportRequestProto req
= GetDatanodeStorageReportRequestProto.newBuilder()
.setType(PBHelper.convert(type)).build();
try {
return PBHelper.convertDatanodeStorageReports(
rpcProxy.getDatanodeStorageReport(null, req).getDatanodeStorageReportsList());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
private static String getRegisteredDatanodeUid(
MiniDFSCluster cluster, int nnIndex) {
List<DatanodeDescriptor> registeredDatanodes = cluster.getNamesystem(nnIndex)
.getBlockManager().getDatanodeManager()
.getDatanodeListForReport(DatanodeReportType.ALL);
assertEquals(1, registeredDatanodes.size());
return registeredDatanodes.get(0).getDatanodeUuid();
}