org.apache.hadoop.mapred.SequenceFileInputFormat#getSplits ( )源码实例Demo

下面列出了org.apache.hadoop.mapred.SequenceFileInputFormat#getSplits ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: incubator-tez   文件: MapUtils.java
private static InputSplit 
createInputSplit(FileSystem fs, Path workDir, JobConf job, Path file) 
    throws IOException {
  FileInputFormat.setInputPaths(job, workDir);

  LOG.info("Generating data at path: " + file);
  // create a file with length entries
  @SuppressWarnings("deprecation")
  SequenceFile.Writer writer = 
      SequenceFile.createWriter(fs, job, file, 
          LongWritable.class, Text.class);
  try {
    Random r = new Random(System.currentTimeMillis());
    LongWritable key = new LongWritable();
    Text value = new Text();
    for (int i = 10; i > 0; i--) {
      key.set(r.nextInt(1000));
      value.set(Integer.toString(i));
      writer.append(key, value);
      LOG.info("<k, v> : <" + key.get() + ", " + value + ">");
    }
  } finally {
    writer.close();
  }
  
  SequenceFileInputFormat<LongWritable, Text> format = 
      new SequenceFileInputFormat<LongWritable, Text>();
  InputSplit[] splits = format.getSplits(job, 1);
  System.err.println("#split = " + splits.length + " ; " +
      "#locs = " + splits[0].getLocations().length + "; " +
      "loc = " + splits[0].getLocations()[0] + "; " + 
      "off = " + splits[0].getLength() + "; " +
      "file = " + ((FileSplit)splits[0]).getPath());
  return splits[0];
}
 
源代码2 项目: tez   文件: MapUtils.java
private static InputSplit 
createInputSplit(FileSystem fs, Path workDir, JobConf job, Path file, int numKVs)
    throws IOException {
  FileInputFormat.setInputPaths(job, workDir);

  LOG.info("Generating data at path: " + file);
  // create a file with length entries
  @SuppressWarnings("deprecation")
  SequenceFile.Writer writer = 
      SequenceFile.createWriter(fs, job, file, 
          LongWritable.class, Text.class);
  try {
    Random r = new Random(System.currentTimeMillis());
    LongWritable key = new LongWritable();
    Text value = new Text();
    for (int i = numKVs; i > 0; i--) {
      key.set(r.nextInt(1000));
      value.set(Integer.toString(i));
      writer.append(key, value);
      LOG.info("<k, v> : <" + key.get() + ", " + value + ">");
    }
  } finally {
    writer.close();
  }
  
  SequenceFileInputFormat<LongWritable, Text> format = 
      new SequenceFileInputFormat<LongWritable, Text>();
  InputSplit[] splits = format.getSplits(job, 1);
  System.err.println("#split = " + splits.length + " ; " +
      "#locs = " + splits[0].getLocations().length + "; " +
      "loc = " + splits[0].getLocations()[0] + "; " + 
      "off = " + splits[0].getLength() + "; " +
      "file = " + ((FileSplit)splits[0]).getPath());
  return splits[0];
}
 
源代码3 项目: tez   文件: TestMultiMRInput.java
@Test(timeout = 5000)
public void testSingleSplit() throws Exception {

  Path workDir = new Path(TEST_ROOT_DIR, "testSingleSplit");
  JobConf jobConf = new JobConf(defaultConf);
  jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class);
  FileInputFormat.setInputPaths(jobConf, workDir);

  InputContext inputContext = createTezInputContext(jobConf, new Configuration(false));

  MultiMRInput input = new MultiMRInput(inputContext, 1);
  input.initialize();

  AtomicLong inputLength = new AtomicLong();
  LinkedHashMap<LongWritable, Text> data = createSplits(1, workDir, jobConf, inputLength);

  SequenceFileInputFormat<LongWritable, Text> format =
      new SequenceFileInputFormat<LongWritable, Text>();
  InputSplit[] splits = format.getSplits(jobConf, 1);
  assertEquals(1, splits.length);

  MRSplitProto splitProto = MRInputHelpers.createSplitProto(splits[0]);
  InputDataInformationEvent event =
      InputDataInformationEvent.createWithSerializedPayload(0,
          splitProto.toByteString().asReadOnlyByteBuffer());

  List<Event> eventList = new ArrayList<Event>();
  eventList.add(event);
  input.handleEvents(eventList);

  assertReaders(input, data, 1, inputLength.get());
}
 
源代码4 项目: tez   文件: TestMultiMRInput.java
@Test(timeout = 5000)
public void testMultipleSplits() throws Exception {

  Path workDir = new Path(TEST_ROOT_DIR, "testMultipleSplits");
  JobConf jobConf = new JobConf(defaultConf);
  jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class);
  FileInputFormat.setInputPaths(jobConf, workDir);

  InputContext inputContext = createTezInputContext(jobConf, new Configuration(false));

  MultiMRInput input = new MultiMRInput(inputContext, 2);
  input.initialize();

  AtomicLong inputLength = new AtomicLong();
  LinkedHashMap<LongWritable, Text> data = createSplits(2, workDir, jobConf, inputLength);

  SequenceFileInputFormat<LongWritable, Text> format =
      new SequenceFileInputFormat<LongWritable, Text>();
  InputSplit[] splits = format.getSplits(jobConf, 2);
  assertEquals(2, splits.length);

  MRSplitProto splitProto1 = MRInputHelpers.createSplitProto(splits[0]);
  InputDataInformationEvent event1 =
      InputDataInformationEvent.createWithSerializedPayload(0,
          splitProto1.toByteString().asReadOnlyByteBuffer());

  MRSplitProto splitProto2 = MRInputHelpers.createSplitProto(splits[1]);
  InputDataInformationEvent event2 =
      InputDataInformationEvent.createWithSerializedPayload(0,
          splitProto2.toByteString().asReadOnlyByteBuffer());

  List<Event> eventList = new ArrayList<Event>();
  eventList.add(event1);
  eventList.add(event2);
  input.handleEvents(eventList);

  assertReaders(input, data, 2, inputLength.get());
}
 
源代码5 项目: tez   文件: TestMultiMRInput.java
@Test(timeout = 5000)
public void testExtraEvents() throws Exception {
  Path workDir = new Path(TEST_ROOT_DIR, "testExtraEvents");
  JobConf jobConf = new JobConf(defaultConf);
  jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class);
  FileInputFormat.setInputPaths(jobConf, workDir);

  InputContext inputContext = createTezInputContext(jobConf, new Configuration(false));

  MultiMRInput input = new MultiMRInput(inputContext, 1);
  input.initialize();

  createSplits(1, workDir, jobConf, new AtomicLong());

  SequenceFileInputFormat<LongWritable, Text> format =
      new SequenceFileInputFormat<LongWritable, Text>();
  InputSplit[] splits = format.getSplits(jobConf, 1);
  assertEquals(1, splits.length);

  MRSplitProto splitProto = MRInputHelpers.createSplitProto(splits[0]);
  InputDataInformationEvent event1 =
      InputDataInformationEvent.createWithSerializedPayload(0,
          splitProto.toByteString().asReadOnlyByteBuffer());
  InputDataInformationEvent event2 =
      InputDataInformationEvent.createWithSerializedPayload(1,
          splitProto.toByteString().asReadOnlyByteBuffer());

  List<Event> eventList = new ArrayList<Event>();
  eventList.add(event1);
  eventList.add(event2);
  try {
    input.handleEvents(eventList);
    fail("Expecting Exception due to too many events");
  } catch (Exception e) {
    assertTrue(e.getMessage().contains(
        "Unexpected event. All physical sources already initialized"));
  }
}
 
源代码6 项目: incubator-tez   文件: TestMultiMRInput.java
@Test(timeout = 5000)
public void testSingleSplit() throws Exception {

  Path workDir = new Path(TEST_ROOT_DIR, "testSingleSplit");
  JobConf jobConf = new JobConf(defaultConf);
  jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class);
  FileInputFormat.setInputPaths(jobConf, workDir);

  MRInputUserPayloadProto.Builder builder = MRInputUserPayloadProto.newBuilder();
  builder.setInputFormatName(SequenceFileInputFormat.class.getName());
  builder.setConfigurationBytes(TezUtils.createByteStringFromConf(jobConf));
  byte[] payload = builder.build().toByteArray();

  TezInputContext inputContext = createTezInputContext(payload);

  MultiMRInput input = new MultiMRInput();
  input.setNumPhysicalInputs(1);
  input.initialize(inputContext);
  List<Event> eventList = new ArrayList<Event>();

  String file1 = "file1";
  LinkedHashMap<LongWritable, Text> data1 = createInputData(localFs, workDir, jobConf, file1, 0,
      10);
  SequenceFileInputFormat<LongWritable, Text> format =
      new SequenceFileInputFormat<LongWritable, Text>();
  InputSplit[] splits = format.getSplits(jobConf, 1);
  assertEquals(1, splits.length);

  MRSplitProto splitProto = MRHelpers.createSplitProto(splits[0]);
  RootInputDataInformationEvent event = new RootInputDataInformationEvent(0,
      splitProto.toByteArray());

  eventList.clear();
  eventList.add(event);
  input.handleEvents(eventList);

  int readerCount = 0;
  for (KeyValueReader reader : input.getKeyValueReaders()) {
    readerCount++;
    while (reader.next()) {
      if (data1.size() == 0) {
        fail("Found more records than expected");
      }
      Object key = reader.getCurrentKey();
      Object val = reader.getCurrentValue();
      assertEquals(val, data1.remove(key));
    }
  }
  assertEquals(1, readerCount);
}
 
源代码7 项目: incubator-tez   文件: TestMultiMRInput.java
@Test(timeout = 5000)
public void testMultipleSplits() throws Exception {

  Path workDir = new Path(TEST_ROOT_DIR, "testMultipleSplits");
  JobConf jobConf = new JobConf(defaultConf);
  jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class);
  FileInputFormat.setInputPaths(jobConf, workDir);

  MRInputUserPayloadProto.Builder builder = MRInputUserPayloadProto.newBuilder();
  builder.setInputFormatName(SequenceFileInputFormat.class.getName());
  builder.setConfigurationBytes(TezUtils.createByteStringFromConf(jobConf));
  byte[] payload = builder.build().toByteArray();

  TezInputContext inputContext = createTezInputContext(payload);

  MultiMRInput input = new MultiMRInput();
  input.setNumPhysicalInputs(2);
  input.initialize(inputContext);
  List<Event> eventList = new ArrayList<Event>();

  LinkedHashMap<LongWritable, Text> data = new LinkedHashMap<LongWritable, Text>();

  String file1 = "file1";
  LinkedHashMap<LongWritable, Text> data1 = createInputData(localFs, workDir, jobConf, file1, 0,
      10);

  String file2 = "file2";
  LinkedHashMap<LongWritable, Text> data2 = createInputData(localFs, workDir, jobConf, file2, 10,
      20);

  data.putAll(data1);
  data.putAll(data2);

  SequenceFileInputFormat<LongWritable, Text> format =
      new SequenceFileInputFormat<LongWritable, Text>();
  InputSplit[] splits = format.getSplits(jobConf, 2);
  assertEquals(2, splits.length);

  MRSplitProto splitProto1 = MRHelpers.createSplitProto(splits[0]);
  RootInputDataInformationEvent event1 = new RootInputDataInformationEvent(0,
      splitProto1.toByteArray());

  MRSplitProto splitProto2 = MRHelpers.createSplitProto(splits[1]);
  RootInputDataInformationEvent event2 = new RootInputDataInformationEvent(0,
      splitProto2.toByteArray());

  eventList.clear();
  eventList.add(event1);
  eventList.add(event2);
  input.handleEvents(eventList);

  int readerCount = 0;
  for (KeyValueReader reader : input.getKeyValueReaders()) {
    readerCount++;
    while (reader.next()) {
      if (data.size() == 0) {
        fail("Found more records than expected");
      }
      Object key = reader.getCurrentKey();
      Object val = reader.getCurrentValue();
      assertEquals(val, data.remove(key));
    }
  }
  assertEquals(2, readerCount);
}
 
源代码8 项目: incubator-tez   文件: TestMultiMRInput.java
@Test(timeout = 5000)
public void testExtraEvents() throws Exception {
  Path workDir = new Path(TEST_ROOT_DIR, "testExtraEvents");
  JobConf jobConf = new JobConf(defaultConf);
  jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class);
  FileInputFormat.setInputPaths(jobConf, workDir);

  MRInputUserPayloadProto.Builder builder = MRInputUserPayloadProto.newBuilder();
  builder.setInputFormatName(SequenceFileInputFormat.class.getName());
  builder.setConfigurationBytes(TezUtils.createByteStringFromConf(jobConf));
  byte[] payload = builder.build().toByteArray();

  TezInputContext inputContext = createTezInputContext(payload);

  MultiMRInput input = new MultiMRInput();
  input.setNumPhysicalInputs(1);
  input.initialize(inputContext);
  List<Event> eventList = new ArrayList<Event>();

  String file1 = "file1";
  createInputData(localFs, workDir, jobConf, file1, 0, 10);
  SequenceFileInputFormat<LongWritable, Text> format =
      new SequenceFileInputFormat<LongWritable, Text>();
  InputSplit[] splits = format.getSplits(jobConf, 1);
  assertEquals(1, splits.length);

  MRSplitProto splitProto = MRHelpers.createSplitProto(splits[0]);
  RootInputDataInformationEvent event1 = new RootInputDataInformationEvent(0,
      splitProto.toByteArray());
  RootInputDataInformationEvent event2 = new RootInputDataInformationEvent(1,
      splitProto.toByteArray());

  eventList.clear();
  eventList.add(event1);
  eventList.add(event2);
  try {
    input.handleEvents(eventList);
    fail("Expecting Exception due to too many events");
  } catch (Exception e) {
    assertTrue(e.getMessage().contains(
        "Unexpected event. All physical sources already initialized"));
  }
}