org.apache.hadoop.mapred.TaskAttemptContextImpl#org.apache.hadoop.yarn.api.records.Container源码实例Demo

下面列出了org.apache.hadoop.mapred.TaskAttemptContextImpl#org.apache.hadoop.yarn.api.records.Container 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: big-c   文件: TestAllocateResponse.java
@SuppressWarnings("deprecation")
@Test
public void testAllocateResponseWithoutIncDecContainers() {
  AllocateResponse r =
      AllocateResponse.newInstance(3, new ArrayList<ContainerStatus>(),
          new ArrayList<Container>(), new ArrayList<NodeReport>(), null,
          AMCommand.AM_RESYNC, 3, null, new ArrayList<NMToken>(), null, null);

  // serde
  AllocateResponseProto p = ((AllocateResponsePBImpl) r).getProto();
  r = new AllocateResponsePBImpl(p);

  // check value
  Assert.assertEquals(0, r.getIncreasedContainers().size());
  Assert.assertEquals(0, r.getDecreasedContainers().size());
}
 
源代码2 项目: hadoop   文件: TestAMRMClientAsync.java
@Test (timeout = 10000)
public void testAMRMClientAsyncShutDown() throws Exception {
  Configuration conf = new Configuration();
  TestCallbackHandler callbackHandler = new TestCallbackHandler();
  @SuppressWarnings("unchecked")
  AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);

  createAllocateResponse(new ArrayList<ContainerStatus>(),
    new ArrayList<Container>(), null);
  when(client.allocate(anyFloat())).thenThrow(
    new ApplicationAttemptNotFoundException("app not found, shut down"));

  AMRMClientAsync<ContainerRequest> asyncClient =
      AMRMClientAsync.createAMRMClientAsync(client, 10, callbackHandler);
  asyncClient.init(conf);
  asyncClient.start();

  asyncClient.registerApplicationMaster("localhost", 1234, null);

  Thread.sleep(50);

  verify(client, times(1)).allocate(anyFloat());
  asyncClient.stop();
}
 
源代码3 项目: hadoop   文件: TestRMAppAttemptTransitions.java
@Test
public void testFinishingExpire() {
  Container amContainer = allocateApplicationAttempt();
  launchApplicationAttempt(amContainer);
  runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
  FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
  String trackingUrl = "mytrackingurl";
  String diagnostics = "Successful";
  unregisterApplicationAttempt(amContainer, finalStatus, trackingUrl,
      diagnostics);
  applicationAttempt.handle(
      new RMAppAttemptEvent(
          applicationAttempt.getAppAttemptId(),
          RMAppAttemptEventType.EXPIRE));
  testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl,
      diagnostics, 0, false);
}
 
源代码4 项目: hadoop   文件: NMClientAsyncImpl.java
public void startContainerAsync(
    Container container, ContainerLaunchContext containerLaunchContext) {
  if (containers.putIfAbsent(container.getId(),
      new StatefulContainer(this, container.getId())) != null) {
    callbackHandler.onStartContainerError(container.getId(),
        RPCUtil.getRemoteException("Container " + container.getId() +
            " is already started or scheduled to start"));
  }
  try {
    events.put(new StartContainerEvent(container, containerLaunchContext));
  } catch (InterruptedException e) {
    LOG.warn("Exception when scheduling the event of starting Container " +
        container.getId());
    callbackHandler.onStartContainerError(container.getId(), e);
  }
}
 
源代码5 项目: incubator-tez   文件: YarnTaskSchedulerService.java
private boolean canAssignTaskToContainer(
    CookieContainerRequest cookieContainerRequest, Container container) {
  HeldContainer heldContainer = heldContainers.get(container.getId());
  if (heldContainer == null || heldContainer.isNew()) { // New container.
    return true;
  } else {
    if (LOG.isDebugEnabled()) {
      LOG.debug("Trying to match task to a held container, "
          + " containerId=" + heldContainer.container.getId());
    }
    if (containerSignatureMatcher.isSuperSet(heldContainer
        .getFirstContainerSignature(), cookieContainerRequest.getCookie()
        .getContainerSignature())) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("Matched delayed container to task"
          + " containerId=" + heldContainer.container.getId());
      }
      return true;
    }
  }
  if (LOG.isDebugEnabled()) {
    LOG.debug("Failed to match delayed container to task"
      + " containerId=" + heldContainer.container.getId());
  }
  return false;
}
 
源代码6 项目: big-c   文件: TestRMAppAttemptTransitions.java
@Test
public void testFinishingExpire() {
  Container amContainer = allocateApplicationAttempt();
  launchApplicationAttempt(amContainer);
  runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
  FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
  String trackingUrl = "mytrackingurl";
  String diagnostics = "Successful";
  unregisterApplicationAttempt(amContainer, finalStatus, trackingUrl,
      diagnostics);
  applicationAttempt.handle(
      new RMAppAttemptEvent(
          applicationAttempt.getAppAttemptId(),
          RMAppAttemptEventType.EXPIRE));
  testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl,
      diagnostics, 0, false);
}
 
源代码7 项目: tez   文件: YarnTaskSchedulerService.java
/**
 * Tries assigning the list of specified containers. Optionally, release
 * containers or add them to the delayed container queue.
 *
 * The flags apply to all containers in the specified lists. So, separate
 * calls should be made based on the expected behaviour.
 *
 * @param containers
 *          The list of containers to be assigned. The list *may* be modified
 *          in place based on allocations and releases.
 * @return Assignments.
 */
private synchronized Map<CookieContainerRequest, Container>
    assignNewlyAllocatedContainers(Iterable<Container> containers) {

  boolean amInCompletionState = (getContext().getAMState() == AMState.COMPLETED);
  Map<CookieContainerRequest, Container> assignedContainers =
      new HashMap<CookieContainerRequest, Container>();

  if (!amInCompletionState) {
    assignNewContainersWithLocation(containers,
        NODE_LOCAL_ASSIGNER, assignedContainers);
    assignNewContainersWithLocation(containers,
        RACK_LOCAL_ASSIGNER, assignedContainers);
    assignNewContainersWithLocation(containers,
        NON_LOCAL_ASSIGNER, assignedContainers);
  }

  // Release any unassigned containers given by the RM
  if (containers.iterator().hasNext()) {
    LOG.info("Releasing newly assigned containers which could not be allocated");
  }
  releaseUnassignedContainers(containers);

  return assignedContainers;
}
 
源代码8 项目: incubator-tez   文件: YarnTaskSchedulerService.java
private void pushNewContainerToDelayed(List<Container> containers){
  long expireTime = -1;
  if (idleContainerTimeoutMin > 0) {
    long currentTime = System.currentTimeMillis();
    expireTime = currentTime + idleContainerTimeoutMin;
  }

  synchronized (delayedContainerManager) {
    for (Container container : containers) {
      if (heldContainers.put(container.getId(), new HeldContainer(container,
          -1, expireTime, null)) != null) {
        throw new TezUncheckedException("New container " + container.getId()
            + " is already held.");
      }
      long nextScheduleTime = delayedContainerManager.maxScheduleTimeSeen;
      if (delayedContainerManager.maxScheduleTimeSeen == -1) {
        nextScheduleTime = System.currentTimeMillis();
      }
      Resources.addTo(allocatedResources, container.getResource());
      delayedContainerManager.addDelayedContainer(container,
        nextScheduleTime + 1);
    }
  }
  delayedContainerManager.triggerScheduling(false);      
}
 
源代码9 项目: hadoop   文件: RMContainerAllocator.java
@SuppressWarnings("unchecked")
private void containerAssigned(Container allocated, 
                                ContainerRequest assigned) {
  // Update resource requests
  decContainerReq(assigned);

  // send the container-assigned event to task attempt
  eventHandler.handle(new TaskAttemptContainerAssignedEvent(
      assigned.attemptID, allocated, applicationACLs));

  assignedRequests.add(allocated, assigned.attemptID);

  if (LOG.isDebugEnabled()) {
    LOG.info("Assigned container (" + allocated + ") "
        + " to task " + assigned.attemptID + " on node "
        + allocated.getNodeId().toString());
  }
}
 
源代码10 项目: hadoop   文件: RMContainerAllocator.java
private ContainerRequest assignWithoutLocality(Container allocated) {
  ContainerRequest assigned = null;
  
  Priority priority = allocated.getPriority();
  if (PRIORITY_FAST_FAIL_MAP.equals(priority)) {
    LOG.info("Assigning container " + allocated + " to fast fail map");
    assigned = assignToFailedMap(allocated);
  } else if (PRIORITY_REDUCE.equals(priority)) {
    if (LOG.isDebugEnabled()) {
      LOG.debug("Assigning container " + allocated + " to reduce");
    }
    assigned = assignToReduce(allocated);
  }
    
  return assigned;
}
 
源代码11 项目: Hi-WAY   文件: StaticScheduler.java
@Override
public TaskInstance getTask(Container container) {
	numberOfRemainingTasks--;
	numberOfRunningTasks++;
	String node = container.getNodeId().getHost();

	if (HiWayConfiguration.verbose)
		WorkflowDriver.writeToStdout("Looking for task on container " + container.getId() + " on node " + node + "; Queue:" + queues.get(node).toString());

	TaskInstance task = queues.get(node).remove();

	WorkflowDriver.writeToStdout("Assigned task " + task + " to container " + container.getId() + "@" + node);
	task.incTries();

	return task;
}
 
源代码12 项目: incubator-tez   文件: YarnTaskSchedulerService.java
public void doBookKeepingForAssignedContainer(
    CookieContainerRequest assigned, Container container,
    String matchedLocation, boolean honorLocalityFlags) {
  if (assigned == null) {
    return;
  }
  Object task = getTask(assigned);
  assert task != null;

  LOG.info("Assigning container to task"
    + ", container=" + container
    + ", task=" + task
    + ", containerHost=" + container.getNodeId().getHost()
    + ", localityMatchType=" + locality
    + ", matchedLocation=" + matchedLocation
    + ", honorLocalityFlags=" + honorLocalityFlags
    + ", reusedContainer="
    + containerAssignments.containsKey(container.getId())
    + ", delayedContainers=" + delayedContainerManager.delayedContainers.size()
    + ", containerResourceMemory=" + container.getResource().getMemory()
    + ", containerResourceVCores="
    + container.getResource().getVirtualCores());

  assignContainer(task, container, assigned);
}
 
源代码13 项目: big-c   文件: TestSystemMetricsPublisher.java
private static RMAppAttempt createRMAppAttempt(
    ApplicationAttemptId appAttemptId) {
  RMAppAttempt appAttempt = mock(RMAppAttempt.class);
  when(appAttempt.getAppAttemptId()).thenReturn(appAttemptId);
  when(appAttempt.getHost()).thenReturn("test host");
  when(appAttempt.getRpcPort()).thenReturn(-100);
  Container container = mock(Container.class);
  when(container.getId())
      .thenReturn(ContainerId.newContainerId(appAttemptId, 1));
  when(appAttempt.getMasterContainer()).thenReturn(container);
  when(appAttempt.getDiagnostics()).thenReturn("test diagnostics info");
  when(appAttempt.getTrackingUrl()).thenReturn("test tracking url");
  when(appAttempt.getOriginalTrackingUrl()).thenReturn(
      "test original tracking url");
  return appAttempt;
}
 
@Test
public void testDoesntCallGetContainersFromPreviousAttemptsMethodIfAbsent() {
	final RegisterApplicationMasterResponseReflector registerApplicationMasterResponseReflector =
		new RegisterApplicationMasterResponseReflector(LOG, HasMethod.class);

	final List<Container> containersFromPreviousAttemptsUnsafe =
		registerApplicationMasterResponseReflector.getContainersFromPreviousAttemptsUnsafe(new Object());

	assertThat(containersFromPreviousAttemptsUnsafe, empty());
}
 
源代码15 项目: flink   文件: YarnResourceManager.java
private void getContainersFromPreviousAttempts(final RegisterApplicationMasterResponse registerApplicationMasterResponse) {
	final List<Container> containersFromPreviousAttempts =
		registerApplicationMasterResponseReflector.getContainersFromPreviousAttempts(registerApplicationMasterResponse);

	log.info("Recovered {} containers from previous attempts ({}).", containersFromPreviousAttempts.size(), containersFromPreviousAttempts);

	for (final Container container : containersFromPreviousAttempts) {
		workerNodeMap.put(new ResourceID(container.getId().toString()), new YarnWorkerNode(container));
	}
}
 
源代码16 项目: big-c   文件: TestRMAppAttemptTransitions.java
@Test
public void testUnregisterToSuccessfulFinishing() {
  Container amContainer = allocateApplicationAttempt();
  launchApplicationAttempt(amContainer);
  runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
  unregisterApplicationAttempt(amContainer,
      FinalApplicationStatus.SUCCEEDED, "mytrackingurl", "Successful");
}
 
源代码17 项目: hadoop   文件: TestContainerAllocation.java
@Test
public void testContainerTokenGeneratedOnPullRequest() throws Exception {
  MockRM rm1 = new MockRM(conf);
  rm1.start();
  MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 8000);
  RMApp app1 = rm1.submitApp(200);
  MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
  // request a container.
  am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>());
  ContainerId containerId2 =
      ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
  rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED);

  RMContainer container =
      rm1.getResourceScheduler().getRMContainer(containerId2);
  // no container token is generated.
  Assert.assertEquals(containerId2, container.getContainerId());
  Assert.assertNull(container.getContainer().getContainerToken());

  // acquire the container.
  List<Container> containers =
      am1.allocate(new ArrayList<ResourceRequest>(),
        new ArrayList<ContainerId>()).getAllocatedContainers();
  Assert.assertEquals(containerId2, containers.get(0).getId());
  // container token is generated.
  Assert.assertNotNull(containers.get(0).getContainerToken());
  rm1.stop();
}
 
源代码18 项目: big-c   文件: TestSchedulerApplicationAttempt.java
private RMContainer createRMContainer(ApplicationAttemptId appAttId, int id,
    Resource resource) {
  ContainerId containerId = ContainerId.newContainerId(appAttId, id);
  RMContainer rmContainer = mock(RMContainer.class);
  Container container = mock(Container.class);
  when(container.getResource()).thenReturn(resource);
  when(container.getNodeId()).thenReturn(nodeId);
  when(rmContainer.getContainer()).thenReturn(container);
  when(rmContainer.getContainerId()).thenReturn(containerId);
  return rmContainer;
}
 
源代码19 项目: hadoop   文件: SchedulerNode.java
/**
 * The Scheduler has allocated containers on this node to the given
 * application.
 * 
 * @param rmContainer
 *          allocated container
 */
public synchronized void allocateContainer(RMContainer rmContainer) {
  Container container = rmContainer.getContainer();
  deductAvailableResource(container.getResource());
  ++numContainers;

  launchedContainers.put(container.getId(), rmContainer);

  LOG.info("Assigned container " + container.getId() + " of capacity "
      + container.getResource() + " on host " + rmNode.getNodeAddress()
      + ", which has " + numContainers + " containers, "
      + getUsedResource() + " used and " + getAvailableResource()
      + " available after allocation");
}
 
源代码20 项目: hadoop   文件: ApplicationMaster.java
@Override
public void onContainersAllocated(List<Container> allocatedContainers) {
  LOG.info("Got response from RM for container ask, allocatedCnt="
      + allocatedContainers.size());
  numAllocatedContainers.addAndGet(allocatedContainers.size());
  for (Container allocatedContainer : allocatedContainers) {
    LOG.info("Launching shell command on a new container."
        + ", containerId=" + allocatedContainer.getId()
        + ", containerNode=" + allocatedContainer.getNodeId().getHost()
        + ":" + allocatedContainer.getNodeId().getPort()
        + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
        + ", containerResourceMemory"
        + allocatedContainer.getResource().getMemory()
        + ", containerResourceVirtualCores"
        + allocatedContainer.getResource().getVirtualCores()
        + ", containerResourceGpuCores"
        + allocatedContainer.getResource().getGpuCores());
    // + ", containerToken"
    // +allocatedContainer.getContainerToken().getIdentifier().toString());

    LaunchContainerRunnable runnableLaunchContainer =
        new LaunchContainerRunnable(allocatedContainer, containerListener);
    Thread launchThread = new Thread(runnableLaunchContainer);

    // launch and start the container on a separate thread to keep
    // the main thread unblocked
    // as all containers may not be allocated at one go.
    launchThreads.add(launchThread);
    launchThread.start();
  }
}
 
源代码21 项目: hadoop   文件: RMContainerAllocator.java
@Override
protected synchronized void heartbeat() throws Exception {
  scheduleStats.updateAndLogIfChanged("Before Scheduling: ");
  List<Container> allocatedContainers = getResources();
  if (allocatedContainers != null && allocatedContainers.size() > 0) {
    scheduledRequests.assign(allocatedContainers);
  }

  int completedMaps = getJob().getCompletedMaps();
  int completedTasks = completedMaps + getJob().getCompletedReduces();
  if ((lastCompletedTasks != completedTasks) ||
        (scheduledRequests.maps.size() > 0)) {
    lastCompletedTasks = completedTasks;
    recalculateReduceSchedule = true;
  }

  if (recalculateReduceSchedule) {
    preemptReducesIfNeeded();
    scheduleReduces(
        getJob().getTotalMaps(), completedMaps,
        scheduledRequests.maps.size(), scheduledRequests.reduces.size(), 
        assignedRequests.maps.size(), assignedRequests.reduces.size(),
        mapResourceRequest, reduceResourceRequest,
        pendingReduces.size(), 
        maxReduceRampupLimit, reduceSlowStart);
    recalculateReduceSchedule = false;
  }

  scheduleStats.updateAndLogIfChanged("After Scheduling: ");
}
 
源代码22 项目: hadoop   文件: NMClientAsyncImpl.java
public StartContainerEvent(Container container,
    ContainerLaunchContext containerLaunchContext) {
  super(container.getId(), container.getNodeId(),
      container.getContainerToken(), ContainerEventType.START_CONTAINER);
  this.container = container;
  this.containerLaunchContext = containerLaunchContext;
}
 
源代码23 项目: big-c   文件: CapacityScheduler.java
@Lock(CapacityScheduler.class)
@Override
protected synchronized void completedContainer(RMContainer rmContainer,
    ContainerStatus containerStatus, RMContainerEventType event) {
  if (rmContainer == null) {
    LOG.info("Null container completed...");
    return;
  }
  
  Container container = rmContainer.getContainer();
  
  // Get the application for the finished container
  FiCaSchedulerApp application =
      getCurrentAttemptForContainer(container.getId());
  ApplicationId appId =
      container.getId().getApplicationAttemptId().getApplicationId();
  if (application == null) {
    LOG.info("Container " + container + " of" + " unknown application "
        + appId + " completed or suspended with event " + event);
    return;
  }
  
  // Get the node on which the container was allocated
  FiCaSchedulerNode node = getNode(container.getNodeId());
  
  // Inform the queue
  LeafQueue queue = (LeafQueue)application.getQueue();
  queue.completedContainer(clusterResource, application, node, 
      rmContainer, containerStatus, event, null, true);

  LOG.info("Application attempt " + application.getApplicationAttemptId()
      + " released container " + container.getId() + " on node: " + node
      + " with event: " + event);
}
 
源代码24 项目: tez   文件: YarnTaskSchedulerService.java
@VisibleForTesting
void addDelayedContainer(Container container,
    long nextScheduleTime) {
  HeldContainer delayedContainer = heldContainers.get(container.getId());
  if (delayedContainer == null) {
    LOG.warn("Attempting to add a non-running container to the"
        + " delayed container list, containerId=" + container.getId());
    return;
  } else {
    delayedContainer.setNextScheduleTime(nextScheduleTime);
  }
  if (maxScheduleTimeSeen < nextScheduleTime) {
    maxScheduleTimeSeen = nextScheduleTime;
  }
  if (LOG.isDebugEnabled()) {
    LOG.debug("Adding container to delayed queue"
      + ", containerId=" + delayedContainer.getContainer().getId()
      + ", nextScheduleTime=" + delayedContainer.getNextScheduleTime()
      + ", containerExpiry=" + delayedContainer.getContainerExpiryTime());
  }
  boolean added =  false;
  synchronized(this) {
    added = delayedContainers.offer(delayedContainer);
    if (drainedDelayedContainersForTest != null) {
      synchronized (drainedDelayedContainersForTest) {
        drainedDelayedContainersForTest.set(false);
      }
    }
    this.notify();
  }
  if (!added) {
    releaseUnassignedContainers(Lists.newArrayList(container));
  }
}
 
源代码25 项目: TensorFlowOnYARN   文件: ApplicationMaster.java
@Override
public void onContainersAllocated(List<Container> allocatedContainers) {
  allocatedContainerNum.addAndGet(allocatedContainers.size());
  ApplicationMaster.this.allocatedContainers.addAll(allocatedContainers);
  if (allocatedContainerNum.get() == args.totalContainerNum) {
    startAllContainers();
  }
}
 
源代码26 项目: TensorFlowOnYARN   文件: LaunchContainerThread.java
public LaunchContainerThread(Container container, ApplicationMaster appMaster,
    TFTaskInfo taskInfo, ClusterSpec clusterSpec, long containerMemory,
    String tfLib, String tfJar) {
  this.container = container;
  this.appMaster = appMaster;
  this.taskInfo = taskInfo;
  this.clusterSpec = clusterSpec;
  this.containerMemory = containerMemory;
  this.tfLib = tfLib;
  this.tfJar = tfJar;
}
 
源代码27 项目: tez   文件: YarnTaskSchedulerService.java
private boolean canAssignTaskToContainer(
    CookieContainerRequest cookieContainerRequest, Container container) {
  HeldContainer heldContainer = heldContainers.get(container.getId());
  Object task = getTask(cookieContainerRequest);
  if (task instanceof TaskAttempt
      && ((TaskAttempt) task).getTask() != null
      && ((TaskAttempt) task).getTask().getNodesWithRunningAttempts().contains(container.getNodeId())) {
    return false;
  }
  if (heldContainer == null || heldContainer.isNew()) { // New container.
    return true;
  } else {
    if (LOG.isDebugEnabled()) {
      LOG.debug("Trying to match task to a held container, "
          + " containerId=" + heldContainer.container.getId());
    }
    if (containerSignatureMatcher.isSuperSet(heldContainer
        .getLastAssignedContainerSignature(), cookieContainerRequest.getCookie()
        .getContainerSignature())) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("Matched delayed container to task"
          + " containerId=" + heldContainer.container.getId());
      }
      return true;
    }
  }
  if (LOG.isDebugEnabled()) {
    LOG.debug("Failed to match delayed container to task"
      + " containerId=" + heldContainer.container.getId());
  }
  return false;
}
 
源代码28 项目: big-c   文件: TestLeafQueue.java
static LeafQueue stubLeafQueue(LeafQueue queue) {
  
  // Mock some methods for ease in these unit tests
  
  // 1. LeafQueue.createContainer to return dummy containers
  doAnswer(
      new Answer<Container>() {
        @Override
        public Container answer(InvocationOnMock invocation) 
            throws Throwable {
          final FiCaSchedulerApp application = 
              (FiCaSchedulerApp)(invocation.getArguments()[0]);
          final ContainerId containerId =                 
              TestUtils.getMockContainerId(application);

          Container container = TestUtils.getMockContainer(
              containerId,
              ((FiCaSchedulerNode)(invocation.getArguments()[1])).getNodeID(), 
              (Resource)(invocation.getArguments()[2]),
              ((Priority)invocation.getArguments()[3]));
          return container;
        }
      }
    ).
    when(queue).createContainer(
            any(FiCaSchedulerApp.class), 
            any(FiCaSchedulerNode.class), 
            any(Resource.class),
            any(Priority.class)
            );
  
  // 2. Stub out LeafQueue.parent.completedContainer
  CSQueue parent = queue.getParent();
  doNothing().when(parent).completedContainer(
      any(Resource.class), any(FiCaSchedulerApp.class), any(FiCaSchedulerNode.class), 
      any(RMContainer.class), any(ContainerStatus.class), 
      any(RMContainerEventType.class), any(CSQueue.class), anyBoolean());
  
  return queue;
}
 
源代码29 项目: big-c   文件: Application.java
public synchronized List<Container> getResources() throws IOException {
  if(LOG.isDebugEnabled()) {
    LOG.debug("getResources begin:" + " application=" + applicationId
      + " #ask=" + ask.size());

    for (ResourceRequest request : ask) {
      LOG.debug("getResources:" + " application=" + applicationId
        + " ask-request=" + request);
    }
  }
  
  // Get resources from the ResourceManager
  Allocation allocation = resourceManager.getResourceScheduler().allocate(
      applicationAttemptId, new ArrayList<ResourceRequest>(ask),
      new ArrayList<ContainerId>(), null, null);
  System.out.println("-=======" + applicationAttemptId);
  System.out.println("----------" + resourceManager.getRMContext().getRMApps()
      .get(applicationId).getRMAppAttempt(applicationAttemptId));
  List<Container> containers = allocation.getContainers();

  // Clear state for next interaction with ResourceManager
  ask.clear();
  
  if(LOG.isDebugEnabled()) {
    LOG.debug("getResources() for " + applicationId + ":"
      + " ask=" + ask.size() + " recieved=" + containers.size());
  }
  
  return containers;
}
 
源代码30 项目: Hi-WAY   文件: NMCallbackHandler.java
@Override
public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> allServiceResponse) {
	Container container = containers.get(containerId);
	if (container != null) {
		am.getNmClientAsync().getContainerStatusAsync(containerId, container.getNodeId());
	}
}