下面列出了org.apache.hadoop.mapred.TaskAttemptContextImpl#org.apache.hadoop.yarn.api.records.Container 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@SuppressWarnings("deprecation")
@Test
public void testAllocateResponseWithoutIncDecContainers() {
AllocateResponse r =
AllocateResponse.newInstance(3, new ArrayList<ContainerStatus>(),
new ArrayList<Container>(), new ArrayList<NodeReport>(), null,
AMCommand.AM_RESYNC, 3, null, new ArrayList<NMToken>(), null, null);
// serde
AllocateResponseProto p = ((AllocateResponsePBImpl) r).getProto();
r = new AllocateResponsePBImpl(p);
// check value
Assert.assertEquals(0, r.getIncreasedContainers().size());
Assert.assertEquals(0, r.getDecreasedContainers().size());
}
@Test (timeout = 10000)
public void testAMRMClientAsyncShutDown() throws Exception {
Configuration conf = new Configuration();
TestCallbackHandler callbackHandler = new TestCallbackHandler();
@SuppressWarnings("unchecked")
AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
createAllocateResponse(new ArrayList<ContainerStatus>(),
new ArrayList<Container>(), null);
when(client.allocate(anyFloat())).thenThrow(
new ApplicationAttemptNotFoundException("app not found, shut down"));
AMRMClientAsync<ContainerRequest> asyncClient =
AMRMClientAsync.createAMRMClientAsync(client, 10, callbackHandler);
asyncClient.init(conf);
asyncClient.start();
asyncClient.registerApplicationMaster("localhost", 1234, null);
Thread.sleep(50);
verify(client, times(1)).allocate(anyFloat());
asyncClient.stop();
}
@Test
public void testFinishingExpire() {
Container amContainer = allocateApplicationAttempt();
launchApplicationAttempt(amContainer);
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
String trackingUrl = "mytrackingurl";
String diagnostics = "Successful";
unregisterApplicationAttempt(amContainer, finalStatus, trackingUrl,
diagnostics);
applicationAttempt.handle(
new RMAppAttemptEvent(
applicationAttempt.getAppAttemptId(),
RMAppAttemptEventType.EXPIRE));
testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl,
diagnostics, 0, false);
}
public void startContainerAsync(
Container container, ContainerLaunchContext containerLaunchContext) {
if (containers.putIfAbsent(container.getId(),
new StatefulContainer(this, container.getId())) != null) {
callbackHandler.onStartContainerError(container.getId(),
RPCUtil.getRemoteException("Container " + container.getId() +
" is already started or scheduled to start"));
}
try {
events.put(new StartContainerEvent(container, containerLaunchContext));
} catch (InterruptedException e) {
LOG.warn("Exception when scheduling the event of starting Container " +
container.getId());
callbackHandler.onStartContainerError(container.getId(), e);
}
}
private boolean canAssignTaskToContainer(
CookieContainerRequest cookieContainerRequest, Container container) {
HeldContainer heldContainer = heldContainers.get(container.getId());
if (heldContainer == null || heldContainer.isNew()) { // New container.
return true;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to match task to a held container, "
+ " containerId=" + heldContainer.container.getId());
}
if (containerSignatureMatcher.isSuperSet(heldContainer
.getFirstContainerSignature(), cookieContainerRequest.getCookie()
.getContainerSignature())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Matched delayed container to task"
+ " containerId=" + heldContainer.container.getId());
}
return true;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Failed to match delayed container to task"
+ " containerId=" + heldContainer.container.getId());
}
return false;
}
@Test
public void testFinishingExpire() {
Container amContainer = allocateApplicationAttempt();
launchApplicationAttempt(amContainer);
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
String trackingUrl = "mytrackingurl";
String diagnostics = "Successful";
unregisterApplicationAttempt(amContainer, finalStatus, trackingUrl,
diagnostics);
applicationAttempt.handle(
new RMAppAttemptEvent(
applicationAttempt.getAppAttemptId(),
RMAppAttemptEventType.EXPIRE));
testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl,
diagnostics, 0, false);
}
/**
* Tries assigning the list of specified containers. Optionally, release
* containers or add them to the delayed container queue.
*
* The flags apply to all containers in the specified lists. So, separate
* calls should be made based on the expected behaviour.
*
* @param containers
* The list of containers to be assigned. The list *may* be modified
* in place based on allocations and releases.
* @return Assignments.
*/
private synchronized Map<CookieContainerRequest, Container>
assignNewlyAllocatedContainers(Iterable<Container> containers) {
boolean amInCompletionState = (getContext().getAMState() == AMState.COMPLETED);
Map<CookieContainerRequest, Container> assignedContainers =
new HashMap<CookieContainerRequest, Container>();
if (!amInCompletionState) {
assignNewContainersWithLocation(containers,
NODE_LOCAL_ASSIGNER, assignedContainers);
assignNewContainersWithLocation(containers,
RACK_LOCAL_ASSIGNER, assignedContainers);
assignNewContainersWithLocation(containers,
NON_LOCAL_ASSIGNER, assignedContainers);
}
// Release any unassigned containers given by the RM
if (containers.iterator().hasNext()) {
LOG.info("Releasing newly assigned containers which could not be allocated");
}
releaseUnassignedContainers(containers);
return assignedContainers;
}
private void pushNewContainerToDelayed(List<Container> containers){
long expireTime = -1;
if (idleContainerTimeoutMin > 0) {
long currentTime = System.currentTimeMillis();
expireTime = currentTime + idleContainerTimeoutMin;
}
synchronized (delayedContainerManager) {
for (Container container : containers) {
if (heldContainers.put(container.getId(), new HeldContainer(container,
-1, expireTime, null)) != null) {
throw new TezUncheckedException("New container " + container.getId()
+ " is already held.");
}
long nextScheduleTime = delayedContainerManager.maxScheduleTimeSeen;
if (delayedContainerManager.maxScheduleTimeSeen == -1) {
nextScheduleTime = System.currentTimeMillis();
}
Resources.addTo(allocatedResources, container.getResource());
delayedContainerManager.addDelayedContainer(container,
nextScheduleTime + 1);
}
}
delayedContainerManager.triggerScheduling(false);
}
@SuppressWarnings("unchecked")
private void containerAssigned(Container allocated,
ContainerRequest assigned) {
// Update resource requests
decContainerReq(assigned);
// send the container-assigned event to task attempt
eventHandler.handle(new TaskAttemptContainerAssignedEvent(
assigned.attemptID, allocated, applicationACLs));
assignedRequests.add(allocated, assigned.attemptID);
if (LOG.isDebugEnabled()) {
LOG.info("Assigned container (" + allocated + ") "
+ " to task " + assigned.attemptID + " on node "
+ allocated.getNodeId().toString());
}
}
private ContainerRequest assignWithoutLocality(Container allocated) {
ContainerRequest assigned = null;
Priority priority = allocated.getPriority();
if (PRIORITY_FAST_FAIL_MAP.equals(priority)) {
LOG.info("Assigning container " + allocated + " to fast fail map");
assigned = assignToFailedMap(allocated);
} else if (PRIORITY_REDUCE.equals(priority)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Assigning container " + allocated + " to reduce");
}
assigned = assignToReduce(allocated);
}
return assigned;
}
@Override
public TaskInstance getTask(Container container) {
numberOfRemainingTasks--;
numberOfRunningTasks++;
String node = container.getNodeId().getHost();
if (HiWayConfiguration.verbose)
WorkflowDriver.writeToStdout("Looking for task on container " + container.getId() + " on node " + node + "; Queue:" + queues.get(node).toString());
TaskInstance task = queues.get(node).remove();
WorkflowDriver.writeToStdout("Assigned task " + task + " to container " + container.getId() + "@" + node);
task.incTries();
return task;
}
public void doBookKeepingForAssignedContainer(
CookieContainerRequest assigned, Container container,
String matchedLocation, boolean honorLocalityFlags) {
if (assigned == null) {
return;
}
Object task = getTask(assigned);
assert task != null;
LOG.info("Assigning container to task"
+ ", container=" + container
+ ", task=" + task
+ ", containerHost=" + container.getNodeId().getHost()
+ ", localityMatchType=" + locality
+ ", matchedLocation=" + matchedLocation
+ ", honorLocalityFlags=" + honorLocalityFlags
+ ", reusedContainer="
+ containerAssignments.containsKey(container.getId())
+ ", delayedContainers=" + delayedContainerManager.delayedContainers.size()
+ ", containerResourceMemory=" + container.getResource().getMemory()
+ ", containerResourceVCores="
+ container.getResource().getVirtualCores());
assignContainer(task, container, assigned);
}
private static RMAppAttempt createRMAppAttempt(
ApplicationAttemptId appAttemptId) {
RMAppAttempt appAttempt = mock(RMAppAttempt.class);
when(appAttempt.getAppAttemptId()).thenReturn(appAttemptId);
when(appAttempt.getHost()).thenReturn("test host");
when(appAttempt.getRpcPort()).thenReturn(-100);
Container container = mock(Container.class);
when(container.getId())
.thenReturn(ContainerId.newContainerId(appAttemptId, 1));
when(appAttempt.getMasterContainer()).thenReturn(container);
when(appAttempt.getDiagnostics()).thenReturn("test diagnostics info");
when(appAttempt.getTrackingUrl()).thenReturn("test tracking url");
when(appAttempt.getOriginalTrackingUrl()).thenReturn(
"test original tracking url");
return appAttempt;
}
@Test
public void testDoesntCallGetContainersFromPreviousAttemptsMethodIfAbsent() {
final RegisterApplicationMasterResponseReflector registerApplicationMasterResponseReflector =
new RegisterApplicationMasterResponseReflector(LOG, HasMethod.class);
final List<Container> containersFromPreviousAttemptsUnsafe =
registerApplicationMasterResponseReflector.getContainersFromPreviousAttemptsUnsafe(new Object());
assertThat(containersFromPreviousAttemptsUnsafe, empty());
}
private void getContainersFromPreviousAttempts(final RegisterApplicationMasterResponse registerApplicationMasterResponse) {
final List<Container> containersFromPreviousAttempts =
registerApplicationMasterResponseReflector.getContainersFromPreviousAttempts(registerApplicationMasterResponse);
log.info("Recovered {} containers from previous attempts ({}).", containersFromPreviousAttempts.size(), containersFromPreviousAttempts);
for (final Container container : containersFromPreviousAttempts) {
workerNodeMap.put(new ResourceID(container.getId().toString()), new YarnWorkerNode(container));
}
}
@Test
public void testUnregisterToSuccessfulFinishing() {
Container amContainer = allocateApplicationAttempt();
launchApplicationAttempt(amContainer);
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
unregisterApplicationAttempt(amContainer,
FinalApplicationStatus.SUCCEEDED, "mytrackingurl", "Successful");
}
@Test
public void testContainerTokenGeneratedOnPullRequest() throws Exception {
MockRM rm1 = new MockRM(conf);
rm1.start();
MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 8000);
RMApp app1 = rm1.submitApp(200);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// request a container.
am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>());
ContainerId containerId2 =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED);
RMContainer container =
rm1.getResourceScheduler().getRMContainer(containerId2);
// no container token is generated.
Assert.assertEquals(containerId2, container.getContainerId());
Assert.assertNull(container.getContainer().getContainerToken());
// acquire the container.
List<Container> containers =
am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
Assert.assertEquals(containerId2, containers.get(0).getId());
// container token is generated.
Assert.assertNotNull(containers.get(0).getContainerToken());
rm1.stop();
}
private RMContainer createRMContainer(ApplicationAttemptId appAttId, int id,
Resource resource) {
ContainerId containerId = ContainerId.newContainerId(appAttId, id);
RMContainer rmContainer = mock(RMContainer.class);
Container container = mock(Container.class);
when(container.getResource()).thenReturn(resource);
when(container.getNodeId()).thenReturn(nodeId);
when(rmContainer.getContainer()).thenReturn(container);
when(rmContainer.getContainerId()).thenReturn(containerId);
return rmContainer;
}
/**
* The Scheduler has allocated containers on this node to the given
* application.
*
* @param rmContainer
* allocated container
*/
public synchronized void allocateContainer(RMContainer rmContainer) {
Container container = rmContainer.getContainer();
deductAvailableResource(container.getResource());
++numContainers;
launchedContainers.put(container.getId(), rmContainer);
LOG.info("Assigned container " + container.getId() + " of capacity "
+ container.getResource() + " on host " + rmNode.getNodeAddress()
+ ", which has " + numContainers + " containers, "
+ getUsedResource() + " used and " + getAvailableResource()
+ " available after allocation");
}
@Override
public void onContainersAllocated(List<Container> allocatedContainers) {
LOG.info("Got response from RM for container ask, allocatedCnt="
+ allocatedContainers.size());
numAllocatedContainers.addAndGet(allocatedContainers.size());
for (Container allocatedContainer : allocatedContainers) {
LOG.info("Launching shell command on a new container."
+ ", containerId=" + allocatedContainer.getId()
+ ", containerNode=" + allocatedContainer.getNodeId().getHost()
+ ":" + allocatedContainer.getNodeId().getPort()
+ ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
+ ", containerResourceMemory"
+ allocatedContainer.getResource().getMemory()
+ ", containerResourceVirtualCores"
+ allocatedContainer.getResource().getVirtualCores()
+ ", containerResourceGpuCores"
+ allocatedContainer.getResource().getGpuCores());
// + ", containerToken"
// +allocatedContainer.getContainerToken().getIdentifier().toString());
LaunchContainerRunnable runnableLaunchContainer =
new LaunchContainerRunnable(allocatedContainer, containerListener);
Thread launchThread = new Thread(runnableLaunchContainer);
// launch and start the container on a separate thread to keep
// the main thread unblocked
// as all containers may not be allocated at one go.
launchThreads.add(launchThread);
launchThread.start();
}
}
@Override
protected synchronized void heartbeat() throws Exception {
scheduleStats.updateAndLogIfChanged("Before Scheduling: ");
List<Container> allocatedContainers = getResources();
if (allocatedContainers != null && allocatedContainers.size() > 0) {
scheduledRequests.assign(allocatedContainers);
}
int completedMaps = getJob().getCompletedMaps();
int completedTasks = completedMaps + getJob().getCompletedReduces();
if ((lastCompletedTasks != completedTasks) ||
(scheduledRequests.maps.size() > 0)) {
lastCompletedTasks = completedTasks;
recalculateReduceSchedule = true;
}
if (recalculateReduceSchedule) {
preemptReducesIfNeeded();
scheduleReduces(
getJob().getTotalMaps(), completedMaps,
scheduledRequests.maps.size(), scheduledRequests.reduces.size(),
assignedRequests.maps.size(), assignedRequests.reduces.size(),
mapResourceRequest, reduceResourceRequest,
pendingReduces.size(),
maxReduceRampupLimit, reduceSlowStart);
recalculateReduceSchedule = false;
}
scheduleStats.updateAndLogIfChanged("After Scheduling: ");
}
public StartContainerEvent(Container container,
ContainerLaunchContext containerLaunchContext) {
super(container.getId(), container.getNodeId(),
container.getContainerToken(), ContainerEventType.START_CONTAINER);
this.container = container;
this.containerLaunchContext = containerLaunchContext;
}
@Lock(CapacityScheduler.class)
@Override
protected synchronized void completedContainer(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event) {
if (rmContainer == null) {
LOG.info("Null container completed...");
return;
}
Container container = rmContainer.getContainer();
// Get the application for the finished container
FiCaSchedulerApp application =
getCurrentAttemptForContainer(container.getId());
ApplicationId appId =
container.getId().getApplicationAttemptId().getApplicationId();
if (application == null) {
LOG.info("Container " + container + " of" + " unknown application "
+ appId + " completed or suspended with event " + event);
return;
}
// Get the node on which the container was allocated
FiCaSchedulerNode node = getNode(container.getNodeId());
// Inform the queue
LeafQueue queue = (LeafQueue)application.getQueue();
queue.completedContainer(clusterResource, application, node,
rmContainer, containerStatus, event, null, true);
LOG.info("Application attempt " + application.getApplicationAttemptId()
+ " released container " + container.getId() + " on node: " + node
+ " with event: " + event);
}
@VisibleForTesting
void addDelayedContainer(Container container,
long nextScheduleTime) {
HeldContainer delayedContainer = heldContainers.get(container.getId());
if (delayedContainer == null) {
LOG.warn("Attempting to add a non-running container to the"
+ " delayed container list, containerId=" + container.getId());
return;
} else {
delayedContainer.setNextScheduleTime(nextScheduleTime);
}
if (maxScheduleTimeSeen < nextScheduleTime) {
maxScheduleTimeSeen = nextScheduleTime;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Adding container to delayed queue"
+ ", containerId=" + delayedContainer.getContainer().getId()
+ ", nextScheduleTime=" + delayedContainer.getNextScheduleTime()
+ ", containerExpiry=" + delayedContainer.getContainerExpiryTime());
}
boolean added = false;
synchronized(this) {
added = delayedContainers.offer(delayedContainer);
if (drainedDelayedContainersForTest != null) {
synchronized (drainedDelayedContainersForTest) {
drainedDelayedContainersForTest.set(false);
}
}
this.notify();
}
if (!added) {
releaseUnassignedContainers(Lists.newArrayList(container));
}
}
@Override
public void onContainersAllocated(List<Container> allocatedContainers) {
allocatedContainerNum.addAndGet(allocatedContainers.size());
ApplicationMaster.this.allocatedContainers.addAll(allocatedContainers);
if (allocatedContainerNum.get() == args.totalContainerNum) {
startAllContainers();
}
}
public LaunchContainerThread(Container container, ApplicationMaster appMaster,
TFTaskInfo taskInfo, ClusterSpec clusterSpec, long containerMemory,
String tfLib, String tfJar) {
this.container = container;
this.appMaster = appMaster;
this.taskInfo = taskInfo;
this.clusterSpec = clusterSpec;
this.containerMemory = containerMemory;
this.tfLib = tfLib;
this.tfJar = tfJar;
}
private boolean canAssignTaskToContainer(
CookieContainerRequest cookieContainerRequest, Container container) {
HeldContainer heldContainer = heldContainers.get(container.getId());
Object task = getTask(cookieContainerRequest);
if (task instanceof TaskAttempt
&& ((TaskAttempt) task).getTask() != null
&& ((TaskAttempt) task).getTask().getNodesWithRunningAttempts().contains(container.getNodeId())) {
return false;
}
if (heldContainer == null || heldContainer.isNew()) { // New container.
return true;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to match task to a held container, "
+ " containerId=" + heldContainer.container.getId());
}
if (containerSignatureMatcher.isSuperSet(heldContainer
.getLastAssignedContainerSignature(), cookieContainerRequest.getCookie()
.getContainerSignature())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Matched delayed container to task"
+ " containerId=" + heldContainer.container.getId());
}
return true;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Failed to match delayed container to task"
+ " containerId=" + heldContainer.container.getId());
}
return false;
}
static LeafQueue stubLeafQueue(LeafQueue queue) {
// Mock some methods for ease in these unit tests
// 1. LeafQueue.createContainer to return dummy containers
doAnswer(
new Answer<Container>() {
@Override
public Container answer(InvocationOnMock invocation)
throws Throwable {
final FiCaSchedulerApp application =
(FiCaSchedulerApp)(invocation.getArguments()[0]);
final ContainerId containerId =
TestUtils.getMockContainerId(application);
Container container = TestUtils.getMockContainer(
containerId,
((FiCaSchedulerNode)(invocation.getArguments()[1])).getNodeID(),
(Resource)(invocation.getArguments()[2]),
((Priority)invocation.getArguments()[3]));
return container;
}
}
).
when(queue).createContainer(
any(FiCaSchedulerApp.class),
any(FiCaSchedulerNode.class),
any(Resource.class),
any(Priority.class)
);
// 2. Stub out LeafQueue.parent.completedContainer
CSQueue parent = queue.getParent();
doNothing().when(parent).completedContainer(
any(Resource.class), any(FiCaSchedulerApp.class), any(FiCaSchedulerNode.class),
any(RMContainer.class), any(ContainerStatus.class),
any(RMContainerEventType.class), any(CSQueue.class), anyBoolean());
return queue;
}
public synchronized List<Container> getResources() throws IOException {
if(LOG.isDebugEnabled()) {
LOG.debug("getResources begin:" + " application=" + applicationId
+ " #ask=" + ask.size());
for (ResourceRequest request : ask) {
LOG.debug("getResources:" + " application=" + applicationId
+ " ask-request=" + request);
}
}
// Get resources from the ResourceManager
Allocation allocation = resourceManager.getResourceScheduler().allocate(
applicationAttemptId, new ArrayList<ResourceRequest>(ask),
new ArrayList<ContainerId>(), null, null);
System.out.println("-=======" + applicationAttemptId);
System.out.println("----------" + resourceManager.getRMContext().getRMApps()
.get(applicationId).getRMAppAttempt(applicationAttemptId));
List<Container> containers = allocation.getContainers();
// Clear state for next interaction with ResourceManager
ask.clear();
if(LOG.isDebugEnabled()) {
LOG.debug("getResources() for " + applicationId + ":"
+ " ask=" + ask.size() + " recieved=" + containers.size());
}
return containers;
}
@Override
public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> allServiceResponse) {
Container container = containers.get(containerId);
if (container != null) {
am.getNmClientAsync().getContainerStatusAsync(containerId, container.getNodeId());
}
}