下面列出了org.apache.hadoop.fs.FSDataInputStream#close ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public void testVLongByte() throws IOException {
FSDataOutputStream out = fs.create(path);
for (int i = Byte.MIN_VALUE; i <= Byte.MAX_VALUE; ++i) {
Utils.writeVLong(out, i);
}
out.close();
Assert.assertEquals("Incorrect encoded size", (1 << Byte.SIZE) + 96, fs
.getFileStatus(
path).getLen());
FSDataInputStream in = fs.open(path);
for (int i = Byte.MIN_VALUE; i <= Byte.MAX_VALUE; ++i) {
long n = Utils.readVLong(in);
Assert.assertEquals(n, i);
}
in.close();
fs.delete(path, false);
}
/** @throws Exception If failed. */
@Test
public void testOpenIfPathIsAlreadyOpened() throws Exception {
Path fsHome = new Path(primaryFsUri);
Path file = new Path(fsHome, "someFile");
FSDataOutputStream os = fs.create(file, EnumSet.noneOf(CreateFlag.class),
Options.CreateOpts.perms(FsPermission.getDefault()));
os.close();
FSDataInputStream is1 = fs.open(file);
FSDataInputStream is2 = fs.open(file);
is1.close();
is2.close();
}
@Test(timeout = SWIFT_TEST_TIMEOUT)
public void testRenameFile() throws Exception {
assumeRenameSupported();
final Path old = new Path("/test/alice/file");
final Path newPath = new Path("/test/bob/file");
fs.mkdirs(newPath.getParent());
final FSDataOutputStream fsDataOutputStream = fs.create(old);
final byte[] message = "Some data".getBytes();
fsDataOutputStream.write(message);
fsDataOutputStream.close();
assertTrue(fs.exists(old));
rename(old, newPath, true, false, true);
final FSDataInputStream bobStream = fs.open(newPath);
final byte[] bytes = new byte[512];
final int read = bobStream.read(bytes);
bobStream.close();
final byte[] buffer = new byte[read];
System.arraycopy(bytes, 0, buffer, 0, read);
assertEquals(new String(message), new String(buffer));
}
/** check if the files have been copied correctly. */
public boolean checkFiles(FileSystem fs, String topdir) throws IOException {
Path root = new Path(topdir);
for (int idx = 0; idx < nFiles; idx++) {
Path fPath = new Path(root, files[idx].getName());
FSDataInputStream in = fs.open(fPath);
byte[] toRead = new byte[files[idx].getSize()];
byte[] toCompare = new byte[files[idx].getSize()];
Random rb = new Random(files[idx].getSeed());
rb.nextBytes(toCompare);
in.readFully(0, toRead);
in.close();
for (int i = 0; i < toRead.length; i++) {
if (toRead[i] != toCompare[i]) {
return false;
}
}
toRead = null;
toCompare = null;
}
return true;
}
private String readString(FSDataInputStream inputStream) throws IOException {
BufferedReader reader = new BufferedReader(new InputStreamReader(
inputStream));
final int BUFFER_SIZE = 1024;
char[] buffer = new char[BUFFER_SIZE];
int count = reader.read(buffer, 0, BUFFER_SIZE);
if (count > BUFFER_SIZE) {
throw new IOException("Exceeded buffer size");
}
inputStream.close();
return new String(buffer, 0, count);
}
public void close() throws IOException {
for (FSDataInputStream stream : streams) {
stream.close();
}
for (ColumnChunkIncPageReader reader : columns.values()) {
reader.close();
}
}
private static boolean checkFiles(FileSystem fs, String topdir, MyFile[] files,
boolean existingOnly) throws IOException {
Path root = new Path(topdir);
for (int idx = 0; idx < files.length; idx++) {
Path fPath = new Path(root, files[idx].getName());
try {
fs.getFileStatus(fPath);
FSDataInputStream in = fs.open(fPath);
byte[] toRead = new byte[files[idx].getSize()];
byte[] toCompare = new byte[files[idx].getSize()];
Random rb = new Random(files[idx].getSeed());
rb.nextBytes(toCompare);
assertEquals("Cannnot read file.", toRead.length, in.read(toRead));
in.close();
for (int i = 0; i < toRead.length; i++) {
if (toRead[i] != toCompare[i]) {
return false;
}
}
toRead = null;
toCompare = null;
}
catch(FileNotFoundException fnfe) {
if (!existingOnly) {
throw fnfe;
}
}
}
return true;
}
/**
* Lifted from TestLocalFileSystem:
* Regression test for HADOOP-9307: BufferedFSInputStream returning
* wrong results after certain sequences of seeks and reads.
*/
@Test
public void testRandomSeeks() throws Throwable {
int limit = getContract().getLimit(TEST_RANDOM_SEEK_COUNT,
DEFAULT_RANDOM_SEEK_COUNT);
describe("Testing " + limit + " random seeks");
int filesize = 10 * 1024;
byte[] buf = dataset(filesize, 0, 255);
Path randomSeekFile = path("testrandomseeks.bin");
createFile(getFileSystem(), randomSeekFile, false, buf);
Random r = new Random();
FSDataInputStream stm = getFileSystem().open(randomSeekFile);
// Record the sequence of seeks and reads which trigger a failure.
int[] seeks = new int[10];
int[] reads = new int[10];
try {
for (int i = 0; i < limit; i++) {
int seekOff = r.nextInt(buf.length);
int toRead = r.nextInt(Math.min(buf.length - seekOff, 32000));
seeks[i % seeks.length] = seekOff;
reads[i % reads.length] = toRead;
verifyRead(stm, buf, seekOff, toRead);
}
} catch (AssertionError afe) {
StringBuilder sb = new StringBuilder();
sb.append("Sequence of actions:\n");
for (int j = 0; j < seeks.length; j++) {
sb.append("seek @ ").append(seeks[j]).append(" ")
.append("read ").append(reads[j]).append("\n");
}
LOG.error(sb.toString());
throw afe;
} finally {
stm.close();
}
}
private void checkFile(FileSystem fileSys, Path name, int repl)
throws IOException {
boolean done = false;
// wait till all full blocks are confirmed by the datanodes.
while (!done) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {}
done = true;
BlockLocation[] locations = fileSys.getFileBlockLocations(
fileSys.getFileStatus(name), 0, fileSize);
if (locations.length < numBlocks) {
done = false;
continue;
}
for (int idx = 0; idx < locations.length; idx++) {
if (locations[idx].getHosts().length < repl) {
done = false;
break;
}
}
}
FSDataInputStream stm = fileSys.open(name);
final byte[] expected;
if (simulatedStorage) {
expected = new byte[numBlocks * blockSize];
for (int i= 0; i < expected.length; i++) {
expected[i] = SimulatedFSDataset.DEFAULT_DATABYTE;
}
} else {
expected = AppendTestUtil.randomBytes(seed, numBlocks*blockSize);
}
// do a sanity check. Read the file
byte[] actual = new byte[numBlocks * blockSize];
System.out.println("Verifying file ");
stm.readFully(0, actual);
stm.close();
checkData(actual, 0, expected, "Read 1");
}
protected int printAndCountHdfsFileDirData(String path, String filePrefix,
boolean print, boolean count) throws IOException {
int recordsCount = 0;
DistributedFileSystem fs = hadoopClusterService.getFileSystem();
RemoteIterator<LocatedFileStatus> files = fs.listFiles(new Path(path),
true);
while (files.hasNext()) {
LocatedFileStatus locatedFileStatus = files.next();
System.out.println("Check:" + locatedFileStatus.getPath());
if (locatedFileStatus.isFile()) {
Path filePath = locatedFileStatus.getPath();
if (filePath.getName().startsWith(filePrefix)) {
FSDataInputStream input = fs.open(filePath);
BufferedReader reader = new BufferedReader(
new InputStreamReader(input));
String body = null;
while ((body = reader.readLine()) != null) {
if (print) {
System.out.println("file is: " + filePath.getName() + "body is:" + body);
}
if (count) {
recordsCount++;
}
}
reader.close();
input.close();
}
}
}
return recordsCount;
}
private void checkFile(FileSystem fileSys, Path name, int repl)
throws IOException {
boolean done = false;
// wait till all full blocks are confirmed by the datanodes.
while (!done) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {}
done = true;
BlockLocation[] locations = fileSys.getFileBlockLocations(
fileSys.getFileStatus(name), 0, fileSize);
if (locations.length < numBlocks) {
done = false;
continue;
}
for (int idx = 0; idx < locations.length; idx++) {
if (locations[idx].getHosts().length < repl) {
done = false;
break;
}
}
}
FSDataInputStream stm = fileSys.open(name);
final byte[] expected;
if (simulatedStorage) {
expected = new byte[numBlocks * blockSize];
for (int i= 0; i < expected.length; i++) {
expected[i] = SimulatedFSDataset.DEFAULT_DATABYTE;
}
} else {
expected = AppendTestUtil.randomBytes(seed, numBlocks*blockSize);
}
// do a sanity check. Read the file
byte[] actual = new byte[numBlocks * blockSize];
stm.readFully(0, actual);
stm.close();
checkData(actual, 0, expected, "Read 1");
}
public ColumnDefinitionFile(String path) throws IOException
{
if (path == null)
{
throw new IOException("A column file path must be specified.");
}
Path p = new Path(path);
FSDataInputStream fdis = p.getFileSystem(HadoopUtils.createConfiguration()).open(p);
load(fdis);
fdis.close();
}
@Test
public void testHdfs() throws Exception {
FileSystem hdfsFsHandle = hdfsLocalCluster.getHdfsFileSystemHandle();
UserGroupInformation.loginUserFromKeytab(kdcLocalCluster.getKrbPrincipalWithRealm("hdfs"), kdcLocalCluster.getKeytabForPrincipal("hdfs"));
assertTrue(UserGroupInformation.isSecurityEnabled());
assertTrue(UserGroupInformation.isLoginKeytabBased());
// Write a file to HDFS containing the test string
FSDataOutputStream writer = hdfsFsHandle.create(
new Path(propertyParser.getProperty(ConfigVars.HDFS_TEST_FILE_KEY)));
writer.writeUTF(propertyParser.getProperty(ConfigVars.HDFS_TEST_STRING_KEY));
writer.close();
// Read the file and compare to test string
FSDataInputStream reader = hdfsFsHandle.open(
new Path(propertyParser.getProperty(ConfigVars.HDFS_TEST_FILE_KEY)));
assertEquals(reader.readUTF(), propertyParser.getProperty(ConfigVars.HDFS_TEST_STRING_KEY));
reader.close();
// Log out
UserGroupInformation.getLoginUser().logoutUserFromKeytab();
UserGroupInformation.reset();
try {
Configuration conf = new Configuration();
UserGroupInformation.setConfiguration(conf);
FileSystem.get(hdfsFsHandle.getUri(), conf).open(
new Path(propertyParser.getProperty(ConfigVars.HDFS_TEST_FILE_KEY)));
fail();
} catch (AccessControlException e) {
LOG.info("Not authenticated!");
}
}
private void writeFile(FileSystem fileSys, Path name) throws IOException {
// create and write a file that contains three blocks of data
DataOutputStream stm = fileSys.create(name, true, 4096, (short)1,
(long)blockSize);
// test empty file open and read
stm.close();
FSDataInputStream in = fileSys.open(name);
byte[] buffer = new byte[(int)(12*blockSize)];
in.readFully(0, buffer, 0, 0);
List<ByteBuffer> rlist = in.readFullyScatterGather(0, 0);
IOException res = null;
try { // read beyond the end of the file
in.readFully(0, buffer, 0, 1);
rlist = in.readFullyScatterGather(0, 1);
} catch (IOException e) {
// should throw an exception
res = e;
}
assertTrue("Error reading beyond file boundary.", res != null);
in.close();
if (!fileSys.delete(name, true))
assertTrue("Cannot delete file", false);
// now create the real file
stm = fileSys.create(name, true, 4096, (short)1, (long)blockSize);
Random rand = new Random(seed);
rand.nextBytes(buffer);
stm.write(buffer);
stm.close();
}
public void testMetaBlocks() throws IOException {
Path mFile = new Path(ROOT, "meta.tfile");
FSDataOutputStream fout = createFSOutput(mFile);
Writer writer = new Writer(fout, minBlockSize, "none", null, conf);
someTestingWithMetaBlock(writer, "none");
writer.close();
fout.close();
FSDataInputStream fin = fs.open(mFile);
Reader reader = new Reader(fin, fs.getFileStatus(mFile).getLen(), conf);
someReadingWithMetaBlock(reader);
fs.delete(mFile, true);
reader.close();
fin.close();
}
/**
* When mark() is used on BufferedInputStream, the request
* size on the checksum file system can be small. However,
* checksum file system currently depends on the request size
* >= bytesPerSum to work properly.
*/
public void testTruncatedInputBug() throws IOException {
final int ioBufSize = 512;
final int fileSize = ioBufSize*4;
int filePos = 0;
Configuration conf = new Configuration();
conf.setInt("io.file.buffer.size", ioBufSize);
FileSystem fileSys = FileSystem.getLocal(conf);
try {
// First create a test input file.
Path testFile = new Path(TEST_ROOT_DIR, "HADOOP-1489");
writeFile(fileSys, testFile, fileSize);
assertTrue(fileSys.exists(testFile));
assertTrue(fileSys.getLength(testFile) == fileSize);
// Now read the file for ioBufSize bytes
FSDataInputStream in = fileSys.open(testFile, ioBufSize);
// seek beyond data buffered by open
filePos += ioBufSize * 2 + (ioBufSize - 10);
in.seek(filePos);
// read 4 more bytes before marking
for (int i = 0; i < 4; ++i) {
if (in.read() == -1) {
break;
}
++filePos;
}
// Now set mark() to trigger the bug
// NOTE: in the fixed code, mark() does nothing (not supported) and
// hence won't trigger this bug.
in.mark(1);
System.out.println("MARKED");
// Try to read the rest
while (filePos < fileSize) {
if (in.read() == -1) {
break;
}
++filePos;
}
in.close();
System.out.println("Read " + filePos + " bytes."
+ " file size=" + fileSize);
assertTrue(filePos == fileSize);
} finally {
try {
fileSys.close();
} catch (Exception e) {
// noop
}
}
}
/**
* This tests user defined hdfs configuration.
* @throws Exception
*/
@Test
public void testUserDefinedConfiguration() throws Exception {
final String outPath = hdfsURI + "/string-non-rolling-with-config";
final int numElements = 20;
Map<String, String> properties = new HashMap<>();
Schema keySchema = Schema.create(Schema.Type.INT);
Schema valueSchema = Schema.create(Schema.Type.STRING);
properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema.toString());
properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema.toString());
properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, String.valueOf(true));
properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, DataFileConstants.SNAPPY_CODEC);
Configuration conf = new Configuration();
conf.set("io.file.buffer.size", "40960");
BucketingSink<Tuple2<Integer, String>> sink = new BucketingSink<Tuple2<Integer, String>>(outPath)
.setFSConfig(conf)
.setWriter(new StreamWriterWithConfigCheck<Integer, String>(properties, "io.file.buffer.size", "40960"))
.setBucketer(new BasePathBucketer<Tuple2<Integer, String>>())
.setPartPrefix(PART_PREFIX)
.setPendingPrefix("")
.setPendingSuffix("");
OneInputStreamOperatorTestHarness<Tuple2<Integer, String>, Object> testHarness =
createTestSink(sink, 1, 0);
testHarness.setProcessingTime(0L);
testHarness.setup();
testHarness.open();
for (int i = 0; i < numElements; i++) {
testHarness.processElement(new StreamRecord<>(Tuple2.of(
i, "message #" + Integer.toString(i)
)));
}
testHarness.close();
GenericData.setStringType(valueSchema, GenericData.StringType.String);
Schema elementSchema = AvroKeyValueSinkWriter.AvroKeyValue.getSchema(keySchema, valueSchema);
FSDataInputStream inStream = dfs.open(new Path(outPath + "/" + PART_PREFIX + "-0-0"));
SpecificDatumReader<GenericRecord> elementReader = new SpecificDatumReader<>(elementSchema);
DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(inStream, elementReader);
for (int i = 0; i < numElements; i++) {
AvroKeyValueSinkWriter.AvroKeyValue<Integer, String> wrappedEntry =
new AvroKeyValueSinkWriter.AvroKeyValue<>(dataFileStream.next());
int key = wrappedEntry.getKey();
Assert.assertEquals(i, key);
String value = wrappedEntry.getValue();
Assert.assertEquals("message #" + i, value);
}
dataFileStream.close();
inStream.close();
}
@Override
void call() throws IOException {
FSDataInputStream in = fs.open(path);
in.close();
}
/**
* Generate splits.
*
* @param job refers to JobContext that is being used to read the configurations of the job that ran
* @param minSize refers to the minimum file block size.
* @param maxSize refers to the maximum file block size.
* @param splits refers to a list of splits that are being generated.
* @param file refers to the FileStatus required to determine block size,length,allocations.
* @throws IOException Signals that an I/O exception has occurred.
*/
private void generateSplits(JobContext job, long minSize, long maxSize,
List<InputSplit> splits, FileStatus file) throws IOException {
Path path = file.getPath();
int numOfRecordsInCurrentSplit = 0;
int numOfRecordsInPreviousSplit = 0;
FileSystem fs = path.getFileSystem(job.getConfiguration());
long length = file.getLen();
BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0,
length);
FSDataInputStream fsin = null ;
if ((length != 0) && isSplitable(job, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;
// checking the occurrences of the record separator in current
// split
recordSeparator = job.getConfiguration()
.get(DataValidationConstants.RECORD_SEPARATOR)
.getBytes();
while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length
- bytesRemaining);
long start = length - bytesRemaining;
long end = start + splitSize;
try{
fsin = fs.open(path);
fsin.seek(start);
long pos = start;
int b = 0;
int bufferPos = 0;
while (true) {
b = fsin.read();
pos = fsin.getPos();
if (b == -1) {
break;}
if (b == recordSeparator[bufferPos]) {
bufferPos++;
if (bufferPos == recordSeparator.length) {
numOfRecordsInCurrentSplit++;
bufferPos = 0;
if (pos > end) {
break;
}
}
} else {
// reset the value of buffer position to zero
bufferPos = 0;
}
}}finally{
if(fsin != null){
fsin.close();
}
}
splits.add(new DataValidationFileSplit(path, start,
splitSize, numOfRecordsInPreviousSplit,
blkLocations[blkIndex].getHosts()));
bytesRemaining -= splitSize;
numOfRecordsInPreviousSplit = numOfRecordsInCurrentSplit;
numOfRecordsInCurrentSplit = 0;
}
addSplitIfBytesRemaining(splits, path, numOfRecordsInPreviousSplit,
length, blkLocations, bytesRemaining);
} else if (length != 0) {
splits.add(new DataValidationFileSplit(path, 0, length,
numOfRecordsInPreviousSplit, blkLocations[0].getHosts()));
} else {
splits.add(new DataValidationFileSplit(path, 0, length,
numOfRecordsInPreviousSplit, new String[0]));
}
}
private void readIndex(boolean useTags) throws IOException {
long fileSize = fs.getFileStatus(path).getLen();
LOG.info("Size of {}: {} compression={}", path, fileSize, compr.toString());
FSDataInputStream istream = fs.open(path);
HFileContext meta = new HFileContextBuilder()
.withHBaseCheckSum(true)
.withIncludesMvcc(includesMemstoreTS)
.withIncludesTags(useTags)
.withCompression(compr)
.build();
ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, path).build();
HFileBlock.FSReader blockReader = new HFileBlock.FSReaderImpl(context, meta,
ByteBuffAllocator.HEAP);
BlockReaderWrapper brw = new BlockReaderWrapper(blockReader);
HFileBlockIndex.BlockIndexReader indexReader =
new HFileBlockIndex.CellBasedKeyBlockIndexReader(
CellComparatorImpl.COMPARATOR, numLevels);
indexReader.readRootIndex(blockReader.blockRange(rootIndexOffset,
fileSize).nextBlockWithBlockType(BlockType.ROOT_INDEX), numRootEntries);
long prevOffset = -1;
int i = 0;
int expectedHitCount = 0;
int expectedMissCount = 0;
LOG.info("Total number of keys: " + keys.size());
for (byte[] key : keys) {
assertTrue(key != null);
assertTrue(indexReader != null);
KeyValue.KeyOnlyKeyValue keyOnlyKey = new KeyValue.KeyOnlyKeyValue(key, 0, key.length);
HFileBlock b =
indexReader.seekToDataBlock(keyOnlyKey, null, true,
true, false, null, brw);
if (PrivateCellUtil.compare(CellComparatorImpl.COMPARATOR, keyOnlyKey, firstKeyInFile, 0,
firstKeyInFile.length) < 0) {
assertTrue(b == null);
++i;
continue;
}
String keyStr = "key #" + i + ", " + Bytes.toStringBinary(key);
assertTrue("seekToDataBlock failed for " + keyStr, b != null);
if (prevOffset == b.getOffset()) {
assertEquals(++expectedHitCount, brw.hitCount);
} else {
LOG.info("First key in a new block: " + keyStr + ", block offset: "
+ b.getOffset() + ")");
assertTrue(b.getOffset() > prevOffset);
assertEquals(++expectedMissCount, brw.missCount);
prevOffset = b.getOffset();
}
++i;
}
istream.close();
}