下面列出了怎么用org.apache.hadoop.mapred.KeyValueTextInputFormat的API类实例代码及写法,或者点击链接到github查看源代码。
public void testAddInputPathWithMapper() {
final JobConf conf = new JobConf();
MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class,
MapClass.class);
MultipleInputs.addInputPath(conf, new Path("/bar"),
KeyValueTextInputFormat.class, MapClass2.class);
final Map<Path, InputFormat> inputs = MultipleInputs
.getInputFormatMap(conf);
final Map<Path, Class<? extends Mapper>> maps = MultipleInputs
.getMapperTypeMap(conf);
assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass());
assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar"))
.getClass());
assertEquals(MapClass.class, maps.get(new Path("/foo")));
assertEquals(MapClass2.class, maps.get(new Path("/bar")));
}
public void testAddInputPathWithMapper() {
final JobConf conf = new JobConf();
MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class,
MapClass.class);
MultipleInputs.addInputPath(conf, new Path("/bar"),
KeyValueTextInputFormat.class, MapClass2.class);
final Map<Path, InputFormat> inputs = MultipleInputs
.getInputFormatMap(conf);
final Map<Path, Class<? extends Mapper>> maps = MultipleInputs
.getMapperTypeMap(conf);
assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass());
assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar"))
.getClass());
assertEquals(MapClass.class, maps.get(new Path("/foo")));
assertEquals(MapClass2.class, maps.get(new Path("/bar")));
}
public void testAddInputPathWithMapper() {
final JobConf conf = new JobConf();
MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class,
MapClass.class);
MultipleInputs.addInputPath(conf, new Path("/bar"),
KeyValueTextInputFormat.class, MapClass2.class);
final Map<Path, InputFormat> inputs = MultipleInputs
.getInputFormatMap(conf);
final Map<Path, Class<? extends Mapper>> maps = MultipleInputs
.getMapperTypeMap(conf);
assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass());
assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar"))
.getClass());
assertEquals(MapClass.class, maps.get(new Path("/foo")));
assertEquals(MapClass2.class, maps.get(new Path("/bar")));
}
public void testAddInputPathWithMapper() {
final JobConf conf = new JobConf();
MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class,
MapClass.class);
MultipleInputs.addInputPath(conf, new Path("/bar"),
KeyValueTextInputFormat.class, MapClass2.class);
final Map<Path, InputFormat> inputs = MultipleInputs
.getInputFormatMap(conf);
final Map<Path, Class<? extends Mapper>> maps = MultipleInputs
.getMapperTypeMap(conf);
assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass());
assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar"))
.getClass());
assertEquals(MapClass.class, maps.get(new Path("/foo")));
assertEquals(MapClass2.class, maps.get(new Path("/bar")));
}
public void testAddInputPathWithFormat() {
final JobConf conf = new JobConf();
MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class);
MultipleInputs.addInputPath(conf, new Path("/bar"),
KeyValueTextInputFormat.class);
final Map<Path, InputFormat> inputs = MultipleInputs
.getInputFormatMap(conf);
assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass());
assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar"))
.getClass());
}
@Test
public void testCreateJob() throws IOException {
JobConf job;
ArrayList<String> dummyArgs = new ArrayList<String>();
dummyArgs.add("-input"); dummyArgs.add("dummy");
dummyArgs.add("-output"); dummyArgs.add("dummy");
dummyArgs.add("-mapper"); dummyArgs.add("dummy");
dummyArgs.add("-reducer"); dummyArgs.add("dummy");
ArrayList<String> args;
args = new ArrayList<String>(dummyArgs);
args.add("-inputformat");
args.add("org.apache.hadoop.mapred.KeyValueTextInputFormat");
job = StreamJob.createJob(args.toArray(new String[] {}));
assertEquals(KeyValueTextInputFormat.class, job.getInputFormat().getClass());
args = new ArrayList<String>(dummyArgs);
args.add("-inputformat");
args.add("org.apache.hadoop.mapred.SequenceFileInputFormat");
job = StreamJob.createJob(args.toArray(new String[] {}));
assertEquals(SequenceFileInputFormat.class, job.getInputFormat().getClass());
args = new ArrayList<String>(dummyArgs);
args.add("-inputformat");
args.add("org.apache.hadoop.mapred.KeyValueTextInputFormat");
args.add("-inputreader");
args.add("StreamXmlRecordReader,begin=<doc>,end=</doc>");
job = StreamJob.createJob(args.toArray(new String[] {}));
assertEquals(StreamInputFormat.class, job.getInputFormat().getClass());
}
public void testAddInputPathWithFormat() {
final JobConf conf = new JobConf();
MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class);
MultipleInputs.addInputPath(conf, new Path("/bar"),
KeyValueTextInputFormat.class);
final Map<Path, InputFormat> inputs = MultipleInputs
.getInputFormatMap(conf);
assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass());
assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar"))
.getClass());
}
@Test
public void testCreateJob() throws IOException {
JobConf job;
ArrayList<String> dummyArgs = new ArrayList<String>();
dummyArgs.add("-input"); dummyArgs.add("dummy");
dummyArgs.add("-output"); dummyArgs.add("dummy");
dummyArgs.add("-mapper"); dummyArgs.add("dummy");
dummyArgs.add("-reducer"); dummyArgs.add("dummy");
ArrayList<String> args;
args = new ArrayList<String>(dummyArgs);
args.add("-inputformat");
args.add("org.apache.hadoop.mapred.KeyValueTextInputFormat");
job = StreamJob.createJob(args.toArray(new String[] {}));
assertEquals(KeyValueTextInputFormat.class, job.getInputFormat().getClass());
args = new ArrayList<String>(dummyArgs);
args.add("-inputformat");
args.add("org.apache.hadoop.mapred.SequenceFileInputFormat");
job = StreamJob.createJob(args.toArray(new String[] {}));
assertEquals(SequenceFileInputFormat.class, job.getInputFormat().getClass());
args = new ArrayList<String>(dummyArgs);
args.add("-inputformat");
args.add("org.apache.hadoop.mapred.KeyValueTextInputFormat");
args.add("-inputreader");
args.add("StreamXmlRecordReader,begin=<doc>,end=</doc>");
job = StreamJob.createJob(args.toArray(new String[] {}));
assertEquals(StreamInputFormat.class, job.getInputFormat().getClass());
}
private InputSplit[] getSplits(JobConf conf, int numSplits, String path) throws Exception
{
FileInputFormat.setInputPaths(conf, new Path(path));
if (inputFormat == null) {
inputFormat = inputFormatClass.newInstance();
String inputFormatClassName = inputFormatClass.getName();
if (inputFormatClassName.equals("org.apache.hadoop.mapred.TextInputFormat")) {
((TextInputFormat)inputFormat).configure(conf);
} else if (inputFormatClassName.equals("org.apache.hadoop.mapred.KeyValueTextInputFormat")) {
((KeyValueTextInputFormat)inputFormat).configure(conf);
}
}
return inputFormat.getSplits(conf, numSplits);
// return null;
}
public void testAddInputPathWithFormat() {
final JobConf conf = new JobConf();
MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class);
MultipleInputs.addInputPath(conf, new Path("/bar"),
KeyValueTextInputFormat.class);
final Map<Path, InputFormat> inputs = MultipleInputs
.getInputFormatMap(conf);
assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass());
assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar"))
.getClass());
}
@Override
public int run(String[] args) throws Exception {
if (args.length != 4) {
printUsage();
}
Path userPath = new Path(args[0]);
Path commentPath = new Path(args[1]);
Path outputDir = new Path(args[2]);
String joinType = args[3];
JobConf conf = new JobConf("CompositeJoin");
conf.setJarByClass(CompositeUserJoin.class);
conf.setMapperClass(CompositeMapper.class);
conf.setNumReduceTasks(0);
// Set the input format class to a CompositeInputFormat class.
// The CompositeInputFormat will parse all of our input files and output
// records to our mapper.
conf.setInputFormat(CompositeInputFormat.class);
// The composite input format join expression will set how the records
// are going to be read in, and in what input format.
conf.set("mapred.join.expr", CompositeInputFormat.compose(joinType,
KeyValueTextInputFormat.class, userPath, commentPath));
TextOutputFormat.setOutputPath(conf, outputDir);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
RunningJob job = JobClient.runJob(conf);
while (!job.isComplete()) {
Thread.sleep(1000);
}
return job.isSuccessful() ? 0 : 1;
}
public void testAddInputPathWithFormat() {
final JobConf conf = new JobConf();
MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class);
MultipleInputs.addInputPath(conf, new Path("/bar"),
KeyValueTextInputFormat.class);
final Map<Path, InputFormat> inputs = MultipleInputs
.getInputFormatMap(conf);
assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass());
assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar"))
.getClass());
}
public void testSplitting() throws Exception {
JobConf conf = new JobConf();
MiniDFSCluster dfs = null;
try {
dfs = new MiniDFSCluster.Builder(conf).numDataNodes(4)
.racks(new String[] { "/rack0", "/rack0", "/rack1", "/rack1" })
.hosts(new String[] { "host0", "host1", "host2", "host3" })
.build();
FileSystem fs = dfs.getFileSystem();
Path path = getPath("/foo/bar", fs);
Path path2 = getPath("/foo/baz", fs);
Path path3 = getPath("/bar/bar", fs);
Path path4 = getPath("/bar/baz", fs);
final int numSplits = 100;
MultipleInputs.addInputPath(conf, path, TextInputFormat.class,
MapClass.class);
MultipleInputs.addInputPath(conf, path2, TextInputFormat.class,
MapClass2.class);
MultipleInputs.addInputPath(conf, path3, KeyValueTextInputFormat.class,
MapClass.class);
MultipleInputs.addInputPath(conf, path4, TextInputFormat.class,
MapClass2.class);
DelegatingInputFormat inFormat = new DelegatingInputFormat();
InputSplit[] splits = inFormat.getSplits(conf, numSplits);
int[] bins = new int[3];
for (InputSplit split : splits) {
assertTrue(split instanceof TaggedInputSplit);
final TaggedInputSplit tis = (TaggedInputSplit) split;
int index = -1;
if (tis.getInputFormatClass().equals(KeyValueTextInputFormat.class)) {
// path3
index = 0;
} else if (tis.getMapperClass().equals(MapClass.class)) {
// path
index = 1;
} else {
// path2 and path4
index = 2;
}
bins[index]++;
}
// Each bin is a unique combination of a Mapper and InputFormat, and
// DelegatingInputFormat should split each bin into numSplits splits,
// regardless of the number of paths that use that Mapper/InputFormat
for (int count : bins) {
assertEquals(numSplits, count);
}
assertTrue(true);
} finally {
if (dfs != null) {
dfs.shutdown();
}
}
}
public void testSplitting() throws Exception {
JobConf conf = new JobConf();
MiniDFSCluster dfs = null;
try {
dfs = new MiniDFSCluster.Builder(conf).numDataNodes(4)
.racks(new String[] { "/rack0", "/rack0", "/rack1", "/rack1" })
.hosts(new String[] { "host0", "host1", "host2", "host3" })
.build();
FileSystem fs = dfs.getFileSystem();
Path path = getPath("/foo/bar", fs);
Path path2 = getPath("/foo/baz", fs);
Path path3 = getPath("/bar/bar", fs);
Path path4 = getPath("/bar/baz", fs);
final int numSplits = 100;
MultipleInputs.addInputPath(conf, path, TextInputFormat.class,
MapClass.class);
MultipleInputs.addInputPath(conf, path2, TextInputFormat.class,
MapClass2.class);
MultipleInputs.addInputPath(conf, path3, KeyValueTextInputFormat.class,
MapClass.class);
MultipleInputs.addInputPath(conf, path4, TextInputFormat.class,
MapClass2.class);
DelegatingInputFormat inFormat = new DelegatingInputFormat();
InputSplit[] splits = inFormat.getSplits(conf, numSplits);
int[] bins = new int[3];
for (InputSplit split : splits) {
assertTrue(split instanceof TaggedInputSplit);
final TaggedInputSplit tis = (TaggedInputSplit) split;
int index = -1;
if (tis.getInputFormatClass().equals(KeyValueTextInputFormat.class)) {
// path3
index = 0;
} else if (tis.getMapperClass().equals(MapClass.class)) {
// path
index = 1;
} else {
// path2 and path4
index = 2;
}
bins[index]++;
}
// Each bin is a unique combination of a Mapper and InputFormat, and
// DelegatingInputFormat should split each bin into numSplits splits,
// regardless of the number of paths that use that Mapper/InputFormat
for (int count : bins) {
assertEquals(numSplits, count);
}
assertTrue(true);
} finally {
if (dfs != null) {
dfs.shutdown();
}
}
}