下面列出了org.apache.hadoop.mapreduce.Job#getJobID ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void startGetJobIdThread(Job job) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
if (job.getJobID() != null) {
LogUtils.info("map reducer jobId:" + job.getJobID());
break;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
thread.setDaemon(true);
thread.setName("getJobId");
thread.start();
}
@Override
public Metrics copy() throws CircusTrainException {
LOG.info("Copying table data.");
LOG.debug("Invoking S3MapReduceCp: {} -> {}", sourceDataBaseLocation, replicaDataLocation);
S3MapReduceCpOptions s3MapReduceCpOptions = parseCopierOptions(copierOptions);
LOG.debug("Invoking S3MapReduceCp with options: {}", s3MapReduceCpOptions);
try {
Enum<?> counter = Counter.BYTESCOPIED;
Job job = executor.exec(conf, s3MapReduceCpOptions);
registerRunningJobMetrics(job, counter);
if (!job.waitForCompletion(true)) {
throw new IOException(
"S3MapReduceCp failure: Job " + job.getJobID() + " has failed: " + job.getStatus().getFailureInfo());
}
return new JobMetrics(job, counter);
} catch (Exception e) {
cleanUpReplicaDataLocation();
throw new CircusTrainException("Unable to copy file(s)", e);
}
}
/**
* Generates a job stats.
*/
public static JobStats generateJobStats(Job job, JobStory jobdesc) {
int seq = GridmixJob.getJobSeqId(job);
// bail out if job description is missing for a job to be simulated
if (seq >= 0 && jobdesc == null) {
throw new IllegalArgumentException("JobStory not available for job "
+ job.getJobID());
}
int maps = -1;
int reds = -1;
if (jobdesc != null) {
// Note that the ZombieJob will return a >= 0 value
maps = jobdesc.getNumberMaps();
reds = jobdesc.getNumberReduces();
}
return new JobStats(maps, reds, job);
}
/**
* Generates a job stats.
*/
public static JobStats generateJobStats(Job job, JobStory jobdesc) {
int seq = GridmixJob.getJobSeqId(job);
// bail out if job description is missing for a job to be simulated
if (seq >= 0 && jobdesc == null) {
throw new IllegalArgumentException("JobStory not available for job "
+ job.getJobID());
}
int maps = -1;
int reds = -1;
if (jobdesc != null) {
// Note that the ZombieJob will return a >= 0 value
maps = jobdesc.getNumberMaps();
reds = jobdesc.getNumberReduces();
}
return new JobStats(maps, reds, job);
}
@Override
public void open(String uId) throws Exception {
this.hash = uId.hashCode();
Job job = ((ConfigurableHDFSFileSink<K, V>) getWriteOperation().getSink()).jobInstance();
FileOutputFormat.setOutputPath(job, new Path(path));
// Each Writer is responsible for writing one bundle of elements and is represented by one
// unique Hadoop task based on uId/hash. All tasks share the same job ID. Since Dataflow
// handles retrying of failed bundles, each task has one attempt only.
JobID jobId = job.getJobID();
TaskID taskId = new TaskID(jobId, TaskType.REDUCE, hash);
configure(job);
context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID(taskId, 0));
FileOutputFormat<K, V> outputFormat = formatClass.newInstance();
recordWriter = outputFormat.getRecordWriter(context);
outputCommitter = (FileOutputCommitter) outputFormat.getOutputCommitter(context);
}
public static JobStepStatusEnum checkStatus(Job job, StringBuilder output) {
if (job == null || job.getJobID() == null) {
output.append("Skip status check with empty job id..\n");
return JobStepStatusEnum.WAITING;
}
JobStepStatusEnum status = null;
try {
switch (job.getStatus().getState()) {
case SUCCEEDED:
status = JobStepStatusEnum.FINISHED;
break;
case FAILED:
status = JobStepStatusEnum.ERROR;
break;
case KILLED:
status = JobStepStatusEnum.KILLED;
break;
case RUNNING:
status = JobStepStatusEnum.RUNNING;
break;
case PREP:
status = JobStepStatusEnum.WAITING;
break;
default:
throw new IllegalStateException();
}
} catch (Exception e) {
logger.error("error check status", e);
output.append("Exception: ").append(e.getLocalizedMessage()).append("\n");
status = JobStepStatusEnum.ERROR;
}
return status;
}
@Override
public Metrics copy() throws CircusTrainException {
LOG.info("Copying table data.");
LOG.debug("Invoking DistCp: {} -> {}", sourceDataBaseLocation, replicaDataLocation);
DistCpOptions distCpOptions = parseCopierOptions(copierOptions);
LOG.debug("Invoking DistCp with options: {}", distCpOptions);
CircusTrainCopyListing.setAsCopyListingClass(conf);
CircusTrainCopyListing.setRootPath(conf, sourceDataBaseLocation);
try {
distCpOptions.setBlocking(false);
Job job = executor.exec(conf, distCpOptions);
String counter = String
.format("%s_BYTES_WRITTEN", replicaDataLocation.toUri().getScheme().toUpperCase(Locale.ROOT));
registerRunningJobMetrics(job, counter);
if (!job.waitForCompletion(true)) {
throw new IOException(
"DistCp failure: Job " + job.getJobID() + " has failed: " + job.getStatus().getFailureInfo());
}
return new JobMetrics(job, FileSystemCounter.class.getName(), counter);
} catch (Exception e) {
cleanUpReplicaDataLocation();
throw new CircusTrainException("Unable to copy file(s)", e);
}
}
public static JobStepStatusEnum checkStatus(Job job, StringBuilder output) {
if (job == null || job.getJobID() == null) {
output.append("Skip status check with empty job id..\n");
return JobStepStatusEnum.WAITING;
}
JobStepStatusEnum status = null;
try {
switch (job.getStatus().getState()) {
case SUCCEEDED:
status = JobStepStatusEnum.FINISHED;
break;
case FAILED:
status = JobStepStatusEnum.ERROR;
break;
case KILLED:
status = JobStepStatusEnum.KILLED;
break;
case RUNNING:
status = JobStepStatusEnum.RUNNING;
break;
case PREP:
status = JobStepStatusEnum.WAITING;
break;
default:
throw new IllegalStateException();
}
} catch (Exception e) {
logger.error("error check status", e);
output.append("Exception: ").append(e.getLocalizedMessage()).append("\n");
status = JobStepStatusEnum.ERROR;
}
return status;
}
@SuppressWarnings("rawtypes")
@Test
public void testInputFormat() throws Exception {
RyaStatement input = RyaStatement.builder()
.setSubject(new RyaIRI("http://www.google.com"))
.setPredicate(new RyaIRI("http://some_other_uri"))
.setObject(new RyaIRI("http://www.yahoo.com"))
.setColumnVisibility(new byte[0])
.setValue(new byte[0])
.build();
apiImpl.add(input);
Job jobConf = Job.getInstance();
GraphXEdgeInputFormat.setMockInstance(jobConf, instance.getInstanceName());
GraphXEdgeInputFormat.setConnectorInfo(jobConf, username, password);
GraphXEdgeInputFormat.setTableLayout(jobConf, TABLE_LAYOUT.SPO);
GraphXEdgeInputFormat.setInputTableName(jobConf, table);
GraphXEdgeInputFormat.setInputTableName(jobConf, table);
GraphXEdgeInputFormat.setScanIsolation(jobConf, false);
GraphXEdgeInputFormat.setLocalIterators(jobConf, false);
GraphXEdgeInputFormat.setOfflineTableScan(jobConf, false);
GraphXEdgeInputFormat inputFormat = new GraphXEdgeInputFormat();
JobContext context = new JobContextImpl(jobConf.getConfiguration(), jobConf.getJobID());
List<InputSplit> splits = inputFormat.getSplits(context);
Assert.assertEquals(1, splits.size());
TaskAttemptContext taskAttemptContext = new TaskAttemptContextImpl(context.getConfiguration(), new TaskAttemptID(new TaskID(), 1));
RecordReader reader = inputFormat.createRecordReader(splits.get(0), taskAttemptContext);
RecordReader ryaStatementRecordReader = (RecordReader) reader;
ryaStatementRecordReader.initialize(splits.get(0), taskAttemptContext);
List<Edge> results = new ArrayList<Edge>();
while(ryaStatementRecordReader.nextKeyValue()) {
Edge writable = (Edge) ryaStatementRecordReader.getCurrentValue();
long srcId = writable.srcId();
long destId = writable.dstId();
RyaTypeWritable rtw = null;
Object text = ryaStatementRecordReader.getCurrentKey();
Edge<RyaTypeWritable> edge = new Edge<RyaTypeWritable>(srcId, destId, rtw);
results.add(edge);
System.out.println(text);
}
System.out.println(results.size());
System.out.println(results);
Assert.assertTrue(results.size() == 2);
}
@Test
public void testInputFormat() throws Exception {
RyaStatement input = RyaStatement.builder()
.setSubject(new RyaIRI("http://www.google.com"))
.setPredicate(new RyaIRI("http://some_other_uri"))
.setObject(new RyaIRI("http://www.yahoo.com"))
.setColumnVisibility(new byte[0])
.setValue(new byte[0])
.build();
apiImpl.add(input);
Job jobConf = Job.getInstance();
RyaInputFormat.setMockInstance(jobConf, instance.getInstanceName());
RyaInputFormat.setConnectorInfo(jobConf, username, password);
RyaInputFormat.setTableLayout(jobConf, TABLE_LAYOUT.SPO);
AccumuloInputFormat.setInputTableName(jobConf, table);
AccumuloInputFormat.setInputTableName(jobConf, table);
AccumuloInputFormat.setScanIsolation(jobConf, false);
AccumuloInputFormat.setLocalIterators(jobConf, false);
AccumuloInputFormat.setOfflineTableScan(jobConf, false);
RyaInputFormat inputFormat = new RyaInputFormat();
JobContext context = new JobContextImpl(jobConf.getConfiguration(), jobConf.getJobID());
List<InputSplit> splits = inputFormat.getSplits(context);
Assert.assertEquals(1, splits.size());
TaskAttemptContext taskAttemptContext = new TaskAttemptContextImpl(context.getConfiguration(), new TaskAttemptID(new TaskID(), 1));
RecordReader<Text, RyaStatementWritable> reader = inputFormat.createRecordReader(splits.get(0), taskAttemptContext);
RyaStatementRecordReader ryaStatementRecordReader = (RyaStatementRecordReader)reader;
ryaStatementRecordReader.initialize(splits.get(0), taskAttemptContext);
List<RyaStatement> results = new ArrayList<RyaStatement>();
while(ryaStatementRecordReader.nextKeyValue()) {
RyaStatementWritable writable = ryaStatementRecordReader.getCurrentValue();
RyaStatement value = writable.getRyaStatement();
Text text = ryaStatementRecordReader.getCurrentKey();
RyaStatement stmt = RyaStatement.builder()
.setSubject(value.getSubject())
.setPredicate(value.getPredicate())
.setObject(value.getObject())
.setContext(value.getContext())
.setQualifier(value.getQualifer())
.setColumnVisibility(value.getColumnVisibility())
.setValue(value.getValue())
.build();
results.add(stmt);
System.out.println(text);
System.out.println(value);
}
Assert.assertTrue(results.size() == 2);
Assert.assertTrue(results.contains(input));
}
@Test
public void testInputFormat() throws Exception {
final RyaStatement input = RyaStatement.builder()
.setSubject(new RyaIRI("http://www.google.com"))
.setPredicate(new RyaIRI("http://some_other_uri"))
.setObject(new RyaIRI("http://www.yahoo.com"))
.setColumnVisibility(new byte[0])
.setValue(new byte[0])
.build();
apiImpl.add(input);
final Job jobConf = Job.getInstance();
GraphXInputFormat.setMockInstance(jobConf, instance.getInstanceName());
GraphXInputFormat.setConnectorInfo(jobConf, username, password);
GraphXInputFormat.setInputTableName(jobConf, table);
GraphXInputFormat.setInputTableName(jobConf, table);
GraphXInputFormat.setScanIsolation(jobConf, false);
GraphXInputFormat.setLocalIterators(jobConf, false);
GraphXInputFormat.setOfflineTableScan(jobConf, false);
final GraphXInputFormat inputFormat = new GraphXInputFormat();
final JobContext context = new JobContextImpl(jobConf.getConfiguration(), jobConf.getJobID());
final List<InputSplit> splits = inputFormat.getSplits(context);
Assert.assertEquals(1, splits.size());
final TaskAttemptContext taskAttemptContext = new TaskAttemptContextImpl(context.getConfiguration(), new TaskAttemptID(new TaskID(), 1));
final RecordReader<Object, RyaTypeWritable> reader = inputFormat.createRecordReader(splits.get(0), taskAttemptContext);
final RyaStatementRecordReader ryaStatementRecordReader = (RyaStatementRecordReader)reader;
ryaStatementRecordReader.initialize(splits.get(0), taskAttemptContext);
final List<RyaType> results = new ArrayList<RyaType>();
System.out.println("before while");
while(ryaStatementRecordReader.nextKeyValue()) {
System.out.println("in while");
final RyaTypeWritable writable = ryaStatementRecordReader.getCurrentValue();
final RyaType value = writable.getRyaType();
final Object text = ryaStatementRecordReader.getCurrentKey();
final RyaType type = new RyaType();
final String validatedLanguage = LiteralLanguageUtils.validateLanguage(value.getLanguage(), value.getDataType());
type.setData(value.getData());
type.setDataType(value.getDataType());
type.setLanguage(validatedLanguage);
results.add(type);
System.out.println(value.getData());
System.out.println(value.getDataType());
System.out.println(results);
System.out.println(type);
System.out.println(text);
System.out.println(value);
}
System.out.println("after while");
System.out.println(results.size());
System.out.println(results);
// Assert.assertTrue(results.size() == 2);
// Assert.assertTrue(results.contains(input));
}
private String getJobInfo(Job job) {
return "jobName: " + job.getJobName() + ", jobId: " + job.getJobID();
}
/**
* Run a map/reduce job for estimating Pi.
*
* @return the estimated value of Pi
*/
public static JobID submitPiEstimationMRApp(String jobName, int numMaps, long numPoints,
Path tmpDir, Configuration conf
) throws IOException, ClassNotFoundException, InterruptedException {
Job job = new Job(conf);
//setup job conf
job.setJobName(jobName);
job.setJarByClass(QuasiMonteCarlo.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputKeyClass(BooleanWritable.class);
job.setOutputValueClass(LongWritable.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapperClass(QmcMapper.class);
job.setReducerClass(QmcReducer.class);
job.setNumReduceTasks(1);
// turn off speculative execution, because DFS doesn't handle
// multiple writers to the same file.
job.setSpeculativeExecution(false);
//setup input/output directories
final Path inDir = new Path(tmpDir, "in");
final Path outDir = new Path(tmpDir, "out");
FileInputFormat.setInputPaths(job, inDir);
FileOutputFormat.setOutputPath(job, outDir);
final FileSystem fs = FileSystem.get(conf);
if (fs.exists(tmpDir)) {
fs.delete(tmpDir, true);
// throw new IOException("Tmp directory " + fs.makeQualified(tmpDir)
// + " already exists. Please remove it first.");
}
if (!fs.mkdirs(inDir)) {
throw new IOException("Cannot create input directory " + inDir);
}
// try {
//generate an input file for each map task
for(int i=0; i < numMaps; ++i) {
final Path file = new Path(inDir, "part"+i);
final LongWritable offset = new LongWritable(i * numPoints);
final LongWritable size = new LongWritable(numPoints);
final SequenceFile.Writer writer = SequenceFile.createWriter(
fs, conf, file,
LongWritable.class, LongWritable.class, CompressionType.NONE);
try {
writer.append(offset, size);
} finally {
writer.close();
}
System.out.println("Wrote input for Map #"+i);
}
//start a map/reduce job
System.out.println("Starting Job");
final long startTime = System.currentTimeMillis();
job.submit();
// final double duration = (System.currentTimeMillis() - startTime)/1000.0;
// System.out.println("Job Finished in " + duration + " seconds");
return job.getJobID();
// } finally {
// fs.delete(tmpDir, true);
// }
}
@Override
public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException {
String ks = jobConf.get(AbstractColumnSerDe.CASSANDRA_KEYSPACE_NAME);
String cf = jobConf.get(AbstractColumnSerDe.CASSANDRA_CF_NAME);
int slicePredicateSize = jobConf.getInt(AbstractColumnSerDe.CASSANDRA_SLICE_PREDICATE_SIZE,
AbstractColumnSerDe.DEFAULT_SLICE_PREDICATE_SIZE);
int sliceRangeSize = jobConf.getInt(
AbstractColumnSerDe.CASSANDRA_RANGE_BATCH_SIZE,
AbstractColumnSerDe.DEFAULT_RANGE_BATCH_SIZE);
int splitSize = jobConf.getInt(
AbstractColumnSerDe.CASSANDRA_SPLIT_SIZE,
AbstractColumnSerDe.DEFAULT_SPLIT_SIZE);
String cassandraColumnMapping = jobConf.get(AbstractColumnSerDe.CASSANDRA_COL_MAPPING);
int rpcPort = jobConf.getInt(AbstractColumnSerDe.CASSANDRA_PORT, 9160);
String host = jobConf.get(AbstractColumnSerDe.CASSANDRA_HOST);
String partitioner = jobConf.get(AbstractColumnSerDe.CASSANDRA_PARTITIONER);
if (cassandraColumnMapping == null) {
throw new IOException("cassandra.columns.mapping required for Cassandra Table.");
}
SliceRange range = new SliceRange();
range.setStart(new byte[0]);
range.setFinish(new byte[0]);
range.setReversed(false);
range.setCount(slicePredicateSize);
SlicePredicate predicate = new SlicePredicate();
predicate.setSlice_range(range);
ConfigHelper.setInputRpcPort(jobConf, "" + rpcPort);
ConfigHelper.setInputInitialAddress(jobConf, host);
ConfigHelper.setInputPartitioner(jobConf, partitioner);
ConfigHelper.setInputSlicePredicate(jobConf, predicate);
ConfigHelper.setInputColumnFamily(jobConf, ks, cf);
ConfigHelper.setRangeBatchSize(jobConf, sliceRangeSize);
ConfigHelper.setInputSplitSize(jobConf, splitSize);
Job job = new Job(jobConf);
JobContext jobContext = new JobContext(job.getConfiguration(), job.getJobID());
Path[] tablePaths = FileInputFormat.getInputPaths(jobContext);
List<org.apache.hadoop.mapreduce.InputSplit> splits = getSplits(jobContext);
InputSplit[] results = new InputSplit[splits.size()];
for (int i = 0; i < splits.size(); ++i) {
HiveCassandraStandardSplit csplit = new HiveCassandraStandardSplit(
(ColumnFamilySplit) splits.get(i), cassandraColumnMapping, tablePaths[0]);
csplit.setKeyspace(ks);
csplit.setColumnFamily(cf);
csplit.setRangeBatchSize(sliceRangeSize);
csplit.setSplitSize(splitSize);
csplit.setHost(host);
csplit.setPort(rpcPort);
csplit.setSlicePredicateSize(slicePredicateSize);
csplit.setPartitioner(partitioner);
csplit.setColumnMapping(cassandraColumnMapping);
results[i] = csplit;
}
return results;
}
private static String getJobInfo(Job job) {
return "jobName: " + job.getJobName() + ", jobId: " + job.getJobID();
}