下面列出了怎么用org.apache.hadoop.mapred.InputFormat的API类实例代码及写法,或者点击链接到github查看源代码。
private org.apache.hadoop.mapred.RecordReader<BytesWritable, BytesWritable> getRecordReader(
final InputFormat<BytesWritable, BytesWritable> inputFormat,
final JobConf jobConf) throws ExecutionSetupException {
try {
final UserGroupInformation ugi = ImpersonationUtil.createProxyUgi(this.opUserName, this.queryUserName);
return ugi.doAs(new PrivilegedExceptionAction<org.apache.hadoop.mapred.RecordReader<BytesWritable, BytesWritable>>() {
@Override
public org.apache.hadoop.mapred.RecordReader<BytesWritable, BytesWritable> run() throws Exception {
return inputFormat.getRecordReader(split, jobConf, Reporter.NULL);
}
});
} catch (IOException | InterruptedException e) {
throw new ExecutionSetupException(
String.format("Error in creating sequencefile reader for file: %s, start: %d, length: %d",
split.getPath(), split.getStart(), split.getLength()), e);
}
}
public InternalHiveSplitFactory(
FileSystem fileSystem,
String partitionName,
InputFormat<?, ?> inputFormat,
Properties schema,
List<HivePartitionKey> partitionKeys,
TupleDomain<HiveColumnHandle> effectivePredicate,
BooleanSupplier partitionMatchSupplier,
TableToPartitionMapping tableToPartitionMapping,
Optional<BucketConversion> bucketConversion,
boolean forceLocalScheduling,
boolean s3SelectPushdownEnabled)
{
this.fileSystem = requireNonNull(fileSystem, "fileSystem is null");
this.partitionName = requireNonNull(partitionName, "partitionName is null");
this.inputFormat = requireNonNull(inputFormat, "inputFormat is null");
this.schema = requireNonNull(schema, "schema is null");
this.partitionKeys = requireNonNull(partitionKeys, "partitionKeys is null");
pathDomain = getPathDomain(requireNonNull(effectivePredicate, "effectivePredicate is null"));
this.partitionMatchSupplier = requireNonNull(partitionMatchSupplier, "partitionMatchSupplier is null");
this.tableToPartitionMapping = requireNonNull(tableToPartitionMapping, "tableToPartitionMapping is null");
this.bucketConversion = requireNonNull(bucketConversion, "bucketConversion is null");
this.forceLocalScheduling = forceLocalScheduling;
this.s3SelectPushdownEnabled = s3SelectPushdownEnabled;
}
/**
* From each split sampled, take the first numSamples / numSplits records.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
ArrayList<K> samples = new ArrayList<K>(numSamples);
int splitsToSample = Math.min(maxSplitsSampled, splits.length);
int splitStep = splits.length / splitsToSample;
int samplesPerSplit = numSamples / splitsToSample;
long records = 0;
for (int i = 0; i < splitsToSample; ++i) {
RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
job, Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
samples.add(key);
key = reader.createKey();
++records;
if ((i+1) * samplesPerSplit <= records) {
break;
}
}
reader.close();
}
return (K[])samples.toArray();
}
/**
* For each split sampled, emit when the ratio of the number of records
* retained to the total record count is less than the specified
* frequency.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
ArrayList<K> samples = new ArrayList<K>();
int splitsToSample = Math.min(maxSplitsSampled, splits.length);
int splitStep = splits.length / splitsToSample;
long records = 0;
long kept = 0;
for (int i = 0; i < splitsToSample; ++i) {
RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
job, Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
++records;
if ((double) kept / records < freq) {
++kept;
samples.add(key);
key = reader.createKey();
}
}
reader.close();
}
return (K[])samples.toArray();
}
/**
* Get {@link InputFormat} class name for given table and partition definitions. We try to get the InputFormat class name
* from inputFormat if explicitly specified in inputFormat, else we get the InputFormat class name from storageHandlerName.
* @param jobConf
* @param inputFormat
* @param storageHandlerName
* @return InputFormat
* @throws Exception
*/
public static final Class<? extends InputFormat<?, ?>> getInputFormatClass(final JobConf jobConf, Optional<String> inputFormat,
Optional<String> storageHandlerName) throws Exception {
if (inputFormat.isPresent()) {
return (Class<? extends InputFormat<?, ?>>) Class.forName(inputFormat.get());
}
if (storageHandlerName.isPresent()) {
try (final ContextClassLoaderSwapper swapper = ContextClassLoaderSwapper.newInstance()) {
// HiveUtils.getStorageHandler() depends on the current context classloader if you query and HBase table,
// and don't have an HBase session open.
final HiveStorageHandler storageHandler = HiveUtils.getStorageHandler(jobConf, storageHandlerName.get());
return (Class<? extends InputFormat<?, ?>>) storageHandler.getInputFormatClass();
}
}
throw new ExecutionSetupException("Unable to get Hive table InputFormat class. There is neither " +
"InputFormat class explicitly specified nor a StorageHandler class provided.");
}
/**
* Get paths from a Hive location using the provided input format.
*/
public static Set<Path> getPaths(InputFormat<?, ?> inputFormat, Path location) throws IOException {
JobConf jobConf = new JobConf(getHadoopConfiguration());
Set<Path> paths = Sets.newHashSet();
FileInputFormat.addInputPaths(jobConf, location.toString());
InputSplit[] splits = inputFormat.getSplits(jobConf, 1000);
for (InputSplit split : splits) {
if (!(split instanceof FileSplit)) {
throw new IOException("Not a file split. Found " + split.getClass().getName());
}
FileSplit fileSplit = (FileSplit) split;
paths.add(fileSplit.getPath());
}
return paths;
}
/**
* When impersonation is not possible and when last modified times are not available,
* {@link HiveReaderProto.FileSystemPartitionUpdateKey} should not be generated.
*
* @param hiveStorageCapabilities The capabilities of the storage mechanism.
* @param format The file input format.
* @return true if FSUpdateKeys should be generated. False if not.
*/
public static boolean shouldGenerateFileSystemUpdateKeys(final HiveStorageCapabilities hiveStorageCapabilities,
final InputFormat<?, ?> format) {
if (!hiveStorageCapabilities.supportsImpersonation() && !hiveStorageCapabilities.supportsLastModifiedTime()) {
return false;
}
// Files in a filesystem have last modified times and filesystem permissions. Generate
// FileSystemPartitionUpdateKeys for formats representing files. Subclasses of FilInputFormat
// as well as OrcInputFormat represent files.
if ((format instanceof FileInputFormat) || (format instanceof OrcInputFormat)) {
return true;
}
return false;
}
/**
* When impersonation is not possible and when last modified times are not available,
* {@link HiveReaderProto.FileSystemPartitionUpdateKey} should not be generated.
*
* @param hiveStorageCapabilities The capabilities of the storage mechanism.
* @param format The file input format.
* @return true if FSUpdateKeys should be generated. False if not.
*/
public static boolean shouldGenerateFileSystemUpdateKeys(final HiveStorageCapabilities hiveStorageCapabilities,
final InputFormat<?, ?> format) {
if (!hiveStorageCapabilities.supportsImpersonation() && !hiveStorageCapabilities.supportsLastModifiedTime()) {
return false;
}
// Files in a filesystem have last modified times and filesystem permissions. Generate
// FileSystemPartitionUpdateKeys for formats representing files. Subclasses of FilInputFormat
// as well as OrcInputFormat represent files.
if ((format instanceof FileInputFormat) || (format instanceof OrcInputFormat)) {
return true;
}
return false;
}
private static List<InputSplit> getInputSplits(final InputFormat<?, ?> format, final JobConf job) {
InputSplit[] inputSplits;
try {
// Parquet logic in hive-3.1.1 does not check recursively by default.
job.set(FileInputFormat.INPUT_DIR_RECURSIVE, "true");
inputSplits = format.getSplits(job, 1);
} catch (IOException e) {
throw new RuntimeException(e);
}
if (null == inputSplits) {
return Collections.emptyList();
} else {
return Arrays.asList(inputSplits);
}
}
private void runImportRCFile(ExaIterator ctx, List<HCatTableColumn> columns, List<HCatTableColumn> partitionColumns, List<OutputColumnSpec> outputColumns, String file) throws Exception {
List<HCatSerDeParameter> serDeParameters = new ArrayList<>();
serDeParameters.add(new HCatSerDeParameter("serialization.format", "1"));
String inputFormatClassName = "org.apache.hadoop.hive.ql.io.RCFileInputFormat";
String serDeClassName = "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe";
String hdfsUser = "hdfs";
boolean useKerberos = false;
List<String> hdfsServers = new ArrayList<>();
hdfsServers.add("file:///");
final Configuration conf = new Configuration();
FileSystem fs = HdfsService.getFileSystem(hdfsServers,conf);
InputFormat<?, ?> inputFormat = (InputFormat<?, ?>) UdfUtils.getInstanceByName(inputFormatClassName);
AbstractSerDe serDe = (AbstractSerDe) UdfUtils.getInstanceByName(serDeClassName);
HdfsSerDeImportService.importFile(fs, file, partitionColumns, inputFormat, serDe, serDeParameters, hdfsServers, hdfsUser, columns, outputColumns, useKerberos, false, ctx);
}
public void testAddInputPathWithMapper() {
final JobConf conf = new JobConf();
MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class,
MapClass.class);
MultipleInputs.addInputPath(conf, new Path("/bar"),
KeyValueTextInputFormat.class, MapClass2.class);
final Map<Path, InputFormat> inputs = MultipleInputs
.getInputFormatMap(conf);
final Map<Path, Class<? extends Mapper>> maps = MultipleInputs
.getMapperTypeMap(conf);
assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass());
assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar"))
.getClass());
assertEquals(MapClass.class, maps.get(new Path("/foo")));
assertEquals(MapClass2.class, maps.get(new Path("/bar")));
}
/**
* From each split sampled, take the first numSamples / numSplits records.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
ArrayList<K> samples = new ArrayList<K>(numSamples);
int splitsToSample = Math.min(maxSplitsSampled, splits.length);
int splitStep = splits.length / splitsToSample;
int samplesPerSplit = numSamples / splitsToSample;
long records = 0;
for (int i = 0; i < splitsToSample; ++i) {
RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
job, Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
samples.add(key);
key = reader.createKey();
++records;
if ((i+1) * samplesPerSplit <= records) {
break;
}
}
reader.close();
}
return (K[])samples.toArray();
}
/**
* For each split sampled, emit when the ratio of the number of records
* retained to the total record count is less than the specified
* frequency.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
ArrayList<K> samples = new ArrayList<K>();
int splitsToSample = Math.min(maxSplitsSampled, splits.length);
int splitStep = splits.length / splitsToSample;
long records = 0;
long kept = 0;
for (int i = 0; i < splitsToSample; ++i) {
RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
job, Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
++records;
if ((double) kept / records < freq) {
++kept;
samples.add(key);
key = reader.createKey();
}
}
reader.close();
}
return (K[])samples.toArray();
}
public static boolean isCompressionCodecSupported(InputFormat<?, ?> inputFormat, Path path)
{
if (inputFormat instanceof TextInputFormat) {
return getCompressionCodec((TextInputFormat) inputFormat, path)
.map(codec -> (codec instanceof GzipCodec) || (codec instanceof BZip2Codec))
.orElse(false); // TODO (https://github.com/prestosql/presto/issues/2475) fix S3 Select when file not compressed
}
return false;
}
private static boolean shouldUseFileSplitsFromInputFormat(InputFormat<?, ?> inputFormat)
{
return Arrays.stream(inputFormat.getClass().getAnnotations())
.map(Annotation::annotationType)
.map(Class::getSimpleName)
.anyMatch(name -> name.equals("UseFileSplitsFromInputFormat"));
}
/**
* Use this before submitting a TableMap job. It will
* appropriately set up the JobConf.
*
* @param table The table name to read from.
* @param columns The columns to scan.
* @param mapper The mapper class to use.
* @param outputKeyClass The class of the output key.
* @param outputValueClass The class of the output value.
* @param job The current job configuration to adjust.
* @param addDependencyJars upload HBase jars and jars for any of the configured
* job classes via the distributed cache (tmpjars).
*/
public static void initTableMapJob(String table, String columns,
Class<? extends TableMap> mapper,
Class<?> outputKeyClass,
Class<?> outputValueClass, JobConf job, boolean addDependencyJars,
Class<? extends InputFormat> inputFormat) {
job.setInputFormat(inputFormat);
job.setMapOutputValueClass(outputValueClass);
job.setMapOutputKeyClass(outputKeyClass);
job.setMapperClass(mapper);
job.setStrings("io.serializations", job.get("io.serializations"),
MutationSerialization.class.getName(), ResultSerialization.class.getName());
FileInputFormat.addInputPaths(job, table);
job.set(TableInputFormat.COLUMN_LIST, columns);
if (addDependencyJars) {
try {
addDependencyJars(job);
} catch (IOException e) {
LOG.error("IOException encountered while adding dependency jars", e);
}
}
try {
initCredentials(job);
} catch (IOException ioe) {
// just spit out the stack trace? really?
LOG.error("IOException encountered while initializing credentials", ioe);
}
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test(timeout=10000)
public void testGroupedSplitSize() throws IOException {
JobConf job = new JobConf(defaultConf);
InputFormat mockWrappedFormat = mock(InputFormat.class);
TezGroupedSplitsInputFormat<LongWritable , Text> format =
new TezGroupedSplitsInputFormat<LongWritable, Text>();
format.setConf(job);
format.setInputFormat(mockWrappedFormat);
job.setLong(TezConfiguration.TEZ_AM_GROUPING_SPLIT_MAX_SIZE, 500*1000*1000l);
job.setLong(TezConfiguration.TEZ_AM_GROUPING_SPLIT_MIN_SIZE, 50*1000*1000l);
InputSplit mockSplit1 = mock(InputSplit.class);
when(mockSplit1.getLength()).thenReturn(10*1000*1000l);
when(mockSplit1.getLocations()).thenReturn(null);
int numSplits = 100;
InputSplit[] mockSplits = new InputSplit[numSplits];
for (int i=0; i<numSplits; i++) {
mockSplits[i] = mockSplit1;
}
when(mockWrappedFormat.getSplits((JobConf)anyObject(), anyInt())).thenReturn(mockSplits);
// desired splits not set. We end up choosing min/max split size based on
// total data and num original splits. In this case, min size will be hit
InputSplit[] splits = format.getSplits(job, 0);
Assert.assertEquals(25, splits.length);
// split too big. override with max
format.setDesiredNumberOfSplits(1);
splits = format.getSplits(job, 0);
Assert.assertEquals(4, splits.length);
// splits too small. override with min
format.setDesiredNumberOfSplits(1000);
splits = format.getSplits(job, 0);
Assert.assertEquals(25, splits.length);
}
@SuppressWarnings({"unchecked", "RedundantCast"})
private static Class<? extends InputFormat<?, ?>> getInputFormatClass(JobConf conf, String inputFormatName)
throws ClassNotFoundException
{
// CDH uses different names for Parquet
if ("parquet.hive.DeprecatedParquetInputFormat".equals(inputFormatName) ||
"parquet.hive.MapredParquetInputFormat".equals(inputFormatName)) {
return MapredParquetInputFormat.class;
}
Class<?> clazz = conf.getClassByName(inputFormatName);
return (Class<? extends InputFormat<?, ?>>) clazz.asSubclass(InputFormat.class);
}
public static boolean isSplittable(InputFormat<?, ?> inputFormat, FileSystem fileSystem, Path path)
{
// ORC uses a custom InputFormat but is always splittable
if (inputFormat.getClass().getSimpleName().equals("OrcInputFormat")) {
return true;
}
// use reflection to get isSplittable method on FileInputFormat
Method method = null;
for (Class<?> clazz = inputFormat.getClass(); clazz != null; clazz = clazz.getSuperclass()) {
try {
method = clazz.getDeclaredMethod("isSplitable", FileSystem.class, Path.class);
break;
}
catch (NoSuchMethodException ignored) {
}
}
if (method == null) {
return false;
}
try {
method.setAccessible(true);
return (boolean) method.invoke(inputFormat, fileSystem, path);
}
catch (InvocationTargetException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}
/**
* Creates the partition InputFormat.
*
* @param inputFormatName input format class name
* @param jobConf configuration data for the Hadoop framework
* @return a {@link org.apache.hadoop.mapred.InputFormat} derived object
* @throws Exception if failed to create input format
*/
public static InputFormat<?, ?> makeInputFormat(String inputFormatName,
JobConf jobConf)
throws Exception {
Class<?> c = Class.forName(inputFormatName, true,
JavaUtils.getClassLoader());
InputFormat<?, ?> inputFormat = (InputFormat<?, ?>) c.getDeclaredConstructor().newInstance();
if ("org.apache.hadoop.mapred.TextInputFormat".equals(inputFormatName)) {
// TextInputFormat needs a special configuration
((TextInputFormat) inputFormat).configure(jobConf);
}
return inputFormat;
}
@Test(enabled = true, dependsOnMethods = {"testWriteLongData"})
public void testReadLongData() throws Exception {
long sum = 0L;
long reccnt = 0L;
File folder = new File(m_workdir.toString());
File[] listfiles = folder.listFiles();
for (int idx = 0; idx < listfiles.length; ++idx) {
if (listfiles[idx].isFile()
&& listfiles[idx].getName().startsWith(MneConfigHelper.getBaseOutputName(m_conf, null))
&& listfiles[idx].getName().endsWith(MneConfigHelper.DEFAULT_FILE_EXTENSION)) {
System.out.println(String.format("Verifying : %s", listfiles[idx].getName()));
FileSplit split = new FileSplit(
new Path(m_workdir, listfiles[idx].getName()), 0, 0L, new String[0]);
InputFormat<NullWritable, MneDurableInputValue<Long>> inputFormat =
new MneInputFormat<MneDurableInputValue<Long>, Long>();
RecordReader<NullWritable, MneDurableInputValue<Long>> reader =
inputFormat.getRecordReader(split, m_conf, null);
MneDurableInputValue<Long> mdval = null;
NullWritable mdkey = reader.createKey();
while (true) {
mdval = reader.createValue();
if (reader.next(mdkey, mdval)) {
sum += mdval.getValue();
++reccnt;
} else {
break;
}
}
reader.close();
}
}
AssertJUnit.assertEquals(m_sum, sum);
AssertJUnit.assertEquals(m_reccnt, reccnt);
System.out.println(String.format("The checksum of long data is %d", sum));
}
@SuppressWarnings({ "unchecked", "rawtypes" })
void initInputFormatFromSplit(TezGroupedSplit split) throws TezException {
if (wrappedInputFormat == null) {
Class<? extends InputFormat> clazz = (Class<? extends InputFormat>)
getClassFromName(split.wrappedInputFormatName);
try {
wrappedInputFormat = org.apache.hadoop.util.ReflectionUtils.newInstance(clazz, conf);
} catch (Exception e) {
throw new TezException(e);
}
}
}
/**
* Convenience method for constructing composite formats.
* Given operation (op), Object class (inf), set of paths (p) return:
* {@code <op>(tbl(<inf>,<p1>),tbl(<inf>,<p2>),...,tbl(<inf>,<pn>)) }
*/
public static String compose(String op, Class<? extends InputFormat> inf,
String... path) {
final String infname = inf.getName();
StringBuffer ret = new StringBuffer(op + '(');
for (String p : path) {
compose(infname, p, ret);
ret.append(',');
}
ret.setCharAt(ret.length() - 1, ')');
return ret.toString();
}
@SuppressWarnings("unchecked")
public RecordReader<K, V> getRecordReader(InputSplit split, JobConf conf,
Reporter reporter) throws IOException {
// Find the InputFormat and then the RecordReader from the
// TaggedInputSplit.
TaggedInputSplit taggedInputSplit = (TaggedInputSplit) split;
InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils
.newInstance(taggedInputSplit.getInputFormatClass(), conf);
return inputFormat.getRecordReader(taggedInputSplit.getInputSplit(), conf,
reporter);
}
/**
* Convenience method for constructing composite formats.
* Given operation (op), Object class (inf), set of paths (p) return:
* {@code <op>(tbl(<inf>,<p1>),tbl(<inf>,<p2>),...,tbl(<inf>,<pn>)) }
*/
public static String compose(String op, Class<? extends InputFormat> inf,
String... path) {
final String infname = inf.getName();
StringBuffer ret = new StringBuffer(op + '(');
for (String p : path) {
compose(infname, p, ret);
ret.append(',');
}
ret.setCharAt(ret.length() - 1, ')');
return ret.toString();
}
@SuppressWarnings("unchecked")
public void readFields(DataInput in) throws IOException {
inputSplitClass = (Class<? extends InputSplit>) readClass(in);
inputSplit = (InputSplit) ReflectionUtils
.newInstance(inputSplitClass, conf);
inputSplit.readFields(in);
inputFormatClass = (Class<? extends InputFormat>) readClass(in);
mapperClass = (Class<? extends Mapper>) readClass(in);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test(timeout=10000)
public void testGroupedSplitWithDuplicates() throws IOException {
JobConf job = new JobConf(defaultConf);
InputFormat mockWrappedFormat = mock(InputFormat.class);
TezGroupedSplitsInputFormat<LongWritable , Text> format =
new TezGroupedSplitsInputFormat<LongWritable, Text>();
format.setConf(job);
format.setInputFormat(mockWrappedFormat);
// put multiple splits with multiple copies in the same location
String[] locations = {"common", "common", "common"};
int numSplits = 3;
InputSplit[] mockSplits = new InputSplit[numSplits];
for (int i=0; i<numSplits; i++) {
InputSplit mockSplit = mock(InputSplit.class);
when(mockSplit.getLength()).thenReturn(10*1000*1000l);
when(mockSplit.getLocations()).thenReturn(locations);
mockSplits[i] = mockSplit;
}
when(mockWrappedFormat.getSplits((JobConf)anyObject(), anyInt())).thenReturn(mockSplits);
format.setDesiredNumberOfSplits(1);
InputSplit[] splits = format.getSplits(job, 1);
Assert.assertEquals(1, splits.length);
TezGroupedSplit split = (TezGroupedSplit) splits[0];
// all 3 splits are present
Assert.assertEquals(numSplits, split.wrappedSplits.size());
Set<InputSplit> splitSet = Sets.newHashSet(split.wrappedSplits);
Assert.assertEquals(numSplits, splitSet.size());
}
@SuppressWarnings("unchecked")
public void readFields(DataInput in) throws IOException {
inputSplitClass = (Class<? extends InputSplit>) readClass(in);
inputSplit = (InputSplit) ReflectionUtils
.newInstance(inputSplitClass, conf);
inputSplit.readFields(in);
inputFormatClass = (Class<? extends InputFormat>) readClass(in);
mapperClass = (Class<? extends Mapper>) readClass(in);
}
public void testAddInputPathWithFormat() {
final JobConf conf = new JobConf();
MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class);
MultipleInputs.addInputPath(conf, new Path("/bar"),
KeyValueTextInputFormat.class);
final Map<Path, InputFormat> inputs = MultipleInputs
.getInputFormatMap(conf);
assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass());
assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar"))
.getClass());
}
/**
* Add a {@link Path} with a custom {@link InputFormat} to the list of
* inputs for the map-reduce job.
*
* @param conf The configuration of the job
* @param path {@link Path} to be added to the list of inputs for the job
* @param inputFormatClass {@link InputFormat} class to use for this path
*/
public static void addInputPath(JobConf conf, Path path,
Class<? extends InputFormat> inputFormatClass) {
String inputFormatMapping = path.toString() + ";"
+ inputFormatClass.getName();
String inputFormats = conf.get("mapreduce.input.multipleinputs.dir.formats");
conf.set("mapreduce.input.multipleinputs.dir.formats",
inputFormats == null ? inputFormatMapping : inputFormats + ","
+ inputFormatMapping);
conf.setInputFormat(DelegatingInputFormat.class);
}