类org.apache.hadoop.io.IOUtils源码实例Demo

下面列出了怎么用org.apache.hadoop.io.IOUtils的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: big-c   文件: FsVolumeImpl.java
/**
 * Get the next subdirectory within the block pool slice.
 *
 * @return         The next subdirectory within the block pool slice, or
 *                   null if there are no more.
 */
private String getNextSubDir(String prev, File dir)
      throws IOException {
  List<String> children =
      IOUtils.listDirectory(dir, SubdirFilter.INSTANCE);
  cache = null;
  cacheMs = 0;
  if (children.size() == 0) {
    LOG.trace("getNextSubDir({}, {}): no subdirectories found in {}",
        storageID, bpid, dir.getAbsolutePath());
    return null;
  }
  Collections.sort(children);
  String nextSubDir = nextSorted(children, prev);
  if (nextSubDir == null) {
    LOG.trace("getNextSubDir({}, {}): no more subdirectories found in {}",
        storageID, bpid, dir.getAbsolutePath());
  } else {
    LOG.trace("getNextSubDir({}, {}): picking next subdirectory {} " +
        "within {}", storageID, bpid, nextSubDir, dir.getAbsolutePath());
  }
  return nextSubDir;
}
 
源代码2 项目: jMetalSP   文件: HDFSUtil.java
public boolean writeFile(String text, String path) {
  boolean result = false;
  if (text != null && path != null) {
    try {
      fs = FileSystem.get(conf);
      InputStream in = org.apache.commons.io.IOUtils.toInputStream(text, "UTF-8");
      OutputStream out = fs.create(new Path(path));
      IOUtils.copyBytes(in, out, conf);
      result = true;
    } catch (Exception ex) {
      ex.printStackTrace();
      result = false;
    }

  }
  return result;
}
 
源代码3 项目: kylin   文件: SparkBuildDictionary.java
private void checkSnapshot(CubeManager cubeManager, CubeSegment cubeSegment) {
    List<DimensionDesc> dimensionDescs = cubeSegment.getCubeDesc().getDimensions();
    for (DimensionDesc dimensionDesc : dimensionDescs) {
        TableRef lookup = dimensionDesc.getTableRef();
        String tableIdentity = lookup.getTableIdentity();
        if (cubeSegment.getModel().isLookupTable(tableIdentity) && !cubeSegment.getCubeDesc().isExtSnapshotTable(tableIdentity)) {
            logger.info("Checking snapshot of {}", lookup);
            try {
                JoinDesc join = cubeSegment.getModel().getJoinsTree().getJoinByPKSide(lookup);
                ILookupTable table = cubeManager.getLookupTable(cubeSegment, join);
                if (table != null) {
                    IOUtils.closeStream(table);
                }
            } catch (Throwable th) {
                throw new RuntimeException(String.format(Locale.ROOT, "Checking snapshot of %s failed.", lookup), th);
            }
        }
    }
}
 
源代码4 项目: big-c   文件: TestFileJournalManager.java
/**
 * Make sure that in-progress streams aren't counted if we don't ask for
 * them.
 */
@Test
public void testExcludeInProgressStreams() throws CorruptionException,
    IOException {
  File f = new File(TestEditLog.TEST_DIR + "/excludeinprogressstreams");
  
  // Don't close the edit log once the files have been set up.
  NNStorage storage = setupEdits(Collections.<URI>singletonList(f.toURI()), 
                                 10, false);
  StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next();
  
  FileJournalManager jm = new FileJournalManager(conf, sd, storage);
  
  // If we exclude the in-progess stream, we should only have 100 tx.
  assertEquals(100, getNumberOfTransactions(jm, 1, false, false));
  
  EditLogInputStream elis = getJournalInputStream(jm, 90, false);
  try {
    FSEditLogOp lastReadOp = null;
    while ((lastReadOp = elis.readOp()) != null) {
      assertTrue(lastReadOp.getTransactionId() <= 100);
    }
  } finally {
    IOUtils.cleanup(LOG, elis);
  }
}
 
源代码5 项目: big-c   文件: TestStickyBit.java
/**
 * Ensure that even if a file is in a directory with the sticky bit on,
 * another user can write to that file (assuming correct permissions).
 */
private void confirmCanAppend(Configuration conf, Path p) throws Exception {
  // Write a file to the new tmp directory as a regular user
  Path file = new Path(p, "foo");
  writeFile(hdfsAsUser1, file);
  hdfsAsUser1.setPermission(file, new FsPermission((short) 0777));

  // Log onto cluster as another user and attempt to append to file
  Path file2 = new Path(p, "foo");
  FSDataOutputStream h = null;
  try {
    h = hdfsAsUser2.append(file2);
    h.write("Some more data".getBytes());
    h.close();
    h = null;
  } finally {
    IOUtils.cleanup(null, h);
  }
}
 
源代码6 项目: hadoop   文件: INode.java
public InputStream serialize() throws IOException {
  ByteArrayOutputStream bytes = new ByteArrayOutputStream();
  DataOutputStream out = new DataOutputStream(bytes);
  try {
    out.writeByte(fileType.ordinal());
    if (isFile()) {
      out.writeInt(blocks.length);
      for (int i = 0; i < blocks.length; i++) {
        out.writeLong(blocks[i].getId());
        out.writeLong(blocks[i].getLength());
      }
    }
    out.close();
    out = null;
  } finally {
    IOUtils.closeStream(out);
  }
  return new ByteArrayInputStream(bytes.toByteArray());
}
 
源代码7 项目: hiped2   文件: ThriftStockAvgFileReader.java
public static void readFromProtoBuf(InputStream inputStream)
    throws IOException {

  ThriftBlockReader<StockAvg> reader =
      new ThriftBlockReader<StockAvg>(
          inputStream, new TypeRef<StockAvg>() {
      });

  StockAvg stock;
  while ((stock = reader.readNext()) != null) {
    System.out.println(ToStringBuilder.reflectionToString(stock));

  }

  IOUtils.closeStream(inputStream);
}
 
源代码8 项目: hadoop   文件: TestAtomicFileOutputStream.java
@Test
public void testFailToRename() throws IOException {
  assumeTrue(Shell.WINDOWS);
  OutputStream fos = null;
  try {
    fos = new AtomicFileOutputStream(DST_FILE);
    fos.write(TEST_STRING.getBytes());
    FileUtil.setWritable(TEST_DIR, false);
    exception.expect(IOException.class);
    exception.expectMessage("failure in native rename");
    try {
      fos.close();
    } finally {
      fos = null;
    }
  } finally {
    IOUtils.cleanup(null, fos);
    FileUtil.setWritable(TEST_DIR, true);
  }
}
 
源代码9 项目: big-c   文件: NNUpgradeUtil.java
/**
 * Perform any steps that must succeed across all storage dirs/JournalManagers
 * involved in an upgrade before proceeding onto the actual upgrade stage. If
 * a call to any JM's or local storage dir's doPreUpgrade method fails, then
 * doUpgrade will not be called for any JM. The existing current dir is
 * renamed to previous.tmp, and then a new, empty current dir is created.
 *
 * @param conf configuration for creating {@link EditLogFileOutputStream}
 * @param sd the storage directory to perform the pre-upgrade procedure.
 * @throws IOException in the event of error
 */
static void doPreUpgrade(Configuration conf, StorageDirectory sd)
    throws IOException {
  LOG.info("Starting upgrade of storage directory " + sd.getRoot());

  // rename current to tmp
  renameCurToTmp(sd);

  final File curDir = sd.getCurrentDir();
  final File tmpDir = sd.getPreviousTmp();
  List<String> fileNameList = IOUtils.listDirectory(tmpDir, new FilenameFilter() {
    @Override
    public boolean accept(File dir, String name) {
      return dir.equals(tmpDir)
          && name.startsWith(NNStorage.NameNodeFile.EDITS.getName());
    }
  });

  for (String s : fileNameList) {
    File prevFile = new File(tmpDir, s);
    File newFile = new File(curDir, prevFile.getName());
    Files.createLink(newFile.toPath(), prevFile.toPath());
  }
}
 
源代码10 项目: big-c   文件: CryptoUtils.java
/**
 * Wraps a given FSDataInputStream with a CryptoInputStream. The size of the
 * data buffer required for the stream is specified by the
 * "mapreduce.job.encrypted-intermediate-data.buffer.kb" Job configuration
 * variable.
 * 
 * @param conf
 * @param in
 * @return FSDataInputStream
 * @throws IOException
 */
public static FSDataInputStream wrapIfNecessary(Configuration conf,
    FSDataInputStream in) throws IOException {
  if (isEncryptedSpillEnabled(conf)) {
    CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf);
    int bufferSize = getBufferSize(conf);
    // Not going to be used... but still has to be read...
    // Since the O/P stream always writes it..
    IOUtils.readFully(in, new byte[8], 0, 8);
    byte[] iv = 
        new byte[cryptoCodec.getCipherSuite().getAlgorithmBlockSize()];
    IOUtils.readFully(in, iv, 0, 
        cryptoCodec.getCipherSuite().getAlgorithmBlockSize());
    if (LOG.isDebugEnabled()) {
      LOG.debug("IV read from Stream ["
          + Base64.encodeBase64URLSafeString(iv) + "]");
    }
    return new CryptoFSDataInputStream(in, cryptoCodec, bufferSize,
        getEncryptionKey(), iv);
  } else {
    return in;
  }
}
 
源代码11 项目: dremio-oss   文件: SupportResource.java
@POST
@Path("download")
@Consumes(MediaType.APPLICATION_JSON)
public Response downloadData(@PathParam("jobId") JobId jobId)
    throws IOException, UserNotFoundException, JobResourceNotFoundException {
  final DownloadDataResponse response;
  try {
    final ImmutableSupportRequest request = new ImmutableSupportRequest.Builder()
      .setUserId(context.getUserPrincipal().getName())
      .setJobId(jobId)
      .build();
    response = supportService.downloadSupportRequest(request);
  } catch (JobNotFoundException e) {
    throw JobResourceNotFoundException.fromJobNotFoundException(e);
  }
  final StreamingOutput streamingOutput = new StreamingOutput() {
    @Override
    public void write(OutputStream output) throws IOException, WebApplicationException {
      IOUtils.copyBytes(response.getInput(), output, 4096, true);
    }
  };
  return Response.ok(streamingOutput, MediaType.APPLICATION_OCTET_STREAM)
    .header("Content-Disposition", "attachment; filename=\"" + response.getFileName() + "\"").build();
}
 
源代码12 项目: big-c   文件: TestMapFile.java
/**
 * test throwing {@code IOException} in {@code MapFile.Writer} constructor    
 */
@Test
public void testWriteWithFailDirCreation() {
  String ERROR_MESSAGE = "Mkdirs failed to create directory";
  Path dirName = new Path(TEST_DIR, "fail.mapfile");
  MapFile.Writer writer = null;
  try {
    FileSystem fs = FileSystem.getLocal(conf);
    FileSystem spyFs = spy(fs);
    Path pathSpy = spy(dirName);
    when(pathSpy.getFileSystem(conf)).thenReturn(spyFs);
    when(spyFs.mkdirs(dirName)).thenReturn(false);

    writer = new MapFile.Writer(conf, pathSpy,
        MapFile.Writer.keyClass(IntWritable.class),
        MapFile.Writer.valueClass(Text.class));
    fail("testWriteWithFailDirCreation error !!!");
  } catch (IOException ex) {
    assertTrue("testWriteWithFailDirCreation ex error !!!", ex.getMessage()
        .startsWith(ERROR_MESSAGE));
  } finally {
    IOUtils.cleanup(null, writer);
  }
}
 
源代码13 项目: big-c   文件: BlockSender.java
/**
 * Read checksum into given buffer
 * @param buf buffer to read the checksum into
 * @param checksumOffset offset at which to write the checksum into buf
 * @param checksumLen length of checksum to write
 * @throws IOException on error
 */
private void readChecksum(byte[] buf, final int checksumOffset,
    final int checksumLen) throws IOException {
  if (checksumSize <= 0 && checksumIn == null) {
    return;
  }
  try {
    checksumIn.readFully(buf, checksumOffset, checksumLen);
  } catch (IOException e) {
    LOG.warn(" Could not read or failed to veirfy checksum for data"
        + " at offset " + offset + " for block " + block, e);
    IOUtils.closeStream(checksumIn);
    checksumIn = null;
    if (corruptChecksumOk) {
      if (checksumOffset < checksumLen) {
        // Just fill the array with zeros.
        Arrays.fill(buf, checksumOffset, checksumLen, (byte) 0);
      }
    } else {
      throw e;
    }
  }
}
 
源代码14 项目: rainbow   文件: HdfsUtil.java
public void upFile(File localFile, String hdfsPath) throws IOException
{
    InputStream in = new BufferedInputStream(new FileInputStream(localFile));
    OutputStream out = fileSystem.create(new Path(hdfsPath));
    try
    {
        IOUtils.copyBytes(in, out, conf);
    } catch (Exception e)
    {
        e.printStackTrace();
    } finally
    {
        IOUtils.closeStream(in);
        IOUtils.closeStream(out);
    }
}
 
源代码15 项目: rainbow   文件: HdfsUtil.java
public void upFile(InputStream fileInputStream, String hdfsPath)
        throws IOException
{
    InputStream in = new BufferedInputStream(fileInputStream);
    OutputStream out = fileSystem.create(new Path(hdfsPath));
    try
    {
        IOUtils.copyBytes(in, out, conf);
    } catch (Exception e)
    {
        e.printStackTrace();
    } finally
    {
        // close Stream
        IOUtils.closeStream(in);
        IOUtils.closeStream(out);
    }
}
 
源代码16 项目: big-c   文件: VersionInfo.java
protected VersionInfo(String component) {
  info = new Properties();
  String versionInfoFile = component + "-version-info.properties";
  InputStream is = null;
  try {
    is = Thread.currentThread().getContextClassLoader()
      .getResourceAsStream(versionInfoFile);
    if (is == null) {
      throw new IOException("Resource not found");
    }
    info.load(is);
  } catch (IOException ex) {
    LogFactory.getLog(getClass()).warn("Could not read '" +
        versionInfoFile + "', " + ex.toString(), ex);
  } finally {
    IOUtils.closeStream(is);
  }
}
 
源代码17 项目: hbase   文件: TagCompressionContext.java
/**
 * Uncompress tags from the InputStream and writes to the destination array.
 * @param src Stream where the compressed tags are available
 * @param dest Destination array where to write the uncompressed tags
 * @param offset Offset in destination where tags to be written
 * @param length Length of all tag bytes
 * @throws IOException
 */
public void uncompressTags(InputStream src, byte[] dest, int offset, int length)
    throws IOException {
  int endOffset = offset + length;
  while (offset < endOffset) {
    byte status = (byte) src.read();
    if (status == Dictionary.NOT_IN_DICTIONARY) {
      int tagLen = StreamUtils.readRawVarint32(src);
      offset = Bytes.putAsShort(dest, offset, tagLen);
      IOUtils.readFully(src, dest, offset, tagLen);
      tagDict.addEntry(dest, offset, tagLen);
      offset += tagLen;
    } else {
      short dictIdx = StreamUtils.toShort(status, (byte) src.read());
      byte[] entry = tagDict.getEntry(dictIdx);
      if (entry == null) {
        throw new IOException("Missing dictionary entry for index " + dictIdx);
      }
      offset = Bytes.putAsShort(dest, offset, entry.length);
      System.arraycopy(entry, 0, dest, offset, entry.length);
      offset += entry.length;
    }
  }
}
 
源代码18 项目: hadoop   文件: TestFiHftp.java
private static byte[] createFile(FileSystem fs, Path name, long length, 
    short replication, long blocksize) throws IOException {
  final FSDataOutputStream out = fs.create(name, false, 4096,
      replication, blocksize);
  try {
    for(long n = length; n > 0; ) {
      ran.nextBytes(buffer);
      final int w = n < buffer.length? (int)n: buffer.length;
      out.write(buffer, 0, w);
      md5.update(buffer, 0, w);
      n -= w;
    }
  } finally {
    IOUtils.closeStream(out);
  }
  return md5.digest();
}
 
源代码19 项目: tajo   文件: TestLineReader.java
@Test
public void testByteBufLineReaderWithoutTerminating() throws IOException {
  String path = JavaResourceUtil.getResourceURL("dataset/testLineText.txt").getFile();
  File file = new File(path);
  String data = FileUtil.readTextFile(file);

  ByteBufInputChannel channel = new ByteBufInputChannel(new FileInputStream(file));
  ByteBufLineReader reader = new ByteBufLineReader(channel);

  long totalRead = 0;
  int i = 0;
  AtomicInteger bytes = new AtomicInteger();
  for(;;){
    ByteBuf buf = reader.readLineBuf(bytes);
    totalRead += bytes.get();
    if(buf == null) break;
    i++;
  }
  IOUtils.cleanup(null, reader);
  assertFalse(channel.isOpen());
  assertEquals(file.length(), totalRead);
  assertEquals(file.length(), reader.readBytes());
  assertEquals(data.split("\n").length, i);
}
 
源代码20 项目: RDFS   文件: TestRumenJobTraces.java
static private <T extends DeepCompare> void jsonFileMatchesGold(
    FileSystem lfs, Path result, Path gold, Class<? extends T> clazz,
    String fileDescription) throws IOException {
  JsonObjectMapperParser<T> goldParser =
      new JsonObjectMapperParser<T>(gold, clazz, new Configuration());
  InputStream resultStream = lfs.open(result);
  JsonObjectMapperParser<T> resultParser =
      new JsonObjectMapperParser<T>(resultStream, clazz);
  try {
    while (true) {
      DeepCompare goldJob = goldParser.getNext();
      DeepCompare resultJob = resultParser.getNext();
      if ((goldJob == null) || (resultJob == null)) {
        assertTrue(goldJob == resultJob);
        break;
      }

      try {
        resultJob.deepCompare(goldJob, new TreePath(null, "<root>"));
      } catch (DeepInequalityException e) {
        String error = e.path.toString();

        assertFalse(fileDescription + " mismatches: " + error, true);
      }
    }
  } finally {
    IOUtils.cleanup(null, goldParser, resultParser);
  }
}
 
源代码21 项目: hadoop   文件: FileSystemApplicationHistoryStore.java
@Override
public void serviceStop() throws Exception {
  try {
    for (Entry<ApplicationId, HistoryFileWriter> entry : outstandingWriters
      .entrySet()) {
      entry.getValue().close();
    }
    outstandingWriters.clear();
  } finally {
    IOUtils.cleanup(LOG, fs);
  }
  super.serviceStop();
}
 
源代码22 项目: hadoop   文件: TestWriteConfigurationToDFS.java
@Test(timeout=60000)
public void testWriteConf() throws Exception {
  Configuration conf = new HdfsConfiguration();
  conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
  System.out.println("Setting conf in: " + System.identityHashCode(conf));
  MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
  FileSystem fs = null;
  OutputStream os = null;
  try {
    fs = cluster.getFileSystem();
    Path filePath = new Path("/testWriteConf.xml");
    os = fs.create(filePath);
    StringBuilder longString = new StringBuilder();
    for (int i = 0; i < 100000; i++) {
      longString.append("hello");
    } // 500KB
    conf.set("foobar", longString.toString());
    conf.writeXml(os);
    os.close();
    os = null;
    fs.close();
    fs = null;
  } finally {
    IOUtils.cleanup(null, os, fs);
    cluster.shutdown();
  }
}
 
源代码23 项目: big-c   文件: TestQJMWithFaults.java
/**
 * Run through the creation of a log without any faults injected,
 * and count how many RPCs are made to each node. This sets the
 * bounds for the other test cases, so they can exhaustively explore
 * the space of potential failures.
 */
private static long determineMaxIpcNumber() throws Exception {
  Configuration conf = new Configuration();
  MiniJournalCluster cluster = new MiniJournalCluster.Builder(conf).build();
  QuorumJournalManager qjm = null;
  long ret;
  try {
    qjm = createInjectableQJM(cluster);
    qjm.format(FAKE_NSINFO);
    doWorkload(cluster, qjm);
    
    SortedSet<Integer> ipcCounts = Sets.newTreeSet();
    for (AsyncLogger l : qjm.getLoggerSetForTests().getLoggersForTests()) {
      InvocationCountingChannel ch = (InvocationCountingChannel)l;
      ch.waitForAllPendingCalls();
      ipcCounts.add(ch.getRpcCount());
    }

    // All of the loggers should have sent the same number of RPCs, since there
    // were no failures.
    assertEquals(1, ipcCounts.size());
    
    ret = ipcCounts.first();
    LOG.info("Max IPC count = " + ret);
  } finally {
    IOUtils.closeStream(qjm);
    cluster.shutdown();
  }
  return ret;
}
 
源代码24 项目: big-c   文件: ReplicaInfo.java
/**
 * Copy specified file into a temporary file. Then rename the
 * temporary file to the original name. This will cause any
 * hardlinks to the original file to be removed. The temporary
 * files are created in the same directory. The temporary files will
 * be recovered (especially on Windows) on datanode restart.
 */
private void unlinkFile(File file, Block b) throws IOException {
  File tmpFile = DatanodeUtil.createTmpFile(b, DatanodeUtil.getUnlinkTmpFile(file));
  try {
    FileInputStream in = new FileInputStream(file);
    try {
      FileOutputStream out = new FileOutputStream(tmpFile);
      try {
        IOUtils.copyBytes(in, out, 16*1024);
      } finally {
        out.close();
      }
    } finally {
      in.close();
    }
    if (file.length() != tmpFile.length()) {
      throw new IOException("Copy of file " + file + " size " + file.length()+
                            " into file " + tmpFile +
                            " resulted in a size of " + tmpFile.length());
    }
    FileUtil.replaceFile(tmpFile, file);
  } catch (IOException e) {
    boolean done = tmpFile.delete();
    if (!done) {
      DataNode.LOG.info("detachFile failed to delete temporary file " +
                        tmpFile);
    }
    throw e;
  }
}
 
源代码25 项目: incubator-tez   文件: TezSpillRecord.java
public TezSpillRecord(Path indexFileName, Configuration job, Checksum crc,
                   String expectedIndexOwner)
    throws IOException {

  final FileSystem rfs = FileSystem.getLocal(job).getRaw();
  final FSDataInputStream in = rfs.open(indexFileName);
  try {
    final long length = rfs.getFileStatus(indexFileName).getLen();
    final int partitions = 
        (int) length / Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;
    final int size = partitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;

    buf = ByteBuffer.allocate(size);
    if (crc != null) {
      crc.reset();
      CheckedInputStream chk = new CheckedInputStream(in, crc);
      IOUtils.readFully(chk, buf.array(), 0, size);
      if (chk.getChecksum().getValue() != in.readLong()) {
        throw new ChecksumException("Checksum error reading spill index: " +
                              indexFileName, -1);
      }
    } else {
      IOUtils.readFully(in, buf.array(), 0, size);
    }
    entries = buf.asLongBuffer();
  } finally {
    in.close();
  }
}
 
源代码26 项目: hadoop-gpu   文件: DataXceiver.java
/**
 * Utility function for sending a response.
 * @param s socket to write to
 * @param opStatus status message to write
 * @param timeout send timeout
 **/
private void sendResponse(Socket s, short opStatus, long timeout) 
                                                     throws IOException {
  DataOutputStream reply = 
    new DataOutputStream(NetUtils.getOutputStream(s, timeout));
  try {
    reply.writeShort(opStatus);
    reply.flush();
  } finally {
    IOUtils.closeStream(reply);
  }
}
 
源代码27 项目: hadoop-ozone   文件: TestContainerSmallFile.java
@AfterClass
public static void shutdown() throws InterruptedException {
  if (cluster != null) {
    cluster.shutdown();
  }
  IOUtils.cleanupWithLogger(null, storageContainerLocationClient);
}
 
源代码28 项目: liteflow   文件: HadoopUtils.java
/**
 * 获取文本内容
 * @param hdfsFilePath
 * @return
 */
public static String getFileContent(String hdfsFilePath, boolean checkTxt){
    FSDataInputStream fsDataInputStream = null;
    try {
        if(checkTxt){
            boolean isPass = false;
            for(String suffix : CommonConstants.TEXT_FILE_SUFFIX){
                if(hdfsFilePath.endsWith(suffix)){
                    isPass = true;
                }
            }
            if(!isPass){
                throw new RuntimeException("file is not txt type:" + hdfsFilePath);
            }
        }
        FileSystem fileSystem = HadoopUtils.getFileSystem();
        fsDataInputStream = fileSystem.open(new Path(hdfsFilePath));
        StringWriter stringWriter = new StringWriter();
        org.apache.commons.io.IOUtils.copy(fsDataInputStream, stringWriter);
        return stringWriter.toString();
    } catch (Throwable e) {
        LOG.error("download file error:{}", hdfsFilePath, e);
        throw new RuntimeException(e);
    } finally {
        org.apache.commons.io.IOUtils.closeQuietly(fsDataInputStream);
    }

}
 
源代码29 项目: hadoop   文件: TestGlobbedCopyListing.java
private static void touchFile(String path) throws Exception {
  FileSystem fileSystem = null;
  DataOutputStream outputStream = null;
  try {
    fileSystem = cluster.getFileSystem();
    outputStream = fileSystem.create(new Path(path), true, 0);
    recordInExpectedValues(path);
  }
  finally {
    IOUtils.cleanup(null, fileSystem, outputStream);
  }
}
 
源代码30 项目: hadoop   文件: FSEditLogOp.java
private void verifyTerminator() throws IOException {
  /** The end of the edit log should contain only 0x00 or 0xff bytes.
   * If it contains other bytes, the log itself may be corrupt.
   * It is important to check this; if we don't, a stray OP_INVALID byte 
   * could make us stop reading the edit log halfway through, and we'd never
   * know that we had lost data.
   */
  byte[] buf = new byte[4096];
  limiter.clearLimit();
  int numRead = -1, idx = 0;
  while (true) {
    try {
      numRead = -1;
      idx = 0;
      numRead = in.read(buf);
      if (numRead == -1) {
        return;
      }
      while (idx < numRead) {
        if ((buf[idx] != (byte)0) && (buf[idx] != (byte)-1)) {
          throw new IOException("Read extra bytes after " +
            "the terminator!");
        }
        idx++;
      }
    } finally {
      // After reading each group of bytes, we reposition the mark one
      // byte before the next group.  Similarly, if there is an error, we
      // want to reposition the mark one byte before the error
      if (numRead != -1) { 
        in.reset();
        IOUtils.skipFully(in, idx);
        in.mark(buf.length + 1);
        IOUtils.skipFully(in, 1);
      }
    }
  }
}
 
 类所在包
 同包方法