下面列出了怎么用org.apache.hadoop.fs.FSDataInputStream的API类实例代码及写法,或者点击链接到github查看源代码。
private void checkFile(FileSystem fileSys, Path name) throws IOException {
BlockLocation[] locations = fileSys.getFileBlockLocations(
fileSys.getFileStatus(name), 0, fileSize);
assertEquals("Number of blocks", fileSize, locations.length);
FSDataInputStream stm = fileSys.open(name);
byte[] expected = new byte[fileSize];
if (simulatedStorage) {
for (int i = 0; i < expected.length; ++i) {
expected[i] = SimulatedFSDataset.DEFAULT_DATABYTE;
}
} else {
Random rand = new Random(seed);
rand.nextBytes(expected);
}
// do a sanity check. Read the file
byte[] actual = new byte[fileSize];
stm.readFully(0, actual);
checkAndEraseData(actual, 0, expected, "Read Sanity Test");
stm.close();
}
public BZip2LineRecordReader(Configuration job, FileSplit split)
throws IOException {
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
// open the file and seek to the start of the split
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
fileIn.seek(start);
in = new CBZip2InputStream(fileIn, 9, end);
if (start != 0) {
// skip first line and re-establish "start".
// LineRecordReader.readLine(this.in, null);
readLine(this.in, null);
start = in.getPos();
}
pos = in.getPos();
}
private static void confirmOutput(Path out, JobConf job, int srcs)
throws IOException {
FileSystem fs = out.getFileSystem(job);
FileStatus[] outlist = fs.listStatus(out);
assertEquals(1, outlist.length);
assertTrue(0 < outlist[0].getLen());
FSDataInputStream in = fs.open(outlist[0].getPath());
LineRecordReader rr = new LineRecordReader(in, 0, Integer.MAX_VALUE, job);
LongWritable k = new LongWritable();
Text v = new Text();
int count = 0;
while (rr.next(k, v)) {
String[] vals = v.toString().split("\t");
assertEquals(srcs + 1, vals.length);
int[] ivals = new int[vals.length];
for (int i = 0; i < vals.length; ++i)
ivals[i] = Integer.parseInt(vals[i]);
assertEquals(0, ivals[0] % (srcs * srcs));
for (int i = 1; i < vals.length; ++i) {
assertEquals((ivals[i] - (i - 1)) * srcs, 10 * ivals[0]);
}
++count;
}
assertEquals(4, count);
}
@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
// We want to use RAID logic only on instance of DFS.
if (fs instanceof DistributedFileSystem) {
DistributedFileSystem underlyingDfs = (DistributedFileSystem) fs;
LocatedBlocks lbs =
underlyingDfs.getLocatedBlocks(f, 0L, Long.MAX_VALUE);
if (lbs != null) {
// Use underlying filesystem if the file is under construction.
if (!lbs.isUnderConstruction()) {
// Use underlying filesystem if file length is 0.
final long fileSize = getFileSize(lbs);
if (fileSize > 0) {
return new ExtFSDataInputStream(conf, this, f,
fileSize, getBlockSize(lbs), bufferSize);
}
}
}
}
return fs.open(f, bufferSize);
}
private static void confirmOutput(Path out, JobConf job, int srcs)
throws IOException {
FileSystem fs = out.getFileSystem(job);
FileStatus[] outlist = fs.listStatus(out);
assertEquals(1, outlist.length);
assertTrue(0 < outlist[0].getLen());
FSDataInputStream in = fs.open(outlist[0].getPath());
LineRecordReader rr = new LineRecordReader(in, 0, Integer.MAX_VALUE, job);
LongWritable k = new LongWritable();
Text v = new Text();
int count = 0;
while (rr.next(k, v)) {
String[] vals = v.toString().split("\t");
assertEquals(srcs + 1, vals.length);
int[] ivals = new int[vals.length];
for (int i = 0; i < vals.length; ++i)
ivals[i] = Integer.parseInt(vals[i]);
assertEquals(0, ivals[0] % (srcs * srcs));
for (int i = 1; i < vals.length; ++i) {
assertEquals((ivals[i] - (i - 1)) * srcs, 10 * ivals[0]);
}
++count;
}
assertEquals(4, count);
}
private Schema loadFromUrl(String schemaUrl) throws IOException {
Configuration conf = new Configuration();
Schema.Parser parser = new Schema.Parser();
if (schemaUrl.toLowerCase().startsWith("hdfs://")) {
FileSystem fs = FileSystem.get(conf);
FSDataInputStream input = null;
try {
input = fs.open(new Path(schemaUrl));
return parser.parse(input);
} finally {
if (input != null) {
input.close();
}
}
} else {
InputStream is = null;
try {
is = new URL(schemaUrl).openStream();
return parser.parse(is);
} finally {
if (is != null) {
is.close();
}
}
}
}
public static void check(FileSystem fs, Path p, long length) throws IOException {
int i = -1;
try {
final FileStatus status = fs.getFileStatus(p);
FSDataInputStream in = fs.open(p);
if (in.getWrappedStream() instanceof DFSInputStream) {
long len = ((DFSInputStream)in.getWrappedStream()).getFileLength();
assertEquals(length, len);
} else {
assertEquals(length, status.getLen());
}
for(i++; i < length; i++) {
assertEquals((byte)i, (byte)in.read());
}
i = -(int)length;
assertEquals(-1, in.read()); //EOF
in.close();
} catch(IOException ioe) {
throw new IOException("p=" + p + ", length=" + length + ", i=" + i, ioe);
}
}
/**
* Tests isUnderConstruction() functionality.
*/
public void testIsUnderConstruction() throws Exception {
// Open output file stream.
FSDataOutputStream out = hdfs.create(TEST_FILE, true);
out.writeBytes("test");
// Test file under construction.
FSDataInputStream in1 = hftpFs.open(TEST_FILE);
assertTrue(in1.isUnderConstruction());
in1.close();
// Close output file stream.
out.close();
// Test file not under construction.
FSDataInputStream in2 = hftpFs.open(TEST_FILE);
assertFalse(in2.isUnderConstruction());
in2.close();
}
/** test seek */
static void verifySeek(FileSystem fs, Path p, long offset, long length,
byte[] buf, byte[] expected) throws IOException {
long remaining = length - offset;
long checked = 0;
LOG.info("XXX SEEK: offset=" + offset + ", remaining=" + remaining);
final Ticker t = new Ticker("SEEK", "offset=%d, remaining=%d",
offset, remaining);
final FSDataInputStream in = fs.open(p, 64 << 10);
in.seek(offset);
for(; remaining > 0; ) {
t.tick(checked, "offset=%d, remaining=%d", offset, remaining);
final int n = (int)Math.min(remaining, buf.length);
in.readFully(buf, 0, n);
checkData(offset, remaining, n, buf, expected);
offset += n;
remaining -= n;
checked += n;
}
in.close();
t.end(checked);
}
private void readOnDiskMapOutput(Configuration conf, FileSystem fs, Path path,
List<String> keys, List<String> values) throws IOException {
FSDataInputStream in = CryptoUtils.wrapIfNecessary(conf, fs.open(path));
IFile.Reader<Text, Text> reader = new IFile.Reader<Text, Text>(conf, in,
fs.getFileStatus(path).getLen(), null, null);
DataInputBuffer keyBuff = new DataInputBuffer();
DataInputBuffer valueBuff = new DataInputBuffer();
Text key = new Text();
Text value = new Text();
while (reader.nextRawKey(keyBuff)) {
key.readFields(keyBuff);
keys.add(key.toString());
reader.nextRawValue(valueBuff);
value.readFields(valueBuff);
values.add(value.toString());
}
}
public static TaskSplitMetaInfo[] readSplitMetaInfo(Configuration conf,
FileSystem fs) throws IOException {
FSDataInputStream in = null;
try {
in = getFSDataIS(conf, fs);
final String jobSplitFile = MRJobConfig.JOB_SPLIT;
final String basePath = conf.get(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR, ".");
int numSplits = WritableUtils.readVInt(in); // TODO: check for insane values
JobSplit.TaskSplitMetaInfo[] allSplitMetaInfo = new JobSplit.TaskSplitMetaInfo[numSplits];
for (int i = 0; i < numSplits; i++) {
JobSplit.SplitMetaInfo splitMetaInfo = new JobSplit.SplitMetaInfo();
splitMetaInfo.readFields(in);
JobSplit.TaskSplitIndex splitIndex = new JobSplit.TaskSplitIndex(
new Path(basePath, jobSplitFile)
.toUri().toString(), splitMetaInfo.getStartOffset());
allSplitMetaInfo[i] = new JobSplit.TaskSplitMetaInfo(splitIndex,
splitMetaInfo.getLocations(), splitMetaInfo.getInputDataLength());
}
return allSplitMetaInfo;
} finally {
if (in != null) {
in.close();
}
}
}
/**
* Read from an input stream at least <code>necessaryLen</code> and if possible,
* <code>extraLen</code> also if available. Analogous to
* {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses positional read and
* specifies a number of "extra" bytes that would be desirable but not absolutely necessary to
* read.
* @param buff ByteBuff to read into.
* @param dis the input stream to read from
* @param position the position within the stream from which to start reading
* @param necessaryLen the number of bytes that are absolutely necessary to read
* @param extraLen the number of extra bytes that would be nice to read
* @return true if and only if extraLen is > 0 and reading those extra bytes was successful
* @throws IOException if failed to read the necessary bytes
*/
public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position,
int necessaryLen, int extraLen) throws IOException {
int remain = necessaryLen + extraLen;
byte[] buf = new byte[remain];
int bytesRead = 0;
while (bytesRead < necessaryLen) {
int ret = dis.read(position + bytesRead, buf, bytesRead, remain);
if (ret < 0) {
throw new IOException("Premature EOF from inputStream (positional read returned " + ret
+ ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen
+ " extra bytes, successfully read " + bytesRead);
}
bytesRead += ret;
remain -= ret;
}
// Copy the bytes from on-heap bytes[] to ByteBuffer[] now, and after resolving HDFS-3246, we
// will read the bytes to ByteBuffer[] directly without allocating any on-heap byte[].
// TODO I keep the bytes copy here, because I want to abstract the ByteBuffer[]
// preadWithExtra method for the upper layer, only need to refactor this method if the
// ByteBuffer pread is OK.
copyToByteBuff(buf, 0, bytesRead, buff);
return (extraLen > 0) && (bytesRead == necessaryLen + extraLen);
}
@Override
protected Spec readSpecFromFile(Path path) throws IOException {
if (path.getName().contains("fail")) {
throw new IOException("Mean to fail in the test");
} else if (path.getName().contains("serDeFail")) {
// Simulate the way that a serDe exception
FSDataInputStream fis = fs.open(path);
SerializationUtils.deserialize(ByteStreams.toByteArray(fis));
// This line should never be reached since we generate SerDe Exception on purpose.
Assert.assertTrue(false);
return null;
}
else return initFlowSpec(Files.createTempDir().getAbsolutePath());
}
public RBlockState(Algorithm compressionAlgo, FSDataInputStream fsin,
BlockRegion region, Configuration conf) throws IOException {
this.compressAlgo = compressionAlgo;
this.region = region;
this.decompressor = compressionAlgo.getDecompressor();
try {
this.in =
compressAlgo
.createDecompressionStream(new BoundedRangeFileInputStream(
fsin, this.region.getOffset(), this.region
.getCompressedSize()), decompressor, TFile
.getFSInputBufferSize(conf));
} catch (IOException e) {
compressAlgo.returnDecompressor(decompressor);
throw e;
}
}
@Test
public void testDirectReadFullyLargeBuffer() throws Exception {
final ByteBuffer readBuffer = ByteBuffer.allocateDirect(20);
FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream());
final MockBufferReader reader = new MockBufferReader(hadoopStream);
TestUtils.assertThrows("Should throw EOFException",
EOFException.class, () -> {
H2SeekableInputStream.readFully(reader, readBuffer);
return null;
});
// NOTE: This behavior differs from readFullyHeapBuffer because direct uses
// several read operations that will read up to the end of the input. This
// is a correct value because the bytes in the buffer are valid. This
// behavior can't be implemented for the heap buffer without using the read
// method instead of the readFully method on the underlying
// FSDataInputStream.
Assert.assertEquals(10, readBuffer.position());
Assert.assertEquals(20, readBuffer.limit());
}
/**
* Send a partial content response with the given range. If there are
* no satisfiable ranges, or if multiple ranges are requested, which
* is unsupported, respond with range not satisfiable.
*
* @param in stream to read from
* @param out stream to write to
* @param response http response to use
* @param contentLength for the response header
* @param ranges to write to respond with
* @throws IOException on error sending the response
*/
static void sendPartialData(FSDataInputStream in,
OutputStream out,
HttpServletResponse response,
long contentLength,
List<InclusiveByteRange> ranges)
throws IOException {
if (ranges == null || ranges.size() != 1) {
response.setContentLength(0);
response.setStatus(HttpServletResponse.SC_REQUESTED_RANGE_NOT_SATISFIABLE);
response.setHeader("Content-Range",
InclusiveByteRange.to416HeaderRangeString(contentLength));
} else {
InclusiveByteRange singleSatisfiableRange = ranges.get(0);
long singleLength = singleSatisfiableRange.getSize(contentLength);
response.setStatus(HttpServletResponse.SC_PARTIAL_CONTENT);
response.setHeader("Content-Range",
singleSatisfiableRange.toHeaderRangeString(contentLength));
copyFromOffset(in, out,
singleSatisfiableRange.getFirst(contentLength),
singleLength);
}
}
static void checkFullFile(FileSystem fs, Path name) throws IOException {
FileStatus stat = fs.getFileStatus(name);
BlockLocation[] locations = fs.getFileBlockLocations(stat, 0,
fileSize);
for (int idx = 0; idx < locations.length; idx++) {
String[] hosts = locations[idx].getNames();
for (int i = 0; i < hosts.length; i++) {
System.out.print( hosts[i] + " ");
}
System.out.println(" off " + locations[idx].getOffset() +
" len " + locations[idx].getLength());
}
byte[] expected = AppendTestUtil.randomBytes(seed, fileSize);
FSDataInputStream stm = fs.open(name);
byte[] actual = new byte[fileSize];
stm.readFully(0, actual);
checkData(actual, 0, expected, "Read 2");
stm.close();
}
public void seekTFile() throws IOException {
int miss = 0;
long totalBytes = 0;
FSDataInputStream fsdis = fs.open(path);
Reader reader =
new Reader(fsdis, fs.getFileStatus(path).getLen(), conf);
KeySampler kSampler =
new KeySampler(rng, reader.getFirstKey(), reader.getLastKey(),
keyLenGen);
Scanner scanner = reader.createScanner();
BytesWritable key = new BytesWritable();
BytesWritable val = new BytesWritable();
timer.reset();
timer.start();
for (int i = 0; i < options.seekCount; ++i) {
kSampler.next(key);
scanner.lowerBound(key.get(), 0, key.getSize());
if (!scanner.atEnd()) {
scanner.entry().get(key, val);
totalBytes += key.getSize();
totalBytes += val.getSize();
}
else {
++miss;
}
}
timer.stop();
double duration = (double) timer.read() / 1000; // in us.
System.out.printf(
"time: %s...avg seek: %s...%d hit...%d miss...avg I/O size: %.2fKB\n",
timer.toString(), NanoTimer.nanoTimeToString(timer.read()
/ options.seekCount), options.seekCount - miss, miss,
(double) totalBytes / 1024 / (options.seekCount - miss));
}
static long readHdfsFile(FileSystem fs, Path p, long length,
Boolean dropBehind) throws Exception {
FSDataInputStream fis = null;
long totalRead = 0;
try {
fis = fs.open(p);
if (dropBehind != null) {
fis.setDropBehind(dropBehind);
}
byte buf[] = new byte[8196];
while (length > 0) {
int amt = (length > buf.length) ? buf.length : (int)length;
int ret = fis.read(buf, 0, amt);
if (ret == -1) {
return totalRead;
}
totalRead += ret;
length -= ret;
}
} catch (IOException e) {
LOG.error("ioexception", e);
} finally {
if (fis != null) {
fis.close();
}
}
throw new RuntimeException("unreachable");
}
private long directReadRequest(int index)
throws Exception
{
try (FSDataInputStream inputStream = remoteFileSystem.open(new Path(filePath))) {
directReadChain = new DirectReadRequestChain(inputStream);
for (ReadRequest readRequest : readRequests.subList(index, readRequests.size())) {
directReadChain.addReadRequest(readRequest);
}
directReadChain.lock();
directRead = directReadChain.call();
directReadChain = null;
}
return (totalRead + directRead);
}
@Test
public void testEncryptedAppendRequiringBlockTransfer() throws IOException {
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
setEncryptionConfigKeys(conf);
// start up 4 DNs
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
FileSystem fs = getFileSystem(conf);
// Create a file with replication 3, so its block is on 3 / 4 DNs.
writeTestDataToFile(fs);
assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
// Shut down one of the DNs holding a block replica.
FSDataInputStream in = fs.open(TEST_PATH);
List<LocatedBlock> locatedBlocks = DFSTestUtil.getAllBlocks(in);
in.close();
assertEquals(1, locatedBlocks.size());
assertEquals(3, locatedBlocks.get(0).getLocations().length);
DataNode dn = cluster.getDataNode(locatedBlocks.get(0).getLocations()[0].getIpcPort());
dn.shutdown();
// Reopen the file for append, which will need to add another DN to the
// pipeline and in doing so trigger a block transfer.
writeTestDataToFile(fs);
assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH));
fs.close();
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
FSInputStream newFSDataInputStreamWrapper(Path f, final FSDataInputStream is) throws IOException {
try {
return (operatorStats != null) ?
com.dremio.exec.store.hive.exec.dfs.FSDataInputStreamWithStatsWrapper.of(is, operatorStats, true, f.toString()) :
com.dremio.exec.store.hive.exec.dfs.FSDataInputStreamWrapper.of(is);
} catch(FSError e) {
throw propagateFSError(e);
}
}
/**
* Test opening many files via TCP (not short-circuit).
*
* This is practical when using unbuffer, because it reduces the number of
* sockets and amount of memory that we use.
*/
@Test
public void testOpenManyFilesViaTcp() throws Exception {
final int NUM_OPENS = 500;
Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false);
MiniDFSCluster cluster = null;
FSDataInputStream[] streams = new FSDataInputStream[NUM_OPENS];
try {
cluster = new MiniDFSCluster.Builder(conf).build();
DistributedFileSystem dfs = cluster.getFileSystem();
final Path TEST_PATH = new Path("/testFile");
DFSTestUtil.createFile(dfs, TEST_PATH, 131072, (short)1, 1);
for (int i = 0; i < NUM_OPENS; i++) {
streams[i] = dfs.open(TEST_PATH);
LOG.info("opening file " + i + "...");
Assert.assertTrue(-1 != streams[i].read());
streams[i].unbuffer();
}
} finally {
for (FSDataInputStream stream : streams) {
IOUtils.cleanup(null, stream);
}
if (cluster != null) {
cluster.shutdown();
}
}
}
public void map(LongWritable key, HarEntry value,
OutputCollector<IntWritable, Text> out,
Reporter reporter) throws IOException {
Path relPath = new Path(value.path);
int hash = HarFileSystem.getHarHash(relPath);
String towrite = null;
Path srcPath = realPath(relPath, rootPath);
long startPos = partStream.getPos();
FileSystem srcFs = srcPath.getFileSystem(conf);
FileStatus srcStatus = srcFs.getFileStatus(srcPath);
String propStr = encodeProperties(srcStatus);
if (value.isDir()) {
towrite = encodeName(relPath.toString())
+ " dir " + propStr + " 0 0 ";
StringBuffer sbuff = new StringBuffer();
sbuff.append(towrite);
for (String child: value.children) {
sbuff.append(encodeName(child) + " ");
}
towrite = sbuff.toString();
//reading directories is also progress
reporter.progress();
}
else {
FSDataInputStream input = srcFs.open(srcStatus.getPath());
reporter.setStatus("Copying file " + srcStatus.getPath() +
" to archive.");
copyData(srcStatus.getPath(), input, partStream, reporter);
towrite = encodeName(relPath.toString())
+ " file " + partname + " " + startPos
+ " " + srcStatus.getLen() + " " + propStr + " ";
}
out.collect(new IntWritable(hash), new Text(towrite));
}
protected InputStream getParityFileInput(int locationIndex, Path parityFile,
FileSystem parityFs, FileStatus parityStat, long offsetInBlock)
throws IOException {
// Dealing with a parity file here.
int parityBlockIdx = (int)(codec.parityLength * stripeStartIdx + locationIndex);
long offset = parityStat.getBlockSize() * parityBlockIdx + offsetInBlock;
assert(offset < parityStat.getLen());
LOG.info("Opening " + parityFile + ":" + offset +
" for location " + locationIndex);
FSDataInputStream s = parityFs.open(
parityFile, conf.getInt("io.file.buffer.size", 64 * 1024));
s.seek(offset);
return s;
}
static public String[] readOutput(FileSystem fs, String fileName) throws IOException {
if(Util.WINDOWS){
fileName = fileName.replace('\\','/');
}
Path path = new Path(fileName);
if(!fs.exists(path)) {
throw new IOException("Path " + fileName + " does not exist on the FileSystem");
}
FileStatus fileStatus = fs.getFileStatus(path);
FileStatus[] files;
if (fileStatus.isDir()) {
files = fs.listStatus(path, new PathFilter() {
@Override
public boolean accept(Path p) {
return !p.getName().startsWith("_");
}
});
} else {
files = new FileStatus[] { fileStatus };
}
List<String> result = new ArrayList<String>();
for (FileStatus f : files) {
FSDataInputStream stream = fs.open(f.getPath());
BufferedReader br = new BufferedReader(new InputStreamReader(stream, "UTF-8"));
String line;
while ((line = br.readLine()) != null) {
result.add(line);
}
br.close();
}
return result.toArray(new String[result.size()]);
}
static public void copyFromClusterToLocal(MiniGenericCluster cluster,
String fileNameOnCluster, String localFileName) throws IOException {
if(Util.WINDOWS){
fileNameOnCluster = fileNameOnCluster.replace('\\','/');
localFileName = localFileName.replace('\\','/');
}
File parent = new File(localFileName).getParentFile();
if (!parent.exists()) {
parent.mkdirs();
}
PrintWriter writer = new PrintWriter(new FileWriter(localFileName));
FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
cluster.getProperties()));
if(!fs.exists(new Path(fileNameOnCluster))) {
throw new IOException("File " + fileNameOnCluster + " does not exists on the minicluster");
}
String line = null;
FileStatus fst = fs.getFileStatus(new Path(fileNameOnCluster));
if(fst.isDir()) {
throw new IOException("Only files from cluster can be copied locally," +
" " + fileNameOnCluster + " is a directory");
}
FSDataInputStream stream = fs.open(new Path(fileNameOnCluster));
BufferedReader reader = new BufferedReader(new InputStreamReader(stream));
while( (line = reader.readLine()) != null) {
writer.println(line);
}
reader.close();
writer.close();
}
@Test(timeout = SWIFT_TEST_TIMEOUT)
public void testOpenNonExistingFile() throws IOException {
final Path p = new Path("/test/testOpenNonExistingFile");
//open it as a file, should get FileNotFoundException
try {
final FSDataInputStream in = fs.open(p);
in.close();
fail("didn't expect to get here");
} catch (FileNotFoundException fnfe) {
LOG.debug("Expected: " + fnfe, fnfe);
}
}
@Test
public void testBothOldAndNewShortCircuitConfigured() throws Exception {
final short REPL_FACTOR = 1;
final int FILE_LENGTH = 512;
Assume.assumeTrue(null == DomainSocket.getLoadingFailureReason());
TemporarySocketDirectory socketDir = new TemporarySocketDirectory();
HdfsConfiguration conf = getConfiguration(socketDir);
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
socketDir.close();
FileSystem fs = cluster.getFileSystem();
Path path = new Path("/foo");
byte orig[] = new byte[FILE_LENGTH];
for (int i = 0; i < orig.length; i++) {
orig[i] = (byte)(i%10);
}
FSDataOutputStream fos = fs.create(path, (short)1);
fos.write(orig);
fos.close();
DFSTestUtil.waitReplication(fs, path, REPL_FACTOR);
FSDataInputStream fis = cluster.getFileSystem().open(path);
byte buf[] = new byte[FILE_LENGTH];
IOUtils.readFully(fis, buf, 0, FILE_LENGTH);
fis.close();
Assert.assertArrayEquals(orig, buf);
Arrays.equals(orig, buf);
cluster.shutdown();
}
private String readFileFromHdfs(String filename) throws Exception {
FileSystem hdfsFsHandle = dfsCluster.getHdfsFileSystemHandle();
FSDataInputStream reader = hdfsFsHandle.open(new Path(filename));
String output = reader.readUTF();
reader.close();
hdfsFsHandle.close();
return output;
}