下面列出了org.apache.hadoop.mapred.SequenceFileInputFormat#getSplits ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private static InputSplit
createInputSplit(FileSystem fs, Path workDir, JobConf job, Path file)
throws IOException {
FileInputFormat.setInputPaths(job, workDir);
LOG.info("Generating data at path: " + file);
// create a file with length entries
@SuppressWarnings("deprecation")
SequenceFile.Writer writer =
SequenceFile.createWriter(fs, job, file,
LongWritable.class, Text.class);
try {
Random r = new Random(System.currentTimeMillis());
LongWritable key = new LongWritable();
Text value = new Text();
for (int i = 10; i > 0; i--) {
key.set(r.nextInt(1000));
value.set(Integer.toString(i));
writer.append(key, value);
LOG.info("<k, v> : <" + key.get() + ", " + value + ">");
}
} finally {
writer.close();
}
SequenceFileInputFormat<LongWritable, Text> format =
new SequenceFileInputFormat<LongWritable, Text>();
InputSplit[] splits = format.getSplits(job, 1);
System.err.println("#split = " + splits.length + " ; " +
"#locs = " + splits[0].getLocations().length + "; " +
"loc = " + splits[0].getLocations()[0] + "; " +
"off = " + splits[0].getLength() + "; " +
"file = " + ((FileSplit)splits[0]).getPath());
return splits[0];
}
private static InputSplit
createInputSplit(FileSystem fs, Path workDir, JobConf job, Path file, int numKVs)
throws IOException {
FileInputFormat.setInputPaths(job, workDir);
LOG.info("Generating data at path: " + file);
// create a file with length entries
@SuppressWarnings("deprecation")
SequenceFile.Writer writer =
SequenceFile.createWriter(fs, job, file,
LongWritable.class, Text.class);
try {
Random r = new Random(System.currentTimeMillis());
LongWritable key = new LongWritable();
Text value = new Text();
for (int i = numKVs; i > 0; i--) {
key.set(r.nextInt(1000));
value.set(Integer.toString(i));
writer.append(key, value);
LOG.info("<k, v> : <" + key.get() + ", " + value + ">");
}
} finally {
writer.close();
}
SequenceFileInputFormat<LongWritable, Text> format =
new SequenceFileInputFormat<LongWritable, Text>();
InputSplit[] splits = format.getSplits(job, 1);
System.err.println("#split = " + splits.length + " ; " +
"#locs = " + splits[0].getLocations().length + "; " +
"loc = " + splits[0].getLocations()[0] + "; " +
"off = " + splits[0].getLength() + "; " +
"file = " + ((FileSplit)splits[0]).getPath());
return splits[0];
}
@Test(timeout = 5000)
public void testSingleSplit() throws Exception {
Path workDir = new Path(TEST_ROOT_DIR, "testSingleSplit");
JobConf jobConf = new JobConf(defaultConf);
jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class);
FileInputFormat.setInputPaths(jobConf, workDir);
InputContext inputContext = createTezInputContext(jobConf, new Configuration(false));
MultiMRInput input = new MultiMRInput(inputContext, 1);
input.initialize();
AtomicLong inputLength = new AtomicLong();
LinkedHashMap<LongWritable, Text> data = createSplits(1, workDir, jobConf, inputLength);
SequenceFileInputFormat<LongWritable, Text> format =
new SequenceFileInputFormat<LongWritable, Text>();
InputSplit[] splits = format.getSplits(jobConf, 1);
assertEquals(1, splits.length);
MRSplitProto splitProto = MRInputHelpers.createSplitProto(splits[0]);
InputDataInformationEvent event =
InputDataInformationEvent.createWithSerializedPayload(0,
splitProto.toByteString().asReadOnlyByteBuffer());
List<Event> eventList = new ArrayList<Event>();
eventList.add(event);
input.handleEvents(eventList);
assertReaders(input, data, 1, inputLength.get());
}
@Test(timeout = 5000)
public void testMultipleSplits() throws Exception {
Path workDir = new Path(TEST_ROOT_DIR, "testMultipleSplits");
JobConf jobConf = new JobConf(defaultConf);
jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class);
FileInputFormat.setInputPaths(jobConf, workDir);
InputContext inputContext = createTezInputContext(jobConf, new Configuration(false));
MultiMRInput input = new MultiMRInput(inputContext, 2);
input.initialize();
AtomicLong inputLength = new AtomicLong();
LinkedHashMap<LongWritable, Text> data = createSplits(2, workDir, jobConf, inputLength);
SequenceFileInputFormat<LongWritable, Text> format =
new SequenceFileInputFormat<LongWritable, Text>();
InputSplit[] splits = format.getSplits(jobConf, 2);
assertEquals(2, splits.length);
MRSplitProto splitProto1 = MRInputHelpers.createSplitProto(splits[0]);
InputDataInformationEvent event1 =
InputDataInformationEvent.createWithSerializedPayload(0,
splitProto1.toByteString().asReadOnlyByteBuffer());
MRSplitProto splitProto2 = MRInputHelpers.createSplitProto(splits[1]);
InputDataInformationEvent event2 =
InputDataInformationEvent.createWithSerializedPayload(0,
splitProto2.toByteString().asReadOnlyByteBuffer());
List<Event> eventList = new ArrayList<Event>();
eventList.add(event1);
eventList.add(event2);
input.handleEvents(eventList);
assertReaders(input, data, 2, inputLength.get());
}
@Test(timeout = 5000)
public void testExtraEvents() throws Exception {
Path workDir = new Path(TEST_ROOT_DIR, "testExtraEvents");
JobConf jobConf = new JobConf(defaultConf);
jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class);
FileInputFormat.setInputPaths(jobConf, workDir);
InputContext inputContext = createTezInputContext(jobConf, new Configuration(false));
MultiMRInput input = new MultiMRInput(inputContext, 1);
input.initialize();
createSplits(1, workDir, jobConf, new AtomicLong());
SequenceFileInputFormat<LongWritable, Text> format =
new SequenceFileInputFormat<LongWritable, Text>();
InputSplit[] splits = format.getSplits(jobConf, 1);
assertEquals(1, splits.length);
MRSplitProto splitProto = MRInputHelpers.createSplitProto(splits[0]);
InputDataInformationEvent event1 =
InputDataInformationEvent.createWithSerializedPayload(0,
splitProto.toByteString().asReadOnlyByteBuffer());
InputDataInformationEvent event2 =
InputDataInformationEvent.createWithSerializedPayload(1,
splitProto.toByteString().asReadOnlyByteBuffer());
List<Event> eventList = new ArrayList<Event>();
eventList.add(event1);
eventList.add(event2);
try {
input.handleEvents(eventList);
fail("Expecting Exception due to too many events");
} catch (Exception e) {
assertTrue(e.getMessage().contains(
"Unexpected event. All physical sources already initialized"));
}
}
@Test(timeout = 5000)
public void testSingleSplit() throws Exception {
Path workDir = new Path(TEST_ROOT_DIR, "testSingleSplit");
JobConf jobConf = new JobConf(defaultConf);
jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class);
FileInputFormat.setInputPaths(jobConf, workDir);
MRInputUserPayloadProto.Builder builder = MRInputUserPayloadProto.newBuilder();
builder.setInputFormatName(SequenceFileInputFormat.class.getName());
builder.setConfigurationBytes(TezUtils.createByteStringFromConf(jobConf));
byte[] payload = builder.build().toByteArray();
TezInputContext inputContext = createTezInputContext(payload);
MultiMRInput input = new MultiMRInput();
input.setNumPhysicalInputs(1);
input.initialize(inputContext);
List<Event> eventList = new ArrayList<Event>();
String file1 = "file1";
LinkedHashMap<LongWritable, Text> data1 = createInputData(localFs, workDir, jobConf, file1, 0,
10);
SequenceFileInputFormat<LongWritable, Text> format =
new SequenceFileInputFormat<LongWritable, Text>();
InputSplit[] splits = format.getSplits(jobConf, 1);
assertEquals(1, splits.length);
MRSplitProto splitProto = MRHelpers.createSplitProto(splits[0]);
RootInputDataInformationEvent event = new RootInputDataInformationEvent(0,
splitProto.toByteArray());
eventList.clear();
eventList.add(event);
input.handleEvents(eventList);
int readerCount = 0;
for (KeyValueReader reader : input.getKeyValueReaders()) {
readerCount++;
while (reader.next()) {
if (data1.size() == 0) {
fail("Found more records than expected");
}
Object key = reader.getCurrentKey();
Object val = reader.getCurrentValue();
assertEquals(val, data1.remove(key));
}
}
assertEquals(1, readerCount);
}
@Test(timeout = 5000)
public void testMultipleSplits() throws Exception {
Path workDir = new Path(TEST_ROOT_DIR, "testMultipleSplits");
JobConf jobConf = new JobConf(defaultConf);
jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class);
FileInputFormat.setInputPaths(jobConf, workDir);
MRInputUserPayloadProto.Builder builder = MRInputUserPayloadProto.newBuilder();
builder.setInputFormatName(SequenceFileInputFormat.class.getName());
builder.setConfigurationBytes(TezUtils.createByteStringFromConf(jobConf));
byte[] payload = builder.build().toByteArray();
TezInputContext inputContext = createTezInputContext(payload);
MultiMRInput input = new MultiMRInput();
input.setNumPhysicalInputs(2);
input.initialize(inputContext);
List<Event> eventList = new ArrayList<Event>();
LinkedHashMap<LongWritable, Text> data = new LinkedHashMap<LongWritable, Text>();
String file1 = "file1";
LinkedHashMap<LongWritable, Text> data1 = createInputData(localFs, workDir, jobConf, file1, 0,
10);
String file2 = "file2";
LinkedHashMap<LongWritable, Text> data2 = createInputData(localFs, workDir, jobConf, file2, 10,
20);
data.putAll(data1);
data.putAll(data2);
SequenceFileInputFormat<LongWritable, Text> format =
new SequenceFileInputFormat<LongWritable, Text>();
InputSplit[] splits = format.getSplits(jobConf, 2);
assertEquals(2, splits.length);
MRSplitProto splitProto1 = MRHelpers.createSplitProto(splits[0]);
RootInputDataInformationEvent event1 = new RootInputDataInformationEvent(0,
splitProto1.toByteArray());
MRSplitProto splitProto2 = MRHelpers.createSplitProto(splits[1]);
RootInputDataInformationEvent event2 = new RootInputDataInformationEvent(0,
splitProto2.toByteArray());
eventList.clear();
eventList.add(event1);
eventList.add(event2);
input.handleEvents(eventList);
int readerCount = 0;
for (KeyValueReader reader : input.getKeyValueReaders()) {
readerCount++;
while (reader.next()) {
if (data.size() == 0) {
fail("Found more records than expected");
}
Object key = reader.getCurrentKey();
Object val = reader.getCurrentValue();
assertEquals(val, data.remove(key));
}
}
assertEquals(2, readerCount);
}
@Test(timeout = 5000)
public void testExtraEvents() throws Exception {
Path workDir = new Path(TEST_ROOT_DIR, "testExtraEvents");
JobConf jobConf = new JobConf(defaultConf);
jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class);
FileInputFormat.setInputPaths(jobConf, workDir);
MRInputUserPayloadProto.Builder builder = MRInputUserPayloadProto.newBuilder();
builder.setInputFormatName(SequenceFileInputFormat.class.getName());
builder.setConfigurationBytes(TezUtils.createByteStringFromConf(jobConf));
byte[] payload = builder.build().toByteArray();
TezInputContext inputContext = createTezInputContext(payload);
MultiMRInput input = new MultiMRInput();
input.setNumPhysicalInputs(1);
input.initialize(inputContext);
List<Event> eventList = new ArrayList<Event>();
String file1 = "file1";
createInputData(localFs, workDir, jobConf, file1, 0, 10);
SequenceFileInputFormat<LongWritable, Text> format =
new SequenceFileInputFormat<LongWritable, Text>();
InputSplit[] splits = format.getSplits(jobConf, 1);
assertEquals(1, splits.length);
MRSplitProto splitProto = MRHelpers.createSplitProto(splits[0]);
RootInputDataInformationEvent event1 = new RootInputDataInformationEvent(0,
splitProto.toByteArray());
RootInputDataInformationEvent event2 = new RootInputDataInformationEvent(1,
splitProto.toByteArray());
eventList.clear();
eventList.add(event1);
eventList.add(event2);
try {
input.handleEvents(eventList);
fail("Expecting Exception due to too many events");
} catch (Exception e) {
assertTrue(e.getMessage().contains(
"Unexpected event. All physical sources already initialized"));
}
}