类org.apache.hadoop.fs.FSDataInputStream源码实例Demo

下面列出了怎么用org.apache.hadoop.fs.FSDataInputStream的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: big-c   文件: TestSmallBlock.java
private void checkFile(FileSystem fileSys, Path name) throws IOException {
  BlockLocation[] locations = fileSys.getFileBlockLocations(
      fileSys.getFileStatus(name), 0, fileSize);
  assertEquals("Number of blocks", fileSize, locations.length);
  FSDataInputStream stm = fileSys.open(name);
  byte[] expected = new byte[fileSize];
  if (simulatedStorage) {
    for (int i = 0; i < expected.length; ++i) {  
      expected[i] = SimulatedFSDataset.DEFAULT_DATABYTE;
    }
  } else {
    Random rand = new Random(seed);
    rand.nextBytes(expected);
  }
  // do a sanity check. Read the file
  byte[] actual = new byte[fileSize];
  stm.readFully(0, actual);
  checkAndEraseData(actual, 0, expected, "Read Sanity Test");
  stm.close();
}
 
源代码2 项目: spork   文件: Bzip2TextInputFormat.java
public BZip2LineRecordReader(Configuration job, FileSplit split)
throws IOException {
    start = split.getStart();
    end = start + split.getLength();
    final Path file = split.getPath();

    // open the file and seek to the start of the split
    FileSystem fs = file.getFileSystem(job);
    FSDataInputStream fileIn = fs.open(split.getPath());
    fileIn.seek(start);

    in = new CBZip2InputStream(fileIn, 9, end);
    if (start != 0) {
        // skip first line and re-establish "start".
        // LineRecordReader.readLine(this.in, null);
        readLine(this.in, null);
        start = in.getPos();
    }
    pos = in.getPos();
}
 
源代码3 项目: RDFS   文件: TestDataJoin.java
private static void confirmOutput(Path out, JobConf job, int srcs)
    throws IOException {
  FileSystem fs = out.getFileSystem(job);
  FileStatus[] outlist = fs.listStatus(out);
  assertEquals(1, outlist.length);
  assertTrue(0 < outlist[0].getLen());
  FSDataInputStream in = fs.open(outlist[0].getPath());
  LineRecordReader rr = new LineRecordReader(in, 0, Integer.MAX_VALUE, job);
  LongWritable k = new LongWritable();
  Text v = new Text();
  int count = 0;
  while (rr.next(k, v)) {
    String[] vals = v.toString().split("\t");
    assertEquals(srcs + 1, vals.length);
    int[] ivals = new int[vals.length];
    for (int i = 0; i < vals.length; ++i)
      ivals[i] = Integer.parseInt(vals[i]);
    assertEquals(0, ivals[0] % (srcs * srcs));
    for (int i = 1; i < vals.length; ++i) {
      assertEquals((ivals[i] - (i - 1)) * srcs, 10 * ivals[0]);
    }
    ++count;
  }
  assertEquals(4, count);
}
 
源代码4 项目: RDFS   文件: DistributedRaidFileSystem.java
@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
  // We want to use RAID logic only on instance of DFS.
  if (fs instanceof DistributedFileSystem) {
    DistributedFileSystem underlyingDfs = (DistributedFileSystem) fs;
    LocatedBlocks lbs =
        underlyingDfs.getLocatedBlocks(f, 0L, Long.MAX_VALUE);
    if (lbs != null) {
      // Use underlying filesystem if the file is under construction.
      if (!lbs.isUnderConstruction()) {
        // Use underlying filesystem if file length is 0.
        final long fileSize = getFileSize(lbs);
        if (fileSize > 0) {
          return new ExtFSDataInputStream(conf, this, f,
            fileSize, getBlockSize(lbs), bufferSize);
        }
      }
    }
  }
  return fs.open(f, bufferSize);
}
 
源代码5 项目: hadoop   文件: TestDataJoin.java
private static void confirmOutput(Path out, JobConf job, int srcs)
    throws IOException {
  FileSystem fs = out.getFileSystem(job);
  FileStatus[] outlist = fs.listStatus(out);
  assertEquals(1, outlist.length);
  assertTrue(0 < outlist[0].getLen());
  FSDataInputStream in = fs.open(outlist[0].getPath());
  LineRecordReader rr = new LineRecordReader(in, 0, Integer.MAX_VALUE, job);
  LongWritable k = new LongWritable();
  Text v = new Text();
  int count = 0;
  while (rr.next(k, v)) {
    String[] vals = v.toString().split("\t");
    assertEquals(srcs + 1, vals.length);
    int[] ivals = new int[vals.length];
    for (int i = 0; i < vals.length; ++i)
      ivals[i] = Integer.parseInt(vals[i]);
    assertEquals(0, ivals[0] % (srcs * srcs));
    for (int i = 1; i < vals.length; ++i) {
      assertEquals((ivals[i] - (i - 1)) * srcs, 10 * ivals[0]);
    }
    ++count;
  }
  assertEquals(4, count);
}
 
源代码6 项目: mt-flume   文件: AvroEventSerializer.java
private Schema loadFromUrl(String schemaUrl) throws IOException {
  Configuration conf = new Configuration();
  Schema.Parser parser = new Schema.Parser();
  if (schemaUrl.toLowerCase().startsWith("hdfs://")) {
    FileSystem fs = FileSystem.get(conf);
    FSDataInputStream input = null;
    try {
      input = fs.open(new Path(schemaUrl));
      return parser.parse(input);
    } finally {
      if (input != null) {
        input.close();
      }
    }
  } else {
    InputStream is = null;
    try {
      is = new URL(schemaUrl).openStream();
      return parser.parse(is);
    } finally {
      if (is != null) {
        is.close();
      }
    }
  }
}
 
源代码7 项目: big-c   文件: AppendTestUtil.java
public static void check(FileSystem fs, Path p, long length) throws IOException {
  int i = -1;
  try {
    final FileStatus status = fs.getFileStatus(p);
    FSDataInputStream in = fs.open(p);
    if (in.getWrappedStream() instanceof DFSInputStream) {
      long len = ((DFSInputStream)in.getWrappedStream()).getFileLength();
      assertEquals(length, len);
    } else {
      assertEquals(length, status.getLen());
    }
    
    for(i++; i < length; i++) {
      assertEquals((byte)i, (byte)in.read());  
    }
    i = -(int)length;
    assertEquals(-1, in.read()); //EOF  
    in.close();
  } catch(IOException ioe) {
    throw new IOException("p=" + p + ", length=" + length + ", i=" + i, ioe);
  }
}
 
源代码8 项目: RDFS   文件: TestHftpFileSystem.java
/**
 * Tests isUnderConstruction() functionality.
 */
public void testIsUnderConstruction() throws Exception {
  // Open output file stream.
  FSDataOutputStream out = hdfs.create(TEST_FILE, true);
  out.writeBytes("test");
  
  // Test file under construction.
  FSDataInputStream in1 = hftpFs.open(TEST_FILE);
  assertTrue(in1.isUnderConstruction());
  in1.close();
  
  // Close output file stream.
  out.close();
  
  // Test file not under construction.
  FSDataInputStream in2 = hftpFs.open(TEST_FILE);
  assertFalse(in2.isUnderConstruction());
  in2.close();
}
 
源代码9 项目: hadoop   文件: TestWebHDFS.java
/** test seek */
static void verifySeek(FileSystem fs, Path p, long offset, long length,
    byte[] buf, byte[] expected) throws IOException { 
  long remaining = length - offset;
  long checked = 0;
  LOG.info("XXX SEEK: offset=" + offset + ", remaining=" + remaining);

  final Ticker t = new Ticker("SEEK", "offset=%d, remaining=%d",
      offset, remaining);
  final FSDataInputStream in = fs.open(p, 64 << 10);
  in.seek(offset);
  for(; remaining > 0; ) {
    t.tick(checked, "offset=%d, remaining=%d", offset, remaining);
    final int n = (int)Math.min(remaining, buf.length);
    in.readFully(buf, 0, n);
    checkData(offset, remaining, n, buf, expected);

    offset += n;
    remaining -= n;
    checked += n;
  }
  in.close();
  t.end(checked);
}
 
源代码10 项目: hadoop   文件: TestMerger.java
private void readOnDiskMapOutput(Configuration conf, FileSystem fs, Path path,
    List<String> keys, List<String> values) throws IOException {
  FSDataInputStream in = CryptoUtils.wrapIfNecessary(conf, fs.open(path));

  IFile.Reader<Text, Text> reader = new IFile.Reader<Text, Text>(conf, in,
      fs.getFileStatus(path).getLen(), null, null);
  DataInputBuffer keyBuff = new DataInputBuffer();
  DataInputBuffer valueBuff = new DataInputBuffer();
  Text key = new Text();
  Text value = new Text();
  while (reader.nextRawKey(keyBuff)) {
    key.readFields(keyBuff);
    keys.add(key.toString());
    reader.nextRawValue(valueBuff);
    value.readFields(valueBuff);
    values.add(value.toString());
  }
}
 
源代码11 项目: tez   文件: SplitMetaInfoReaderTez.java
public static TaskSplitMetaInfo[] readSplitMetaInfo(Configuration conf,
    FileSystem fs) throws IOException {
  FSDataInputStream in = null;
  try {
    in = getFSDataIS(conf, fs);
    final String jobSplitFile = MRJobConfig.JOB_SPLIT;
    final String basePath = conf.get(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR, ".");
    int numSplits = WritableUtils.readVInt(in); // TODO: check for insane values
    JobSplit.TaskSplitMetaInfo[] allSplitMetaInfo = new JobSplit.TaskSplitMetaInfo[numSplits];
    for (int i = 0; i < numSplits; i++) {
      JobSplit.SplitMetaInfo splitMetaInfo = new JobSplit.SplitMetaInfo();
      splitMetaInfo.readFields(in);
      JobSplit.TaskSplitIndex splitIndex = new JobSplit.TaskSplitIndex(
          new Path(basePath, jobSplitFile)
              .toUri().toString(), splitMetaInfo.getStartOffset());
      allSplitMetaInfo[i] = new JobSplit.TaskSplitMetaInfo(splitIndex,
          splitMetaInfo.getLocations(), splitMetaInfo.getInputDataLength());
    }
    return allSplitMetaInfo;
  } finally {
    if (in != null) {
      in.close();
    }
  }
}
 
源代码12 项目: hbase   文件: BlockIOUtils.java
/**
 * Read from an input stream at least <code>necessaryLen</code> and if possible,
 * <code>extraLen</code> also if available. Analogous to
 * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses positional read and
 * specifies a number of "extra" bytes that would be desirable but not absolutely necessary to
 * read.
 * @param buff ByteBuff to read into.
 * @param dis the input stream to read from
 * @param position the position within the stream from which to start reading
 * @param necessaryLen the number of bytes that are absolutely necessary to read
 * @param extraLen the number of extra bytes that would be nice to read
 * @return true if and only if extraLen is > 0 and reading those extra bytes was successful
 * @throws IOException if failed to read the necessary bytes
 */
public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position,
    int necessaryLen, int extraLen) throws IOException {
  int remain = necessaryLen + extraLen;
  byte[] buf = new byte[remain];
  int bytesRead = 0;
  while (bytesRead < necessaryLen) {
    int ret = dis.read(position + bytesRead, buf, bytesRead, remain);
    if (ret < 0) {
      throw new IOException("Premature EOF from inputStream (positional read returned " + ret
          + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen
          + " extra bytes, successfully read " + bytesRead);
    }
    bytesRead += ret;
    remain -= ret;
  }
  // Copy the bytes from on-heap bytes[] to ByteBuffer[] now, and after resolving HDFS-3246, we
  // will read the bytes to ByteBuffer[] directly without allocating any on-heap byte[].
  // TODO I keep the bytes copy here, because I want to abstract the ByteBuffer[]
  // preadWithExtra method for the upper layer, only need to refactor this method if the
  // ByteBuffer pread is OK.
  copyToByteBuff(buf, 0, bytesRead, buff);
  return (extraLen > 0) && (bytesRead == necessaryLen + extraLen);
}
 
源代码13 项目: incubator-gobblin   文件: FSSpecStoreTest.java
@Override
protected Spec readSpecFromFile(Path path) throws IOException {
  if (path.getName().contains("fail")) {
    throw new IOException("Mean to fail in the test");
  } else if (path.getName().contains("serDeFail")) {

    // Simulate the way that a serDe exception
    FSDataInputStream fis = fs.open(path);
    SerializationUtils.deserialize(ByteStreams.toByteArray(fis));

    // This line should never be reached since we generate SerDe Exception on purpose.
    Assert.assertTrue(false);
    return null;
  }
  else return initFlowSpec(Files.createTempDir().getAbsolutePath());
}
 
源代码14 项目: hadoop-gpu   文件: BCFile.java
public RBlockState(Algorithm compressionAlgo, FSDataInputStream fsin,
    BlockRegion region, Configuration conf) throws IOException {
  this.compressAlgo = compressionAlgo;
  this.region = region;
  this.decompressor = compressionAlgo.getDecompressor();

  try {
    this.in =
        compressAlgo
            .createDecompressionStream(new BoundedRangeFileInputStream(
                fsin, this.region.getOffset(), this.region
                    .getCompressedSize()), decompressor, TFile
                .getFSInputBufferSize(conf));
  } catch (IOException e) {
    compressAlgo.returnDecompressor(decompressor);
    throw e;
  }
}
 
源代码15 项目: parquet-mr   文件: TestHadoop2ByteBufferReads.java
@Test
public void testDirectReadFullyLargeBuffer() throws Exception {
  final ByteBuffer readBuffer = ByteBuffer.allocateDirect(20);

  FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream());
  final MockBufferReader reader = new MockBufferReader(hadoopStream);

  TestUtils.assertThrows("Should throw EOFException",
      EOFException.class, () -> {
        H2SeekableInputStream.readFully(reader, readBuffer);
        return null;
      });

  // NOTE: This behavior differs from readFullyHeapBuffer because direct uses
  // several read operations that will read up to the end of the input. This
  // is a correct value because the bytes in the buffer are valid. This
  // behavior can't be implemented for the heap buffer without using the read
  // method instead of the readFully method on the underlying
  // FSDataInputStream.
  Assert.assertEquals(10, readBuffer.position());
  Assert.assertEquals(20, readBuffer.limit());
}
 
源代码16 项目: big-c   文件: StreamFile.java
/**
 * Send a partial content response with the given range. If there are
 * no satisfiable ranges, or if multiple ranges are requested, which
 * is unsupported, respond with range not satisfiable.
 *
 * @param in stream to read from
 * @param out stream to write to
 * @param response http response to use
 * @param contentLength for the response header
 * @param ranges to write to respond with
 * @throws IOException on error sending the response
 */
static void sendPartialData(FSDataInputStream in,
                            OutputStream out,
                            HttpServletResponse response,
                            long contentLength,
                            List<InclusiveByteRange> ranges)
    throws IOException {
  if (ranges == null || ranges.size() != 1) {
    response.setContentLength(0);
    response.setStatus(HttpServletResponse.SC_REQUESTED_RANGE_NOT_SATISFIABLE);
    response.setHeader("Content-Range",
              InclusiveByteRange.to416HeaderRangeString(contentLength));
  } else {
    InclusiveByteRange singleSatisfiableRange = ranges.get(0);
    long singleLength = singleSatisfiableRange.getSize(contentLength);
    response.setStatus(HttpServletResponse.SC_PARTIAL_CONTENT);
    response.setHeader("Content-Range", 
      singleSatisfiableRange.toHeaderRangeString(contentLength));
    copyFromOffset(in, out,
                   singleSatisfiableRange.getFirst(contentLength),
                   singleLength);
  }
}
 
源代码17 项目: RDFS   文件: TestFileLocalRead.java
static void checkFullFile(FileSystem fs, Path name) throws IOException {
  FileStatus stat = fs.getFileStatus(name);
  BlockLocation[] locations = fs.getFileBlockLocations(stat, 0, 
                                                       fileSize);
  for (int idx = 0; idx < locations.length; idx++) {
    String[] hosts = locations[idx].getNames();
    for (int i = 0; i < hosts.length; i++) {
      System.out.print( hosts[i] + " ");
    }
    System.out.println(" off " + locations[idx].getOffset() +
                       " len " + locations[idx].getLength());
  }

  byte[] expected = AppendTestUtil.randomBytes(seed, fileSize);
  FSDataInputStream stm = fs.open(name);
  byte[] actual = new byte[fileSize];
  stm.readFully(0, actual);
  checkData(actual, 0, expected, "Read 2");
  stm.close();
}
 
源代码18 项目: attic-apex-malhar   文件: TestTFileSeek.java
public void seekTFile() throws IOException {
  int miss = 0;
  long totalBytes = 0;
  FSDataInputStream fsdis = fs.open(path);
  Reader reader =
    new Reader(fsdis, fs.getFileStatus(path).getLen(), conf);
  KeySampler kSampler =
      new KeySampler(rng, reader.getFirstKey(), reader.getLastKey(),
          keyLenGen);
  Scanner scanner = reader.createScanner();
  BytesWritable key = new BytesWritable();
  BytesWritable val = new BytesWritable();
  timer.reset();
  timer.start();
  for (int i = 0; i < options.seekCount; ++i) {
    kSampler.next(key);
    scanner.lowerBound(key.get(), 0, key.getSize());
    if (!scanner.atEnd()) {
      scanner.entry().get(key, val);
      totalBytes += key.getSize();
      totalBytes += val.getSize();
    }
    else {
      ++miss;
    }
  }
  timer.stop();
  double duration = (double) timer.read() / 1000; // in us.
  System.out.printf(
      "time: %s...avg seek: %s...%d hit...%d miss...avg I/O size: %.2fKB\n",
      timer.toString(), NanoTimer.nanoTimeToString(timer.read()
          / options.seekCount), options.seekCount - miss, miss,
      (double) totalBytes / 1024 / (options.seekCount - miss));

}
 
源代码19 项目: big-c   文件: TestCachingStrategy.java
static long readHdfsFile(FileSystem fs, Path p, long length,
    Boolean dropBehind) throws Exception {
  FSDataInputStream fis = null;
  long totalRead = 0;
  try {
    fis = fs.open(p);
    if (dropBehind != null) {
      fis.setDropBehind(dropBehind);
    }
    byte buf[] = new byte[8196];
    while (length > 0) {
      int amt = (length > buf.length) ? buf.length : (int)length;
      int ret = fis.read(buf, 0, amt);
      if (ret == -1) {
        return totalRead;
      }
      totalRead += ret;
      length -= ret;
    }
  } catch (IOException e) {
    LOG.error("ioexception", e);
  } finally {
    if (fis != null) {
      fis.close();
    }
  }
  throw new RuntimeException("unreachable");
}
 
源代码20 项目: rubix   文件: NonLocalReadRequestChain.java
private long directReadRequest(int index)
    throws Exception
{
  try (FSDataInputStream inputStream = remoteFileSystem.open(new Path(filePath))) {
    directReadChain = new DirectReadRequestChain(inputStream);
    for (ReadRequest readRequest : readRequests.subList(index, readRequests.size())) {
      directReadChain.addReadRequest(readRequest);
    }
    directReadChain.lock();
    directRead = directReadChain.call();
    directReadChain = null;
  }
  return (totalRead + directRead);
}
 
源代码21 项目: hadoop   文件: TestEncryptedTransfer.java
@Test
public void testEncryptedAppendRequiringBlockTransfer() throws IOException {
  MiniDFSCluster cluster = null;
  try {
    Configuration conf = new Configuration();
    setEncryptionConfigKeys(conf);
    
    // start up 4 DNs
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
    
    FileSystem fs = getFileSystem(conf);
    
    // Create a file with replication 3, so its block is on 3 / 4 DNs.
    writeTestDataToFile(fs);
    assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
    
    // Shut down one of the DNs holding a block replica.
    FSDataInputStream in = fs.open(TEST_PATH);
    List<LocatedBlock> locatedBlocks = DFSTestUtil.getAllBlocks(in);
    in.close();
    assertEquals(1, locatedBlocks.size());
    assertEquals(3, locatedBlocks.get(0).getLocations().length);
    DataNode dn = cluster.getDataNode(locatedBlocks.get(0).getLocations()[0].getIpcPort());
    dn.shutdown();
    
    // Reopen the file for append, which will need to add another DN to the
    // pipeline and in doing so trigger a block transfer.
    writeTestDataToFile(fs);
    assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
    
    fs.close();
  } finally {
    if (cluster != null) {
      cluster.shutdown();
    }
  }
}
 
源代码22 项目: dremio-oss   文件: DremioHadoopFileSystemWrapper.java
FSInputStream newFSDataInputStreamWrapper(Path f, final FSDataInputStream is) throws IOException {
  try {
    return (operatorStats != null) ?
      com.dremio.exec.store.hive.exec.dfs.FSDataInputStreamWithStatsWrapper.of(is, operatorStats, true, f.toString()) :
      com.dremio.exec.store.hive.exec.dfs.FSDataInputStreamWrapper.of(is);
  } catch(FSError e) {
    throw propagateFSError(e);
  }
}
 
源代码23 项目: big-c   文件: TestUnbuffer.java
/**
 * Test opening many files via TCP (not short-circuit).
 *
 * This is practical when using unbuffer, because it reduces the number of
 * sockets and amount of memory that we use.
 */
@Test
public void testOpenManyFilesViaTcp() throws Exception {
  final int NUM_OPENS = 500;
  Configuration conf = new Configuration();
  conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false);
  MiniDFSCluster cluster = null;
  FSDataInputStream[] streams = new FSDataInputStream[NUM_OPENS];
  try {
    cluster = new MiniDFSCluster.Builder(conf).build();
    DistributedFileSystem dfs = cluster.getFileSystem();
    final Path TEST_PATH = new Path("/testFile");
    DFSTestUtil.createFile(dfs, TEST_PATH, 131072, (short)1, 1);

    for (int i = 0; i < NUM_OPENS; i++) {
      streams[i] = dfs.open(TEST_PATH);
      LOG.info("opening file " + i + "...");
      Assert.assertTrue(-1 != streams[i].read());
      streams[i].unbuffer();
    }
  } finally {
    for (FSDataInputStream stream : streams) {
      IOUtils.cleanup(null, stream);
    }
    if (cluster != null) {
      cluster.shutdown();
    }
  }
}
 
源代码24 项目: big-c   文件: HadoopArchives.java
public void map(LongWritable key, HarEntry value,
    OutputCollector<IntWritable, Text> out,
    Reporter reporter) throws IOException {
  Path relPath = new Path(value.path);
  int hash = HarFileSystem.getHarHash(relPath);
  String towrite = null;
  Path srcPath = realPath(relPath, rootPath);
  long startPos = partStream.getPos();
  FileSystem srcFs = srcPath.getFileSystem(conf);
  FileStatus srcStatus = srcFs.getFileStatus(srcPath);
  String propStr = encodeProperties(srcStatus);
  if (value.isDir()) { 
    towrite = encodeName(relPath.toString())
              + " dir " + propStr + " 0 0 ";
    StringBuffer sbuff = new StringBuffer();
    sbuff.append(towrite);
    for (String child: value.children) {
      sbuff.append(encodeName(child) + " ");
    }
    towrite = sbuff.toString();
    //reading directories is also progress
    reporter.progress();
  }
  else {
    FSDataInputStream input = srcFs.open(srcStatus.getPath());
    reporter.setStatus("Copying file " + srcStatus.getPath() + 
        " to archive.");
    copyData(srcStatus.getPath(), input, partStream, reporter);
    towrite = encodeName(relPath.toString())
              + " file " + partname + " " + startPos
              + " " + srcStatus.getLen() + " " + propStr + " ";
  }
  out.collect(new IntWritable(hash), new Text(towrite));
}
 
源代码25 项目: RDFS   文件: StripeReader.java
protected InputStream getParityFileInput(int locationIndex, Path parityFile,
    FileSystem parityFs, FileStatus parityStat, long offsetInBlock)
        throws IOException {
  // Dealing with a parity file here.
  int parityBlockIdx = (int)(codec.parityLength * stripeStartIdx + locationIndex);
  long offset = parityStat.getBlockSize() * parityBlockIdx + offsetInBlock;
  assert(offset < parityStat.getLen());
  LOG.info("Opening " + parityFile + ":" + offset +
    " for location " + locationIndex);
  FSDataInputStream s = parityFs.open(
    parityFile, conf.getInt("io.file.buffer.size", 64 * 1024));
  s.seek(offset);
  return s;
}
 
源代码26 项目: spork   文件: Util.java
static public String[] readOutput(FileSystem fs, String fileName) throws IOException {
    if(Util.WINDOWS){
        fileName = fileName.replace('\\','/');
    }
    Path path = new Path(fileName);
    if(!fs.exists(path)) {
        throw new IOException("Path " + fileName + " does not exist on the FileSystem");
    }
    FileStatus fileStatus = fs.getFileStatus(path);
    FileStatus[] files;
    if (fileStatus.isDir()) {
        files = fs.listStatus(path, new PathFilter() {
            @Override
            public boolean accept(Path p) {
                return !p.getName().startsWith("_");
            }
        });
    } else {
        files = new FileStatus[] { fileStatus };
    }
    List<String> result = new ArrayList<String>();
    for (FileStatus f : files) {
        FSDataInputStream stream = fs.open(f.getPath());
        BufferedReader br = new BufferedReader(new InputStreamReader(stream, "UTF-8"));
        String line;
        while ((line = br.readLine()) != null) {
            result.add(line);
        }
        br.close();
    }
    return result.toArray(new String[result.size()]);
}
 
源代码27 项目: spork   文件: Util.java
static public void copyFromClusterToLocal(MiniGenericCluster cluster,
           String fileNameOnCluster, String localFileName) throws IOException {
       if(Util.WINDOWS){
           fileNameOnCluster = fileNameOnCluster.replace('\\','/');
           localFileName = localFileName.replace('\\','/');
       }
    File parent = new File(localFileName).getParentFile();
    if (!parent.exists()) {
        parent.mkdirs();
    }
    PrintWriter writer = new PrintWriter(new FileWriter(localFileName));

    FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
            cluster.getProperties()));
       if(!fs.exists(new Path(fileNameOnCluster))) {
           throw new IOException("File " + fileNameOnCluster + " does not exists on the minicluster");
       }

       String line = null;
	   FileStatus fst = fs.getFileStatus(new Path(fileNameOnCluster));
	   if(fst.isDir()) {
	       throw new IOException("Only files from cluster can be copied locally," +
	       		" " + fileNameOnCluster + " is a directory");
	   }
       FSDataInputStream stream = fs.open(new Path(fileNameOnCluster));
       BufferedReader reader = new BufferedReader(new InputStreamReader(stream));
       while( (line = reader.readLine()) != null) {
       	writer.println(line);
       }

       reader.close();
       writer.close();
}
 
@Test(timeout = SWIFT_TEST_TIMEOUT)
public void testOpenNonExistingFile() throws IOException {
  final Path p = new Path("/test/testOpenNonExistingFile");
  //open it as a file, should get FileNotFoundException
  try {
    final FSDataInputStream in = fs.open(p);
    in.close();
    fail("didn't expect to get here");
  } catch (FileNotFoundException fnfe) {
    LOG.debug("Expected: " + fnfe, fnfe);
  }
}
 
源代码29 项目: hadoop   文件: TestBlockReaderLocalLegacy.java
@Test
public void testBothOldAndNewShortCircuitConfigured() throws Exception {
  final short REPL_FACTOR = 1;
  final int FILE_LENGTH = 512;
  Assume.assumeTrue(null == DomainSocket.getLoadingFailureReason());
  TemporarySocketDirectory socketDir = new TemporarySocketDirectory();
  HdfsConfiguration conf = getConfiguration(socketDir);
  MiniDFSCluster cluster =
      new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
  cluster.waitActive();
  socketDir.close();
  FileSystem fs = cluster.getFileSystem();

  Path path = new Path("/foo");
  byte orig[] = new byte[FILE_LENGTH];
  for (int i = 0; i < orig.length; i++) {
    orig[i] = (byte)(i%10);
  }
  FSDataOutputStream fos = fs.create(path, (short)1);
  fos.write(orig);
  fos.close();
  DFSTestUtil.waitReplication(fs, path, REPL_FACTOR);
  FSDataInputStream fis = cluster.getFileSystem().open(path);
  byte buf[] = new byte[FILE_LENGTH];
  IOUtils.readFully(fis, buf, 0, FILE_LENGTH);
  fis.close();
  Assert.assertArrayEquals(orig, buf);
  Arrays.equals(orig, buf);
  cluster.shutdown();
}
 
private String readFileFromHdfs(String filename) throws Exception {
    FileSystem hdfsFsHandle = dfsCluster.getHdfsFileSystemHandle();
    FSDataInputStream reader = hdfsFsHandle.open(new Path(filename));
    String output = reader.readUTF();
    reader.close();
    hdfsFsHandle.close();
    return output;
}
 
 类所在包
 同包方法