下面列出了org.apache.hadoop.fs.FSDataInputStream#seek ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void readTestFile(String testFileName) throws Exception {
Path filePath = new Path(testFileName);
FSDataInputStream istream = dfs.open(filePath, 10240);
ByteBuffer buf = ByteBuffer.allocate(10240);
int count = 0;
try {
while (istream.read(buf) > 0) {
count += 1;
buf.clear();
istream.seek(istream.getPos() + 5);
}
} catch (IOException ioe) {
// Ignore this it's probably a seek after eof.
} finally {
istream.close();
}
}
private void seekReadFile(FileSystem fileSys, Path name) throws IOException {
FSDataInputStream stm = fileSys.open(name, 4096);
byte[] expected = new byte[ONEMB];
Random rand = new Random(seed);
rand.nextBytes(expected);
// First read 128 bytes to set count in BufferedInputStream
byte[] actual = new byte[128];
stm.read(actual, 0, actual.length);
// Now read a byte array that is bigger than the internal buffer
actual = new byte[100000];
IOUtils.readFully(stm, actual, 0, actual.length);
checkAndEraseData(actual, 128, expected, "First Read Test");
// now do a small seek, within the range that is already read
stm.seek(96036); // 4 byte seek
actual = new byte[128];
IOUtils.readFully(stm, actual, 0, actual.length);
checkAndEraseData(actual, 96036, expected, "Seek Bug");
// all done
stm.close();
}
/**
* Scenario: Read an under construction file using hftp.
*
* Expected: Hftp should be able to read the latest byte after the file
* has been hdfsSynced (but not yet closed).
*
* @throws IOException
*/
public void testConcurrentRead() throws IOException {
// Write a test file.
FSDataOutputStream out = hdfs.create(TEST_FILE, true);
out.writeBytes("123");
out.sync(); // sync but not close
// Try read using hftp.
FSDataInputStream in = hftpFs.open(TEST_FILE);
assertEquals('1', in.read());
assertEquals('2', in.read());
assertEquals('3', in.read());
in.close();
// Try seek and read.
in = hftpFs.open(TEST_FILE);
in.seek(2);
assertEquals('3', in.read());
in.close();
out.close();
}
/**
* Test (expected to throw IOE) for <code>FSDataInpuStream#seek</code>
* when the position argument is larger than the file size.
*/
@Test (expected=IOException.class)
public void testSeekPastFileSize() throws IOException {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
FileSystem fs = cluster.getFileSystem();
try {
Path seekFile = new Path("seekboundaries.dat");
DFSTestUtil.createFile(
fs,
seekFile,
ONEMB,
fs.getDefaultReplication(seekFile),
seed);
FSDataInputStream stream = fs.open(seekFile);
// Perform "safe seek" (expected to pass)
stream.seek(65536);
assertEquals(65536, stream.getPos());
// expect IOE for this call
stream.seek(ONEMB + ONEMB + ONEMB);
} finally {
fs.close();
cluster.shutdown();
}
}
/**
* Constructor
*
* @param fin
* FS input stream.
* @param fileLength
* Length of the corresponding file
* @throws IOException
*/
public Reader(FSDataInputStream fin, long fileLength, Configuration conf)
throws IOException {
this.in = fin;
this.conf = conf;
// move the cursor to the beginning of the tail, containing: offset to the
// meta block index, version and magic
fin.seek(fileLength - Magic.size() - Version.size() - Long.SIZE
/ Byte.SIZE);
long offsetIndexMeta = fin.readLong();
version = new Version(fin);
Magic.readAndVerify(fin);
if (!version.compatibleWith(BCFile.API_VERSION)) {
throw new RuntimeException("Incompatible BCFile fileBCFileVersion.");
}
// read meta index
fin.seek(offsetIndexMeta);
metaIndex = new MetaIndex(fin);
// read data:BCFile.index, the data block index
BlockReader blockR = getMetaBlock(DataIndex.BLOCK_NAME);
try {
dataIndex = new DataIndex(blockR);
} finally {
blockR.close();
}
}
public LineRecordReader(Configuration job,
FileSplit split) throws IOException {
this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
Integer.MAX_VALUE);
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
compressionCodecs = new CompressionCodecFactory(job);
final CompressionCodec codec = compressionCodecs.getCodec(file);
// open the file and seek to the start of the split
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
boolean skipFirstLine = false;
if (codec != null) {
in = new LineReader(codec.createInputStream(fileIn), job);
end = Long.MAX_VALUE;
} else {
if (start != 0) {
skipFirstLine = true;
--start;
fileIn.seek(start);
}
in = new LineReader(fileIn, job);
}
if (skipFirstLine) { // skip first line and re-establish "start".
start += in.readLine(new Text(), 0,
(int)Math.min((long)Integer.MAX_VALUE, end - start));
}
this.pos = start;
}
private static void expectSeekIOE(FSDataInputStream fsdis, long seekPos, String message) {
try {
fsdis.seek(seekPos);
assertTrue(message + " (Position = " + fsdis.getPos() + ")", false);
} catch (IOException ioe) {
// okay
}
}
public FSInputGeneralColumnDataReader(FSDataInputStream fsInputStream, int dataStartOffset, int dataLength)
throws IOException {
this.fsInputStream = fsInputStream;
fsInputStream.seek(dataStartOffset + dataLength - 4L);
this.numOfVals = fsInputStream.readInt();
fsInputStream.seek(dataStartOffset);
}
/**
* Constructor
*
* @param fin
* FS input stream.
* @param fileLength
* Length of the corresponding file
* @throws IOException
*/
public Reader(FSDataInputStream fin, long fileLength, Configuration conf)
throws IOException {
this.in = fin;
this.conf = conf;
// move the cursor to the beginning of the tail, containing: offset to the
// meta block index, version and magic
fin.seek(fileLength - Magic.size() - Version.size() - Long.SIZE
/ Byte.SIZE);
long offsetIndexMeta = fin.readLong();
version = new Version(fin);
Magic.readAndVerify(fin);
if (!version.compatibleWith(BCFile.API_VERSION)) {
throw new RuntimeException("Incompatible BCFile fileBCFileVersion.");
}
// read meta index
fin.seek(offsetIndexMeta);
metaIndex = new MetaIndex(fin);
// read data:BCFile.index, the data block index
BlockReader blockR = getMetaBlock(DataIndex.BLOCK_NAME);
try {
dataIndex = new DataIndex(blockR);
} finally {
blockR.close();
}
}
@Test(timeout=120000)
public void testSeekAfterSetDropBehind() throws Exception {
// start a cluster
LOG.info("testSeekAfterSetDropBehind");
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = null;
String TEST_PATH = "/test";
int TEST_PATH_LEN = MAX_TEST_FILE_LEN;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
.build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
createHdfsFile(fs, new Path(TEST_PATH), TEST_PATH_LEN, false);
// verify that we can seek after setDropBehind
FSDataInputStream fis = fs.open(new Path(TEST_PATH));
try {
Assert.assertTrue(fis.read() != -1); // create BlockReader
fis.setDropBehind(false); // clear BlockReader
fis.seek(2); // seek
} finally {
fis.close();
}
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
@SuppressWarnings("unchecked")
private <T> T getSplitDetails(Path file, long offset)
throws IOException {
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream inFile = fs.open(file);
inFile.seek(offset);
String className = StringInterner.weakIntern(Text.readString(inFile));
Class<T> cls;
try {
cls = (Class<T>) conf.getClassByName(className);
} catch (ClassNotFoundException ce) {
IOException wrap = new IOException("Split class " + className +
" not found");
wrap.initCause(ce);
throw wrap;
}
SerializationFactory factory = new SerializationFactory(conf);
Deserializer<T> deserializer =
(Deserializer<T>) factory.getDeserializer(cls);
deserializer.open(inFile);
T split = deserializer.deserialize(null);
long pos = inFile.getPos();
getCounters().findCounter(
TaskCounter.SPLIT_RAW_BYTES).increment(pos - offset);
inFile.close();
return split;
}
/**
* Constructor
*
* @param fin
* FS input stream.
* @param fileLength
* Length of the corresponding file
* @throws IOException
*/
public Reader(FSDataInputStream fin, long fileLength, Configuration conf)
throws IOException {
this.in = fin;
this.conf = conf;
// A reader buffer to read the block
baos = new ByteArrayOutputStream(DTFile.getFSInputBufferSize(conf) * 2);
this.cacheKeys = new ArrayList<String>();
// move the cursor to the beginning of the tail, containing: offset to the
// meta block index, version and magic
fin.seek(fileLength - Magic.size() - Version.size() - Long.SIZE
/ Byte.SIZE);
long offsetIndexMeta = fin.readLong();
version = new Version(fin);
Magic.readAndVerify(fin);
if (!version.compatibleWith(DTBCFile.API_VERSION)) {
throw new RuntimeException("Incompatible BCFile fileBCFileVersion.");
}
// read meta index
fin.seek(offsetIndexMeta);
metaIndex = new MetaIndex(fin);
// read data:BCFile.index, the data block index
BlockReader blockR = getMetaBlock(DataIndex.BLOCK_NAME);
try {
dataIndex = new DataIndex(blockR);
} finally {
blockR.close();
}
}
@Override
public InputStream[] getNextStripeInputs() throws IOException {
InputStream[] blocks = new InputStream[codec.stripeLength];
try {
for (int i = 0; i < codec.stripeLength; i++) {
long seekOffset = stripeStartOffset + i * blockSize;
if (seekOffset < srcSize) {
FSDataInputStream in = fs.open(srcFile, bufferSize);
in.seek(seekOffset);
LOG.info("Opening stream at " + srcFile + ":" + seekOffset);
blocks[i] = in;
} else {
LOG.info("Using zeros at offset " + seekOffset);
// We have no src data at this offset.
blocks[i] = new RaidUtils.ZeroInputStream(
seekOffset + blockSize);
}
}
stripeStartOffset += blockSize * codec.stripeLength;
return blocks;
} catch (IOException e) {
// If there is an error during opening a stream, close the previously
// opened streams and re-throw.
RaidUtils.closeStreams(blocks);
throw e;
}
}
private void readAndCompare(FSDataInputStream in, int position, int len)
throws IOException {
byte[] b = new byte[len];
in.seek(position);
IOUtils.readFully(in, b, 0, b.length);
for (int i = 0; i < b.length; i++) {
assertEquals(expected[position + i], b[i]);
}
}
public static void read(FSDataInputStream file, long fileOffset, byte[] array, int arrayOffset,
int length) throws IOException {
file.seek(fileOffset);
file.readFully(array, arrayOffset, length);
}
public void readSequentialDirect() throws Exception {
System.out.println("reading sequential file in direct mode " + path);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
FileStatus status = fs.getFileStatus(path);
FSDataInputStream instream = fs.open(path);
ByteBuffer buf = ByteBuffer.allocateDirect(size);
buf.clear();
double sumbytes = 0;
double ops = 0;
System.out.println("file capacity " + status.getLen());
System.out.println("read size " + size);
System.out.println("operations " + loop);
long start = System.currentTimeMillis();
while (ops < loop) {
buf.clear();
double ret = (double) instream.read(buf);
if (ret > 0) {
sumbytes = sumbytes + ret;
ops = ops + 1.0;
} else {
ops = ops + 1.0;
if (instream.getPos() == 0){
break;
} else {
instream.seek(0);
}
}
}
long end = System.currentTimeMillis();
double executionTime = ((double) (end - start)) / 1000.0;
double throughput = 0.0;
double latency = 0.0;
double sumbits = sumbytes * 8.0;
if (executionTime > 0) {
throughput = sumbits / executionTime / 1024.0 / 1024.0;
latency = 1000000.0 * executionTime / ops;
}
System.out.println("execution time " + executionTime);
System.out.println("ops " + ops);
System.out.println("sumbytes " + sumbytes);
System.out.println("throughput " + throughput);
System.out.println("latency " + latency);
System.out.println("closing stream");
instream.close();
fs.close();
}
public void readSequentialHeap() throws Exception {
System.out.println("reading sequential file in heap mode " + path);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
FileStatus status = fs.getFileStatus(path);
FSDataInputStream instream = fs.open(path);
byte[] buf = new byte[size];
double sumbytes = 0;
double ops = 0;
System.out.println("file capacity " + status.getLen());
System.out.println("read size " + size);
System.out.println("operations " + loop);
long start = System.currentTimeMillis();
while (ops < loop) {
double ret = (double) this.read(instream, buf);
if (ret > 0) {
sumbytes = sumbytes + ret;
ops = ops + 1.0;
} else {
ops = ops + 1.0;
if (instream.getPos() == 0){
break;
} else {
instream.seek(0);
}
}
}
long end = System.currentTimeMillis();
double executionTime = ((double) (end - start)) / 1000.0;
double throughput = 0.0;
double latency = 0.0;
double sumbits = sumbytes * 8.0;
if (executionTime > 0) {
throughput = sumbits / executionTime / 1024.0 / 1024.0;
latency = 1000000.0 * executionTime / ops;
}
System.out.println("execution time " + executionTime);
System.out.println("ops " + ops);
System.out.println("sumbytes " + sumbytes);
System.out.println("throughput " + throughput);
System.out.println("latency " + latency);
System.out.println("closing stream");
instream.close();
fs.close();
}
public void initialize(InputSplit split, Configuration conf) throws IOException,
InterruptedException
{
@SuppressWarnings("unchecked")
RubixInputSplit<K, V> rsplit = (RubixInputSplit<K, V>) split;
SerializationFactory serializationFactory = new SerializationFactory(conf);
switch (rsplit.getBlockSerializationType())
{
case DEFAULT:
valueDeserializer =
serializationFactory.getDeserializer(rsplit.getValueClass());
break;
case COMPACT:
BlockSchema schema = rsplit.getSchema();
valueDeserializer = new CompactDeserializer<V>(schema);
break;
}
key = rsplit.getKey();
// store the blockid and partition key in the conf
conf.setLong("MY_BLOCK_ID", rsplit.getBlockId());
conf.setLong("MY_NUM_RECORDS", rsplit.getNumRecords());
ByteArrayOutputStream tmpOut = new ByteArrayOutputStream();
((Tuple) key).write(new DataOutputStream(tmpOut));
String keySerialized = SerializerUtils.serializeToString(tmpOut.toByteArray());
conf.set("MY_PARTITION_KEY", keySerialized);
Path path = rsplit.getFilename();
offset = rsplit.getOffset();
length = rsplit.getLength();
FileSystem fs = path.getFileSystem(conf);
FSDataInputStream fsin = fs.open(path);
fsin.seek(offset);
blockInputStream = new BlockInputStream(fsin, length);
in = blockInputStream;
CompressionCodec codec = new CompressionCodecFactory(conf).getCodec(path);
if (codec != null)
{
print.f("codec is not null and it is %s", codec.getClass().toString());
in = codec.createInputStream(in);
}
else
{
print.f("codec is null");
}
valueDeserializer.open(in);
}
/**
* Generate splits.
*
* @param job refers to JobContext that is being used to read the configurations of the job that ran
* @param minSize refers to the minimum file block size.
* @param maxSize refers to the maximum file block size.
* @param splits refers to a list of splits that are being generated.
* @param file refers to the FileStatus required to determine block size,length,allocations.
* @throws IOException Signals that an I/O exception has occurred.
*/
private void generateSplits(JobContext job, long minSize, long maxSize,
List<InputSplit> splits, FileStatus file) throws IOException {
Path path = file.getPath();
int numOfRecordsInCurrentSplit = 0;
int numOfRecordsInPreviousSplit = 0;
FileSystem fs = path.getFileSystem(job.getConfiguration());
long length = file.getLen();
BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0,
length);
FSDataInputStream fsin = null ;
if ((length != 0) && isSplitable(job, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;
// checking the occurrences of the record separator in current
// split
recordSeparator = job.getConfiguration()
.get(DataValidationConstants.RECORD_SEPARATOR)
.getBytes();
while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length
- bytesRemaining);
long start = length - bytesRemaining;
long end = start + splitSize;
try{
fsin = fs.open(path);
fsin.seek(start);
long pos = start;
int b = 0;
int bufferPos = 0;
while (true) {
b = fsin.read();
pos = fsin.getPos();
if (b == -1) {
break;}
if (b == recordSeparator[bufferPos]) {
bufferPos++;
if (bufferPos == recordSeparator.length) {
numOfRecordsInCurrentSplit++;
bufferPos = 0;
if (pos > end) {
break;
}
}
} else {
// reset the value of buffer position to zero
bufferPos = 0;
}
}}finally{
if(fsin != null){
fsin.close();
}
}
splits.add(new DataValidationFileSplit(path, start,
splitSize, numOfRecordsInPreviousSplit,
blkLocations[blkIndex].getHosts()));
bytesRemaining -= splitSize;
numOfRecordsInPreviousSplit = numOfRecordsInCurrentSplit;
numOfRecordsInCurrentSplit = 0;
}
addSplitIfBytesRemaining(splits, path, numOfRecordsInPreviousSplit,
length, blkLocations, bytesRemaining);
} else if (length != 0) {
splits.add(new DataValidationFileSplit(path, 0, length,
numOfRecordsInPreviousSplit, blkLocations[0].getHosts()));
} else {
splits.add(new DataValidationFileSplit(path, 0, length,
numOfRecordsInPreviousSplit, new String[0]));
}
}
@SuppressWarnings("unchecked")
public List<KeyData<K>> getKeyData() throws IOException,
ClassNotFoundException
{
if (keyData != null)
return keyData;
final FileSystem fs = FileSystem.get(conf);
keyData = new ArrayList<KeyData<K>>();
final long filesize = fs.getFileStatus(path).getLen();
FSDataInputStream in = fs.open(path);
/* The last long in the file is the start position of the trailer section */
in.seek(filesize - 8);
long metaDataStartPos = in.readLong();
in.seek(metaDataStartPos);
ObjectMapper mapper = new ObjectMapper();
metadataJson = mapper.readValue(in.readUTF(), JsonNode.class);
int keySectionSize = in.readInt();
// load the key section
byte[] keySection = new byte[keySectionSize];
in.seek(filesize - keySectionSize - 8);
in.read(keySection, 0, keySectionSize);
in.close();
ByteArrayInputStream bis = new ByteArrayInputStream(keySection);
DataInput dataInput = new DataInputStream(bis);
int numberOfBlocks = metadataJson.get("numberOfBlocks").getIntValue();
// load the key section
keyClass = (Class<K>) ClassCache.forName(JsonUtils.getText(metadataJson, "keyClass"));
valueClass =
(Class<V>) ClassCache.forName(JsonUtils.getText(metadataJson, "valueClass"));
SerializationFactory serializationFactory = new SerializationFactory(conf);
Deserializer<K> deserializer = serializationFactory.getDeserializer(keyClass);
deserializer.open(bis);
while (bis.available() > 0 && numberOfBlocks > 0)
{
K key = deserializer.deserialize(null);
long offset = dataInput.readLong();
long blockId = dataInput.readLong();
long numRecords = dataInput.readLong();
keyData.add(new KeyData<K>(key, offset, 0, numRecords, blockId));
numberOfBlocks--;
}
// Assign length to each keydata entry
int numEntries = keyData.size();
for (int i = 1; i < numEntries; i++)
{
KeyData<K> prev = keyData.get(i - 1);
KeyData<K> current = keyData.get(i);
prev.setLength(current.getOffset() - prev.getOffset());
}
if (numEntries > 0)
{
KeyData<K> last = keyData.get(numEntries - 1);
last.setLength(metaDataStartPos - last.offset);
}
return keyData;
}