下面列出了怎么用org.apache.hadoop.io.IOUtils的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Get the next subdirectory within the block pool slice.
*
* @return The next subdirectory within the block pool slice, or
* null if there are no more.
*/
private String getNextSubDir(String prev, File dir)
throws IOException {
List<String> children =
IOUtils.listDirectory(dir, SubdirFilter.INSTANCE);
cache = null;
cacheMs = 0;
if (children.size() == 0) {
LOG.trace("getNextSubDir({}, {}): no subdirectories found in {}",
storageID, bpid, dir.getAbsolutePath());
return null;
}
Collections.sort(children);
String nextSubDir = nextSorted(children, prev);
if (nextSubDir == null) {
LOG.trace("getNextSubDir({}, {}): no more subdirectories found in {}",
storageID, bpid, dir.getAbsolutePath());
} else {
LOG.trace("getNextSubDir({}, {}): picking next subdirectory {} " +
"within {}", storageID, bpid, nextSubDir, dir.getAbsolutePath());
}
return nextSubDir;
}
public boolean writeFile(String text, String path) {
boolean result = false;
if (text != null && path != null) {
try {
fs = FileSystem.get(conf);
InputStream in = org.apache.commons.io.IOUtils.toInputStream(text, "UTF-8");
OutputStream out = fs.create(new Path(path));
IOUtils.copyBytes(in, out, conf);
result = true;
} catch (Exception ex) {
ex.printStackTrace();
result = false;
}
}
return result;
}
private void checkSnapshot(CubeManager cubeManager, CubeSegment cubeSegment) {
List<DimensionDesc> dimensionDescs = cubeSegment.getCubeDesc().getDimensions();
for (DimensionDesc dimensionDesc : dimensionDescs) {
TableRef lookup = dimensionDesc.getTableRef();
String tableIdentity = lookup.getTableIdentity();
if (cubeSegment.getModel().isLookupTable(tableIdentity) && !cubeSegment.getCubeDesc().isExtSnapshotTable(tableIdentity)) {
logger.info("Checking snapshot of {}", lookup);
try {
JoinDesc join = cubeSegment.getModel().getJoinsTree().getJoinByPKSide(lookup);
ILookupTable table = cubeManager.getLookupTable(cubeSegment, join);
if (table != null) {
IOUtils.closeStream(table);
}
} catch (Throwable th) {
throw new RuntimeException(String.format(Locale.ROOT, "Checking snapshot of %s failed.", lookup), th);
}
}
}
}
/**
* Make sure that in-progress streams aren't counted if we don't ask for
* them.
*/
@Test
public void testExcludeInProgressStreams() throws CorruptionException,
IOException {
File f = new File(TestEditLog.TEST_DIR + "/excludeinprogressstreams");
// Don't close the edit log once the files have been set up.
NNStorage storage = setupEdits(Collections.<URI>singletonList(f.toURI()),
10, false);
StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next();
FileJournalManager jm = new FileJournalManager(conf, sd, storage);
// If we exclude the in-progess stream, we should only have 100 tx.
assertEquals(100, getNumberOfTransactions(jm, 1, false, false));
EditLogInputStream elis = getJournalInputStream(jm, 90, false);
try {
FSEditLogOp lastReadOp = null;
while ((lastReadOp = elis.readOp()) != null) {
assertTrue(lastReadOp.getTransactionId() <= 100);
}
} finally {
IOUtils.cleanup(LOG, elis);
}
}
/**
* Ensure that even if a file is in a directory with the sticky bit on,
* another user can write to that file (assuming correct permissions).
*/
private void confirmCanAppend(Configuration conf, Path p) throws Exception {
// Write a file to the new tmp directory as a regular user
Path file = new Path(p, "foo");
writeFile(hdfsAsUser1, file);
hdfsAsUser1.setPermission(file, new FsPermission((short) 0777));
// Log onto cluster as another user and attempt to append to file
Path file2 = new Path(p, "foo");
FSDataOutputStream h = null;
try {
h = hdfsAsUser2.append(file2);
h.write("Some more data".getBytes());
h.close();
h = null;
} finally {
IOUtils.cleanup(null, h);
}
}
public InputStream serialize() throws IOException {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(bytes);
try {
out.writeByte(fileType.ordinal());
if (isFile()) {
out.writeInt(blocks.length);
for (int i = 0; i < blocks.length; i++) {
out.writeLong(blocks[i].getId());
out.writeLong(blocks[i].getLength());
}
}
out.close();
out = null;
} finally {
IOUtils.closeStream(out);
}
return new ByteArrayInputStream(bytes.toByteArray());
}
public static void readFromProtoBuf(InputStream inputStream)
throws IOException {
ThriftBlockReader<StockAvg> reader =
new ThriftBlockReader<StockAvg>(
inputStream, new TypeRef<StockAvg>() {
});
StockAvg stock;
while ((stock = reader.readNext()) != null) {
System.out.println(ToStringBuilder.reflectionToString(stock));
}
IOUtils.closeStream(inputStream);
}
@Test
public void testFailToRename() throws IOException {
assumeTrue(Shell.WINDOWS);
OutputStream fos = null;
try {
fos = new AtomicFileOutputStream(DST_FILE);
fos.write(TEST_STRING.getBytes());
FileUtil.setWritable(TEST_DIR, false);
exception.expect(IOException.class);
exception.expectMessage("failure in native rename");
try {
fos.close();
} finally {
fos = null;
}
} finally {
IOUtils.cleanup(null, fos);
FileUtil.setWritable(TEST_DIR, true);
}
}
/**
* Perform any steps that must succeed across all storage dirs/JournalManagers
* involved in an upgrade before proceeding onto the actual upgrade stage. If
* a call to any JM's or local storage dir's doPreUpgrade method fails, then
* doUpgrade will not be called for any JM. The existing current dir is
* renamed to previous.tmp, and then a new, empty current dir is created.
*
* @param conf configuration for creating {@link EditLogFileOutputStream}
* @param sd the storage directory to perform the pre-upgrade procedure.
* @throws IOException in the event of error
*/
static void doPreUpgrade(Configuration conf, StorageDirectory sd)
throws IOException {
LOG.info("Starting upgrade of storage directory " + sd.getRoot());
// rename current to tmp
renameCurToTmp(sd);
final File curDir = sd.getCurrentDir();
final File tmpDir = sd.getPreviousTmp();
List<String> fileNameList = IOUtils.listDirectory(tmpDir, new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return dir.equals(tmpDir)
&& name.startsWith(NNStorage.NameNodeFile.EDITS.getName());
}
});
for (String s : fileNameList) {
File prevFile = new File(tmpDir, s);
File newFile = new File(curDir, prevFile.getName());
Files.createLink(newFile.toPath(), prevFile.toPath());
}
}
/**
* Wraps a given FSDataInputStream with a CryptoInputStream. The size of the
* data buffer required for the stream is specified by the
* "mapreduce.job.encrypted-intermediate-data.buffer.kb" Job configuration
* variable.
*
* @param conf
* @param in
* @return FSDataInputStream
* @throws IOException
*/
public static FSDataInputStream wrapIfNecessary(Configuration conf,
FSDataInputStream in) throws IOException {
if (isEncryptedSpillEnabled(conf)) {
CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf);
int bufferSize = getBufferSize(conf);
// Not going to be used... but still has to be read...
// Since the O/P stream always writes it..
IOUtils.readFully(in, new byte[8], 0, 8);
byte[] iv =
new byte[cryptoCodec.getCipherSuite().getAlgorithmBlockSize()];
IOUtils.readFully(in, iv, 0,
cryptoCodec.getCipherSuite().getAlgorithmBlockSize());
if (LOG.isDebugEnabled()) {
LOG.debug("IV read from Stream ["
+ Base64.encodeBase64URLSafeString(iv) + "]");
}
return new CryptoFSDataInputStream(in, cryptoCodec, bufferSize,
getEncryptionKey(), iv);
} else {
return in;
}
}
@POST
@Path("download")
@Consumes(MediaType.APPLICATION_JSON)
public Response downloadData(@PathParam("jobId") JobId jobId)
throws IOException, UserNotFoundException, JobResourceNotFoundException {
final DownloadDataResponse response;
try {
final ImmutableSupportRequest request = new ImmutableSupportRequest.Builder()
.setUserId(context.getUserPrincipal().getName())
.setJobId(jobId)
.build();
response = supportService.downloadSupportRequest(request);
} catch (JobNotFoundException e) {
throw JobResourceNotFoundException.fromJobNotFoundException(e);
}
final StreamingOutput streamingOutput = new StreamingOutput() {
@Override
public void write(OutputStream output) throws IOException, WebApplicationException {
IOUtils.copyBytes(response.getInput(), output, 4096, true);
}
};
return Response.ok(streamingOutput, MediaType.APPLICATION_OCTET_STREAM)
.header("Content-Disposition", "attachment; filename=\"" + response.getFileName() + "\"").build();
}
/**
* test throwing {@code IOException} in {@code MapFile.Writer} constructor
*/
@Test
public void testWriteWithFailDirCreation() {
String ERROR_MESSAGE = "Mkdirs failed to create directory";
Path dirName = new Path(TEST_DIR, "fail.mapfile");
MapFile.Writer writer = null;
try {
FileSystem fs = FileSystem.getLocal(conf);
FileSystem spyFs = spy(fs);
Path pathSpy = spy(dirName);
when(pathSpy.getFileSystem(conf)).thenReturn(spyFs);
when(spyFs.mkdirs(dirName)).thenReturn(false);
writer = new MapFile.Writer(conf, pathSpy,
MapFile.Writer.keyClass(IntWritable.class),
MapFile.Writer.valueClass(Text.class));
fail("testWriteWithFailDirCreation error !!!");
} catch (IOException ex) {
assertTrue("testWriteWithFailDirCreation ex error !!!", ex.getMessage()
.startsWith(ERROR_MESSAGE));
} finally {
IOUtils.cleanup(null, writer);
}
}
/**
* Read checksum into given buffer
* @param buf buffer to read the checksum into
* @param checksumOffset offset at which to write the checksum into buf
* @param checksumLen length of checksum to write
* @throws IOException on error
*/
private void readChecksum(byte[] buf, final int checksumOffset,
final int checksumLen) throws IOException {
if (checksumSize <= 0 && checksumIn == null) {
return;
}
try {
checksumIn.readFully(buf, checksumOffset, checksumLen);
} catch (IOException e) {
LOG.warn(" Could not read or failed to veirfy checksum for data"
+ " at offset " + offset + " for block " + block, e);
IOUtils.closeStream(checksumIn);
checksumIn = null;
if (corruptChecksumOk) {
if (checksumOffset < checksumLen) {
// Just fill the array with zeros.
Arrays.fill(buf, checksumOffset, checksumLen, (byte) 0);
}
} else {
throw e;
}
}
}
public void upFile(File localFile, String hdfsPath) throws IOException
{
InputStream in = new BufferedInputStream(new FileInputStream(localFile));
OutputStream out = fileSystem.create(new Path(hdfsPath));
try
{
IOUtils.copyBytes(in, out, conf);
} catch (Exception e)
{
e.printStackTrace();
} finally
{
IOUtils.closeStream(in);
IOUtils.closeStream(out);
}
}
public void upFile(InputStream fileInputStream, String hdfsPath)
throws IOException
{
InputStream in = new BufferedInputStream(fileInputStream);
OutputStream out = fileSystem.create(new Path(hdfsPath));
try
{
IOUtils.copyBytes(in, out, conf);
} catch (Exception e)
{
e.printStackTrace();
} finally
{
// close Stream
IOUtils.closeStream(in);
IOUtils.closeStream(out);
}
}
protected VersionInfo(String component) {
info = new Properties();
String versionInfoFile = component + "-version-info.properties";
InputStream is = null;
try {
is = Thread.currentThread().getContextClassLoader()
.getResourceAsStream(versionInfoFile);
if (is == null) {
throw new IOException("Resource not found");
}
info.load(is);
} catch (IOException ex) {
LogFactory.getLog(getClass()).warn("Could not read '" +
versionInfoFile + "', " + ex.toString(), ex);
} finally {
IOUtils.closeStream(is);
}
}
/**
* Uncompress tags from the InputStream and writes to the destination array.
* @param src Stream where the compressed tags are available
* @param dest Destination array where to write the uncompressed tags
* @param offset Offset in destination where tags to be written
* @param length Length of all tag bytes
* @throws IOException
*/
public void uncompressTags(InputStream src, byte[] dest, int offset, int length)
throws IOException {
int endOffset = offset + length;
while (offset < endOffset) {
byte status = (byte) src.read();
if (status == Dictionary.NOT_IN_DICTIONARY) {
int tagLen = StreamUtils.readRawVarint32(src);
offset = Bytes.putAsShort(dest, offset, tagLen);
IOUtils.readFully(src, dest, offset, tagLen);
tagDict.addEntry(dest, offset, tagLen);
offset += tagLen;
} else {
short dictIdx = StreamUtils.toShort(status, (byte) src.read());
byte[] entry = tagDict.getEntry(dictIdx);
if (entry == null) {
throw new IOException("Missing dictionary entry for index " + dictIdx);
}
offset = Bytes.putAsShort(dest, offset, entry.length);
System.arraycopy(entry, 0, dest, offset, entry.length);
offset += entry.length;
}
}
}
private static byte[] createFile(FileSystem fs, Path name, long length,
short replication, long blocksize) throws IOException {
final FSDataOutputStream out = fs.create(name, false, 4096,
replication, blocksize);
try {
for(long n = length; n > 0; ) {
ran.nextBytes(buffer);
final int w = n < buffer.length? (int)n: buffer.length;
out.write(buffer, 0, w);
md5.update(buffer, 0, w);
n -= w;
}
} finally {
IOUtils.closeStream(out);
}
return md5.digest();
}
@Test
public void testByteBufLineReaderWithoutTerminating() throws IOException {
String path = JavaResourceUtil.getResourceURL("dataset/testLineText.txt").getFile();
File file = new File(path);
String data = FileUtil.readTextFile(file);
ByteBufInputChannel channel = new ByteBufInputChannel(new FileInputStream(file));
ByteBufLineReader reader = new ByteBufLineReader(channel);
long totalRead = 0;
int i = 0;
AtomicInteger bytes = new AtomicInteger();
for(;;){
ByteBuf buf = reader.readLineBuf(bytes);
totalRead += bytes.get();
if(buf == null) break;
i++;
}
IOUtils.cleanup(null, reader);
assertFalse(channel.isOpen());
assertEquals(file.length(), totalRead);
assertEquals(file.length(), reader.readBytes());
assertEquals(data.split("\n").length, i);
}
static private <T extends DeepCompare> void jsonFileMatchesGold(
FileSystem lfs, Path result, Path gold, Class<? extends T> clazz,
String fileDescription) throws IOException {
JsonObjectMapperParser<T> goldParser =
new JsonObjectMapperParser<T>(gold, clazz, new Configuration());
InputStream resultStream = lfs.open(result);
JsonObjectMapperParser<T> resultParser =
new JsonObjectMapperParser<T>(resultStream, clazz);
try {
while (true) {
DeepCompare goldJob = goldParser.getNext();
DeepCompare resultJob = resultParser.getNext();
if ((goldJob == null) || (resultJob == null)) {
assertTrue(goldJob == resultJob);
break;
}
try {
resultJob.deepCompare(goldJob, new TreePath(null, "<root>"));
} catch (DeepInequalityException e) {
String error = e.path.toString();
assertFalse(fileDescription + " mismatches: " + error, true);
}
}
} finally {
IOUtils.cleanup(null, goldParser, resultParser);
}
}
@Override
public void serviceStop() throws Exception {
try {
for (Entry<ApplicationId, HistoryFileWriter> entry : outstandingWriters
.entrySet()) {
entry.getValue().close();
}
outstandingWriters.clear();
} finally {
IOUtils.cleanup(LOG, fs);
}
super.serviceStop();
}
@Test(timeout=60000)
public void testWriteConf() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
System.out.println("Setting conf in: " + System.identityHashCode(conf));
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
FileSystem fs = null;
OutputStream os = null;
try {
fs = cluster.getFileSystem();
Path filePath = new Path("/testWriteConf.xml");
os = fs.create(filePath);
StringBuilder longString = new StringBuilder();
for (int i = 0; i < 100000; i++) {
longString.append("hello");
} // 500KB
conf.set("foobar", longString.toString());
conf.writeXml(os);
os.close();
os = null;
fs.close();
fs = null;
} finally {
IOUtils.cleanup(null, os, fs);
cluster.shutdown();
}
}
/**
* Run through the creation of a log without any faults injected,
* and count how many RPCs are made to each node. This sets the
* bounds for the other test cases, so they can exhaustively explore
* the space of potential failures.
*/
private static long determineMaxIpcNumber() throws Exception {
Configuration conf = new Configuration();
MiniJournalCluster cluster = new MiniJournalCluster.Builder(conf).build();
QuorumJournalManager qjm = null;
long ret;
try {
qjm = createInjectableQJM(cluster);
qjm.format(FAKE_NSINFO);
doWorkload(cluster, qjm);
SortedSet<Integer> ipcCounts = Sets.newTreeSet();
for (AsyncLogger l : qjm.getLoggerSetForTests().getLoggersForTests()) {
InvocationCountingChannel ch = (InvocationCountingChannel)l;
ch.waitForAllPendingCalls();
ipcCounts.add(ch.getRpcCount());
}
// All of the loggers should have sent the same number of RPCs, since there
// were no failures.
assertEquals(1, ipcCounts.size());
ret = ipcCounts.first();
LOG.info("Max IPC count = " + ret);
} finally {
IOUtils.closeStream(qjm);
cluster.shutdown();
}
return ret;
}
/**
* Copy specified file into a temporary file. Then rename the
* temporary file to the original name. This will cause any
* hardlinks to the original file to be removed. The temporary
* files are created in the same directory. The temporary files will
* be recovered (especially on Windows) on datanode restart.
*/
private void unlinkFile(File file, Block b) throws IOException {
File tmpFile = DatanodeUtil.createTmpFile(b, DatanodeUtil.getUnlinkTmpFile(file));
try {
FileInputStream in = new FileInputStream(file);
try {
FileOutputStream out = new FileOutputStream(tmpFile);
try {
IOUtils.copyBytes(in, out, 16*1024);
} finally {
out.close();
}
} finally {
in.close();
}
if (file.length() != tmpFile.length()) {
throw new IOException("Copy of file " + file + " size " + file.length()+
" into file " + tmpFile +
" resulted in a size of " + tmpFile.length());
}
FileUtil.replaceFile(tmpFile, file);
} catch (IOException e) {
boolean done = tmpFile.delete();
if (!done) {
DataNode.LOG.info("detachFile failed to delete temporary file " +
tmpFile);
}
throw e;
}
}
public TezSpillRecord(Path indexFileName, Configuration job, Checksum crc,
String expectedIndexOwner)
throws IOException {
final FileSystem rfs = FileSystem.getLocal(job).getRaw();
final FSDataInputStream in = rfs.open(indexFileName);
try {
final long length = rfs.getFileStatus(indexFileName).getLen();
final int partitions =
(int) length / Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;
final int size = partitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;
buf = ByteBuffer.allocate(size);
if (crc != null) {
crc.reset();
CheckedInputStream chk = new CheckedInputStream(in, crc);
IOUtils.readFully(chk, buf.array(), 0, size);
if (chk.getChecksum().getValue() != in.readLong()) {
throw new ChecksumException("Checksum error reading spill index: " +
indexFileName, -1);
}
} else {
IOUtils.readFully(in, buf.array(), 0, size);
}
entries = buf.asLongBuffer();
} finally {
in.close();
}
}
/**
* Utility function for sending a response.
* @param s socket to write to
* @param opStatus status message to write
* @param timeout send timeout
**/
private void sendResponse(Socket s, short opStatus, long timeout)
throws IOException {
DataOutputStream reply =
new DataOutputStream(NetUtils.getOutputStream(s, timeout));
try {
reply.writeShort(opStatus);
reply.flush();
} finally {
IOUtils.closeStream(reply);
}
}
@AfterClass
public static void shutdown() throws InterruptedException {
if (cluster != null) {
cluster.shutdown();
}
IOUtils.cleanupWithLogger(null, storageContainerLocationClient);
}
/**
* 获取文本内容
* @param hdfsFilePath
* @return
*/
public static String getFileContent(String hdfsFilePath, boolean checkTxt){
FSDataInputStream fsDataInputStream = null;
try {
if(checkTxt){
boolean isPass = false;
for(String suffix : CommonConstants.TEXT_FILE_SUFFIX){
if(hdfsFilePath.endsWith(suffix)){
isPass = true;
}
}
if(!isPass){
throw new RuntimeException("file is not txt type:" + hdfsFilePath);
}
}
FileSystem fileSystem = HadoopUtils.getFileSystem();
fsDataInputStream = fileSystem.open(new Path(hdfsFilePath));
StringWriter stringWriter = new StringWriter();
org.apache.commons.io.IOUtils.copy(fsDataInputStream, stringWriter);
return stringWriter.toString();
} catch (Throwable e) {
LOG.error("download file error:{}", hdfsFilePath, e);
throw new RuntimeException(e);
} finally {
org.apache.commons.io.IOUtils.closeQuietly(fsDataInputStream);
}
}
private static void touchFile(String path) throws Exception {
FileSystem fileSystem = null;
DataOutputStream outputStream = null;
try {
fileSystem = cluster.getFileSystem();
outputStream = fileSystem.create(new Path(path), true, 0);
recordInExpectedValues(path);
}
finally {
IOUtils.cleanup(null, fileSystem, outputStream);
}
}
private void verifyTerminator() throws IOException {
/** The end of the edit log should contain only 0x00 or 0xff bytes.
* If it contains other bytes, the log itself may be corrupt.
* It is important to check this; if we don't, a stray OP_INVALID byte
* could make us stop reading the edit log halfway through, and we'd never
* know that we had lost data.
*/
byte[] buf = new byte[4096];
limiter.clearLimit();
int numRead = -1, idx = 0;
while (true) {
try {
numRead = -1;
idx = 0;
numRead = in.read(buf);
if (numRead == -1) {
return;
}
while (idx < numRead) {
if ((buf[idx] != (byte)0) && (buf[idx] != (byte)-1)) {
throw new IOException("Read extra bytes after " +
"the terminator!");
}
idx++;
}
} finally {
// After reading each group of bytes, we reposition the mark one
// byte before the next group. Similarly, if there is an error, we
// want to reposition the mark one byte before the error
if (numRead != -1) {
in.reset();
IOUtils.skipFully(in, idx);
in.mark(buf.length + 1);
IOUtils.skipFully(in, 1);
}
}
}
}