下面列出了怎么用org.apache.hadoop.mapreduce.JobID的API类实例代码及写法,或者点击链接到github查看源代码。
@Test (timeout=5000)
public void testJobInfo() throws IOException {
JobID jid = new JobID("001", 1);
Text user = new Text("User");
Path path = new Path("/tmp/test");
JobInfo info = new JobInfo(jid, user, path);
ByteArrayOutputStream out = new ByteArrayOutputStream();
info.write(new DataOutputStream(out));
JobInfo copyinfo = new JobInfo();
copyinfo.readFields(new DataInputStream(new ByteArrayInputStream(out
.toByteArray())));
assertEquals(info.getJobID().toString(), copyinfo.getJobID().toString());
assertEquals(info.getJobSubmitDir().getName(), copyinfo.getJobSubmitDir()
.getName());
assertEquals(info.getUser().toString(), copyinfo.getUser().toString());
}
@Override
public void initializeApplication(ApplicationInitializationContext context) {
String user = context.getUser();
ApplicationId appId = context.getApplicationId();
ByteBuffer secret = context.getApplicationDataForService();
// TODO these bytes should be versioned
try {
Token<JobTokenIdentifier> jt = deserializeServiceData(secret);
// TODO: Once SHuffle is out of NM, this can use MR APIs
JobID jobId = new JobID(Long.toString(appId.getClusterTimestamp()), appId.getId());
recordJobShuffleInfo(jobId, user, jt);
} catch (IOException e) {
LOG.error("Error during initApp", e);
// TODO add API to AuxiliaryServices to report failures
}
}
@Test
public void testMaxBlockLocationsNewSplits() throws Exception {
TEST_DIR.mkdirs();
try {
Configuration conf = new Configuration();
conf.setInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY, 4);
Path submitDir = new Path(TEST_DIR.getAbsolutePath());
FileSystem fs = FileSystem.getLocal(conf);
FileSplit split = new FileSplit(new Path("/some/path"), 0, 1,
new String[] { "loc1", "loc2", "loc3", "loc4", "loc5" });
JobSplitWriter.createSplitFiles(submitDir, conf, fs,
new FileSplit[] { split });
JobSplit.TaskSplitMetaInfo[] infos =
SplitMetaInfoReader.readSplitMetaInfo(new JobID(), fs, conf,
submitDir);
assertEquals("unexpected number of splits", 1, infos.length);
assertEquals("unexpected number of split locations",
4, infos[0].getLocations().length);
} finally {
FileUtil.fullyDelete(TEST_DIR);
}
}
/**
* test a getters of TaskAttemptFinishedEvent and TaskAttemptFinished
*
* @throws Exception
*/
@Test(timeout = 10000)
public void testTaskAttemptFinishedEvent() throws Exception {
JobID jid = new JobID("001", 1);
TaskID tid = new TaskID(jid, TaskType.REDUCE, 2);
TaskAttemptID taskAttemptId = new TaskAttemptID(tid, 3);
Counters counters = new Counters();
TaskAttemptFinishedEvent test = new TaskAttemptFinishedEvent(taskAttemptId,
TaskType.REDUCE, "TEST", 123L, "RAKNAME", "HOSTNAME", "STATUS",
counters);
assertEquals(test.getAttemptId().toString(), taskAttemptId.toString());
assertEquals(test.getCounters(), counters);
assertEquals(test.getFinishTime(), 123L);
assertEquals(test.getHostname(), "HOSTNAME");
assertEquals(test.getRackName(), "RAKNAME");
assertEquals(test.getState(), "STATUS");
assertEquals(test.getTaskId(), tid);
assertEquals(test.getTaskStatus(), "TEST");
assertEquals(test.getTaskType(), TaskType.REDUCE);
}
public synchronized ClientServiceDelegate getClient(JobID jobId) {
if (hsProxy == null) {
try {
hsProxy = instantiateHistoryProxy();
} catch (IOException e) {
LOG.warn("Could not connect to History server.", e);
throw new YarnRuntimeException("Could not connect to History server.", e);
}
}
ClientServiceDelegate client = cache.get(jobId);
if (client == null) {
client = new ClientServiceDelegate(conf, rm, jobId, hsProxy);
cache.put(jobId, client);
}
return client;
}
public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1, int arg2)
throws IOException, InterruptedException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter
.toYarn(arg0);
GetTaskAttemptCompletionEventsRequest request = recordFactory
.newRecordInstance(GetTaskAttemptCompletionEventsRequest.class);
request.setJobId(jobID);
request.setFromEventId(arg1);
request.setMaxEvents(arg2);
List<org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent> list =
((GetTaskAttemptCompletionEventsResponse) invoke(
"getTaskAttemptCompletionEvents", GetTaskAttemptCompletionEventsRequest.class, request)).
getCompletionEventList();
return TypeConverter
.fromYarn(list
.toArray(new org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent[0]));
}
@Test(timeout=60000)
public void testJobKillTimeout() throws Exception {
long timeToWaitBeforeHardKill =
10000 + MRJobConfig.DEFAULT_MR_AM_HARD_KILL_TIMEOUT_MS;
conf.setLong(MRJobConfig.MR_AM_HARD_KILL_TIMEOUT_MS,
timeToWaitBeforeHardKill);
clientDelegate = mock(ClientServiceDelegate.class);
doAnswer(
new Answer<ClientServiceDelegate>() {
@Override
public ClientServiceDelegate answer(InvocationOnMock invocation)
throws Throwable {
return clientDelegate;
}
}
).when(clientCache).getClient(any(JobID.class));
when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(new
org.apache.hadoop.mapreduce.JobStatus(jobId, 0f, 0f, 0f, 0f,
State.RUNNING, JobPriority.HIGH, "tmp", "tmp", "tmp", "tmp"));
long startTimeMillis = System.currentTimeMillis();
yarnRunner.killJob(jobId);
assertTrue("killJob should have waited at least " + timeToWaitBeforeHardKill
+ " ms.", System.currentTimeMillis() - startTimeMillis
>= timeToWaitBeforeHardKill);
}
@Override
public TaskID acquireTaskIdLock(Configuration conf) {
JobID jobId = HadoopFormats.getJobId(conf);
boolean lockAcquired = false;
int taskIdCandidate = 0;
while (!lockAcquired) {
taskIdCandidate = RANDOM_GEN.nextInt(Integer.MAX_VALUE);
Path path =
new Path(
locksDir,
String.format(LOCKS_DIR_TASK_PATTERN, getJobJtIdentifier(conf), taskIdCandidate));
lockAcquired = tryCreateFile(conf, path);
}
return HadoopFormats.createTaskID(jobId, taskIdCandidate);
}
/**
* Create an event to record successful job completion
* @param id Job ID
* @param finishTime Finish time of the job
* @param finishedMaps The number of finished maps
* @param finishedReduces The number of finished reduces
* @param failedMaps The number of failed maps
* @param failedReduces The number of failed reduces
* @param mapCounters Map Counters for the job
* @param reduceCounters Reduce Counters for the job
* @param totalCounters Total Counters for the job
*/
public JobFinishedEvent(JobID id, long finishTime,
int finishedMaps, int finishedReduces,
int failedMaps, int failedReduces,
Counters mapCounters, Counters reduceCounters,
Counters totalCounters) {
this.jobId = id;
this.finishTime = finishTime;
this.finishedMaps = finishedMaps;
this.finishedReduces = finishedReduces;
this.failedMaps = failedMaps;
this.failedReduces = failedReduces;
this.mapCounters = mapCounters;
this.reduceCounters = reduceCounters;
this.totalCounters = totalCounters;
}
public JobStatus getJobStatus(JobID oldJobID) throws IOException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
TypeConverter.toYarn(oldJobID);
GetJobReportRequest request =
recordFactory.newRecordInstance(GetJobReportRequest.class);
request.setJobId(jobId);
JobReport report = ((GetJobReportResponse) invoke("getJobReport",
GetJobReportRequest.class, request)).getJobReport();
JobStatus jobStatus = null;
if (report != null) {
if (StringUtils.isEmpty(report.getJobFile())) {
String jobFile = MRApps.getJobFile(conf, report.getUser(), oldJobID);
report.setJobFile(jobFile);
}
String historyTrackingUrl = report.getTrackingUrl();
String url = StringUtils.isNotEmpty(historyTrackingUrl)
? historyTrackingUrl : trackingUrl;
jobStatus = TypeConverter.fromYarn(report, url);
}
return jobStatus;
}
private List<TaskLocationHint> getMapLocationHintsFromInputSplits(JobID jobId,
FileSystem fs, Configuration conf,
String jobSubmitDir) throws IOException {
TaskSplitMetaInfo[] splitsInfo =
SplitMetaInfoReader.readSplitMetaInfo(jobId, fs, conf,
new Path(jobSubmitDir));
int splitsCount = splitsInfo.length;
List<TaskLocationHint> locationHints =
new ArrayList<TaskLocationHint>(splitsCount);
for (int i = 0; i < splitsCount; ++i) {
TaskLocationHint locationHint =
TaskLocationHint.createTaskLocationHint(
new HashSet<String>(
Arrays.asList(splitsInfo[i].getLocations())), null
);
locationHints.add(locationHint);
}
return locationHints;
}
@Test
public void testQueueNamePercentEncoding() throws IOException {
JobIndexInfo info = new JobIndexInfo();
JobID oldJobId = JobID.forName(JOB_ID);
JobId jobId = TypeConverter.toYarn(oldJobId);
info.setJobId(jobId);
info.setSubmitTime(Long.parseLong(SUBMIT_TIME));
info.setUser(USER_NAME);
info.setJobName(JOB_NAME);
info.setFinishTime(Long.parseLong(FINISH_TIME));
info.setNumMaps(Integer.parseInt(NUM_MAPS));
info.setNumReduces(Integer.parseInt(NUM_REDUCES));
info.setJobStatus(JOB_STATUS);
info.setQueueName(QUEUE_NAME_WITH_DELIMITER);
info.setJobStartTime(Long.parseLong(JOB_START_TIME));
String jobHistoryFile = FileNameIndexUtils.getDoneFileName(info);
Assert.assertTrue("Queue name not encoded correctly into job history file",
jobHistoryFile.contains(QUEUE_NAME_WITH_DELIMITER_ESCAPE));
}
@Test (timeout=5000)
public void testJobInfo() throws IOException {
JobID jid = new JobID("001", 1);
Text user = new Text("User");
Path path = new Path("/tmp/test");
JobInfo info = new JobInfo(jid, user, path);
ByteArrayOutputStream out = new ByteArrayOutputStream();
info.write(new DataOutputStream(out));
JobInfo copyinfo = new JobInfo();
copyinfo.readFields(new DataInputStream(new ByteArrayInputStream(out
.toByteArray())));
assertEquals(info.getJobID().toString(), copyinfo.getJobID().toString());
assertEquals(info.getJobSubmitDir().getName(), copyinfo.getJobSubmitDir()
.getName());
assertEquals(info.getUser().toString(), copyinfo.getUser().toString());
}
@Override
public void open(String uId) throws Exception {
this.hash = uId.hashCode();
Job job = ((ConfigurableHDFSFileSink<K, V>) getWriteOperation().getSink()).jobInstance();
FileOutputFormat.setOutputPath(job, new Path(path));
// Each Writer is responsible for writing one bundle of elements and is represented by one
// unique Hadoop task based on uId/hash. All tasks share the same job ID. Since Dataflow
// handles retrying of failed bundles, each task has one attempt only.
JobID jobId = job.getJobID();
TaskID taskId = new TaskID(jobId, TaskType.REDUCE, hash);
configure(job);
context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID(taskId, 0));
FileOutputFormat<K, V> outputFormat = formatClass.newInstance();
recordWriter = outputFormat.getRecordWriter(context);
outputCommitter = (FileOutputCommitter) outputFormat.getOutputCommitter(context);
}
/**
* Mask the job ID part in a {@link TaskAttemptID}.
*
* @param attemptId
* raw {@link TaskAttemptID} read from trace
* @return masked {@link TaskAttemptID} with empty {@link JobID}.
*/
private TaskAttemptID maskAttemptID(TaskAttemptID attemptId) {
JobID jobId = new JobID();
TaskType taskType = attemptId.getTaskType();
TaskID taskId = attemptId.getTaskID();
return new TaskAttemptID(jobId.getJtIdentifier(), jobId.getId(), taskType,
taskId.getId(), attemptId.getId());
}
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
HistoryEventEmitter thatg) {
if (jobIDName == null) {
return null;
}
JobID jobID = JobID.forName(jobIDName);
String finishTime = line.get("FINISH_TIME");
String status = line.get("JOB_STATUS");
String finishedMaps = line.get("FINISHED_MAPS");
String finishedReduces = line.get("FINISHED_REDUCES");
String failedMaps = line.get("FAILED_MAPS");
String failedReduces = line.get("FAILED_REDUCES");
String counters = line.get("COUNTERS");
if (status != null && status.equalsIgnoreCase("success")
&& finishTime != null && finishedMaps != null
&& finishedReduces != null) {
return new JobFinishedEvent(jobID, Long.parseLong(finishTime), Integer
.parseInt(finishedMaps), Integer.parseInt(finishedReduces), Integer
.parseInt(failedMaps), Integer.parseInt(failedReduces), null, null,
maybeParseCounters(counters));
}
return null;
}
public static JobContext newJobContext(Configuration conf, JobID jobId) {
try {
Class clazz = firstAvailableClass(
"org.apache.hadoop.mapreduce.task.JobContextImpl", // hadoop2, hadoop2-yarn
"org.apache.hadoop.mapreduce.JobContext"); // hadoop1
Constructor constructor = clazz.getDeclaredConstructor(Configuration.class, JobID.class);
return (JobContext) constructor.newInstance(conf, jobId);
} catch (NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) {
LOG.error("to use hadoop classes, we need them in the classpath " + e.getMessage());
throw new DeepInstantiationException(
"to use hadoop classes, we need them in the classpath " + e.getMessage());
}
}
@Override
protected void serviceStart() throws Exception {
scheduler= createSchedulerProxy();
JobID id = TypeConverter.fromYarn(this.applicationId);
JobId jobId = TypeConverter.toYarn(id);
job = context.getJob(jobId);
register();
startAllocatorThread();
super.serviceStart();
}
@Test
public void testListAttemptIdsWithValidInput() throws Exception {
JobID jobId = JobID.forName(jobIdStr);
Cluster mockCluster = mock(Cluster.class);
Job job = mock(Job.class);
CLI cli = spy(new CLI());
doReturn(mockCluster).when(cli).createCluster();
when(job.getTaskReports(TaskType.MAP)).thenReturn(
getTaskReports(jobId, TaskType.MAP));
when(job.getTaskReports(TaskType.REDUCE)).thenReturn(
getTaskReports(jobId, TaskType.REDUCE));
when(mockCluster.getJob(jobId)).thenReturn(job);
int retCode_MAP = cli.run(new String[] { "-list-attempt-ids", jobIdStr,
"MAP", "running" });
// testing case insensitive behavior
int retCode_map = cli.run(new String[] { "-list-attempt-ids", jobIdStr,
"map", "running" });
int retCode_REDUCE = cli.run(new String[] { "-list-attempt-ids", jobIdStr,
"REDUCE", "running" });
int retCode_completed = cli.run(new String[] { "-list-attempt-ids",
jobIdStr, "REDUCE", "completed" });
assertEquals("MAP is a valid input,exit code should be 0", 0, retCode_MAP);
assertEquals("map is a valid input,exit code should be 0", 0, retCode_map);
assertEquals("REDUCE is a valid input,exit code should be 0", 0,
retCode_REDUCE);
assertEquals(
"REDUCE and completed are a valid inputs to -list-attempt-ids,exit code should be 0",
0, retCode_completed);
verify(job, times(2)).getTaskReports(TaskType.MAP);
verify(job, times(2)).getTaskReports(TaskType.REDUCE);
}
private void writeMetrics(QueryMetric updatedQueryMetric, List<QueryMetric> storedQueryMetrics, Date lastUpdated, boolean delete) throws Exception {
LiveContextWriter contextWriter = null;
MapContext<Text,RawRecordContainer,Text,Mutation> context = null;
try {
contextWriter = new LiveContextWriter();
contextWriter.setup(conf, false);
TaskAttemptID taskId = new TaskAttemptID(new TaskID(new JobID(JOB_ID, 1), TaskType.MAP, 1), 1);
context = new MapContextImpl<>(conf, taskId, null, recordWriter, null, reporter, null);
for (QueryMetric storedQueryMetric : storedQueryMetrics) {
AbstractColumnBasedHandler<Key> handler = new ContentQueryMetricsHandler<>();
handler.setup(context);
Multimap<BulkIngestKey,Value> r = getEntries(handler, updatedQueryMetric, storedQueryMetric, lastUpdated, delete);
try {
if (r != null) {
contextWriter.write(r, context);
}
if (handler.getMetadata() != null) {
contextWriter.write(handler.getMetadata().getBulkMetadata(), context);
}
} finally {
contextWriter.commit(context);
}
}
} finally {
if (contextWriter != null && context != null) {
contextWriter.cleanup(context);
}
}
}
@Override
protected void serviceStart() throws Exception {
scheduler= createSchedulerProxy();
JobID id = TypeConverter.fromYarn(this.applicationId);
JobId jobId = TypeConverter.toYarn(id);
job = context.getJob(jobId);
register();
startAllocatorThread();
super.serviceStart();
}
public void setDatum(Object oDatum) {
this.datum = (JobFinished) oDatum;
this.jobId = JobID.forName(datum.jobid.toString());
this.finishTime = datum.finishTime;
this.finishedMaps = datum.finishedMaps;
this.finishedReduces = datum.finishedReduces;
this.failedMaps = datum.failedMaps;
this.failedReduces = datum.failedReduces;
this.mapCounters = EventReader.fromAvro(datum.mapCounters);
this.reduceCounters = EventReader.fromAvro(datum.reduceCounters);
this.totalCounters = EventReader.fromAvro(datum.totalCounters);
}
@Override
public void handle(ContainerAllocatorEvent event) {
ContainerId cId =
ContainerId.newContainerId(getContext().getApplicationAttemptId(),
containerCount++);
NodeId nodeId = NodeId.newInstance(NM_HOST, NM_PORT);
Resource resource = Resource.newInstance(1234, 2, 2);
ContainerTokenIdentifier containerTokenIdentifier =
new ContainerTokenIdentifier(cId, nodeId.toString(), "user",
resource, System.currentTimeMillis() + 10000, 42, 42,
Priority.newInstance(0), 0);
Token containerToken = newContainerToken(nodeId, "password".getBytes(),
containerTokenIdentifier);
Container container = Container.newInstance(cId, nodeId,
NM_HOST + ":" + NM_HTTP_PORT, resource, null, containerToken);
JobID id = TypeConverter.fromYarn(applicationId);
JobId jobId = TypeConverter.toYarn(id);
getContext().getEventHandler().handle(new JobHistoryEvent(jobId,
new NormalizedResourceEvent(
org.apache.hadoop.mapreduce.TaskType.REDUCE,
100)));
getContext().getEventHandler().handle(new JobHistoryEvent(jobId,
new NormalizedResourceEvent(
org.apache.hadoop.mapreduce.TaskType.MAP,
100)));
getContext().getEventHandler().handle(
new TaskAttemptContainerAssignedEvent(event.getAttemptID(),
container, null));
}
/**
* Resolves to provided {@link #TEMP_GCS_PATH} or fallbacks to a temporary path based on {@link
* #GCS_BUCKET} and {@code jobId}.
*
* @param conf the configuration to fetch the keys from.
* @param jobId the ID of the job requesting a working path. Optional (could be {@code null}) if
* {@link #TEMP_GCS_PATH} is provided.
* @return the temporary directory path.
* @throws IOException if the file system of the derived working path isn't GCS.
*/
public static String getTemporaryPathRoot(Configuration conf, @Nullable JobID jobId)
throws IOException {
// Try using the temporary gcs path.
String pathRoot = conf.get(BigQueryConfiguration.TEMP_GCS_PATH.getKey());
if (Strings.isNullOrEmpty(pathRoot)) {
checkNotNull(jobId, "jobId is required if '%s' is not set", TEMP_GCS_PATH.getKey());
logger.atInfo().log(
"Fetching key '%s' since '%s' isn't set explicitly.",
GCS_BUCKET.getKey(), TEMP_GCS_PATH.getKey());
String gcsBucket = conf.get(GCS_BUCKET.getKey());
if (Strings.isNullOrEmpty(gcsBucket)) {
throw new IOException(
"Must supply a value for configuration setting: " + GCS_BUCKET.getKey());
}
pathRoot = String.format("gs://%s/hadoop/tmp/bigquery/%s", gcsBucket, jobId);
}
logger.atInfo().log("Using working path: '%s'", pathRoot);
Path workingPath = new Path(pathRoot);
FileSystem fs = workingPath.getFileSystem(conf);
Preconditions.checkState("gs".equals(fs.getScheme()), "Export FS must be GCS ('gs' scheme).");
return pathRoot;
}
@Test(timeout = 10000)
public void testJobQueueChange() throws Exception {
org.apache.hadoop.mapreduce.JobID jid = new JobID("001", 1);
JobQueueChangeEvent test = new JobQueueChangeEvent(jid,
"newqueue");
assertEquals(test.getJobId().toString(), jid.toString());
assertEquals(test.getJobQueueName(), "newqueue");
}
public JobID getNewJobID() throws IOException, InterruptedException {
try {
this.application =
client.createApplication().getNewApplicationResponse();
} catch (YarnException e) {
throw new IOException(e);
}
this.applicationId = this.application.getApplicationId();
return TypeConverter.fromYarn(applicationId);
}
private void removeJobShuffleInfo(JobID jobId) throws IOException {
String jobIdStr = jobId.toString();
secretManager.removeTokenForJob(jobIdStr);
userRsrc.remove(jobIdStr);
if (stateDb != null) {
try {
stateDb.delete(bytes(jobIdStr));
} catch (DBException e) {
throw new IOException("Unable to remove " + jobId
+ " from state store", e);
}
}
}
public JobContextImpl(Configuration conf, JobID jobId) {
if (conf instanceof JobConf) {
this.conf = (JobConf)conf;
} else {
this.conf = new JobConf(conf);
}
this.jobId = jobId;
this.credentials = this.conf.getCredentials();
try {
this.ugi = UserGroupInformation.getCurrentUser();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Test(timeout = 5000)
public void testTaskID() throws IOException, InterruptedException {
JobID jobid = new JobID("1014873536921", 6);
TaskID tid = new TaskID(jobid, TaskType.MAP, 0);
org.apache.hadoop.mapred.TaskID tid1 =
org.apache.hadoop.mapred.TaskID.downgrade(tid);
org.apache.hadoop.mapred.TaskReport treport =
new org.apache.hadoop.mapred.TaskReport(tid1, 0.0f,
State.FAILED.toString(), null, TIPStatus.FAILED, 100, 100,
new org.apache.hadoop.mapred.Counters());
Assert
.assertEquals(treport.getTaskId(), "task_1014873536921_0006_m_000000");
Assert.assertEquals(treport.getTaskID().toString(),
"task_1014873536921_0006_m_000000");
}
@Override
protected Job createJob(Configuration conf, JobStateInternal forcedState,
String diagnostic) {
JobImpl jobImpl = mock(JobImpl.class);
when(jobImpl.getInternalState()).thenReturn(this.jobStateInternal);
when(jobImpl.getAllCounters()).thenReturn(new Counters());
JobID jobID = JobID.forName("job_1234567890000_0001");
JobId jobId = TypeConverter.toYarn(jobID);
when(jobImpl.getID()).thenReturn(jobId);
((AppContext) getContext())
.getAllJobs().put(jobImpl.getID(), jobImpl);
return jobImpl;
}