下面列出了怎么用org.apache.hadoop.mapreduce.ClusterMetrics的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Get status information about the Map-Reduce cluster.
*
* @return the status information about the Map-Reduce cluster as an object
* of {@link ClusterStatus}.
* @throws IOException
*/
public ClusterStatus getClusterStatus() throws IOException {
try {
return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() {
public ClusterStatus run() throws IOException, InterruptedException {
ClusterMetrics metrics = cluster.getClusterStatus();
return new ClusterStatus(metrics.getTaskTrackerCount(), metrics
.getBlackListedTaskTrackerCount(), cluster
.getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(),
metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
metrics.getReduceSlotCapacity(), cluster.getJobTrackerStatus(),
metrics.getDecommissionedTaskTrackerCount(), metrics
.getGrayListedTaskTrackerCount());
}
});
} catch (InterruptedException ie) {
throw new IOException(ie);
}
}
/**
* Get status information about the Map-Reduce cluster.
*
* @param detailed if true then get a detailed status including the
* tracker names
* @return the status information about the Map-Reduce cluster as an object
* of {@link ClusterStatus}.
* @throws IOException
*/
public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
try {
return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() {
public ClusterStatus run() throws IOException, InterruptedException {
ClusterMetrics metrics = cluster.getClusterStatus();
return new ClusterStatus(arrayToStringList(cluster.getActiveTaskTrackers()),
arrayToBlackListInfo(cluster.getBlackListedTaskTrackers()),
cluster.getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(),
metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
metrics.getReduceSlotCapacity(),
cluster.getJobTrackerStatus());
}
});
} catch (InterruptedException ie) {
throw new IOException(ie);
}
}
/**
* Choose a Machine in runtime according to the cluster status.
*/
private Machine chooseMachine(Configuration conf) throws IOException {
final int parts = conf.getInt(N_PARTS, Integer.MAX_VALUE);
try {
for(;; Thread.sleep(2000)) {
//get cluster status
final ClusterMetrics status = cluster.getClusterStatus();
final int m =
status.getMapSlotCapacity() - status.getOccupiedMapSlots();
final int r =
status.getReduceSlotCapacity() - status.getOccupiedReduceSlots();
if (m >= parts || r >= parts) {
//favor ReduceSide machine
final Machine value = r >= parts?
ReduceSide.INSTANCE: MapSide.INSTANCE;
Util.out.println(" " + this + " is " + value + " (m=" + m + ", r=" + r + ")");
return value;
}
}
} catch (InterruptedException e) {
throw new IOException(e);
}
}
/**
* Get status information about the Map-Reduce cluster.
*
* @return the status information about the Map-Reduce cluster as an object
* of {@link ClusterStatus}.
* @throws IOException
*/
public ClusterStatus getClusterStatus() throws IOException {
try {
return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() {
public ClusterStatus run() throws IOException, InterruptedException {
ClusterMetrics metrics = cluster.getClusterStatus();
return new ClusterStatus(metrics.getTaskTrackerCount(), metrics
.getBlackListedTaskTrackerCount(), cluster
.getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(),
metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
metrics.getReduceSlotCapacity(), cluster.getJobTrackerStatus(),
metrics.getDecommissionedTaskTrackerCount(), metrics
.getGrayListedTaskTrackerCount());
}
});
} catch (InterruptedException ie) {
throw new IOException(ie);
}
}
/**
* Get status information about the Map-Reduce cluster.
*
* @param detailed if true then get a detailed status including the
* tracker names
* @return the status information about the Map-Reduce cluster as an object
* of {@link ClusterStatus}.
* @throws IOException
*/
public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
try {
return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() {
public ClusterStatus run() throws IOException, InterruptedException {
ClusterMetrics metrics = cluster.getClusterStatus();
return new ClusterStatus(arrayToStringList(cluster.getActiveTaskTrackers()),
arrayToBlackListInfo(cluster.getBlackListedTaskTrackers()),
cluster.getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(),
metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
metrics.getReduceSlotCapacity(),
cluster.getJobTrackerStatus());
}
});
} catch (InterruptedException ie) {
throw new IOException(ie);
}
}
/**
* Choose a Machine in runtime according to the cluster status.
*/
private Machine chooseMachine(Configuration conf) throws IOException {
final int parts = conf.getInt(N_PARTS, Integer.MAX_VALUE);
try {
for(;; Thread.sleep(2000)) {
//get cluster status
final ClusterMetrics status = cluster.getClusterStatus();
final int m =
status.getMapSlotCapacity() - status.getOccupiedMapSlots();
final int r =
status.getReduceSlotCapacity() - status.getOccupiedReduceSlots();
if (m >= parts || r >= parts) {
//favor ReduceSide machine
final Machine value = r >= parts?
ReduceSide.INSTANCE: MapSide.INSTANCE;
Util.out.println(" " + this + " is " + value + " (m=" + m + ", r=" + r + ")");
return value;
}
}
} catch (InterruptedException e) {
throw new IOException(e);
}
}
public ClusterMetrics getClusterMetrics() throws IOException,
InterruptedException {
try {
YarnClusterMetrics metrics = client.getYarnClusterMetrics();
ClusterMetrics oldMetrics =
new ClusterMetrics(1, 1, 1, 1, 1, 1,
metrics.getNumNodeManagers() * 10,
metrics.getNumNodeManagers() * 2, 1,
metrics.getNumNodeManagers(), 0, 0);
return oldMetrics;
} catch (YarnException e) {
throw new IOException(e);
}
}
public ClusterMetrics getClusterMetrics() throws IOException,
InterruptedException {
try {
YarnClusterMetrics metrics = client.getYarnClusterMetrics();
ClusterMetrics oldMetrics =
new ClusterMetrics(1, 1, 1, 1, 1, 1,
metrics.getNumNodeManagers() * 10,
metrics.getNumNodeManagers() * 2, 1,
metrics.getNumNodeManagers(), 0, 0);
return oldMetrics;
} catch (YarnException e) {
throw new IOException(e);
}
}
public synchronized ClusterMetrics getClusterMetrics() {
return new ClusterMetrics(totalMaps,
totalReduces, occupiedMapSlots, occupiedReduceSlots,
reservedMapSlots, reservedReduceSlots,
totalMapTaskCapacity, totalReduceTaskCapacity,
totalSubmissions,
taskTrackers.size() - getBlacklistedTrackerCount(),
getBlacklistedTrackerCount(), getExcludedNodes().size()) ;
}
public ClusterMetrics getClusterMetrics() throws IOException,
InterruptedException {
YarnClusterMetrics metrics;
try {
metrics = client.getYarnClusterMetrics();
} catch (YarnException e) {
throw new IOException(e);
}
ClusterMetrics oldMetrics = new ClusterMetrics(1, 1, 1, 1, 1, 1,
metrics.getNumNodeManagers() * 10, metrics.getNumNodeManagers() * 2, 1,
metrics.getNumNodeManagers(), 0, 0);
return oldMetrics;
}
public ClusterMetrics getClusterMetrics() throws IOException,
InterruptedException {
YarnClusterMetrics metrics;
try {
metrics = client.getYarnClusterMetrics();
} catch (YarnException e) {
throw new IOException(e);
}
ClusterMetrics oldMetrics = new ClusterMetrics(1, 1, 1, 1, 1, 1,
metrics.getNumNodeManagers() * 10, metrics.getNumNodeManagers() * 2, 1,
metrics.getNumNodeManagers(), 0, 0);
return oldMetrics;
}
@Override
public ClusterMetrics getClusterMetrics() throws IOException,
InterruptedException {
return resMgrDelegate.getClusterMetrics();
}
public ClusterMetrics getClusterMetrics() {
int numMapTasks = map_tasks.get();
int numReduceTasks = reduce_tasks.get();
return new ClusterMetrics(numMapTasks, numReduceTasks, numMapTasks,
numReduceTasks, 0, 0, 1, 1, jobs.size(), 1, 0, 0);
}
@Override
public ClusterMetrics getClusterMetrics() throws IOException,
InterruptedException {
return resMgrDelegate.getClusterMetrics();
}
public ClusterMetrics getClusterMetrics() {
int numMapTasks = map_tasks.get();
int numReduceTasks = reduce_tasks.get();
return new ClusterMetrics(numMapTasks, numReduceTasks, numMapTasks,
numReduceTasks, 0, 0, 1, 1, jobs.size(), 1, 0, 0);
}
/** {@inheritDoc} */
@Override public ClusterMetrics getClusterMetrics() throws IOException, InterruptedException {
return new ClusterMetrics(0, 0, 0, 0, 0, 0, 1000, 1000, 1, 100, 0, 0);
}
public void testClusterMetrics() throws IOException, InterruptedException {
assertEquals("tasktracker count doesn't match", trackers.length,
client.getClusterStatus().getTaskTrackers());
List<TaskStatus> list = new ArrayList<TaskStatus>();
// create a map task status, which uses 2 slots.
int mapSlotsPerTask = 2;
addMapTaskAttemptToList(list, mapSlotsPerTask, TaskStatus.State.RUNNING);
// create a reduce task status, which uses 1 slot.
int reduceSlotsPerTask = 1;
addReduceTaskAttemptToList(list,
reduceSlotsPerTask, TaskStatus.State.RUNNING);
// create TaskTrackerStatus and send heartbeats
sendHeartbeats(list);
// assert ClusterMetrics
ClusterMetrics metrics = jobTracker.getClusterMetrics();
assertEquals("occupied map slots do not match", mapSlotsPerTask,
metrics.getOccupiedMapSlots());
assertEquals("occupied reduce slots do not match", reduceSlotsPerTask,
metrics.getOccupiedReduceSlots());
assertEquals("map slot capacities do not match",
mapSlotsPerTracker * trackers.length,
metrics.getMapSlotCapacity());
assertEquals("reduce slot capacities do not match",
reduceSlotsPerTracker * trackers.length,
metrics.getReduceSlotCapacity());
assertEquals("running map tasks do not match", 1,
metrics.getRunningMaps());
assertEquals("running reduce tasks do not match", 1,
metrics.getRunningReduces());
// assert the values in ClusterStatus also
ClusterStatus stat = client.getClusterStatus();
assertEquals("running map tasks do not match", 1,
stat.getMapTasks());
assertEquals("running reduce tasks do not match", 1,
stat.getReduceTasks());
assertEquals("map slot capacities do not match",
mapSlotsPerTracker * trackers.length,
stat.getMaxMapTasks());
assertEquals("reduce slot capacities do not match",
reduceSlotsPerTracker * trackers.length,
stat.getMaxReduceTasks());
// send a heartbeat finishing only a map and check
// counts are updated.
list.clear();
addMapTaskAttemptToList(list, mapSlotsPerTask, TaskStatus.State.SUCCEEDED);
addReduceTaskAttemptToList(list,
reduceSlotsPerTask, TaskStatus.State.RUNNING);
sendHeartbeats(list);
metrics = jobTracker.getClusterMetrics();
assertEquals(0, metrics.getOccupiedMapSlots());
assertEquals(reduceSlotsPerTask, metrics.getOccupiedReduceSlots());
// send a heartbeat finishing the reduce task also.
list.clear();
addReduceTaskAttemptToList(list,
reduceSlotsPerTask, TaskStatus.State.SUCCEEDED);
sendHeartbeats(list);
metrics = jobTracker.getClusterMetrics();
assertEquals(0, metrics.getOccupiedReduceSlots());
}
public void testReservedSlots() throws IOException {
JobConf conf = mr.createJobConf();
conf.setNumReduceTasks(1);
conf.setSpeculativeExecution(false);
//Set task tracker objects for reservation.
TaskTracker tt1 = jobTracker.getTaskTracker(trackers[0]);
TaskTracker tt2 = jobTracker.getTaskTracker(trackers[1]);
TaskTrackerStatus status1 = new TaskTrackerStatus(
trackers[0],JobInProgress.convertTrackerNameToHostName(
trackers[0]),0,new ArrayList<TaskStatus>(), 0, 2, 2);
TaskTrackerStatus status2 = new TaskTrackerStatus(
trackers[1],JobInProgress.convertTrackerNameToHostName(
trackers[1]),0,new ArrayList<TaskStatus>(), 0, 2, 2);
tt1.setStatus(status1);
tt2.setStatus(status2);
fakeJob = new FakeJobInProgress(new JobID("jt", 1), new JobConf(conf),
jobTracker);
sendHeartBeat(jobTracker, status1, false, true, trackers[0], responseId);
sendHeartBeat(jobTracker, status2, false, true, trackers[1], responseId);
responseId++;
ClusterMetrics metrics = jobTracker.getClusterMetrics();
assertEquals("reserved map slots do not match",
2, metrics.getReservedMapSlots());
assertEquals("reserved reduce slots do not match",
2, metrics.getReservedReduceSlots());
// redo to test re-reservations.
sendHeartBeat(jobTracker, status1, false, true, trackers[0], responseId);
sendHeartBeat(jobTracker, status2, false, true, trackers[1], responseId);
responseId++;
metrics = jobTracker.getClusterMetrics();
assertEquals("reserved map slots do not match",
4, metrics.getReservedMapSlots());
assertEquals("reserved reduce slots do not match",
4, metrics.getReservedReduceSlots());
// undo reservations now.
scheduler.setUnreserveSlots(true);
sendHeartBeat(jobTracker, status1, false, true, trackers[0], responseId);
sendHeartBeat(jobTracker, status2, false, true, trackers[1], responseId);
responseId++;
metrics = jobTracker.getClusterMetrics();
assertEquals("map slots should have been unreserved",
0, metrics.getReservedMapSlots());
assertEquals("reduce slots should have been unreserved",
0, metrics.getReservedReduceSlots());
}
@Override
public ClusterMetrics getClusterMetrics() throws IOException,
InterruptedException {
throw new UnsupportedOperationException();
}
@Override
public ClusterMetrics getClusterMetrics() throws IOException,
InterruptedException {
return resMgrDelegate.getClusterMetrics();
}
@Override
public ClusterMetrics getClusterMetrics() throws IOException,
InterruptedException {
return resMgrDelegate.getClusterMetrics();
}
/**
* Get the current status of the cluster
*
* @return summary of the state of the cluster
*/
public ClusterMetrics getClusterMetrics()
throws IOException, InterruptedException;
/**
* Get the current status of the cluster
*
* @return summary of the state of the cluster
*/
public ClusterMetrics getClusterMetrics()
throws IOException, InterruptedException;