下面列出了怎么用org.apache.hadoop.mapred.JobConf的API类实例代码及写法,或者点击链接到github查看源代码。
private static void joinAs(String jointype,
Class<? extends SimpleCheckerBase> c) throws Exception {
final int srcs = 4;
Configuration conf = new Configuration();
JobConf job = new JobConf(conf, c);
Path base = cluster.getFileSystem().makeQualified(new Path("/"+jointype));
Path[] src = writeSimpleSrc(base, conf, srcs);
job.set("mapreduce.join.expr", CompositeInputFormat.compose(jointype,
SequenceFileInputFormat.class, src));
job.setInt("testdatamerge.sources", srcs);
job.setInputFormat(CompositeInputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(base, "out"));
job.setMapperClass(c);
job.setReducerClass(c);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
JobClient.runJob(job);
base.getFileSystem(job).delete(base, true);
}
public static int getTaskId() {
MapredContext ctx = MapredContextAccessor.get();
if (ctx == null) {
throw new IllegalStateException("MapredContext is not set");
}
JobConf jobconf = ctx.getJobConf();
if (jobconf == null) {
throw new IllegalStateException("JobConf is not set");
}
int taskid = jobconf.getInt("mapred.task.partition", -1);
if (taskid == -1) {
taskid = jobconf.getInt("mapreduce.task.partition", -1);
if (taskid == -1) {
throw new IllegalStateException(
"Both mapred.task.partition and mapreduce.task.partition are not set: "
+ toString(jobconf));
}
}
return taskid;
}
private void splitRealFiles(String[] args) throws IOException {
JobConf conf = new JobConf();
FileSystem fs = FileSystem.get(conf);
if (!(fs instanceof DistributedFileSystem)) {
throw new IOException("Wrong file system: " + fs.getClass().getName());
}
int blockSize = conf.getInt("dfs.block.size", 128 * 1024 * 1024);
DummyInputFormat inFormat = new DummyInputFormat();
for (int i = 0; i < args.length; i++) {
inFormat.addInputPaths(conf, args[i]);
}
inFormat.setMinSplitSizeRack(blockSize);
inFormat.setMaxSplitSize(10 * blockSize);
InputSplit[] splits = inFormat.getSplits(conf, 1);
System.out.println("Total number of splits " + splits.length);
for (int i = 0; i < splits.length; ++i) {
CombineFileSplit fileSplit = (CombineFileSplit) splits[i];
System.out.println("Split[" + i + "] " + fileSplit);
}
}
static <K> void configureDataFileWriter(DataFileWriter<K> writer,
JobConf job) throws UnsupportedEncodingException {
if (FileOutputFormat.getCompressOutput(job)) {
int level = job.getInt(org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY,
org.apache.avro.mapred.AvroOutputFormat.DEFAULT_DEFLATE_LEVEL);
String codecName = job.get(AvroJob.OUTPUT_CODEC, DEFLATE_CODEC);
CodecFactory factory = codecName.equals(DEFLATE_CODEC) ?
CodecFactory.deflateCodec(level) : CodecFactory.fromString(codecName);
writer.setCodec(factory);
}
writer.setSyncInterval(job.getInt(org.apache.avro.mapred.AvroOutputFormat.SYNC_INTERVAL_KEY,
DEFAULT_SYNC_INTERVAL));
// copy metadata from job
for (Map.Entry<String,String> e : job) {
if (e.getKey().startsWith(AvroJob.TEXT_PREFIX))
writer.setMeta(e.getKey().substring(AvroJob.TEXT_PREFIX.length()),e.getValue());
if (e.getKey().startsWith(AvroJob.BINARY_PREFIX))
writer.setMeta(e.getKey().substring(AvroJob.BINARY_PREFIX.length()),
URLDecoder.decode(e.getValue(), "ISO-8859-1")
.getBytes("ISO-8859-1"));
}
}
public void configure(JobConf job) {
super.configure(job);
//disable the auto increment of the counter. For streaming, no of
//processed records could be different(equal or less) than the no of
//records input.
SkipBadRecords.setAutoIncrReducerProcCount(job, false);
skipping = job.getBoolean(MRJobConfig.SKIP_RECORDS, false);
try {
reduceOutFieldSeparator = job_.get("stream.reduce.output.field.separator", "\t").getBytes("UTF-8");
reduceInputFieldSeparator = job_.get("stream.reduce.input.field.separator", "\t").getBytes("UTF-8");
this.numOfReduceOutputKeyFields = job_.getInt("stream.num.reduce.output.key.fields", 1);
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("The current system does not support UTF-8 encoding!", e);
}
}
/**
* Maps two Hadoop Reducer (mapred API) to a combinable Flink GroupReduceFunction.
*
* @param hadoopReducer The Hadoop Reducer that is mapped to a GroupReduceFunction.
* @param hadoopCombiner The Hadoop Reducer that is mapped to the combiner function.
* @param conf The JobConf that is used to configure both Hadoop Reducers.
*/
public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
Reducer<KEYIN, VALUEIN, KEYIN, VALUEIN> hadoopCombiner, JobConf conf) {
if (hadoopReducer == null) {
throw new NullPointerException("Reducer may not be null.");
}
if (hadoopCombiner == null) {
throw new NullPointerException("Combiner may not be null.");
}
if (conf == null) {
throw new NullPointerException("JobConf may not be null.");
}
this.reducer = hadoopReducer;
this.combiner = hadoopCombiner;
this.jobConf = conf;
}
@SuppressWarnings("deprecation")
protected static void configureTaskJVMOptions(Configuration originalJobConf,
Configuration simulatedJobConf){
// Get the heap related java opts used for the original job and set the
// same for the simulated job.
// set task task heap options
configureTaskJVMMaxHeapOptions(originalJobConf, simulatedJobConf,
JobConf.MAPRED_TASK_JAVA_OPTS);
// set map task heap options
configureTaskJVMMaxHeapOptions(originalJobConf, simulatedJobConf,
MRJobConfig.MAP_JAVA_OPTS);
// set reduce task heap options
configureTaskJVMMaxHeapOptions(originalJobConf, simulatedJobConf,
MRJobConfig.REDUCE_JAVA_OPTS);
}
private static void getJtToken(Credentials cred) throws IOException {
try {
JobConf jobConf = new JobConf();
JobClient jobClient = new JobClient(jobConf);
LOG.info("Pre-fetching JT token from JobTracker");
Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(getMRTokenRenewerInternal(jobConf));
if (mrdt == null) {
LOG.error("Failed to fetch JT token");
throw new IOException("Failed to fetch JT token.");
}
LOG.info("Created JT token: " + mrdt.toString());
LOG.info("Token kind: " + mrdt.getKind());
LOG.info("Token id: " + Arrays.toString(mrdt.getIdentifier()));
LOG.info("Token service: " + mrdt.getService());
cred.addToken(mrdt.getService(), mrdt);
} catch (InterruptedException ie) {
throw new IOException(ie);
}
}
@Override // Mapper
public void configure(JobConf conf) {
super.configure(conf);
// grab compression
String compression = getConf().get("test.io.compression.class", null);
Class<? extends CompressionCodec> codec;
// try to initialize codec
try {
codec = (compression == null) ? null :
Class.forName(compression).asSubclass(CompressionCodec.class);
} catch(Exception e) {
throw new RuntimeException("Compression codec not found: ", e);
}
if(codec != null) {
compressionCodec = (CompressionCodec)
ReflectionUtils.newInstance(codec, getConf());
}
}
@Override // MapReduceBase
public void configure(JobConf conf) {
try {
config = new ConfigExtractor(conf);
ConfigExtractor.dumpOptions(config);
filesystem = config.getBaseDirectory().getFileSystem(conf);
} catch (Exception e) {
LOG.error("Unable to setup slive " + StringUtils.stringifyException(e));
throw new RuntimeException("Unable to setup slive configuration", e);
}
if(conf.get(MRJobConfig.TASK_ATTEMPT_ID) != null ) {
this.taskId = TaskAttemptID.forName(conf.get(MRJobConfig.TASK_ATTEMPT_ID))
.getTaskID().getId();
} else {
// So that branch-1/0.20 can run this same code as well
this.taskId = TaskAttemptID.forName(conf.get("mapred.task.id"))
.getTaskID().getId();
}
}
static RunningJob runJobFail(JobConf conf, Path inDir, Path outDir)
throws IOException {
conf.setJobName("test-job-fail");
conf.setMapperClass(FailMapper.class);
conf.setReducerClass(IdentityReducer.class);
RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
while (!job.isComplete()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
break;
}
}
return job;
}
@Override
protected LocatedFileStatus[] listLocatedStatus(JobConf job) throws IOException {
Path[] files = getInputPaths(job);
LocatedFileStatus[] results = new LocatedFileStatus[files.length];
for (int i = 0; i < files.length; i++) {
Path p = files[i];
FileSystem fs = p.getFileSystem(job);
FileStatus stat = fs.getFileStatus(p);
if (stat.isDir()) {
results[i] = new LocatedFileStatus(stat, null);
} else {
results[i] = new LocatedFileStatus(stat,
fs.getFileBlockLocations(stat, 0, stat.getLen()));
}
}
return results;
}
public void control(JobConf fsConfig, String fileName)
throws IOException {
String name = fileName;
FileSystem fs = FileSystem.get(fsConfig);
SequenceFile.Writer write = null;
for (int i = 0; i < nmaps; i++) {
try {
Path controlFile = new Path(dfs_input, name + i);
write = SequenceFile.createWriter(fs, fsConfig, controlFile,
Text.class, Text.class, CompressionType.NONE);
write.append(new Text(name + i), new Text(workdir));
} finally {
if (write != null)
write.close();
write = null;
}
}
}
private Set<String> getMoveToLatestTopicsSet(JobConf conf) {
Set<String> topics = new HashSet<String>();
String[] arr = getMoveToLatestTopics(conf);
if (arr != null) {
for (String topic : arr) {
topics.add(topic);
}
}
return topics;
}
private static <InterfaceType>
Class<? extends InterfaceType> getClass(CommandLine cl, String key,
JobConf conf,
Class<InterfaceType> cls
) throws ClassNotFoundException {
return conf.getClassByName((String) cl.getOptionValue(key)).asSubclass(cls);
}
@SuppressWarnings("unchecked")
@Test
public void testProgressIsReportedIfInputASeriesOfEmptyFiles() throws IOException, InterruptedException {
JobConf conf = new JobConf();
Path[] paths = new Path[3];
File[] files = new File[3];
long[] fileLength = new long[3];
try {
for(int i=0;i<3;i++){
File dir = new File(outDir.toString());
dir.mkdir();
files[i] = new File(dir,"testfile"+i);
FileWriter fileWriter = new FileWriter(files[i]);
fileWriter.flush();
fileWriter.close();
fileLength[i] = i;
paths[i] = new Path(outDir+"/testfile"+i);
}
CombineFileSplit combineFileSplit = new CombineFileSplit(paths, fileLength);
TaskAttemptID taskAttemptID = Mockito.mock(TaskAttemptID.class);
TaskReporter reporter = Mockito.mock(TaskReporter.class);
TaskAttemptContextImpl taskAttemptContext =
new TaskAttemptContextImpl(conf, taskAttemptID,reporter);
CombineFileRecordReader cfrr = new CombineFileRecordReader(combineFileSplit,
taskAttemptContext, TextRecordReaderWrapper.class);
cfrr.initialize(combineFileSplit,taskAttemptContext);
verify(reporter).progress();
Assert.assertFalse(cfrr.nextKeyValue());
verify(reporter, times(3)).progress();
} finally {
FileUtil.fullyDelete(new File(outDir.toString()));
}
}
private void writePasswordToLocalFile(String localPasswordFile,
byte[] password, JobConf conf) throws IOException {
FileSystem localFs = FileSystem.getLocal(conf);
Path localPath = new Path(localPasswordFile);
FSDataOutputStream out = FileSystem.create(localFs, localPath,
new FsPermission("400"));
out.write(password);
out.close();
}
@Test
public void testSearchNonExistingIndex() throws Exception {
JobConf conf = createJobConf();
conf.setBoolean(ConfigurationOptions.ES_INDEX_READ_MISSING_AS_EMPTY, true);
conf.set(ConfigurationOptions.ES_RESOURCE, resource("foobar", "save", clusterInfo.getMajorVersion()));
JobClient.runJob(conf);
}
public void setConf(Configuration conf) {
if (conf instanceof JobConf) {
this.conf = (JobConf) conf;
} else {
this.conf = new JobConf(conf);
}
}
private static void writeEmptyFile(ConnectorSession session, Path target, JobConf conf, Properties properties, String serDe, String outputFormatName)
{
// Some serializers such as Avro set a property in the schema.
initializeSerializer(conf, properties, serDe);
// The code below is not a try with resources because RecordWriter is not Closeable.
FileSinkOperator.RecordWriter recordWriter = HiveWriteUtils.createRecordWriter(target, conf, properties, outputFormatName, session);
try {
recordWriter.close(false);
}
catch (IOException e) {
throw new PrestoException(HIVE_WRITER_CLOSE_ERROR, "Error write empty file to Hive", e);
}
}
public RecordReader<K,V> getRecordReader(
InputSplit ignored, JobConf conf, Reporter reporter) {
return new RecordReader<K,V>() {
public boolean next(K key, V value) throws IOException { return false; }
public K createKey() {
return ReflectionUtils.newInstance(keyclass, null);
}
public V createValue() {
return ReflectionUtils.newInstance(valclass, null);
}
public long getPos() throws IOException { return 0L; }
public void close() throws IOException { }
public float getProgress() throws IOException { return 0.0f; }
};
}
public void initCommitter(JobConf job, boolean useNewApi)
throws IOException, InterruptedException {
if (useNewApi) {
if (LOG.isDebugEnabled()) {
LOG.debug("using new api for output committer");
}
this.committer = newOutputFormat.getOutputCommitter(
newApiTaskAttemptContext);
} else {
this.committer = job.getOutputCommitter();
}
Path outputPath = FileOutputFormat.getOutputPath(job);
if (outputPath != null) {
if ((this.committer instanceof FileOutputCommitter)) {
FileOutputFormat.setWorkOutputPath(job,
((FileOutputCommitter) this.committer).getTaskAttemptPath(
oldApiTaskAttemptContext));
} else {
FileOutputFormat.setWorkOutputPath(job, outputPath);
}
}
if (useNewApi) {
this.committer.setupTask(newApiTaskAttemptContext);
} else {
this.committer.setupTask(oldApiTaskAttemptContext);
}
}
public void testOneTaskWithOneTaskTracker() throws Exception {
LOG.info("Starting testOneTaskWithOneTaskTracker");
corona = new MiniCoronaCluster.Builder().numTaskTrackers(1).build();
JobConf conf = corona.createJobConf();
long start = System.currentTimeMillis();
runSleepJob(conf, 1, 1);
long end = System.currentTimeMillis();
new ClusterManagerMetricsVerifier(corona.getClusterManager(),
1, 1, 1, 1, 1, 1, 0, 0).verifyAll();
LOG.info("Time spent for testOneTaskWithOneTaskTracker:" +
(end - start));
}
private static CompressionCodecName getCompression(JobConf configuration)
{
String compressionName = configuration.get(ParquetOutputFormat.COMPRESSION);
if (compressionName == null) {
return CompressionCodecName.GZIP;
}
return CompressionCodecName.valueOf(compressionName);
}
public ReadTask( InputSplit split, TextInputFormat informat, JobConf job, MatrixBlock dest, long rlen, long clen, boolean mm, FileFormatPropertiesMM mmProps ) {
_split = split;
_sparse = dest.isInSparseFormat();
_informat = informat;
_job = job;
_dest = dest;
_rlen = rlen;
_clen = clen;
_matrixMarket = mm;
_mmProps = mmProps;
}
public MapTaskImpl(JobId jobId, int partition, EventHandler eventHandler,
Path remoteJobConfFile, JobConf conf,
TaskSplitMetaInfo taskSplitMetaInfo,
TaskAttemptListener taskAttemptListener,
Token<JobTokenIdentifier> jobToken,
Credentials credentials, Clock clock,
int appAttemptId, MRAppMetrics metrics, AppContext appContext) {
super(jobId, TaskType.MAP, partition, eventHandler, remoteJobConfFile,
conf, taskAttemptListener, jobToken, credentials, clock,
appAttemptId, metrics, appContext);
this.taskSplitMetaInfo = taskSplitMetaInfo;
}
@Override
public final FrameBlock readFrameFromHDFS(String fname, ValueType[] schema, String[] names,
long rlen, long clen)
throws IOException, DMLRuntimeException
{
//prepare file access
JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
Path path = new Path( fname );
FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
FileInputFormat.addInputPath(job, path);
//check existence and non-empty file
checkValidInputFile(fs, path);
//compute size if necessary
if( rlen <= 0 || clen <= 0 ) {
Pair<Integer,Integer> size = computeCSVSize(path, job, fs);
rlen = size.getKey();
clen = size.getValue();
}
//allocate output frame block
ValueType[] lschema = createOutputSchema(schema, clen);
String[] lnames = createOutputNames(names, clen);
FrameBlock ret = createOutputFrameBlock(lschema, lnames, rlen);
//core read (sequential/parallel)
readCSVFrameFromHDFS(path, job, fs, ret, lschema, lnames, rlen, clen);
return ret;
}
public CombineFileSplit(JobConf job, Path[] files, long[] lengths) {
long[] startoffset = new long[files.length];
for (int i = 0; i < startoffset.length; i++) {
startoffset[i] = 0;
}
String[] locations = new String[files.length];
for (int i = 0; i < locations.length; i++) {
locations[i] = "";
}
initSplit(job, files, startoffset, lengths, locations);
}
public void setUp() throws Exception {
// Read the testConfig.xml file
readTestConfigFile();
// Start up the mini dfs cluster
boolean success = false;
conf = new Configuration();
conf.setClass(PolicyProvider.POLICY_PROVIDER_CONFIG,
HadoopPolicyProvider.class, PolicyProvider.class);
conf.setBoolean(ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG,
true);
dfsCluster = new MiniDFSCluster(conf, 1, true, null);
namenode = conf.get("fs.default.name", "file:///");
clitestDataDir = new File(TEST_CACHE_DATA_DIR).
toURI().toString().replace(' ', '+');
username = System.getProperty("user.name");
FileSystem fs = dfsCluster.getFileSystem();
assertTrue("Not a HDFS: "+fs.getUri(),
fs instanceof DistributedFileSystem);
dfs = (DistributedFileSystem) fs;
// Start up mini mr cluster
JobConf mrConf = new JobConf(conf);
mrCluster = new MiniMRCluster(1, dfsCluster.getFileSystem().getUri().toString(), 1,
null, null, mrConf);
jobtracker = mrCluster.createJobConf().get("mapred.job.tracker", "local");
success = true;
assertTrue("Error setting up Mini DFS & MR clusters", success);
}
public void setupJob(JobConf conf) throws IOException {
Path outputPath = FileOutputFormat.getOutputPath(conf);
if (outputPath != null) {
Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
FileSystem fileSys = tmpDir.getFileSystem(conf);
if (!fileSys.mkdirs(tmpDir)) {
LOG.error("Mkdirs failed to create " + tmpDir.toString());
}
}
}