下面列出了怎么用org.apache.hadoop.mapreduce.InputFormat的API类实例代码及写法,或者点击链接到github查看源代码。
public static double getTotalMapInputMB(Job job)
throws ClassNotFoundException, IOException, InterruptedException, JobException {
if (job == null) {
throw new JobException("Job is null");
}
long mapInputBytes = 0;
InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), job.getConfiguration());
for (InputSplit split : input.getSplits(job)) {
mapInputBytes += split.getLength();
}
// 0 input bytes is possible when the segment range hits no partition on a partitioned hive table (KYLIN-2470)
if (mapInputBytes == 0) {
logger.warn("Map input splits are 0 bytes, something is wrong?");
}
double totalMapInputMB = (double) mapInputBytes / 1024 / 1024;
return totalMapInputMB;
}
/**
* This test validates behavior of {@link
* HadoopInputFormatBoundedSource#computeSplitsIfNecessary() computeSplits()} when Hadoop
* InputFormat's {@link InputFormat#getSplits(JobContext)} getSplits(JobContext)} returns NULL
* value.
*/
@Test
public void testComputeSplitsIfGetSplitsReturnsNullValue() throws Exception {
InputFormat<Text, Employee> mockInputFormat = Mockito.mock(EmployeeInputFormat.class);
SerializableSplit mockInputSplit = Mockito.mock(SerializableSplit.class);
Mockito.when(mockInputFormat.getSplits(Mockito.any(JobContext.class))).thenReturn(null);
HadoopInputFormatBoundedSource<Text, Employee> hifSource =
new HadoopInputFormatBoundedSource<>(
serConf,
WritableCoder.of(Text.class),
AvroCoder.of(Employee.class),
null, // No key translation required.
null, // No value translation required.
mockInputSplit);
thrown.expect(IOException.class);
thrown.expectMessage("Error in computing splits, getSplits() returns null.");
hifSource.setInputFormatObj(mockInputFormat);
hifSource.computeSplitsIfNecessary();
}
void testInputFormat(Class<? extends InputFormat> clazz)
throws IOException, InterruptedException, ClassNotFoundException {
final Job job = MapreduceTestingShim.createJob(UTIL.getConfiguration());
job.setInputFormatClass(clazz);
job.setOutputFormatClass(NullOutputFormat.class);
job.setMapperClass(ExampleVerifier.class);
job.setNumReduceTasks(0);
LOG.debug("submitting job.");
assertTrue("job failed!", job.waitForCompletion(true));
assertEquals("Saw the wrong number of instances of the filtered-for row.", 2, job.getCounters()
.findCounter(TestTableInputFormat.class.getName() + ":row", "aaa").getValue());
assertEquals("Saw any instances of the filtered out row.", 0, job.getCounters()
.findCounter(TestTableInputFormat.class.getName() + ":row", "bbb").getValue());
assertEquals("Saw the wrong number of instances of columnA.", 1, job.getCounters()
.findCounter(TestTableInputFormat.class.getName() + ":family", "columnA").getValue());
assertEquals("Saw the wrong number of instances of columnB.", 1, job.getCounters()
.findCounter(TestTableInputFormat.class.getName() + ":family", "columnB").getValue());
assertEquals("Saw the wrong count of values for the filtered-for row.", 2, job.getCounters()
.findCounter(TestTableInputFormat.class.getName() + ":value", "value aaa").getValue());
assertEquals("Saw the wrong count of values for the filtered-out row.", 0, job.getCounters()
.findCounter(TestTableInputFormat.class.getName() + ":value", "value bbb").getValue());
}
public HadoopElementIterator(final HadoopGraph graph) {
try {
this.graph = graph;
final Configuration configuration = ConfUtil.makeHadoopConfiguration(this.graph.configuration());
final InputFormat<NullWritable, VertexWritable> inputFormat = ConfUtil.getReaderAsInputFormat(configuration);
if (inputFormat instanceof FileInputFormat) {
final Storage storage = FileSystemStorage.open(configuration);
if (!this.graph.configuration().containsKey(Constants.GREMLIN_HADOOP_INPUT_LOCATION))
return; // there is no input location and thus, no data (empty graph)
if (!Constants.getSearchGraphLocation(this.graph.configuration().getInputLocation(), storage).isPresent())
return; // there is no data at the input location (empty graph)
configuration.set(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, Constants.getSearchGraphLocation(this.graph.configuration().getInputLocation(), storage).get());
}
final List<InputSplit> splits = inputFormat.getSplits(new JobContextImpl(configuration, new JobID(UUID.randomUUID().toString(), 1)));
for (final InputSplit split : splits) {
this.readers.add(inputFormat.createRecordReader(split, new TaskAttemptContextImpl(configuration, new TaskAttemptID())));
}
} catch (final Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
/**
* This test validates functionality of {@link HadoopFormatIO.Read#validateTransform()
* Read.validateTransform()} function when myKeyTranslate's (simple function provided by user for
* key translation) input type is not same as Hadoop InputFormat's keyClass(Which is property set
* in configuration as "key.class").
*/
@Test
public void testReadValidationFailsWithWrongInputTypeKeyTranslationFunction() {
SimpleFunction<LongWritable, String> myKeyTranslateWithWrongInputType =
new SimpleFunction<LongWritable, String>() {
@Override
public String apply(LongWritable input) {
return input.toString();
}
};
HadoopFormatIO.Read<String, Employee> read =
HadoopFormatIO.<String, Employee>read()
.withConfiguration(serConf.get())
.withKeyTranslation(myKeyTranslateWithWrongInputType);
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(
String.format(
"Key translation's input type is not same as hadoop InputFormat : %s key "
+ "class : %s",
serConf.get().getClass("mapreduce.job.inputformat.class", InputFormat.class),
serConf.get().getClass("key.class", Object.class)));
read.validateTransform();
}
/**
* Returns Hadoop configuration for reading data from Cassandra. To read data from Cassandra using
* HadoopFormatIO, following properties must be set: InputFormat class, InputFormat key class,
* InputFormat value class, Thrift address, Thrift port, partitioner class, keyspace and
* columnfamily name.
*/
private static Configuration getConfiguration(HadoopFormatIOTestOptions options) {
Configuration conf = new Configuration();
conf.set(CASSANDRA_THRIFT_PORT_PROPERTY, options.getCassandraServerPort().toString());
conf.set(CASSANDRA_THRIFT_ADDRESS_PROPERTY, options.getCassandraServerIp());
conf.set(CASSANDRA_PARTITIONER_CLASS_PROPERTY, CASSANDRA_PARTITIONER_CLASS_VALUE);
conf.set(CASSANDRA_KEYSPACE_PROPERTY, CASSANDRA_KEYSPACE);
conf.set(CASSANDRA_COLUMNFAMILY_PROPERTY, CASSANDRA_TABLE);
// Set user name and password if Cassandra instance has security configured.
conf.set(USERNAME, options.getCassandraUserName());
conf.set(PASSWORD, options.getCassandraPassword());
conf.set(INPUT_KEYSPACE_USERNAME_CONFIG, options.getCassandraUserName());
conf.set(INPUT_KEYSPACE_PASSWD_CONFIG, options.getCassandraPassword());
conf.setClass(
"mapreduce.job.inputformat.class",
org.apache.cassandra.hadoop.cql3.CqlInputFormat.class,
InputFormat.class);
conf.setClass("key.class", Long.class, Object.class);
conf.setClass("value.class", Row.class, Object.class);
return conf;
}
public static ScanMetrics runScanJob(ScanJob scanJob, Configuration conf, String confRootField,
org.apache.hadoop.conf.Configuration hadoopConf,
Class<? extends InputFormat> inputFormat)
throws IOException, InterruptedException, ClassNotFoundException {
ModifiableHadoopConfiguration scanConf =
ModifiableHadoopConfiguration.of(TitanHadoopConfiguration.MAPRED_NS, hadoopConf);
tryToLoadClassByName(scanJob);
// Set the ScanJob class
scanConf.set(TitanHadoopConfiguration.SCAN_JOB_CLASS, scanJob.getClass().getName());
String jobName = HadoopScanMapper.class.getSimpleName() + "[" + scanJob + "]";
return runJob(conf, confRootField, hadoopConf, inputFormat, jobName, HadoopScanMapper.class);
}
@SuppressWarnings("deprecation")
public static boolean runJob(Configuration conf,
Class<? extends InputFormat<?,?>> inputFormatClass,
Class<? extends Mapper<?,?,?,?>> mapperClass,
Class<? extends OutputFormat<?,?>> outputFormatClass) throws IOException,
InterruptedException, ClassNotFoundException {
Job job = new Job(conf);
job.setInputFormatClass(inputFormatClass);
job.setMapperClass(mapperClass);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputFormatClass(outputFormatClass);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
boolean ret = job.waitForCompletion(true);
// Hadoop 1.0 (and 0.20) have nasty bug when job committer is not called in
// LocalJobRuner
if (isHadoop1()) {
callOutputCommitter(job, outputFormatClass);
}
return ret;
}
private HadoopInputFormat<String, Long> setupHadoopInputFormat(InputFormat<String, Long> inputFormat, Job job,
RecordReader<String, Long> recordReader) {
HadoopInputFormat<String, Long> hadoopInputFormat = new HadoopInputFormat<>(inputFormat,
String.class, Long.class, job);
hadoopInputFormat.recordReader = recordReader;
return hadoopInputFormat;
}
private HadoopInputFormat<String, Long> setupHadoopInputFormat(InputFormat<String, Long> inputFormat, Job job,
RecordReader<String, Long> recordReader) {
HadoopInputFormat<String, Long> hadoopInputFormat = new HadoopInputFormat<>(inputFormat,
String.class, Long.class, job);
hadoopInputFormat.recordReader = recordReader;
return hadoopInputFormat;
}
protected int getMapInputSplitCount()
throws ClassNotFoundException, JobException, IOException, InterruptedException {
if (job == null) {
throw new JobException("Job is null");
}
InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), job.getConfiguration());
return input.getSplits(job).size();
}
/**
* IndexableLoadFunc interface implementation
*/
@Override
public void initialize(Configuration conf) throws IOException {
try {
InputFormat inputFormat = this.getInputFormat();
TaskAttemptID id = HadoopShims.getNewTaskAttemptID();
if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
conf.set(MRConfiguration.JOB_CREDENTIALS_BINARY, System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
}
List<FileSplit> fileSplits = inputFormat.getSplits(HadoopShims.createJobContext(conf, null));
this.readers = new IndexedStorageRecordReader[fileSplits.size()];
int idx = 0;
Iterator<FileSplit> it = fileSplits.iterator();
while (it.hasNext()) {
FileSplit fileSplit = it.next();
TaskAttemptContext context = HadoopShims.createTaskAttemptContext(conf, id);
IndexedStorageRecordReader r = (IndexedStorageRecordReader) inputFormat.createRecordReader(fileSplit, context);
r.initialize(fileSplit, context);
this.readers[idx] = r;
idx++;
}
Arrays.sort(this.readers, this.readerComparator);
} catch (InterruptedException e) {
throw new IOException(e);
}
}
@Override
public InputFormat getInputFormat() {
LOG.info("[{}]: getInputFormat()", signature);
String location = locations.get(signature);
return new IcebergPigInputFormat(tables.get(location), signature);
}
@SuppressWarnings("rawtypes")
@Override
public InputFormat getInputFormat() throws IOException {
if(loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz")) {
return new Bzip2TextInputFormat();
} else {
return new PigTextInputFormat();
}
}
public <K, V, F extends InputFormat<K, V>, I> SourceTSet<I> createHadoopSource(
Configuration configuration, Class<F> inputFormat, int parallel,
MapFunc<I, Tuple<K, V>> mapFunc) {
SourceTSet<I> sourceT = new SourceTSet<>(this,
new HadoopSourceWithMap<>(configuration, inputFormat, mapFunc), parallel);
getGraph().addSourceTSet(sourceT);
return sourceT;
}
public <K, V,
F extends InputFormat<K, V>, K2, V2> KeyedSourceTSet<K2, V2> createKeyedHadoopSource(
Configuration configuration, Class<F> inputFormat, int parallel,
MapFunc<Tuple<K2, V2>, Tuple<K, V>> mapFunc) {
KeyedSourceTSet<K2, V2> sourceT = new KeyedSourceTSet<>(this,
new HadoopSourceWithMap<>(configuration, inputFormat, mapFunc), parallel);
getGraph().addSourceTSet(sourceT);
return sourceT;
}
@Override
protected Class<? extends InputFormat> getInputFormatClass()
throws ClassNotFoundException {
if (isHCatJob) {
return SqoopHCatUtilities.getInputFormatClass();
}
return super.getInputFormatClass();
}
@Override
public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) {
final org.apache.hadoop.conf.Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(configuration);
return sparkContext.newAPIHadoopRDD(hadoopConfiguration,
(Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, InputFormat.class),
NullWritable.class,
VertexWritable.class)
.mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new VertexWritable(tuple._2().get())));
}
public JobBase(final SqoopOptions opts,
final Class<? extends Mapper> mapperClass,
final Class<? extends InputFormat> inputFormatClass,
final Class<? extends OutputFormat> outputFormatClass) {
this.options = opts;
this.mapperClass = mapperClass;
this.inputFormatClass = inputFormatClass;
this.outputFormatClass = outputFormatClass;
isHCatJob = options.getHCatTableName() != null;
}
@SuppressWarnings("unchecked")
private HadoopInputFormatReader(
HadoopInputFormatBoundedSource<K, V> source,
@Nullable SimpleFunction keyTranslationFunction,
@Nullable SimpleFunction valueTranslationFunction,
SerializableSplit split,
InputFormat inputFormatObj,
TaskAttemptContext taskAttemptContext) {
this.source = source;
this.keyTranslationFunction = keyTranslationFunction;
this.valueTranslationFunction = valueTranslationFunction;
this.split = split;
this.inputFormatObj = inputFormatObj;
this.taskAttemptContext = taskAttemptContext;
}
/**
* Configure the inputformat to use for the job.
*/
protected void configureInputFormat(Job job, String tableName,
String tableClassName, String splitByCol)
throws ClassNotFoundException, IOException {
//TODO: 'splitByCol' is import-job specific; lift it out of this API.
Class<? extends InputFormat> ifClass = getInputFormatClass();
LOG.debug("Using InputFormat: " + ifClass);
job.setInputFormatClass(ifClass);
}
@Override
public InputFormat getInputFormat() {
TableInputFormat inputFormat = new HBaseTableIFBuilder()
.withLimit(limit_)
.withGt(gt_)
.withGte(gte_)
.withLt(lt_)
.withLte(lte_)
.withConf(m_conf)
.build();
inputFormat.setScan(scan);
return inputFormat;
}
/**
* Creates instance of InputFormat class. The InputFormat class name is specified in the Hadoop
* configuration.
*/
@SuppressWarnings("WeakerAccess")
protected void createInputFormatInstance() throws IOException {
if (inputFormatObj == null) {
try {
taskAttemptContext = new TaskAttemptContextImpl(conf.get(), new TaskAttemptID());
inputFormatObj =
(InputFormat<?, ?>)
conf.get()
.getClassByName(conf.get().get("mapreduce.job.inputformat.class"))
.getConstructor()
.newInstance();
/*
* If InputFormat explicitly implements interface {@link Configurable}, then setConf()
* method of {@link Configurable} needs to be explicitly called to set all the
* configuration parameters. For example: InputFormat classes which implement Configurable
* are {@link org.apache.hadoop.mapreduce.lib.db.DBInputFormat DBInputFormat}, {@link
* org.apache.hadoop.hbase.mapreduce.TableInputFormat TableInputFormat}, etc.
*/
if (Configurable.class.isAssignableFrom(inputFormatObj.getClass())) {
((Configurable) inputFormatObj).setConf(conf.get());
}
} catch (InstantiationException
| IllegalAccessException
| ClassNotFoundException
| NoSuchMethodException
| InvocationTargetException e) {
throw new IOException("Unable to create InputFormat object: ", e);
}
}
}
@Override
protected Class<? extends InputFormat> getInputFormatClass()
throws ClassNotFoundException {
Class<? extends InputFormat> configuredIF = super.getInputFormatClass();
if (null == configuredIF) {
return ExportInputFormat.class;
} else {
return configuredIF;
}
}
@Test(enabled = true, dependsOnMethods = { "testWriteLongData" })
public void testReadLongData() throws Exception {
long sum = 0L;
long reccnt = 0L;
File folder = new File(m_workdir.toString());
File[] listfiles = folder.listFiles();
for (int idx = 0; idx < listfiles.length; ++idx) {
if (listfiles[idx].isFile()
&& listfiles[idx].getName().startsWith(MneConfigHelper.getBaseOutputName(m_conf, null))
&& listfiles[idx].getName().endsWith(MneConfigHelper.DEFAULT_FILE_EXTENSION)) {
System.out.println(String.format("Verifying : %s", listfiles[idx].getName()));
FileSplit split = new FileSplit(
new Path(m_workdir, listfiles[idx].getName()), 0, 0L, new String[0]);
InputFormat<NullWritable, MneDurableInputValue<Long>> inputFormat =
new MneInputFormat<MneDurableInputValue<Long>, Long>();
RecordReader<NullWritable, MneDurableInputValue<Long>> reader =
inputFormat.createRecordReader(split, m_tacontext);
MneDurableInputValue<Long> mdval = null;
while (reader.nextKeyValue()) {
mdval = reader.getCurrentValue();
sum += mdval.getValue();
++reccnt;
}
reader.close();
}
}
AssertJUnit.assertEquals(m_sum, sum);
AssertJUnit.assertEquals(m_reccnt, reccnt);
System.out.println(String.format("The checksum of long data is %d", sum));
}
public ImportJobBase(final SqoopOptions opts,
final Class<? extends Mapper> mapperClass,
final Class<? extends InputFormat> inputFormatClass,
final Class<? extends OutputFormat> outputFormatClass,
final ImportJobContext context) {
super(opts, mapperClass, inputFormatClass, outputFormatClass, context);
}
/**
* Constructs the DelegatingRecordReader.
*
* @param split TaggegInputSplit object
* @param context TaskAttemptContext object
*
* @throws IOException
* @throws InterruptedException
*/
@SuppressWarnings("unchecked")
public DelegatingRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
// Find the InputFormat and then the RecordReader from the
// TaggedInputSplit.
TaggedInputSplit taggedInputSplit = (TaggedInputSplit) split;
InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils
.newInstance(taggedInputSplit.getInputFormatClass(), context
.getConfiguration());
originalRR = inputFormat.createRecordReader(taggedInputSplit
.getInputSplit(), context);
}
/**
* Allow the user to inject custom mapper, input, and output formats
* into the importTable() process.
*/
@Override
@SuppressWarnings("unchecked")
public void importTable(ImportJobContext context)
throws IOException, ImportException {
SqoopOptions options = context.getOptions();
Configuration conf = options.getConf();
Class<? extends Mapper> mapperClass = (Class<? extends Mapper>)
conf.getClass(MAPPER_KEY, Mapper.class);
Class<? extends InputFormat> ifClass = (Class<? extends InputFormat>)
conf.getClass(INPUT_FORMAT_KEY, TextInputFormat.class);
Class<? extends OutputFormat> ofClass = (Class<? extends OutputFormat>)
conf.getClass(OUTPUT_FORMAT_KEY, TextOutputFormat.class);
Class<? extends ImportJobBase> jobClass = (Class<? extends ImportJobBase>)
conf.getClass(IMPORT_JOB_KEY, ImportJobBase.class);
String tableName = context.getTableName();
// Instantiate the user's chosen ImportJobBase instance.
ImportJobBase importJob = ReflectionUtils.newInstance(jobClass, conf);
// And configure the dependencies to inject
importJob.setOptions(options);
importJob.setMapperClass(mapperClass);
importJob.setInputFormatClass(ifClass);
importJob.setOutputFormatClass(ofClass);
importJob.runImport(tableName, context.getJarFile(),
getSplitColumn(options, tableName), conf);
}
@Test
public void testRecordReaderInit() throws InterruptedException, IOException {
// Test that we properly initialize the child recordreader when
// CombineFileInputFormat and CombineFileRecordReader are used.
TaskAttemptID taskId = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0);
Configuration conf1 = new Configuration();
conf1.set(DUMMY_KEY, "STATE1");
TaskAttemptContext context1 = new TaskAttemptContextImpl(conf1, taskId);
// This will create a CombineFileRecordReader that itself contains a
// DummyRecordReader.
InputFormat inputFormat = new ChildRRInputFormat();
Path [] files = { new Path("file1") };
long [] lengths = { 1 };
CombineFileSplit split = new CombineFileSplit(files, lengths);
RecordReader rr = inputFormat.createRecordReader(split, context1);
assertTrue("Unexpected RR type!", rr instanceof CombineFileRecordReader);
// Verify that the initial configuration is the one being used.
// Right after construction the dummy key should have value "STATE1"
assertEquals("Invalid initial dummy key value", "STATE1",
rr.getCurrentKey().toString());
// Switch the active context for the RecordReader...
Configuration conf2 = new Configuration();
conf2.set(DUMMY_KEY, "STATE2");
TaskAttemptContext context2 = new TaskAttemptContextImpl(conf2, taskId);
rr.initialize(split, context2);
// And verify that the new context is updated into the child record reader.
assertEquals("Invalid secondary dummy key value", "STATE2",
rr.getCurrentKey().toString());
}
@Test
public void testReinit() throws Exception {
// Test that a split containing multiple files works correctly,
// with the child RecordReader getting its initialize() method
// called a second time.
TaskAttemptID taskId = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0);
Configuration conf = new Configuration();
TaskAttemptContext context = new TaskAttemptContextImpl(conf, taskId);
// This will create a CombineFileRecordReader that itself contains a
// DummyRecordReader.
InputFormat inputFormat = new ChildRRInputFormat();
Path [] files = { new Path("file1"), new Path("file2") };
long [] lengths = { 1, 1 };
CombineFileSplit split = new CombineFileSplit(files, lengths);
RecordReader rr = inputFormat.createRecordReader(split, context);
assertTrue("Unexpected RR type!", rr instanceof CombineFileRecordReader);
// first initialize() call comes from MapTask. We'll do it here.
rr.initialize(split, context);
// First value is first filename.
assertTrue(rr.nextKeyValue());
assertEquals("file1", rr.getCurrentValue().toString());
// The inner RR will return false, because it only emits one (k, v) pair.
// But there's another sub-split to process. This returns true to us.
assertTrue(rr.nextKeyValue());
// And the 2nd rr will have its initialize method called correctly.
assertEquals("file2", rr.getCurrentValue().toString());
// But after both child RR's have returned their singleton (k, v), this
// should also return false.
assertFalse(rr.nextKeyValue());
}