类org.apache.hadoop.mapreduce.ClusterMetrics源码实例Demo

下面列出了怎么用org.apache.hadoop.mapreduce.ClusterMetrics的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: hadoop   文件: JobClient.java
/**
 * Get status information about the Map-Reduce cluster.
 *  
 * @return the status information about the Map-Reduce cluster as an object
 *         of {@link ClusterStatus}.
 * @throws IOException
 */
public ClusterStatus getClusterStatus() throws IOException {
  try {
    return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() {
      public ClusterStatus run() throws IOException, InterruptedException {
        ClusterMetrics metrics = cluster.getClusterStatus();
        return new ClusterStatus(metrics.getTaskTrackerCount(), metrics
          .getBlackListedTaskTrackerCount(), cluster
          .getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(),
          metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
          metrics.getReduceSlotCapacity(), cluster.getJobTrackerStatus(),
          metrics.getDecommissionedTaskTrackerCount(), metrics
            .getGrayListedTaskTrackerCount());
      }
    });
  } catch (InterruptedException ie) {
    throw new IOException(ie);
  }
}
 
源代码2 项目: hadoop   文件: JobClient.java
/**
 * Get status information about the Map-Reduce cluster.
 *  
 * @param  detailed if true then get a detailed status including the
 *         tracker names
 * @return the status information about the Map-Reduce cluster as an object
 *         of {@link ClusterStatus}.
 * @throws IOException
 */
public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
  try {
    return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() {
      public ClusterStatus run() throws IOException, InterruptedException {
      ClusterMetrics metrics = cluster.getClusterStatus();
      return new ClusterStatus(arrayToStringList(cluster.getActiveTaskTrackers()),
        arrayToBlackListInfo(cluster.getBlackListedTaskTrackers()),
        cluster.getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(),
        metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
        metrics.getReduceSlotCapacity(), 
        cluster.getJobTrackerStatus());
      }
    });
  } catch (InterruptedException ie) {
    throw new IOException(ie);
  }
}
 
源代码3 项目: hadoop   文件: DistSum.java
/**
 * Choose a Machine in runtime according to the cluster status.
 */
private Machine chooseMachine(Configuration conf) throws IOException {
  final int parts = conf.getInt(N_PARTS, Integer.MAX_VALUE);
  try {
    for(;; Thread.sleep(2000)) {
      //get cluster status
      final ClusterMetrics status = cluster.getClusterStatus();
      final int m = 
        status.getMapSlotCapacity() - status.getOccupiedMapSlots();
      final int r = 
        status.getReduceSlotCapacity() - status.getOccupiedReduceSlots();
      if (m >= parts || r >= parts) {
        //favor ReduceSide machine
        final Machine value = r >= parts?
            ReduceSide.INSTANCE: MapSide.INSTANCE;
        Util.out.println("  " + this + " is " + value + " (m=" + m + ", r=" + r + ")");
        return value;
      }
    }
  } catch (InterruptedException e) {
    throw new IOException(e);
  }    
}
 
源代码4 项目: big-c   文件: JobClient.java
/**
 * Get status information about the Map-Reduce cluster.
 *  
 * @return the status information about the Map-Reduce cluster as an object
 *         of {@link ClusterStatus}.
 * @throws IOException
 */
public ClusterStatus getClusterStatus() throws IOException {
  try {
    return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() {
      public ClusterStatus run() throws IOException, InterruptedException {
        ClusterMetrics metrics = cluster.getClusterStatus();
        return new ClusterStatus(metrics.getTaskTrackerCount(), metrics
          .getBlackListedTaskTrackerCount(), cluster
          .getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(),
          metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
          metrics.getReduceSlotCapacity(), cluster.getJobTrackerStatus(),
          metrics.getDecommissionedTaskTrackerCount(), metrics
            .getGrayListedTaskTrackerCount());
      }
    });
  } catch (InterruptedException ie) {
    throw new IOException(ie);
  }
}
 
源代码5 项目: big-c   文件: JobClient.java
/**
 * Get status information about the Map-Reduce cluster.
 *  
 * @param  detailed if true then get a detailed status including the
 *         tracker names
 * @return the status information about the Map-Reduce cluster as an object
 *         of {@link ClusterStatus}.
 * @throws IOException
 */
public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
  try {
    return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() {
      public ClusterStatus run() throws IOException, InterruptedException {
      ClusterMetrics metrics = cluster.getClusterStatus();
      return new ClusterStatus(arrayToStringList(cluster.getActiveTaskTrackers()),
        arrayToBlackListInfo(cluster.getBlackListedTaskTrackers()),
        cluster.getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(),
        metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
        metrics.getReduceSlotCapacity(), 
        cluster.getJobTrackerStatus());
      }
    });
  } catch (InterruptedException ie) {
    throw new IOException(ie);
  }
}
 
源代码6 项目: big-c   文件: DistSum.java
/**
 * Choose a Machine in runtime according to the cluster status.
 */
private Machine chooseMachine(Configuration conf) throws IOException {
  final int parts = conf.getInt(N_PARTS, Integer.MAX_VALUE);
  try {
    for(;; Thread.sleep(2000)) {
      //get cluster status
      final ClusterMetrics status = cluster.getClusterStatus();
      final int m = 
        status.getMapSlotCapacity() - status.getOccupiedMapSlots();
      final int r = 
        status.getReduceSlotCapacity() - status.getOccupiedReduceSlots();
      if (m >= parts || r >= parts) {
        //favor ReduceSide machine
        final Machine value = r >= parts?
            ReduceSide.INSTANCE: MapSide.INSTANCE;
        Util.out.println("  " + this + " is " + value + " (m=" + m + ", r=" + r + ")");
        return value;
      }
    }
  } catch (InterruptedException e) {
    throw new IOException(e);
  }    
}
 
源代码7 项目: hadoop   文件: ResourceMgrDelegate.java
public ClusterMetrics getClusterMetrics() throws IOException,
    InterruptedException {
  try {
    YarnClusterMetrics metrics = client.getYarnClusterMetrics();
    ClusterMetrics oldMetrics =
        new ClusterMetrics(1, 1, 1, 1, 1, 1,
            metrics.getNumNodeManagers() * 10,
            metrics.getNumNodeManagers() * 2, 1,
            metrics.getNumNodeManagers(), 0, 0);
    return oldMetrics;
  } catch (YarnException e) {
    throw new IOException(e);
  }
}
 
源代码8 项目: big-c   文件: ResourceMgrDelegate.java
public ClusterMetrics getClusterMetrics() throws IOException,
    InterruptedException {
  try {
    YarnClusterMetrics metrics = client.getYarnClusterMetrics();
    ClusterMetrics oldMetrics =
        new ClusterMetrics(1, 1, 1, 1, 1, 1,
            metrics.getNumNodeManagers() * 10,
            metrics.getNumNodeManagers() * 2, 1,
            metrics.getNumNodeManagers(), 0, 0);
    return oldMetrics;
  } catch (YarnException e) {
    throw new IOException(e);
  }
}
 
源代码9 项目: RDFS   文件: JobTracker.java
public synchronized ClusterMetrics getClusterMetrics() {
  return new ClusterMetrics(totalMaps,
    totalReduces, occupiedMapSlots, occupiedReduceSlots,
    reservedMapSlots, reservedReduceSlots,
    totalMapTaskCapacity, totalReduceTaskCapacity,
    totalSubmissions,
    taskTrackers.size() - getBlacklistedTrackerCount(),
    getBlacklistedTrackerCount(), getExcludedNodes().size()) ;
}
 
源代码10 项目: incubator-tez   文件: ResourceMgrDelegate.java
public ClusterMetrics getClusterMetrics() throws IOException,
    InterruptedException {
  YarnClusterMetrics metrics;
  try {
    metrics = client.getYarnClusterMetrics();
  } catch (YarnException e) {
    throw new IOException(e);
  }
  ClusterMetrics oldMetrics = new ClusterMetrics(1, 1, 1, 1, 1, 1, 
      metrics.getNumNodeManagers() * 10, metrics.getNumNodeManagers() * 2, 1,
      metrics.getNumNodeManagers(), 0, 0);
  return oldMetrics;
}
 
源代码11 项目: tez   文件: ResourceMgrDelegate.java
public ClusterMetrics getClusterMetrics() throws IOException,
    InterruptedException {
  YarnClusterMetrics metrics;
  try {
    metrics = client.getYarnClusterMetrics();
  } catch (YarnException e) {
    throw new IOException(e);
  }
  ClusterMetrics oldMetrics = new ClusterMetrics(1, 1, 1, 1, 1, 1, 
      metrics.getNumNodeManagers() * 10, metrics.getNumNodeManagers() * 2, 1,
      metrics.getNumNodeManagers(), 0, 0);
  return oldMetrics;
}
 
源代码12 项目: hadoop   文件: YARNRunner.java
@Override
public ClusterMetrics getClusterMetrics() throws IOException,
    InterruptedException {
  return resMgrDelegate.getClusterMetrics();
}
 
源代码13 项目: hadoop   文件: LocalJobRunner.java
public ClusterMetrics getClusterMetrics() {
  int numMapTasks = map_tasks.get();
  int numReduceTasks = reduce_tasks.get();
  return new ClusterMetrics(numMapTasks, numReduceTasks, numMapTasks,
      numReduceTasks, 0, 0, 1, 1, jobs.size(), 1, 0, 0);
}
 
源代码14 项目: big-c   文件: YARNRunner.java
@Override
public ClusterMetrics getClusterMetrics() throws IOException,
    InterruptedException {
  return resMgrDelegate.getClusterMetrics();
}
 
源代码15 项目: big-c   文件: LocalJobRunner.java
public ClusterMetrics getClusterMetrics() {
  int numMapTasks = map_tasks.get();
  int numReduceTasks = reduce_tasks.get();
  return new ClusterMetrics(numMapTasks, numReduceTasks, numMapTasks,
      numReduceTasks, 0, 0, 1, 1, jobs.size(), 1, 0, 0);
}
 
源代码16 项目: ignite   文件: HadoopClientProtocol.java
/** {@inheritDoc} */
@Override public ClusterMetrics getClusterMetrics() throws IOException, InterruptedException {
    return new ClusterMetrics(0, 0, 0, 0, 0, 0, 1000, 1000, 1, 100, 0, 0);
}
 
源代码17 项目: RDFS   文件: TestClusterStatus.java
public void testClusterMetrics() throws IOException, InterruptedException {
  assertEquals("tasktracker count doesn't match", trackers.length,
    client.getClusterStatus().getTaskTrackers());
  
  List<TaskStatus> list = new ArrayList<TaskStatus>();

  // create a map task status, which uses 2 slots. 
  int mapSlotsPerTask = 2;
  addMapTaskAttemptToList(list, mapSlotsPerTask, TaskStatus.State.RUNNING);
  
  // create a reduce task status, which uses 1 slot.
  int reduceSlotsPerTask = 1;
  addReduceTaskAttemptToList(list, 
      reduceSlotsPerTask, TaskStatus.State.RUNNING);
  
  // create TaskTrackerStatus and send heartbeats
  sendHeartbeats(list);

  // assert ClusterMetrics
  ClusterMetrics metrics = jobTracker.getClusterMetrics();
  assertEquals("occupied map slots do not match", mapSlotsPerTask,
    metrics.getOccupiedMapSlots());
  assertEquals("occupied reduce slots do not match", reduceSlotsPerTask,
    metrics.getOccupiedReduceSlots());
  assertEquals("map slot capacities do not match",
    mapSlotsPerTracker * trackers.length,
    metrics.getMapSlotCapacity());
  assertEquals("reduce slot capacities do not match",
    reduceSlotsPerTracker * trackers.length,
    metrics.getReduceSlotCapacity());
  assertEquals("running map tasks do not match", 1,
    metrics.getRunningMaps());
  assertEquals("running reduce tasks do not match", 1,
    metrics.getRunningReduces());
  
  // assert the values in ClusterStatus also
  ClusterStatus stat = client.getClusterStatus();
  assertEquals("running map tasks do not match", 1,
    stat.getMapTasks());
  assertEquals("running reduce tasks do not match", 1,
    stat.getReduceTasks());
  assertEquals("map slot capacities do not match",
    mapSlotsPerTracker * trackers.length,
    stat.getMaxMapTasks());
  assertEquals("reduce slot capacities do not match",
    reduceSlotsPerTracker * trackers.length,
    stat.getMaxReduceTasks());
  
  // send a heartbeat finishing only a map and check
  // counts are updated.
  list.clear();
  addMapTaskAttemptToList(list, mapSlotsPerTask, TaskStatus.State.SUCCEEDED);
  addReduceTaskAttemptToList(list, 
      reduceSlotsPerTask, TaskStatus.State.RUNNING);
  sendHeartbeats(list);
  metrics = jobTracker.getClusterMetrics();
  assertEquals(0, metrics.getOccupiedMapSlots());
  assertEquals(reduceSlotsPerTask, metrics.getOccupiedReduceSlots());
  
  // send a heartbeat finishing the reduce task also.
  list.clear();
  addReduceTaskAttemptToList(list, 
      reduceSlotsPerTask, TaskStatus.State.SUCCEEDED);
  sendHeartbeats(list);
  metrics = jobTracker.getClusterMetrics();
  assertEquals(0, metrics.getOccupiedReduceSlots());
}
 
源代码18 项目: RDFS   文件: TestClusterStatus.java
public void testReservedSlots() throws IOException {
  JobConf conf = mr.createJobConf();

  conf.setNumReduceTasks(1);
  conf.setSpeculativeExecution(false);
  
  //Set task tracker objects for reservation.
  TaskTracker tt1 = jobTracker.getTaskTracker(trackers[0]);
  TaskTracker tt2 = jobTracker.getTaskTracker(trackers[1]);
  TaskTrackerStatus status1 = new TaskTrackerStatus(
      trackers[0],JobInProgress.convertTrackerNameToHostName(
          trackers[0]),0,new ArrayList<TaskStatus>(), 0, 2, 2);
  TaskTrackerStatus status2 = new TaskTrackerStatus(
      trackers[1],JobInProgress.convertTrackerNameToHostName(
          trackers[1]),0,new ArrayList<TaskStatus>(), 0, 2, 2);
  tt1.setStatus(status1);
  tt2.setStatus(status2);
  
  fakeJob = new FakeJobInProgress(new JobID("jt", 1), new JobConf(conf),
                  jobTracker);
  
  sendHeartBeat(jobTracker, status1, false, true, trackers[0], responseId);
  sendHeartBeat(jobTracker, status2, false, true, trackers[1], responseId);
  responseId++; 
  ClusterMetrics metrics = jobTracker.getClusterMetrics();
  assertEquals("reserved map slots do not match", 
    2, metrics.getReservedMapSlots());
  assertEquals("reserved reduce slots do not match", 
    2, metrics.getReservedReduceSlots());

  // redo to test re-reservations.
  sendHeartBeat(jobTracker, status1, false, true, trackers[0], responseId);
  sendHeartBeat(jobTracker, status2, false, true, trackers[1], responseId);
  responseId++; 
  metrics = jobTracker.getClusterMetrics();
  assertEquals("reserved map slots do not match", 
      4, metrics.getReservedMapSlots());
  assertEquals("reserved reduce slots do not match", 
      4, metrics.getReservedReduceSlots());

  // undo reservations now.
  scheduler.setUnreserveSlots(true);
  sendHeartBeat(jobTracker, status1, false, true, trackers[0], responseId);
  sendHeartBeat(jobTracker, status2, false, true, trackers[1], responseId);
  responseId++;
  metrics = jobTracker.getClusterMetrics();
  assertEquals("map slots should have been unreserved",
      0, metrics.getReservedMapSlots());
  assertEquals("reduce slots should have been unreserved",
      0, metrics.getReservedReduceSlots());
}
 
源代码19 项目: RDFS   文件: MockSimulatorJobTracker.java
@Override
public ClusterMetrics getClusterMetrics() throws IOException,
    InterruptedException {
  throw new UnsupportedOperationException();
}
 
源代码20 项目: incubator-tez   文件: YARNRunner.java
@Override
public ClusterMetrics getClusterMetrics() throws IOException,
    InterruptedException {
  return resMgrDelegate.getClusterMetrics();
}
 
源代码21 项目: tez   文件: YARNRunner.java
@Override
public ClusterMetrics getClusterMetrics() throws IOException,
    InterruptedException {
  return resMgrDelegate.getClusterMetrics();
}
 
源代码22 项目: hadoop   文件: ClientProtocol.java
/**
 * Get the current status of the cluster
 * 
 * @return summary of the state of the cluster
 */
public ClusterMetrics getClusterMetrics() 
throws IOException, InterruptedException;
 
源代码23 项目: big-c   文件: ClientProtocol.java
/**
 * Get the current status of the cluster
 * 
 * @return summary of the state of the cluster
 */
public ClusterMetrics getClusterMetrics() 
throws IOException, InterruptedException;
 
 同包方法