类org.apache.hadoop.mapred.InputSplit源码实例Demo

下面列出了怎么用org.apache.hadoop.mapred.InputSplit的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: flink   文件: HadoopInputFormatTest.java
@Test
public void testOpenClose() throws Exception {
	DummyRecordReader recordReader = mock(DummyRecordReader.class);
	DummyInputFormat inputFormat = mock(DummyInputFormat.class);
	when(inputFormat.getRecordReader(any(InputSplit.class), any(JobConf.class), any(Reporter.class))).thenReturn(recordReader);

	HadoopInputFormat<String, Long> hadoopInputFormat = new HadoopInputFormat<>(inputFormat, String.class, Long.class, new JobConf());
	hadoopInputFormat.open(getHadoopInputSplit());

	verify(inputFormat, times(1)).getRecordReader(any(InputSplit.class), any(JobConf.class), any(Reporter.class));
	verify(recordReader, times(1)).createKey();
	verify(recordReader, times(1)).createValue();

	assertThat(hadoopInputFormat.fetched, is(false));

	hadoopInputFormat.close();
	verify(recordReader, times(1)).close();
}
 
源代码2 项目: flink   文件: HadoopInputFormatTest.java
@Test
public void testOpenWithConfigurableReader() throws Exception {
	ConfigurableDummyRecordReader recordReader = mock(ConfigurableDummyRecordReader.class);
	DummyInputFormat inputFormat = mock(DummyInputFormat.class);
	when(inputFormat.getRecordReader(any(InputSplit.class), any(JobConf.class), any(Reporter.class))).thenReturn(recordReader);

	HadoopInputFormat<String, Long> hadoopInputFormat = new HadoopInputFormat<>(inputFormat, String.class, Long.class, new JobConf());
	hadoopInputFormat.open(getHadoopInputSplit());

	verify(inputFormat, times(1)).getRecordReader(any(InputSplit.class), any(JobConf.class), any(Reporter.class));
	verify(recordReader, times(1)).setConf(any(JobConf.class));
	verify(recordReader, times(1)).createKey();
	verify(recordReader, times(1)).createValue();

	assertThat(hadoopInputFormat.fetched, is(false));

}
 
@Override
public RecordReader<Text, DynamoDBItemWritable> getRecordReader(InputSplit split, JobConf conf,
    Reporter reporter) throws
    IOException {
  reporter.progress();

  Map<String, String> columnMapping =
      HiveDynamoDBUtil.fromJsonString(conf.get(DynamoDBConstants.DYNAMODB_COLUMN_MAPPING));
  Map<String, String> hiveTypeMapping = HiveDynamoDBUtil.extractHiveTypeMapping(conf);
  DynamoDBQueryFilter queryFilter = getQueryFilter(conf, columnMapping, hiveTypeMapping);
  DynamoDBSplit bbSplit = (DynamoDBSplit) split;
  bbSplit.setDynamoDBFilterPushdown(queryFilter);

  Collection<String> attributes = (columnMapping == null ? null : columnMapping.values());
  DynamoDBRecordReaderContext context = buildHiveDynamoDBRecordReaderContext(bbSplit, conf,
      reporter, attributes);
  return new DefaultDynamoDBRecordReader(context);
}
 
源代码4 项目: hive-dwrf   文件: OrcInputFormat.java
@Override
public RecordReader<NullWritable, OrcLazyRow>
    getRecordReader(InputSplit inputSplit, JobConf conf,
                    Reporter reporter) throws IOException {
  ReaderWriterProfiler.setProfilerOptions(conf);
  FileSplit fileSplit = (FileSplit) inputSplit;
  Path path = fileSplit.getPath();
  FileSystem fs = path.getFileSystem(conf);
  reporter.setStatus(fileSplit.toString());

  return new OrcRecordReader(
      OrcFile.createReader(fs, path, conf),
      conf,
      fileSplit.getStart(),
      fileSplit.getLength()
  );
}
 
/**
 * Allocates the first available split into the evaluator.
 *
 * @param evaluatorId
 *          the evaluator id
 * @param value
 *          the queue of splits
 * @return a numberedSplit or null if it cannot find one
 */
protected NumberedSplit<InputSplit> allocateSplit(final String evaluatorId,
    final BlockingQueue<NumberedSplit<InputSplit>> value) {
  if (value == null) {
    LOG.log(Level.FINE, "Queue of splits can't be empty. Returning null");
    return null;
  }
  while (true) {
    final NumberedSplit<InputSplit> split = value.poll();
    if (split == null) {
      return null;
    }
    if (value == unallocatedSplits || unallocatedSplits.remove(split)) {
      LOG.log(Level.FINE, "Found split-" + split.getIndex() + " in the queue");
      final NumberedSplit<InputSplit> old = evaluatorToSplits.putIfAbsent(evaluatorId, split);
      if (old != null) {
        throw new RuntimeException("Trying to assign different splits to the same evaluator is not supported");
      } else {
        LOG.log(Level.FINE, "Returning " + split.getIndex());
        return split;
      }
    }
  }
}
 
源代码6 项目: systemds   文件: FrameReaderTextCell.java
protected void readTextCellFrameFromHDFS( Path path, JobConf job, FileSystem fs, FrameBlock dest, 
		ValueType[] schema, String[] names, long rlen, long clen)
	throws IOException
{
	if( fs.isDirectory(path) ) {
		FileInputFormat.addInputPath(job, path);
		TextInputFormat informat = new TextInputFormat();
		informat.configure(job);
		InputSplit[] splits = informat.getSplits(job, 1);
		for(InputSplit split: splits)
			readTextCellFrameFromInputSplit(split, informat, job, dest);
	}
	else {
		readRawTextCellFrameFromHDFS(path, job, fs, dest, schema, names, rlen, clen);
	}
}
 
源代码7 项目: mr4c   文件: MR4CInputFormatTest.java
private void doTest(List<List<String>> frameSplits, int overlapBefore, int overlapAfter, Integer chunkSize) throws Exception {
	AlgorithmConfig algoConfig = m_mgr.getExecutionSource().getAlgorithmConfig();
	algoConfig.addDimension(new DimensionConfig("frame", true, overlapBefore, overlapAfter, null, chunkSize, false));
	algoConfig.addDimension(new DimensionConfig("type", false, 0, 0, null, null, false));

	Set<Set<DataKey>> expectedKeySplits = buildExpectedSplits(frameSplits);
	MR4CInputFormat format = new MR4CInputFormat();
	InputSplit[] splits = format.getSplits( m_mgr.getExecutionSource(), 4);

	Set<Set<DataKey>> actualKeySplits=  new HashSet<Set<DataKey>>();
	for ( InputSplit split : splits ) {
		MR4CInputSplit bbSplit = (MR4CInputSplit) split;
		actualKeySplits.add(new HashSet<DataKey>(bbSplit.getKeys().getKeys()));
	}
	assertEquals(expectedKeySplits, actualKeySplits);
}
 
protected List<DataReaderFactory<ColumnarBatch>> getSplitsFactories(String query) {
  List<DataReaderFactory<ColumnarBatch>> tasks = new ArrayList<>();
  try {
    JobConf jobConf = JobUtil.createJobConf(options, query);
    LlapBaseInputFormat llapInputFormat = new LlapBaseInputFormat(false, Long.MAX_VALUE);
    //numSplits arg not currently supported, use 1 as dummy arg
    InputSplit[] splits = llapInputFormat.getSplits(jobConf, 1);
    for (InputSplit split : splits) {
      tasks.add(getDataReaderFactory(split, jobConf, getArrowAllocatorMax()));
    }
  } catch (IOException e) {
    LOG.error("Unable to submit query to HS2");
    throw new RuntimeException(e);
  }
  return tasks;
}
 
static RecordReader<NullWritable, DynamoDBItemWritable> getRecordReader(
    InputSplit inputSplit, JobConf job, Reporter reporter) throws IOException {
  // CombineFileSplit indicates the new export format which includes a manifest file
  if (inputSplit instanceof CombineFileSplit) {
    int version = job.getInt(DynamoDBConstants.EXPORT_FORMAT_VERSION, -1);
    if (version != ExportManifestRecordWriter.FORMAT_VERSION) {
      throw new IOException("Unknown version: " + job.get(DynamoDBConstants
          .EXPORT_FORMAT_VERSION));
    }
    return new ImportCombineFileRecordReader((CombineFileSplit) inputSplit, job, reporter);
  } else if (inputSplit instanceof FileSplit) {
    // FileSplit indicates the old data pipeline format which doesn't include a manifest file
    Path path = ((FileSplit) inputSplit).getPath();
    return new ImportRecordReader(job, path);
  } else {
    throw new IOException("Expecting CombineFileSplit or FileSplit but the input split type is:"
        + " " + inputSplit.getClass());
  }
}
 
源代码10 项目: pxf   文件: HdfsDataFragmenter.java
protected List<InputSplit> getSplits(Path path) throws IOException {
    PxfInputFormat pxfInputFormat = new PxfInputFormat();
    PxfInputFormat.setInputPaths(jobConf, path);
    InputSplit[] splits = pxfInputFormat.getSplits(jobConf, 1);
    List<InputSplit> result = new ArrayList<>();

    /*
     * HD-2547: If the file is empty, an empty split is returned: no
     * locations and no length.
     */
    if (splits != null) {
        for (InputSplit split : splits) {
            if (split.getLength() > 0) {
                result.add(split);
            }
        }
    }

    return result;
}
 
@Test
 public void readEthereumBlockInputFormatBlock3346406() throws IOException, EthereumBlockReadException, ParseException, InterruptedException {
JobConf job = new JobConf(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName="eth3346406.bin";
String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();	
Path file = new Path(fileNameBlock);
   FileInputFormat.setInputPaths(job, file);
   EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat();
   format.configure(job);
   InputSplit[] inputSplits = format.getSplits(job,1);
 
   assertEquals( 1, inputSplits.length,"Only one split generated for genesis block");
   	RecordReader<BytesWritable, EthereumBlock> reader = format.getRecordReader(inputSplits[0], job, reporter);
assertNotNull( reader,"Format returned  null RecordReader");

BytesWritable key = new BytesWritable();	
EthereumBlock block = new EthereumBlock();
assertTrue( reader.next(key,block),"Input Split for block 3346406 contains at least one block");
assertEquals( 7, block.getEthereumTransactions().size(),"Block 3346406 must have 7 transactions");
   	assertFalse( reader.next(key,block),"No further blocks in block 3346406");
   	reader.close();
}
 
源代码12 项目: presto   文件: BackgroundHiveSplitLoader.java
private ListenableFuture<?> addSplitsToSource(InputSplit[] targetSplits, InternalHiveSplitFactory splitFactory)
        throws IOException
{
    ListenableFuture<?> lastResult = COMPLETED_FUTURE;
    for (InputSplit inputSplit : targetSplits) {
        Optional<InternalHiveSplit> internalHiveSplit = splitFactory.createInternalHiveSplit((FileSplit) inputSplit);
        if (internalHiveSplit.isPresent()) {
            lastResult = hiveSplitSource.addToQueue(internalHiveSplit.get());
        }
        if (stopped) {
            return COMPLETED_FUTURE;
        }
    }
    return lastResult;
}
 
源代码13 项目: tez   文件: TezGroupedSplit.java
@SuppressWarnings("unchecked")
@Override
public void readFields(DataInput in) throws IOException {
  wrappedInputFormatName = Text.readString(in);
  String inputSplitClassName = Text.readString(in);
  Class<? extends InputSplit> clazz = null;
  try {
    clazz = (Class<? extends InputSplit>)
    TezGroupedSplitsInputFormat.getClassFromName(inputSplitClassName);
  } catch (TezException e) {
    throw new IOException(e);
  }

  int numSplits = in.readInt();
  
  wrappedSplits = new ArrayList<InputSplit>(numSplits);
  for (int i=0; i<numSplits; ++i) {
    addSplit(readWrappedSplit(in, clazz));
  }
  
  long recordedLength = in.readLong();
  if(recordedLength != length) {
    throw new TezUncheckedException("Expected length: " + recordedLength
        + " actual length: " + length);
  }
  int numLocs = in.readInt();
  if (numLocs > 0) {
    locations = new String[numLocs];
    for (int i=0; i<numLocs; ++i) {
      locations[i] = Text.readString(in);
    }
  }
}
 
@Override
public RecordReader<BytesWritable,BitcoinTransaction> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
	/** Create reader **/
	try {
		return new BitcoinTransactionRecordReader( (FileSplit) split,job,reporter);
	} catch (HadoopCryptoLedgerConfigurationException|BitcoinBlockReadException e) {
		// log
		LOGFI.error(e);
	} 
	return null;
}
 
源代码15 项目: RDFS   文件: PipesNonJavaInputFormat.java
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
  // Delegate the generation of input splits to the 'original' InputFormat
  return ReflectionUtils.newInstance(
      job.getClass("mapred.pipes.user.inputformat", 
                   TextInputFormat.class, 
                   InputFormat.class), job).getSplits(job, numSplits);
}
 
源代码16 项目: parquet-mr   文件: ParquetRecordReaderWrapper.java
public ParquetRecordReaderWrapper(
    final ParquetInputFormat<ArrayWritable> newInputFormat,
    final InputSplit oldSplit,
    final JobConf oldJobConf,
    final Reporter reporter)
        throws IOException, InterruptedException {
  this(newInputFormat, oldSplit, oldJobConf, reporter,
      (new HiveBindingFactory()).create());
}
 
源代码17 项目: hadoop   文件: BinaryProtocol.java
public void runMap(InputSplit split, int numReduces, 
                   boolean pipedInput) throws IOException {
  WritableUtils.writeVInt(stream, MessageType.RUN_MAP.code);
  writeObject(split);
  WritableUtils.writeVInt(stream, numReduces);
  WritableUtils.writeVInt(stream, pipedInput ? 1 : 0);
}
 
源代码18 项目: incubator-tez   文件: TezGroupedSplit.java
public void addSplit(InputSplit split) {
  wrappedSplits.add(split);
  try {
    length += split.getLength();
  } catch (Exception e) {
    throw new TezUncheckedException(e);
  }
}
 
源代码19 项目: gemfirexd-oss   文件: GFInputFormat.java
@Override
public RecordReader<GFKey, PersistedEventImpl> getRecordReader(
    InputSplit split, JobConf job, Reporter reporter) throws IOException {

  CombineFileSplit cSplit = (CombineFileSplit) split;
  AbstractGFRecordReader reader = new AbstractGFRecordReader();
  reader.initialize(cSplit, job);
  return reader;
}
 
源代码20 项目: gemfirexd-oss   文件: FragmenterJUnitTest.java
public void _testGetFragmentsWithTestInputFormat() throws Exception {
    int tableIndex = 0;
    String tableName = TestDataHelper.TABLE_NAMES[tableIndex];
    //String inputDataPath = 
    InputSplit[] fs = TestDataHelper.getSplits(tableName);
/*
    InputData inputData = new InputData(TestDataHelper.populateInputDataParam(tableIndex), null);
    GemFireXDFragmenter fragmenter = new GemFireXDFragmenter(inputData,
        new TestGemFireXDManager(inputData));

    FragmentsOutput fragments = fragmenter.GetFragments();
    verifyFragments(fs, fragments);*/
  }
 
源代码21 项目: big-c   文件: CompositeInputSplit.java
/**
 * Add an InputSplit to this collection.
 * @throws IOException If capacity was not specified during construction
 *                     or if capacity has been reached.
 */
public void add(InputSplit s) throws IOException {
  if (null == splits) {
    throw new IOException("Uninitialized InputSplit");
  }
  if (fill == splits.length) {
    throw new IOException("Too many splits");
  }
  splits[fill++] = s;
  totsize += s.getLength();
}
 
源代码22 项目: RDFS   文件: NLineInputFormat.java
public RecordReader<LongWritable, Text> getRecordReader(
                                          InputSplit genericSplit,
                                          JobConf job,
                                          Reporter reporter) 
throws IOException {
  reporter.setStatus(genericSplit.toString());
  return new LineRecordReader(job, (FileSplit) genericSplit);
}
 
@Override
public RecordReader<MRContainer, MRContainer> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
    StormEvaluator.load_source_dir();  // load the parsed source parameters from a file
    String path = ((FileSplit)split).getPath().toString();
    ParsedDataSource ds = (ParsedDataSource)DataSource.get(path,Plan.conf);
    return new ParsedRecordReader((FileSplit)split,job,ds.parser,(Trees)ds.args);
}
 
源代码24 项目: Hive-XML-SerDe   文件: SplittableXmlInputFormat.java
@Override
public RecordReader<LongWritable, Text> getRecordReader(InputSplit inputSplit, JobConf job, Reporter reporter) throws IOException {

    InputStream inputStream = null;
    try {
        inputStream = getInputStream(job, (FileSplit) inputSplit);
    } catch (ClassNotFoundException e) {
        e.printStackTrace();
    }
    long start = ((FileSplit) inputSplit).getStart();
    long end = start + inputSplit.getLength();

    return new HiveXmlRecordReader(job, inputStream, start, end);
}
 
源代码25 项目: tez   文件: MRInputHelpers.java
@SuppressWarnings({ "rawtypes", "unchecked" })
private static org.apache.hadoop.mapred.InputSplit[] generateOldSplits(
    JobConf jobConf, boolean groupSplits, boolean sortSplits, int numTasks)
    throws IOException {

  // This is the real InputFormat
  org.apache.hadoop.mapred.InputFormat inputFormat;
  try {
    inputFormat = jobConf.getInputFormat();
  } catch (Exception e) {
    throw new TezUncheckedException(e);
  }

  org.apache.hadoop.mapred.InputFormat finalInputFormat = inputFormat;

  if (groupSplits) {
    org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat groupedFormat =
        new org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat();
    groupedFormat.setConf(jobConf);
    groupedFormat.setInputFormat(inputFormat);
    groupedFormat.setDesiredNumberOfSplits(numTasks);
    finalInputFormat = groupedFormat;
  } else {
    finalInputFormat = inputFormat;
  }
  org.apache.hadoop.mapred.InputSplit[] splits = finalInputFormat
      .getSplits(jobConf, jobConf.getNumMapTasks());
  if (sortSplits) {
    // sort the splits into order based on size, so that the biggest
    // go first
    Arrays.sort(splits, new OldInputSplitComparator());
  }
  return splits;
}
 
源代码26 项目: big-c   文件: CompositeInputSplit.java
/**
 * Collect a set of hosts from all child InputSplits.
 */
public String[] getLocations() throws IOException {
  HashSet<String> hosts = new HashSet<String>();
  for (InputSplit s : splits) {
    String[] hints = s.getLocations();
    if (hints != null && hints.length > 0) {
      for (String host : hints) {
        hosts.add(host);
      }
    }
  }
  return hosts.toArray(new String[hosts.size()]);
}
 
源代码27 项目: hadoop   文件: LoadGeneratorMR.java
public RecordReader<LongWritable, Text> getRecordReader(
    InputSplit ignored, JobConf conf, Reporter reporter) throws IOException {

  return new RecordReader<LongWritable, Text>() {

    boolean sentOneRecord = false;

    public boolean next(LongWritable key, Text value)
        throws IOException {
      key.set(1);
      value.set("dummy");
      if (sentOneRecord == false) { // first call
        sentOneRecord = true;
        return true;
      }
      return false; // we have sent one record - we are done
    }

    public LongWritable createKey() {
      return new LongWritable();
    }
    public Text createValue() {
      return new Text();
    }
    public long getPos() throws IOException {
      return 1;
    }
    public void close() throws IOException {
    }
    public float getProgress() throws IOException {
      return 1;
    }
  };
}
 
源代码28 项目: gemfirexd-oss   文件: GFInputFormat.java
@Override
public RecordReader<GFKey, PersistedEventImpl> getRecordReader(
    InputSplit split, JobConf job, Reporter reporter) throws IOException {

  CombineFileSplit cSplit = (CombineFileSplit) split;
  AbstractGFRecordReader reader = new AbstractGFRecordReader();
  reader.initialize(cSplit, job);
  return reader;
}
 
源代码29 项目: hive-dwrf   文件: TestInputOutputFormat.java
@Test
public void testEmptyFile() throws Exception {
  JobConf job = new JobConf(conf);
  Properties properties = new Properties();
  HiveOutputFormat<?, ?> outFormat = new OrcOutputFormat();
  FileSinkOperator.RecordWriter writer =
      outFormat.getHiveRecordWriter(conf, testFilePath, MyRow.class, true,
          properties, Reporter.NULL);
  writer.close(true);
  properties.setProperty("columns", "x,y");
  properties.setProperty("columns.types", "int:int");
  SerDe serde = new OrcSerde();
  serde.initialize(conf, properties);
  InputFormat<?,?> in = new OrcInputFormat();
  FileInputFormat.setInputPaths(conf, testFilePath.toString());
  InputSplit[] splits = in.getSplits(conf, 1);
  assertEquals(1, splits.length);

  // read the whole file
  conf.set("hive.io.file.readcolumn.ids", "0,1");
  org.apache.hadoop.mapred.RecordReader reader =
      in.getRecordReader(splits[0], conf, Reporter.NULL);
  Object key = reader.createKey();
  Object value = reader.createValue();
  assertEquals(0.0, reader.getProgress(), 0.00001);
  assertEquals(0, reader.getPos());
  assertEquals(false, reader.next(key, value));
  reader.close();
  assertEquals(null, serde.getSerDeStats());
}
 
源代码30 项目: hadoop   文件: DBInputFormat.java
/** {@inheritDoc} */
public InputSplit[] getSplits(JobConf job, int chunks) throws IOException {
  List<org.apache.hadoop.mapreduce.InputSplit> newSplits = 
    super.getSplits(Job.getInstance(job));
  InputSplit[] ret = new InputSplit[newSplits.size()];
  int i = 0;
  for (org.apache.hadoop.mapreduce.InputSplit s : newSplits) {
    org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit split = 
  	(org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit)s;
    ret[i++] = new DBInputSplit(split.getStart(), split.getEnd());
  }
  return ret;
}
 
 类所在包
 同包方法