下面列出了org.apache.hadoop.mapred.TaskAttemptContextImpl#org.apache.hadoop.yarn.api.records.ContainerId 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public MockDAGAppMaster(ApplicationAttemptId applicationAttemptId, ContainerId containerId,
String nmHost, int nmPort, int nmHttpPort, Clock clock, long appSubmitTime,
boolean isSession, String workingDirectory, String[] localDirs, String[] logDirs,
AtomicBoolean launcherGoFlag, boolean initFailFlag, boolean startFailFlag,
Credentials credentials, String jobUserName, int handlerConcurrency, int numConcurrentContainers) {
super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, clock, appSubmitTime,
isSession, workingDirectory, localDirs, logDirs, new TezApiVersionInfo().getVersion(),
credentials, jobUserName, null);
shutdownHandler = new MockDAGAppMasterShutdownHandler();
this.launcherGoFlag = launcherGoFlag;
this.initFailFlag = initFailFlag;
this.startFailFlag = startFailFlag;
Preconditions.checkArgument(handlerConcurrency > 0);
this.handlerConcurrency = handlerConcurrency;
this.numConcurrentContainers = numConcurrentContainers;
}
@Test
public void testResourceDecreaseContext() {
ContainerId containerId = ContainerId
.newContainerId(ApplicationAttemptId.newInstance(
ApplicationId.newInstance(1234, 3), 3), 7);
Resource resource = Resource.newInstance(1023, 3);
ContainerResourceDecrease ctx = ContainerResourceDecrease.newInstance(
containerId, resource);
// get proto and recover to ctx
ContainerResourceDecreaseProto proto =
((ContainerResourceDecreasePBImpl) ctx).getProto();
ctx = new ContainerResourceDecreasePBImpl(proto);
// check values
Assert.assertEquals(ctx.getCapability(), resource);
Assert.assertEquals(ctx.getContainerId(), containerId);
}
@Test
public void testContainerReport() throws IOException, YarnException {
ApplicationId appId = ApplicationId.newInstance(0, 1);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
ContainerId containerId = ContainerId.newContainerId(appAttemptId, 1);
GetContainerReportRequest request =
GetContainerReportRequest.newInstance(containerId);
GetContainerReportResponse response =
clientService.getContainerReport(request);
ContainerReport container = response.getContainerReport();
Assert.assertNotNull(container);
Assert.assertEquals(containerId, container.getContainerId());
Assert.assertEquals("http://0.0.0.0:8188/applicationhistory/logs/" +
"test host:100/container_0_0001_01_000001/" +
"container_0_0001_01_000001/user1", container.getLogUrl());
}
/**
* RecoveryEvents: TaskAttemptStartedEvent -> TaskAttemptFinishedEvent (KILLED)
* Recover it to KILLED
*/
@Test(timeout=5000)
public void testTARecoverFromKilled() {
initMockDAGRecoveryDataForTaskAttempt();
TaskAttemptStartedEvent taStartedEvent = new TaskAttemptStartedEvent(
ta1t1v1Id, "v1", ta1LaunchTime, mock(ContainerId.class),
mock(NodeId.class), "", "", "");
TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(
ta1t1v1Id, "v1", ta1FinishedTime, ta1FinishedTime,
TaskAttemptState.KILLED, null, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, "", null,
null, null, 0L, null, 0L, null, null, null, null, null);
TaskAttemptRecoveryData taRecoveryData = new TaskAttemptRecoveryData(taStartedEvent, taFinishedEvent);
doReturn(taRecoveryData).when(dagRecoveryData).getTaskAttemptRecoveryData(ta1t1v1Id);
dag.handle(new DAGEventRecoverEvent(dagId, dagRecoveryData));
dispatcher.await();
TaskImpl task = (TaskImpl)dag.getVertex(v1Id).getTask(t1v1Id);
TaskAttemptImpl taskAttempt = (TaskAttemptImpl)task.getAttempt(ta1t1v1Id);
assertEquals(TaskAttemptStateInternal.KILLED, taskAttempt.getInternalState());
assertEquals(TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, taskAttempt.getTerminationCause());
historyEventHandler.verifyHistoryEvent(0, HistoryEventType.TASK_ATTEMPT_FINISHED);
assertEquals(ta1LaunchTime, taskAttempt.getLaunchTime());
assertEquals(ta1FinishedTime, taskAttempt.getFinishTime());
}
@Test
public void testMassiveWriteContainerHistoryData() throws IOException {
LOG.info("Starting testMassiveWriteContainerHistoryData");
long mb = 1024 * 1024;
long usedDiskBefore = fs.getContentSummary(fsWorkingPath).getLength() / mb;
ApplicationId appId = ApplicationId.newInstance(0, 1);
writeApplicationStartData(appId);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
for (int i = 1; i <= 100000; ++i) {
ContainerId containerId = ContainerId.newContainerId(appAttemptId, i);
writeContainerStartData(containerId);
writeContainerFinishData(containerId);
}
writeApplicationFinishData(appId);
long usedDiskAfter = fs.getContentSummary(fsWorkingPath).getLength() / mb;
Assert.assertTrue((usedDiskAfter - usedDiskBefore) < 20);
}
private int runAndBlock(ContainerId cId, Map<String, String> launchCtxEnv, String... cmd) throws IOException {
String appId = "APP_" + System.currentTimeMillis();
Container container = mock(Container.class);
ContainerLaunchContext context = mock(ContainerLaunchContext.class);
when(container.getContainerId()).thenReturn(cId);
when(container.getLaunchContext()).thenReturn(context);
when(cId.getApplicationAttemptId().getApplicationId().toString()).thenReturn(appId);
when(context.getEnvironment()).thenReturn(launchCtxEnv);
String script = writeScriptFile(launchCtxEnv, cmd);
Path scriptPath = new Path(script);
Path tokensPath = new Path("/dev/null");
Path workDir = new Path(workSpace.getAbsolutePath());
Path pidFile = new Path(workDir, "pid.txt");
exec.activateContainer(cId, pidFile);
return exec.launchContainer(container, scriptPath, tokensPath,
appSubmitter, appId, workDir, dirsHandler.getLocalDirs(),
dirsHandler.getLogDirs());
}
private void waitForContainerToFinishOnNM(ContainerId containerId) {
Context nmContet = yarnCluster.getNodeManager(0).getNMContext();
int interval = 4 * 60; // Max time for container token to expire.
Assert.assertNotNull(nmContet.getContainers().containsKey(containerId));
while ((interval-- > 0)
&& !nmContet.getContainers().get(containerId)
.cloneAndGetContainerStatus().getState()
.equals(ContainerState.COMPLETE)) {
try {
LOG.info("Waiting for " + containerId + " to complete.");
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
// Normally, Containers will be removed from NM context after they are
// explicitly acked by RM. Now, manually remove it for testing.
yarnCluster.getNodeManager(0).getNodeStatusUpdater()
.addCompletedContainer(containerId);
nmContet.getContainers().remove(containerId);
}
@Override
public ApplicationAttemptReport createApplicationAttemptReport() {
this.readLock.lock();
ApplicationAttemptReport attemptReport = null;
try {
// AM container maybe not yet allocated. and also unmangedAM doesn't have
// am container.
ContainerId amId =
masterContainer == null ? null : masterContainer.getId();
attemptReport = ApplicationAttemptReport.newInstance(this
.getAppAttemptId(), this.getHost(), this.getRpcPort(), this
.getTrackingUrl(), this.getOriginalTrackingUrl(), this.getDiagnostics(),
YarnApplicationAttemptState .valueOf(this.getState().toString()), amId);
} finally {
this.readLock.unlock();
}
return attemptReport;
}
@Override
public void taskSuccess(TaskInstance task, ContainerId containerId) {
try {
(new Data(task.getId() + "_reply", containerId.toString())).stageIn();
JSONObject reply = parseEffiFile(task.getId() + "_reply");
workflow.addReply(reply);
writeEntryToLog(new JsonReportEntry(task.getWorkflowId(), task.getTaskId(), task.getTaskName(), task.getLanguageLabel(), Long.valueOf(task.getId()),
null, JsonReportEntry.KEY_INVOC_OUTPUT, reply.toString()));
for (String fileName : RemoteWorkflow.getOutputSet(requests.get(task), reply)) {
files.put(fileName, new Data(fileName, containerId.toString()));
}
} catch (IOException | JSONException e) {
e.printStackTrace(System.out);
System.exit(-1);
}
}
@Public
@Unstable
public static ApplicationAttemptHistoryData newInstance(
ApplicationAttemptId appAttemptId, String host, int rpcPort,
ContainerId masterContainerId, String diagnosticsInfo,
String trackingURL, FinalApplicationStatus finalApplicationStatus,
YarnApplicationAttemptState yarnApplicationAttemptState) {
ApplicationAttemptHistoryData appAttemptHD =
new ApplicationAttemptHistoryData();
appAttemptHD.setApplicationAttemptId(appAttemptId);
appAttemptHD.setHost(host);
appAttemptHD.setRPCPort(rpcPort);
appAttemptHD.setMasterContainerId(masterContainerId);
appAttemptHD.setDiagnosticsInfo(diagnosticsInfo);
appAttemptHD.setTrackingURL(trackingURL);
appAttemptHD.setFinalApplicationStatus(finalApplicationStatus);
appAttemptHD.setYarnApplicationAttemptState(yarnApplicationAttemptState);
return appAttemptHD;
}
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("{ ").append(rsrc.toString()).append(",")
.append(getState() == ResourceState.LOCALIZED
? getLocalPath() + "," + getSize()
: "pending").append(",[");
try {
this.readLock.lock();
for (ContainerId c : ref) {
sb.append("(").append(c.toString()).append(")");
}
sb.append("],").append(getTimestamp()).append(",").append(getState())
.append("}");
return sb.toString();
} finally {
this.readLock.unlock();
}
}
@Test
public void testContainerIdWithEpoch() throws URISyntaxException {
ContainerId id = TestContainerId.newContainerId(0, 0, 0, 25645811);
String cid = ConverterUtils.toString(id);
assertEquals("container_0_0000_00_25645811", cid);
ContainerId gen = ConverterUtils.toContainerId(cid);
assertEquals(gen.toString(), id.toString());
long ts = System.currentTimeMillis();
ContainerId id2 =
TestContainerId.newContainerId(36473, 4365472, ts, 4298334883325L);
String cid2 = ConverterUtils.toString(id2);
assertEquals(
"container_e03_" + ts + "_36473_4365472_999799999997", cid2);
ContainerId gen2 = ConverterUtils.toContainerId(cid2);
assertEquals(gen2.toString(), id2.toString());
ContainerId id3 =
TestContainerId.newContainerId(36473, 4365472, ts, 844424930131965L);
String cid3 = ConverterUtils.toString(id3);
assertEquals(
"container_e767_" + ts + "_36473_4365472_1099511627773", cid3);
ContainerId gen3 = ConverterUtils.toContainerId(cid3);
assertEquals(gen3.toString(), id3.toString());
}
protected void releaseContainers(List<ContainerId> containers,
SchedulerApplicationAttempt attempt) {
for (ContainerId containerId : containers) {
RMContainer rmContainer = getRMContainer(containerId);
if (rmContainer == null) {
if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp()
< nmExpireInterval) {
LOG.info(containerId + " doesn't exist. Add the container"
+ " to the release request cache as it maybe on recovery.");
synchronized (attempt) {
attempt.getPendingRelease().add(containerId);
}
} else {
RMAuditLogger.logFailure(attempt.getUser(),
AuditConstants.RELEASE_CONTAINER,
"Unauthorized access or invalid container", "Scheduler",
"Trying to release container not owned by app or with invalid id.",
attempt.getApplicationId(), containerId);
}
}
completedContainer(rmContainer,
SchedulerUtils.createAbnormalContainerStatus(containerId,
SchedulerUtils.RELEASED_CONTAINER), RMContainerEventType.RELEASED);
}
}
@Override
public void initiateStop() {
super.initiateStop();
LOG.debug("Initiating stop of task scheduler");
stopRequested = true;
List<ContainerId> releasedLaunchedContainers;
synchronized (this) {
releasedLaunchedContainers = new ArrayList<>(heldContainers.size());
List<HeldContainer> heldList = new ArrayList<>(heldContainers.values());
for (HeldContainer hc : heldList) {
if (releaseContainer(hc)) {
releasedLaunchedContainers.add(hc.getId());
}
}
List<Object> tasks = requestTracker.getTasks();
for (Object task : tasks) {
removeTaskRequest(task);
}
}
// perform app callback outside of locks
for (ContainerId id : releasedLaunchedContainers) {
getContext().containerBeingReleased(id);
}
}
private ContainerReport getContainer(
ContainerId containerId,
HashMap<ApplicationAttemptId, List<ContainerReport>> containersToAppAttemptMapping)
throws YarnException, IOException {
List<ContainerReport> containersForAppAttempt =
containersToAppAttemptMapping.get(containerId
.getApplicationAttemptId());
if (containersForAppAttempt == null) {
throw new ApplicationNotFoundException(containerId
.getApplicationAttemptId().getApplicationId() + " is not found ");
}
Iterator<ContainerReport> iterator = containersForAppAttempt.iterator();
while (iterator.hasNext()) {
ContainerReport next = iterator.next();
if (next.getContainerId().equals(containerId)) {
return next;
}
}
throw new ContainerNotFoundException(containerId + " is not found ");
}
/**
* Mark the container as inactive.
* Done iff the container is still active. Else treat it as
* a no-op
*/
public void deactivateContainer(ContainerId containerId) {
try {
writeLock.lock();
this.pidFiles.remove(containerId);
} finally {
writeLock.unlock();
}
}
@Override
public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> allServiceResponse) {
Container container = containers.get(containerId);
if (container != null) {
am.getNmClientAsync().getContainerStatusAsync(containerId, container.getNodeId());
}
}
private void launch() throws IOException, YarnException {
connect();
ContainerId masterContainerID = masterContainer.getId();
ApplicationSubmissionContext applicationContext =
application.getSubmissionContext();
LOG.info("Setting up container " + masterContainer
+ " for AM " + application.getAppAttemptId());
ContainerLaunchContext launchContext =
createAMContainerLaunchContext(applicationContext, masterContainerID);
StartContainerRequest scRequest =
StartContainerRequest.newInstance(launchContext,
masterContainer.getContainerToken());
List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
list.add(scRequest);
StartContainersRequest allRequests =
StartContainersRequest.newInstance(list);
StartContainersResponse response =
containerMgrProxy.startContainers(allRequests);
if (response.getFailedRequests() != null
&& response.getFailedRequests().containsKey(masterContainerID)) {
Throwable t =
response.getFailedRequests().get(masterContainerID).deSerialize();
parseAndThrowException(t);
} else {
LOG.info("Done launching container " + masterContainer + " for AM "
+ application.getAppAttemptId());
}
}
public static ContainerStatus createContainerStatus(int id,
ContainerState containerState) {
ApplicationId applicationId = ApplicationId.newInstance(0, 1);
ApplicationAttemptId applicationAttemptId =
ApplicationAttemptId.newInstance(applicationId, 1);
ContainerId contaierId = ContainerId.newContainerId(applicationAttemptId, id);
ContainerStatus containerStatus =
BuilderUtils.newContainerStatus(contaierId, containerState,
"test_containerStatus: id=" + id + ", containerState: "
+ containerState, 0);
return containerStatus;
}
@Override
public synchronized ContainerId getContainerId() {
ContainerStatusProtoOrBuilder p = viaProto ? proto : builder;
if (this.containerId != null) {
return this.containerId;
}
if (!p.hasContainerId()) {
return null;
}
this.containerId = convertFromProtoFormat(p.getContainerId());
return this.containerId;
}
/**
* Process node labels update on a node.
*
* TODO: Currently capacity scheduler will kill containers on a node when
* labels on the node changed. It is a simply solution to ensure guaranteed
* capacity on labels of queues. When YARN-2498 completed, we can let
* preemption policy to decide if such containers need to be killed or just
* keep them running.
*/
private synchronized void updateLabelsOnNode(NodeId nodeId,
Set<String> newLabels) {
FiCaSchedulerNode node = nodes.get(nodeId);
if (null == node) {
return;
}
// labels is same, we don't need do update
if (node.getLabels().size() == newLabels.size()
&& node.getLabels().containsAll(newLabels)) {
return;
}
// Kill running containers since label is changed
for (RMContainer rmContainer : node.getRunningContainers()) {
ContainerId containerId = rmContainer.getContainerId();
completedContainer(rmContainer,
ContainerStatus.newInstance(containerId,
ContainerState.COMPLETE,
String.format(
"Container=%s killed since labels on the node=%s changed",
containerId.toString(), nodeId.toString()),
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL);
}
// Unreserve container on this node
RMContainer reservedContainer = node.getReservedContainer();
if (null != reservedContainer) {
dropContainerReservation(reservedContainer);
}
// Update node labels after we've done this
node.updateLabels(newLabels);
}
@Override
public void onContainerStopped(ContainerId containerId) {
if (eventSubmitter.isPresent()) {
eventSubmitter.get().submit(GobblinYarnEventConstants.EventNames.CONTAINER_STOPPED,
GobblinYarnMetricTagNames.CONTAINER_ID, containerId.toString());
}
LOGGER.info(String.format("Container %s has been stopped", containerId));
if (containerMap.isEmpty()) {
synchronized (allContainersStopped) {
allContainersStopped.notify();
}
}
}
@Override
public void setId(ContainerId id) {
maybeInitBuilder();
if (id == null)
builder.clearId();
this.containerId = id;
}
@Override
public void removeContainerToken(ContainerId containerId)
throws IOException {
String key = CONTAINER_TOKENS_KEY_PREFIX + containerId;
try {
db.delete(bytes(key));
} catch (DBException e) {
throw new IOException(e);
}
}
@Test
public void testLogDirWithDriveLetter() throws Exception {
//To verify that logs paths which include drive letters (Windows)
//do not lose their drive letter specification
LocalDirsHandlerService localDirs = mock(LocalDirsHandlerService.class);
List<String> logDirs = new ArrayList<String>();
logDirs.add("F:/nmlogs");
when(localDirs.getLogDirsForRead()).thenReturn(logDirs);
ApplicationIdPBImpl appId = mock(ApplicationIdPBImpl.class);
when(appId.toString()).thenReturn("app_id_1");
ApplicationAttemptIdPBImpl appAttemptId =
mock(ApplicationAttemptIdPBImpl.class);
when(appAttemptId.getApplicationId()).thenReturn(appId);
ContainerId containerId = mock(ContainerIdPBImpl.class);
when(containerId.getApplicationAttemptId()).thenReturn(appAttemptId);
List<File> logDirFiles = ContainerLogsUtils.getContainerLogDirs(
containerId, localDirs);
Assert.assertTrue("logDir lost drive letter " +
logDirFiles.get(0),
logDirFiles.get(0).toString().indexOf("F:" + File.separator +
"nmlogs") > -1);
}
public static Container newContainer(ContainerId containerId, NodeId nodeId,
String nodeHttpAddress, Resource resource, Priority priority,
Token containerToken) {
Container container = recordFactory.newRecordInstance(Container.class);
container.setId(containerId);
container.setNodeId(nodeId);
container.setNodeHttpAddress(nodeHttpAddress);
container.setResource(resource);
container.setPriority(priority);
container.setContainerToken(containerToken);
return container;
}
public RMContainerReservedEvent(ContainerId containerId,
Resource reservedResource, NodeId reservedNode,
Priority reservedPriority) {
super(containerId, RMContainerEventType.RESERVED);
this.reservedResource = reservedResource;
this.reservedNode = reservedNode;
this.reservedPriority = reservedPriority;
}
public StatefulContainer(NMClientAsync client, ContainerId containerId) {
this.nmClientAsync = client;
this.containerId = containerId;
stateMachine = stateMachineFactory.make(this);
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
}
public ContainerLauncherEvent(TaskAttemptId taskAttemptID,
ContainerId containerID,
String containerMgrAddress,
Token containerToken,
ContainerLauncher.EventType type) {
super(type);
this.taskAttemptID = taskAttemptID;
this.containerID = containerID;
this.containerMgrAddress = containerMgrAddress;
this.containerToken = containerToken;
}
private void initSucceededContainers() {
if (this.succeededContainers != null)
return;
StartContainersResponseProtoOrBuilder p = viaProto ? proto : builder;
List<ContainerIdProto> list = p.getSucceededRequestsList();
this.succeededContainers = new ArrayList<ContainerId>();
for (ContainerIdProto c : list) {
this.succeededContainers.add(convertFromProtoFormat(c));
}
}