org.apache.hadoop.mapred.FileSplit#getLength ( )源码实例Demo

下面列出了org.apache.hadoop.mapred.FileSplit#getLength ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: Hive-XML-SerDe   文件: SplittableXmlInputFormat.java
private InputStream getInputStream(JobConf jobConf, FileSplit split) throws IOException, ClassNotFoundException {
    FSDataInputStream fsin = null;

    // open the file and seek to the start of the split
    long splitStart = split.getStart();
    long splitEnd = splitStart + split.getLength();
    Path file = split.getPath();
    FileSystem fs = file.getFileSystem(jobConf);
    fsin = fs.open(split.getPath());
    fsin.seek(splitStart);

    Configuration conf = new Configuration();
    CompressionCodecFactory compressionCodecFactory = new CompressionCodecFactory(conf);
    CompressionCodec codec = compressionCodecFactory.getCodec(split.getPath());
    Decompressor decompressor = CodecPool.getDecompressor(codec);
    if (codec instanceof SplittableCompressionCodec) {
        return ((SplittableCompressionCodec) codec).createInputStream(fsin,
            decompressor,
            splitStart,
            splitEnd,
            SplittableCompressionCodec.READ_MODE.BYBLOCK);
    } else {
        return codec.createInputStream(fsin, decompressor);
    }
}
 
@Override
public RecordReader<NullWritable,ColumnAndIndex> getRecordReader( final InputSplit split, final JobConf job, final Reporter reporter ) throws IOException {
  FileSplit fileSplit = (FileSplit)split;
  Path path = fileSplit.getPath();
  FileSystem fs = path.getFileSystem( job );
  long fileLength = fs.getLength( path );
  long start = fileSplit.getStart();
  long length = fileSplit.getLength();
  InputStream in = fs.open( path );
  IJobReporter jobReporter = new HadoopJobReporter( reporter );
  jobReporter.setStatus( String.format( "Read file : %s" , path.toString() ) );
  HiveReaderSetting hiveConfig = new HiveReaderSetting( fileSplit , job );
  if ( hiveConfig.isVectorMode() ){
    IVectorizedReaderSetting vectorizedSetting = new HiveVectorizedReaderSetting( fileSplit , job , hiveConfig );
    return (RecordReader)new MDSHiveDirectVectorizedReader( in , fileLength , start , length , vectorizedSetting , jobReporter );
  }
  else{
    return new MDSHiveLineReader( in , fileLength , start , length , hiveConfig , jobReporter , spreadCounter );
  }
}
 
源代码3 项目: hadoop-gpu   文件: LineDocRecordReader.java
/**
 * Constructor
 * @param job
 * @param split  
 * @throws IOException
 */
public LineDocRecordReader(Configuration job, FileSplit split)
    throws IOException {
  long start = split.getStart();
  long end = start + split.getLength();
  final Path file = split.getPath();

  // open the file and seek to the start of the split
  FileSystem fs = file.getFileSystem(job);
  FSDataInputStream fileIn = fs.open(split.getPath());
  InputStream in = fileIn;
  boolean skipFirstLine = false;
  if (start != 0) {
    skipFirstLine = true; // wait till BufferedInputStream to skip
    --start;
    fileIn.seek(start);
  }

  this.in = new BufferedInputStream(in);
  if (skipFirstLine) { // skip first line and re-establish "start".
    start += LineDocRecordReader.readData(this.in, null, EOL);
  }
  this.start = start;
  this.pos = start;
  this.end = end;
}
 
public ParsedRecordReader ( FileSplit split,
    Configuration conf,
    Class<? extends Parser> parser_class,
    Trees args ) throws IOException {
    start = split.getStart();
    end = start + split.getLength();
    Path file = split.getPath();
    FileSystem fs = file.getFileSystem(conf);
    fsin = fs.open(split.getPath());
    try {
        parser = parser_class.newInstance();
    } catch (Exception ex) {
        throw new Error("Unrecognized parser:"+parser_class);
    };
    parser.initialize(args);
    parser.open(fsin,start,end);
    result = null;
}
 
源代码5 项目: ignite   文件: HadoopV1Splitter.java
/**
 * @param clsName Input split class name.
 * @param in Input stream.
 * @param hosts Optional hosts.
 * @return File block or {@code null} if it is not a {@link FileSplit} instance.
 * @throws IgniteCheckedException If failed.
 */
@Nullable public static HadoopFileBlock readFileBlock(String clsName, FSDataInputStream in,
    @Nullable String[] hosts) throws IgniteCheckedException {
    if (!FileSplit.class.getName().equals(clsName))
        return null;

    FileSplit split = U.newInstance(FileSplit.class);

    try {
        split.readFields(in);
    }
    catch (IOException e) {
        throw new IgniteCheckedException(e);
    }

    if (hosts == null)
        hosts = EMPTY_HOSTS;

    return new HadoopFileBlock(hosts, split.getPath().toUri(), split.getStart(), split.getLength());
}
 
源代码6 项目: Hive-XML-SerDe   文件: XmlInputFormat.java
public XmlRecordReader(FileSplit input, JobConf jobConf) throws IOException {
    Configuration conf = jobConf;
    this.startTag = conf.get(START_TAG_KEY).getBytes("utf-8");
    this.endTag = conf.get(END_TAG_KEY).getBytes("utf-8");
    FileSplit split = (FileSplit) input;

    Path file = split.getPath();
    CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(conf);
    CompressionCodec codec = compressionCodecs.getCodec(file);
    FileSystem fs = file.getFileSystem(conf);
    if (codec != null) {
        this.fsin = new DataInputStream(codec.createInputStream(fs.open(file)));
        //Data read only happens in first split and invalid other splits.
        //This is to avoid reading duplicate data for compressed files.
        this.start = (split.getStart() == 0) ? 0 : Long.MAX_VALUE;
        this.end = Long.MAX_VALUE;
    } else {
        this.start = split.getStart();
        this.end = this.start + split.getLength();
        FSDataInputStream fileIn = fs.open(file);
        fileIn.seek(this.start);
        this.fsin = fileIn;
    }
    this.recordStartPos = this.start;
    this.pos = this.start;
}
 
源代码7 项目: RDFS   文件: LineDocRecordReader.java
/**
 * Constructor
 * @param job
 * @param split  
 * @throws IOException
 */
public LineDocRecordReader(Configuration job, FileSplit split)
    throws IOException {
  long start = split.getStart();
  long end = start + split.getLength();
  final Path file = split.getPath();

  // open the file and seek to the start of the split
  FileSystem fs = file.getFileSystem(job);
  FSDataInputStream fileIn = fs.open(split.getPath());
  InputStream in = fileIn;
  boolean skipFirstLine = false;
  if (start != 0) {
    skipFirstLine = true; // wait till BufferedInputStream to skip
    --start;
    fileIn.seek(start);
  }

  this.in = new BufferedInputStream(in);
  if (skipFirstLine) { // skip first line and re-establish "start".
    start += LineDocRecordReader.readData(this.in, null, EOL);
  }
  this.start = start;
  this.pos = start;
  this.end = end;
}
 
源代码8 项目: hive-dwrf   文件: OrcInputFormat.java
@Override
public RecordReader<NullWritable, OrcLazyRow>
    getRecordReader(InputSplit inputSplit, JobConf conf,
                    Reporter reporter) throws IOException {
  ReaderWriterProfiler.setProfilerOptions(conf);
  FileSplit fileSplit = (FileSplit) inputSplit;
  Path path = fileSplit.getPath();
  FileSystem fs = path.getFileSystem(conf);
  reporter.setStatus(fileSplit.toString());

  return new OrcRecordReader(
      OrcFile.createReader(fs, path, conf),
      conf,
      fileSplit.getStart(),
      fileSplit.getLength()
  );
}
 
源代码9 项目: pxf   文件: JsonRecordReader.java
/**
 * Create new multi-line json object reader.
 *
 * @param conf  Hadoop context
 * @param split HDFS split to start the reading from
 * @throws IOException IOException when reading the file
 */
public JsonRecordReader(JobConf conf, FileSplit split) throws IOException {

    this.jsonMemberName = conf.get(RECORD_MEMBER_IDENTIFIER);
    this.maxObjectLength = conf.getInt(RECORD_MAX_LENGTH, Integer.MAX_VALUE);

    start = split.getStart();
    end = start + split.getLength();
    final Path file = split.getPath();
    compressionCodecs = new CompressionCodecFactory(conf);
    final CompressionCodec codec = compressionCodecs.getCodec(file);

    // openForWrite the file and seek to the start of the split
    FileSystem fs = file.getFileSystem(conf);
    FSDataInputStream fileIn = fs.open(split.getPath());
    if (codec != null) {
        is = codec.createInputStream(fileIn);
        start = 0;
        end = Long.MAX_VALUE;
    } else {
        if (start != 0) {
            fileIn.seek(start);
        }
        is = fileIn;
    }
    parser = new PartitionedJsonParser(is);
    this.pos = start;
}
 
源代码10 项目: hudi   文件: HoodieRealtimeFileSplit.java
public HoodieRealtimeFileSplit(FileSplit baseSplit, String basePath, List<String> deltaLogPaths, String maxCommitTime)
    throws IOException {
  super(baseSplit.getPath(), baseSplit.getStart(), baseSplit.getLength(), baseSplit.getLocations());
  this.deltaLogPaths = deltaLogPaths;
  this.maxCommitTime = maxCommitTime;
  this.basePath = basePath;
}
 
源代码11 项目: anthelion   文件: ArcRecordReader.java
/**
 * Constructor that sets the configuration and file split.
 * 
 * @param conf The job configuration.
 * @param split The file split to read from.
 * 
 * @throws IOException  If an IO error occurs while initializing file split.
 */
public ArcRecordReader(Configuration conf, FileSplit split)
  throws IOException {

  Path path = split.getPath();
  FileSystem fs = path.getFileSystem(conf);
  fileLen = fs.getFileStatus(split.getPath()).getLen();
  this.conf = conf;
  this.in = fs.open(split.getPath());
  this.splitStart = split.getStart();
  this.splitEnd = splitStart + split.getLength();
  this.splitLen = split.getLength();
  in.seek(splitStart);
}
 
源代码12 项目: parquet-mr   文件: ParquetRecordReaderWrapper.java
/**
 * gets a ParquetInputSplit corresponding to a split given by Hive
 *
 * @param oldSplit The split given by Hive
 * @param conf The JobConf of the Hive job
 * @return a ParquetInputSplit corresponding to the oldSplit
 * @throws IOException if the config cannot be enhanced or if the footer cannot be read from the file
 */
protected ParquetInputSplit getSplit(
    final InputSplit oldSplit,
    final JobConf conf
    ) throws IOException {
  if (oldSplit instanceof FileSplit) {
    FileSplit fileSplit = (FileSplit) oldSplit;
    final long splitStart = fileSplit.getStart();
    final long splitLength = fileSplit.getLength();
    final Path finalPath = fileSplit.getPath();
    final JobConf cloneJob = hiveBinding.pushProjectionsAndFilters(conf, finalPath.getParent());

    final ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(cloneJob, finalPath, SKIP_ROW_GROUPS);
    final FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
    final ReadContext readContext =
        new DataWritableReadSupport()
          .init(cloneJob, fileMetaData.getKeyValueMetaData(), fileMetaData.getSchema());

    schemaSize = MessageTypeParser.parseMessageType(
          readContext.getReadSupportMetadata().get(DataWritableReadSupport.HIVE_SCHEMA_KEY)
        ).getFieldCount();
     return new ParquetInputSplit(
              finalPath,
              splitStart,
              splitStart + splitLength,
              splitLength,
              fileSplit.getLocations(),
              null);
  } else {
    throw new IllegalArgumentException("Unknown split type: " + oldSplit);
  }
}
 
protected AvroAsTextRecordReaderCopy(FileReader<T> reader, FileSplit split)
        throws IOException {

    this.reader = reader;
    reader.sync(split.getStart());                    // sync to start
    this.start = reader.tell();
    this.end = split.getStart() + split.getLength();
}
 
源代码14 项目: nutch-htmlunit   文件: ArcRecordReader.java
/**
 * Constructor that sets the configuration and file split.
 * 
 * @param conf The job configuration.
 * @param split The file split to read from.
 * 
 * @throws IOException  If an IO error occurs while initializing file split.
 */
public ArcRecordReader(Configuration conf, FileSplit split)
  throws IOException {

  Path path = split.getPath();
  FileSystem fs = path.getFileSystem(conf);
  fileLen = fs.getFileStatus(split.getPath()).getLen();
  this.conf = conf;
  this.in = fs.open(split.getPath());
  this.splitStart = split.getStart();
  this.splitEnd = splitStart + split.getLength();
  this.splitLen = split.getLength();
  in.seek(splitStart);
}
 
源代码15 项目: kylin-on-parquet-v2   文件: HiveMRInput.java
@Override
public String getInputSplitSignature(InputSplit inputSplit) {
    FileSplit baseSplit = (FileSplit) ((HCatSplit) inputSplit).getBaseSplit();
    //file name(for intermediate table) + start pos + length
    return baseSplit.getPath().getName() + "_" + baseSplit.getStart() + "_" + baseSplit.getLength();
}
 
源代码16 项目: pxf   文件: ChunkRecordReader.java
/**
 * Constructs a ChunkRecordReader instance.
 *
 * @param job the job configuration
 * @param split contains the file name, begin byte of the split and the
 *            bytes length
 * @throws IOException if an I/O error occurs when accessing the file or
 *             creating input stream to read from it
 */
public ChunkRecordReader(Configuration job, FileSplit split)
        throws IOException, IncompatibleInputStreamException {
    maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
    validateLength(maxLineLength);
    start = split.getStart();
    end = start + split.getLength();
    final Path file = split.getPath();
    compressionCodecs = new CompressionCodecFactory(job);
    codec = compressionCodecs.getCodec(file);

    // openForWrite the file and seek to the start of the split
    final FileSystem fs = file.getFileSystem(job);
    fileIn = fs.open(file, ChunkReader.DEFAULT_BUFFER_SIZE);
    fileLength = getInputStream().getFileLength();
    if (isCompressedInput()) {
        decompressor = CodecPool.getDecompressor(codec);
        if (codec instanceof SplittableCompressionCodec) {
            final SplitCompressionInputStream cIn = ((SplittableCompressionCodec) codec).createInputStream(
                    fileIn, decompressor, start, end,
                    SplittableCompressionCodec.READ_MODE.BYBLOCK);
            in = new ChunkReader(cIn);
            start = cIn.getAdjustedStart();
            end = cIn.getAdjustedEnd();
            filePosition = cIn; // take pos from compressed stream
        } else {
            in = new ChunkReader(codec.createInputStream(fileIn,
                    decompressor));
            filePosition = fileIn;
        }
    } else {
        fileIn.seek(start);
        in = new ChunkReader(fileIn);
        filePosition = fileIn;
    }
    /*
     * If this is not the first split, we always throw away first record
     * because we always (except the last split) read one extra line in
     * next() method.
     */
    if (start != 0) {
        start += in.readLine(new ChunkWritable(), maxBytesToConsume(start));
    }
    this.pos = start;
}
 
/**
* Creates an Abstract Record Reader for Ethereum blocks
* @param split Split to use (assumed to be a file split)
* @param job Configuration:
 * io.file.buffer.size: Size of in-memory  specified in the given Configuration. If io.file.buffer.size is not specified the default buffersize will be used. Furthermore, one may specify hadoopcryptoledger.ethereumblockinputformat.maxblocksize, which defines the maximum size a Ethereum block may have. By default it is 1M). If you want to experiment with performance using DirectByteBuffer instead of HeapByteBuffer you can use "hadoopcryptoledeger.ethereumblockinputformat.usedirectbuffer" (default: false). Note that it might have some unwanted consequences such as circumwenting Yarn memory management. The option is experimental and might be removed in future versions. 
* @param reporter Reporter
*
*
* @throws java.io.IOException in case of errors reading from the filestream provided by Hadoop
*
*/
public AbstractEthereumRecordReader(FileSplit split,JobConf job, Reporter reporter) throws IOException {
    LOG.debug("Reading configuration");
    // parse configuration
     this.reporter=reporter;
     this.conf=job;	
	this.maxSizeEthereumBlock=conf.getInt(AbstractEthereumRecordReader.CONF_MAXBLOCKSIZE,AbstractEthereumRecordReader.DEFAULT_MAXSIZE_ETHEREUMBLOCK);
	this.bufferSize=conf.getInt(AbstractEthereumRecordReader.CONF_BUFFERSIZE,AbstractEthereumRecordReader.DEFAULT_BUFFERSIZE);

	this.useDirectBuffer=conf.getBoolean(AbstractEthereumRecordReader.CONF_USEDIRECTBUFFER,AbstractEthereumRecordReader.DEFAULT_USEDIRECTBUFFER);
    // Initialize start and end of split
    start = split.getStart();
    end = start + split.getLength();
    final Path file = split.getPath();
    codec = new CompressionCodecFactory(job).getCodec(file);
    final FileSystem fs = file.getFileSystem(job);
    fileIn = fs.open(file);
    // open stream
      if (isCompressedInput()) { // decompress
	LOG.debug("Decompressing file");
      	decompressor = CodecPool.getDecompressor(codec);
      	if (codec instanceof SplittableCompressionCodec) {
		LOG.debug("SplittableCompressionCodec");
        	final SplitCompressionInputStream cIn =((SplittableCompressionCodec)codec).createInputStream(fileIn, decompressor, start, end,SplittableCompressionCodec.READ_MODE.CONTINUOUS);
        	ebr = new EthereumBlockReader(cIn, this.maxSizeEthereumBlock,this.bufferSize,this.useDirectBuffer);
  			start = cIn.getAdjustedStart();
       		end = cIn.getAdjustedEnd();
        	filePosition = cIn; // take pos from compressed stream
      } else {
	LOG.debug("Not-splitable compression codec");
  	ebr = new EthereumBlockReader(codec.createInputStream(fileIn,decompressor), this.maxSizeEthereumBlock,this.bufferSize,this.useDirectBuffer);
            filePosition = fileIn;
      }
    } else {
      LOG.debug("Processing file without compression");
      fileIn.seek(start);
      ebr = new EthereumBlockReader(fileIn, this.maxSizeEthereumBlock,this.bufferSize,this.useDirectBuffer);
	     filePosition = fileIn;
    }
    // initialize reader

    this.reporter.setStatus("Ready to read");
}
 
/**
* Creates an Abstract Record Reader for Bitcoin blocks
* @param split Split to use (assumed to be a file split)
* @param job Configuration:
* io.file.buffer.size: Size of in-memory  specified in the given Configuration. If io.file.buffer.size is not specified the default buffersize (maximum size of a bitcoin block) will be used. The configuration hadoopcryptoledger.bitcoinblockinputformat.filter.magic allows specifying the magic identifier of the block. The magic is a comma-separated list of Hex-values (e.g. F9BEB4D9,FABFB5DA,0B110907,0B110907). The default magic is always F9BEB4D9. One needs to specify at least one magic, otherwise it will be difficult to find blocks in splits. Furthermore, one may specify hadoopcryptoledger.bitcoinblockinputformat.maxblocksize, which defines the maximum size a bitcoin block may have. By default it is 8M). If you want to experiment with performance using DirectByteBuffer instead of HeapByteBuffer you can use "hadoopcryptoledeger.bitcoinblockinputformat.usedirectbuffer" (default: false). Note that it might have some unwanted consequences such as circumwenting Yarn memory management. The option is experimental and might be removed in future versions. 
* @param reporter Reporter
*
*
* @throws java.io.IOException in case of errors reading from the filestream provided by Hadoop
* @throws org.zuinnote.hadoop.bitcoin.format.exception.HadoopCryptoLedgerConfigurationException in case of an invalid HadoopCryptoLedger-specific configuration of the inputformat
* @throws org.zuinnote.hadoop.bitcoin.format.exception.BitcoinBlockReadException in case the Bitcoin data contains invalid blocks (e.g. magic might be different)
*
*/
public AbstractBitcoinRecordReader(FileSplit split,JobConf job, Reporter reporter) throws IOException,HadoopCryptoLedgerConfigurationException,BitcoinBlockReadException {
    LOG.debug("Reading configuration");
    // parse configuration
     this.reporter=reporter;
     this.conf=job;	
	this.maxSizeBitcoinBlock=conf.getInt(AbstractBitcoinRecordReader.CONF_MAXBLOCKSIZE,AbstractBitcoinRecordReader.DEFAULT_MAXSIZE_BITCOINBLOCK);
	this.bufferSize=conf.getInt(AbstractBitcoinRecordReader.CONF_BUFFERSIZE,AbstractBitcoinRecordReader.DEFAULT_BUFFERSIZE);
	this.specificMagic=conf.get(AbstractBitcoinRecordReader.CONF_FILTERMAGIC);
	// we need to provide at least 
	if ((this.specificMagic==null) || (this.specificMagic.length()==0)) {
		 this.specificMagic=AbstractBitcoinRecordReader.DEFAULT_MAGIC;
	}
	if ((this.specificMagic!=null) && (this.specificMagic.length()>0)) {
		this.specificMagicStringArray=specificMagic.split(",");
		specificMagicByteArray=new byte[specificMagicStringArray.length][4]; // each magic is always 4 byte
		for (int i=0;i<specificMagicStringArray.length;i++) {
				byte[] currentMagicNo=BitcoinUtil.convertHexStringToByteArray(specificMagicStringArray[i]);
				if (currentMagicNo.length!=4) {
					throw new HadoopCryptoLedgerConfigurationException("Error: Configuration. Magic number has not a length of 4 bytes. Index: "+i);
				}
				specificMagicByteArray[i]=currentMagicNo;
		}
	}	
	this.useDirectBuffer=conf.getBoolean(AbstractBitcoinRecordReader.CONF_USEDIRECTBUFFER,AbstractBitcoinRecordReader.DEFAULT_USEDIRECTBUFFER);
	this.readAuxPOW=conf.getBoolean(AbstractBitcoinRecordReader.CONF_READAUXPOW,AbstractBitcoinRecordReader.DEFAULT_READAUXPOW);
    // Initialize start and end of split
    start = split.getStart();
    end = start + split.getLength();
    final Path file = split.getPath();
    codec = new CompressionCodecFactory(job).getCodec(file);
    final FileSystem fs = file.getFileSystem(job);
    fileIn = fs.open(file);
    // open stream
      if (isCompressedInput()) { // decompress
	LOG.debug("Decompressing file");
      	decompressor = CodecPool.getDecompressor(codec);
      	if (codec instanceof SplittableCompressionCodec) {
		LOG.debug("SplittableCompressionCodec");
        	final SplitCompressionInputStream cIn =((SplittableCompressionCodec)codec).createInputStream(fileIn, decompressor, start, end,SplittableCompressionCodec.READ_MODE.CONTINUOUS);
		bbr = new BitcoinBlockReader(cIn, this.maxSizeBitcoinBlock,this.bufferSize,this.specificMagicByteArray,this.useDirectBuffer,this.readAuxPOW);  
		start = cIn.getAdjustedStart();
       		end = cIn.getAdjustedEnd();
        	filePosition = cIn; // take pos from compressed stream
      } else {
	LOG.debug("Not-splitable compression codec");
	bbr = new BitcoinBlockReader(codec.createInputStream(fileIn,decompressor), this.maxSizeBitcoinBlock,this.bufferSize,this.specificMagicByteArray,this.useDirectBuffer,this.readAuxPOW);
        filePosition = fileIn;
      }
    } else {
      LOG.debug("Processing file without compression");
      fileIn.seek(start);
      bbr = new BitcoinBlockReader(fileIn, this.maxSizeBitcoinBlock,this.bufferSize,this.specificMagicByteArray,this.useDirectBuffer,this.readAuxPOW);  
      filePosition = fileIn;
    }
    // initialize reader
    // seek to block start (for the case a block overlaps a split)
    LOG.debug("Seeking to block start");
    this.reporter.setStatus("Seeking Block start");
    bbr.seekBlockStart();
    this.reporter.setStatus("Ready to read");
}
 
源代码19 项目: CopybookInputFormat   文件: CopybookRecordReader.java
public CopybookRecordReader(FileSplit genericSplit, JobConf job)
    throws IOException {
  try {
    String cblPath = job.get(Const.COPYBOOK_INPUTFORMAT_CBL_HDFS_PATH_CONF);

    if (cblPath == null) {
      if (job != null) {
        MapWork mrwork = Utilities.getMapWork(job);

        if (mrwork == null) {
          System.out.println("When running a client side hive job you have to set \"copybook.inputformat.cbl.hdfs.path\" before executing the query." );
          System.out.println("When running a MR job we can get this from the hive TBLProperties" );
        }
        Map<String, PartitionDesc> map = mrwork.getPathToPartitionInfo();
        
        for (Map.Entry<String, PartitionDesc> pathsAndParts : map.entrySet()) {
          System.out.println("Hey");
          Properties props = pathsAndParts.getValue().getProperties();
          cblPath = props
              .getProperty(Const.COPYBOOK_INPUTFORMAT_CBL_HDFS_PATH_CONF);
          break;
        }
      }
    }

    FileSystem fs = FileSystem.get(job);
    BufferedInputStream inputStream = new BufferedInputStream(
        fs.open(new Path(cblPath)));
    CobolCopybookLoader copybookInt = new CobolCopybookLoader();
    externalRecord = copybookInt
        .loadCopyBook(inputStream, "RR", CopybookLoader.SPLIT_NONE, 0,
            "cp037", Convert.FMT_MAINFRAME, 0, null);

    int fileStructure = Constants.IO_FIXED_LENGTH;

    for (ExternalField field : externalRecord.getRecordFields()) {
      recordByteLength += field.getLen();
    }

    // jump to the point in the split that the first whole record of split
    // starts at
    FileSplit split = (FileSplit) genericSplit;

    start = split.getStart();
    end = start + split.getLength();
    final Path file = split.getPath();

    BufferedInputStream fileIn = new BufferedInputStream(fs.open(split
        .getPath()));

    if (start != 0) {
      pos = start - (start % recordByteLength) + recordByteLength;

      fileIn.skip(pos);
    }

    ret = LineIOProvider.getInstance().getLineReader(
        fileStructure,
        LineIOProvider.getInstance().getLineProvider(fileStructure));

    ret.open(fileIn, externalRecord);
  } catch (Exception e) {
    e.printStackTrace();
  } 

}
 
源代码20 项目: kangaroo   文件: S3InputFormatUtils.java
@Override
public org.apache.hadoop.mapreduce.InputSplit apply(final InputSplit input) {
    final FileSplit split = (FileSplit) input;
    return new org.apache.hadoop.mapreduce.lib.input.FileSplit(split.getPath(), split.getStart(),
            split.getLength(), S3_SPLIT_HOST);
}