下面列出了org.apache.hadoop.fs.FSDataInputStream#readLong ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public SpillRecord(Path indexFileName, JobConf job, Checksum crc,
String expectedIndexOwner)
throws IOException {
final FileSystem rfs = FileSystem.getLocal(job).getRaw();
final FSDataInputStream in =
SecureIOUtils.openFSDataInputStream(new File(indexFileName.toUri()
.getRawPath()), expectedIndexOwner, null);
try {
final long length = rfs.getFileStatus(indexFileName).getLen();
final int partitions = (int) length / MAP_OUTPUT_INDEX_RECORD_LENGTH;
final int size = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
buf = ByteBuffer.allocate(size);
if (crc != null) {
crc.reset();
CheckedInputStream chk = new CheckedInputStream(in, crc);
IOUtils.readFully(chk, buf.array(), 0, size);
if (chk.getChecksum().getValue() != in.readLong()) {
throw new ChecksumException("Checksum error reading spill index: " +
indexFileName, -1);
}
} else {
IOUtils.readFully(in, buf.array(), 0, size);
}
entries = buf.asLongBuffer();
} finally {
in.close();
}
}
/**
* Constructor
*
* @param fin
* FS input stream.
* @param fileLength
* Length of the corresponding file
* @throws IOException
*/
public Reader(FSDataInputStream fin, long fileLength, Configuration conf)
throws IOException {
this.in = fin;
this.conf = conf;
// move the cursor to the beginning of the tail, containing: offset to the
// meta block index, version and magic
fin.seek(fileLength - Magic.size() - Version.size() - Long.SIZE
/ Byte.SIZE);
long offsetIndexMeta = fin.readLong();
version = new Version(fin);
Magic.readAndVerify(fin);
if (!version.compatibleWith(BCFile.API_VERSION)) {
throw new RuntimeException("Incompatible BCFile fileBCFileVersion.");
}
// read meta index
fin.seek(offsetIndexMeta);
metaIndex = new MetaIndex(fin);
// read data:BCFile.index, the data block index
BlockReader blockR = getMetaBlock(DataIndex.BLOCK_NAME);
try {
dataIndex = new DataIndex(blockR);
} finally {
blockR.close();
}
}
public static long readCleanUpIntervalMillis(FileSystem fs, Path cleanUpIntervalPath) throws IOException{
if (fs.exists(cleanUpIntervalPath)) {
FSDataInputStream input = new FSDataInputStream(fs.open(cleanUpIntervalPath));
long intervalDurationMillis = input.readLong();
input.close();
return intervalDurationMillis;
} else {
return -1l;
}
}
public SpillRecord(Path indexFileName, JobConf job, Checksum crc,
String expectedIndexOwner)
throws IOException {
final FileSystem rfs = FileSystem.getLocal(job).getRaw();
final FSDataInputStream in =
SecureIOUtils.openFSDataInputStream(new File(indexFileName.toUri()
.getRawPath()), expectedIndexOwner, null);
try {
final long length = rfs.getFileStatus(indexFileName).getLen();
final int partitions = (int) length / MAP_OUTPUT_INDEX_RECORD_LENGTH;
final int size = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
buf = ByteBuffer.allocate(size);
if (crc != null) {
crc.reset();
CheckedInputStream chk = new CheckedInputStream(in, crc);
IOUtils.readFully(chk, buf.array(), 0, size);
if (chk.getChecksum().getValue() != in.readLong()) {
throw new ChecksumException("Checksum error reading spill index: " +
indexFileName, -1);
}
} else {
IOUtils.readFully(in, buf.array(), 0, size);
}
entries = buf.asLongBuffer();
} finally {
in.close();
}
}
/**
* Constructor
*
* @param fin
* FS input stream.
* @param fileLength
* Length of the corresponding file
* @throws IOException
*/
public Reader(FSDataInputStream fin, long fileLength, Configuration conf)
throws IOException {
this.in = fin;
this.conf = conf;
// move the cursor to the beginning of the tail, containing: offset to the
// meta block index, version and magic
fin.seek(fileLength - Magic.size() - Version.size() - Long.SIZE
/ Byte.SIZE);
long offsetIndexMeta = fin.readLong();
version = new Version(fin);
Magic.readAndVerify(fin);
if (!version.compatibleWith(BCFile.API_VERSION)) {
throw new RuntimeException("Incompatible BCFile fileBCFileVersion.");
}
// read meta index
fin.seek(offsetIndexMeta);
metaIndex = new MetaIndex(fin);
// read data:BCFile.index, the data block index
BlockReader blockR = getMetaBlock(DataIndex.BLOCK_NAME);
try {
dataIndex = new DataIndex(blockR);
} finally {
blockR.close();
}
}
public static long readCleanUpIntervalMillis(FileSystem fs, Path cleanUpIntervalPath) throws IOException{
if (fs.exists(cleanUpIntervalPath)) {
FSDataInputStream input = new FSDataInputStream(fs.open(cleanUpIntervalPath));
long intervalDurationMillis = input.readLong();
input.close();
return intervalDurationMillis;
} else {
return -1l;
}
}
/**
* Constructor
*
* @param fin
* FS input stream.
* @param fileLength
* Length of the corresponding file
* @throws IOException
*/
public Reader(FSDataInputStream fin, long fileLength, Configuration conf)
throws IOException {
this.in = fin;
this.conf = conf;
// A reader buffer to read the block
baos = new ByteArrayOutputStream(DTFile.getFSInputBufferSize(conf) * 2);
this.cacheKeys = new ArrayList<String>();
// move the cursor to the beginning of the tail, containing: offset to the
// meta block index, version and magic
fin.seek(fileLength - Magic.size() - Version.size() - Long.SIZE
/ Byte.SIZE);
long offsetIndexMeta = fin.readLong();
version = new Version(fin);
Magic.readAndVerify(fin);
if (!version.compatibleWith(DTBCFile.API_VERSION)) {
throw new RuntimeException("Incompatible BCFile fileBCFileVersion.");
}
// read meta index
fin.seek(offsetIndexMeta);
metaIndex = new MetaIndex(fin);
// read data:BCFile.index, the data block index
BlockReader blockR = getMetaBlock(DataIndex.BLOCK_NAME);
try {
dataIndex = new DataIndex(blockR);
} finally {
blockR.close();
}
}
private void fillRootIndex(int entryNum, FSDataInputStream in)
throws IOException {
this.dataIndex = new Tuple[entryNum];
this.offsetIndex = new long[entryNum];
Tuple keyTuple;
byte[] buf;
for (int i = 0; i < entryNum; i++) {
buf = new byte[in.readInt()];
Bytes.readFully(in, buf, 0, buf.length);
keyTuple = RowStoreUtil.RowStoreDecoder.toTuple(keySchema, buf);
dataIndex[i] = keyTuple;
this.offsetIndex[i] = in.readLong();
}
}
private void loadCacheFromManifest(Path manifest) throws IOException {
FSDataInputStream inputStream = _fileSystem.open(manifest);
int count = inputStream.readInt();
for (int i = 0; i < count; i++) {
String name = readString(inputStream);
long lastMod = inputStream.readLong();
long length = inputStream.readLong();
FStat fstat = new FStat(lastMod, length);
_cache.put(name, fstat);
}
inputStream.close();
}
public SpillRecord(Path indexFileName, JobConf job, Checksum crc)
throws IOException {
final FileSystem rfs = FileSystem.getLocal(job).getRaw();
final FSDataInputStream in = rfs.open(indexFileName);
try {
final long length = rfs.getFileStatus(indexFileName).getLen();
final int partitions = (int) length / MAP_OUTPUT_INDEX_RECORD_LENGTH;
final int size = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
buf = ByteBuffer.allocate(size);
if (crc != null) {
crc.reset();
CheckedInputStream chk = new CheckedInputStream(in, crc);
IOUtils.readFully(chk, buf.array(), 0, size);
if (chk.getChecksum().getValue() != in.readLong()) {
throw new ChecksumException("Checksum error reading spill index: " +
indexFileName, -1);
}
} else {
IOUtils.readFully(in, buf.array(), 0, size);
}
entries = buf.asLongBuffer();
} finally {
in.close();
}
}
/**
* Constructor
*
* @param fin
* FS input stream.
* @param fileLength
* Length of the corresponding file
* @throws IOException
*/
public Reader(FSDataInputStream fin, long fileLength, Configuration conf)
throws IOException {
this.in = fin;
this.conf = conf;
// move the cursor to the beginning of the tail, containing: offset to the
// meta block index, version and magic
fin.seek(fileLength - Magic.size() - Version.size() - Long.SIZE
/ Byte.SIZE);
long offsetIndexMeta = fin.readLong();
version = new Version(fin);
Magic.readAndVerify(fin);
if (!version.compatibleWith(BCFile.API_VERSION)) {
throw new RuntimeException("Incompatible BCFile fileBCFileVersion.");
}
// read meta index
fin.seek(offsetIndexMeta);
metaIndex = new MetaIndex(fin);
// read data:BCFile.index, the data block index
BlockReader blockR = getMetaBlock(DataIndex.BLOCK_NAME);
try {
dataIndex = new DataIndex(blockR);
} finally {
blockR.close();
}
}
public TezSpillRecord(Path indexFileName, Configuration job, Checksum crc,
String expectedIndexOwner)
throws IOException {
final FileSystem rfs = FileSystem.getLocal(job).getRaw();
final FSDataInputStream in = rfs.open(indexFileName);
try {
final long length = rfs.getFileStatus(indexFileName).getLen();
final int partitions =
(int) length / Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;
final int size = partitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;
buf = ByteBuffer.allocate(size);
if (crc != null) {
crc.reset();
CheckedInputStream chk = new CheckedInputStream(in, crc);
IOUtils.readFully(chk, buf.array(), 0, size);
if (chk.getChecksum().getValue() != in.readLong()) {
throw new ChecksumException("Checksum error reading spill index: " +
indexFileName, -1);
}
} else {
IOUtils.readFully(in, buf.array(), 0, size);
}
entries = buf.asLongBuffer();
} finally {
in.close();
}
}
public TezSpillRecord(Path indexFileName, FileSystem rfs, Checksum crc,
String expectedIndexOwner)
throws IOException {
final FSDataInputStream in = rfs.open(indexFileName);
try {
final long length = rfs.getFileStatus(indexFileName).getLen();
final int partitions =
(int) length / Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;
final int size = partitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;
buf = ByteBuffer.allocate(size);
if (crc != null) {
crc.reset();
CheckedInputStream chk = new CheckedInputStream(in, crc);
IOUtils.readFully(chk, buf.array(), 0, size);
if (chk.getChecksum().getValue() != in.readLong()) {
throw new ChecksumException("Checksum error reading spill index: " +
indexFileName, -1);
}
} else {
IOUtils.readFully(in, buf.array(), 0, size);
}
entries = buf.asLongBuffer();
} finally {
in.close();
}
}
public SpillRecord(Path indexFileName, JobConf job, Checksum crc)
throws IOException {
final FileSystem rfs = FileSystem.getLocal(job).getRaw();
final FSDataInputStream in = rfs.open(indexFileName);
try {
final long length = rfs.getFileStatus(indexFileName).getLen();
final int partitions = (int) length / MAP_OUTPUT_INDEX_RECORD_LENGTH;
final int size = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
buf = ByteBuffer.allocate(size);
if (crc != null) {
crc.reset();
CheckedInputStream chk = new CheckedInputStream(in, crc);
IOUtils.readFully(chk, buf.array(), 0, size);
if (chk.getChecksum().getValue() != in.readLong()) {
throw new ChecksumException("Checksum error reading spill index: " +
indexFileName, -1);
}
} else {
IOUtils.readFully(in, buf.array(), 0, size);
}
entries = buf.asLongBuffer();
} finally {
in.close();
}
}
/**
* Constructor
*
* @param fin
* FS input stream.
* @param fileLength
* Length of the corresponding file
* @throws IOException
*/
public Reader(FSDataInputStream fin, long fileLength, Configuration conf)
throws IOException {
this.in = fin;
this.conf = conf;
// move the cursor to the beginning of the tail, containing: offset to the
// meta block index, version and magic
fin.seek(fileLength - Magic.size() - Version.size() - Long.SIZE
/ Byte.SIZE);
long offsetIndexMeta = fin.readLong();
version = new Version(fin);
Magic.readAndVerify(fin);
if (!version.compatibleWith(BCFile.API_VERSION)) {
throw new RuntimeException("Incompatible BCFile fileBCFileVersion.");
}
// read meta index
fin.seek(offsetIndexMeta);
metaIndex = new MetaIndex(fin);
// read data:BCFile.index, the data block index
BlockReader blockR = getMetaBlock(DataIndex.BLOCK_NAME);
try {
dataIndex = new DataIndex(blockR);
} finally {
blockR.close();
}
}
@SuppressWarnings("unchecked")
public List<KeyData<K>> getKeyData() throws IOException,
ClassNotFoundException
{
if (keyData != null)
return keyData;
final FileSystem fs = FileSystem.get(conf);
keyData = new ArrayList<KeyData<K>>();
final long filesize = fs.getFileStatus(path).getLen();
FSDataInputStream in = fs.open(path);
/* The last long in the file is the start position of the trailer section */
in.seek(filesize - 8);
long metaDataStartPos = in.readLong();
in.seek(metaDataStartPos);
ObjectMapper mapper = new ObjectMapper();
metadataJson = mapper.readValue(in.readUTF(), JsonNode.class);
int keySectionSize = in.readInt();
// load the key section
byte[] keySection = new byte[keySectionSize];
in.seek(filesize - keySectionSize - 8);
in.read(keySection, 0, keySectionSize);
in.close();
ByteArrayInputStream bis = new ByteArrayInputStream(keySection);
DataInput dataInput = new DataInputStream(bis);
int numberOfBlocks = metadataJson.get("numberOfBlocks").getIntValue();
// load the key section
keyClass = (Class<K>) ClassCache.forName(JsonUtils.getText(metadataJson, "keyClass"));
valueClass =
(Class<V>) ClassCache.forName(JsonUtils.getText(metadataJson, "valueClass"));
SerializationFactory serializationFactory = new SerializationFactory(conf);
Deserializer<K> deserializer = serializationFactory.getDeserializer(keyClass);
deserializer.open(bis);
while (bis.available() > 0 && numberOfBlocks > 0)
{
K key = deserializer.deserialize(null);
long offset = dataInput.readLong();
long blockId = dataInput.readLong();
long numRecords = dataInput.readLong();
keyData.add(new KeyData<K>(key, offset, 0, numRecords, blockId));
numberOfBlocks--;
}
// Assign length to each keydata entry
int numEntries = keyData.size();
for (int i = 1; i < numEntries; i++)
{
KeyData<K> prev = keyData.get(i - 1);
KeyData<K> current = keyData.get(i);
prev.setLength(current.getOffset() - prev.getOffset());
}
if (numEntries > 0)
{
KeyData<K> last = keyData.get(numEntries - 1);
last.setLength(metaDataStartPos - last.offset);
}
return keyData;
}