下面列出了org.apache.hadoop.fs.FSDataInputStream#available ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private String retrieveLineSeparator(FSDataInputStream fis)
throws IOException {
char current;
String lineSeparator = "";
while (fis.available() > 0) {
current = (char) fis.read();
if ((current == '\n') || (current == '\r')) {
lineSeparator += current;
if (fis.available() > 0) {
char next = (char) fis.read();
if ((next == '\r') || (next == '\n')) {
lineSeparator += next;
}
}
return lineSeparator;
}
}
return null;
}
public static void createCentersSequenceFile (Configuration conf, FileSystem fs, String centroidsPath, String sequenceFilePath) throws Exception {
Path seqFile = new Path (sequenceFilePath);
if (fs.exists(seqFile)) {
fs.delete(seqFile, true);
}
FSDataInputStream inputStream = fs.open(new Path(centroidsPath));
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, seqFile, Centroid.class, IntWritable.class);
IntWritable value = new IntWritable(0);
while (inputStream.available() > 0) {
String line = inputStream.readLine();
StringTokenizer tokenizer = new StringTokenizer(line, " ");
int dim = tokenizer.countTokens() - 1;
int clusterId = Integer.valueOf(tokenizer.nextToken());
double [] coords = new double [dim];
for (int i = 0; i < dim; i++) {
coords[i] = Double.valueOf(tokenizer.nextToken());
}
Centroid cluster = new Centroid(clusterId, new Point(coords));
writer.append(cluster, value);
}
IOUtils.closeStream(writer);
inputStream.close();
}
/**
* Read an existing index. Reads and returns the index index, which is a list of chunks defined by the Cassandra
* Index.db file along with the configured split size.
*
* @param fileSystem Hadoop file system.
* @param sstablePath SSTable Index.db.
* @return Index of chunks.
* @throws IOException
*/
public static SSTableIndexIndex readIndex(final FileSystem fileSystem, final Path sstablePath) throws IOException {
final Closer closer = Closer.create();
final Path indexPath = sstablePath.suffix(SSTABLE_INDEX_SUFFIX);
// Detonate if we don't have an index.
final FSDataInputStream inputStream = closer.register(fileSystem.open(indexPath));
final SSTableIndexIndex indexIndex = new SSTableIndexIndex();
try {
while (inputStream.available() != 0) {
indexIndex.add(inputStream.readLong(), inputStream.readLong());
}
} finally {
closer.close();
}
return indexIndex;
}
public FSDataInputChannel(FSDataInputStream inputStream) throws IOException {
if (inputStream.getWrappedStream() instanceof ByteBufferReadable) {
this.isDirectRead = true;
} else {
/* LocalFileSystem, S3 does not support ByteBufferReadable */
this.channel = Channels.newChannel(inputStream);
}
this.inputStream = inputStream;
this.size = inputStream.getPos() + inputStream.available();
}
/**
* Read the contents of a file and return the results as a string.
* @param path The path to the HDFS file to be created.
* @return Returns the full contents of an HDFS file.
* @throws IOException
* @throws URISyntaxException
*/
public static String readHdfsFile(String path) throws IOException, URISyntaxException {
FileSystem fs = getHadoopFileSystem();
Path filePath = new Path(path);
FSDataInputStream inputStream = fs.open(filePath);
Charset encoding = Charset.defaultCharset();
byte[] buffer = new byte[inputStream.available()];
inputStream.readFully(buffer);
inputStream.close();
String contents = new String(buffer, encoding);
return contents;
}
private static byte [] readFile(final FileSystem fs, final Path path) throws IOException {
FSDataInputStream tmpIn = fs.open(path);
try {
byte [] rawData = new byte[tmpIn.available()];
tmpIn.readFully(rawData);
return rawData;
} finally {
tmpIn.close();
}
}
private static byte[] readAllWithBuffer(FSDataInputStream fsdis, boolean close)
throws IOException {
try {
final int available = fsdis.available();
final byte[] buffer;
final ByteArrayOutputStream baos;
if (available < 0) {
buffer = new byte[1024];
baos = new ByteArrayOutputStream(buffer.length * 2);
} else {
buffer = new byte[available];
baos = new ByteArrayOutputStream(available);
}
int readIntoBuffer = 0;
int read;
while (true) {
read = fsdis.read(buffer, readIntoBuffer, buffer.length - readIntoBuffer);
if (read < 0) {
// end of stream:
if (readIntoBuffer > 0) {
baos.write(buffer, 0, readIntoBuffer);
}
return baos.toByteArray();
} else {
readIntoBuffer += read;
if (readIntoBuffer == buffer.length) {
// buffer is full, need to clean the buffer.
// drop the buffered data to baos:
baos.write(buffer);
// reset the counter to start reading to the buffer beginning:
readIntoBuffer = 0;
} else if (readIntoBuffer > buffer.length) {
throw new IOException("Read more than the buffer length: "
+ readIntoBuffer + ", buffer length = " + buffer.length);
}
}
}
} finally {
if (close) {
fsdis.close();
}
}
}
private static byte[] readAllWithBuffer(FSDataInputStream fsdis, boolean close)
throws IOException {
try {
final int available = fsdis.available();
final byte[] buffer;
final ByteArrayOutputStream baos;
if (available < 0) {
buffer = new byte[1024];
baos = new ByteArrayOutputStream(buffer.length * 2);
} else {
buffer = new byte[available];
baos = new ByteArrayOutputStream(available);
}
int readIntoBuffer = 0;
int read;
while (true) {
read = fsdis.read(buffer, readIntoBuffer, buffer.length - readIntoBuffer);
if (read < 0) {
// end of stream:
if (readIntoBuffer > 0) {
baos.write(buffer, 0, readIntoBuffer);
}
return baos.toByteArray();
} else {
readIntoBuffer += read;
if (readIntoBuffer == buffer.length) {
// buffer is full, need to clean the buffer.
// drop the buffered data to baos:
baos.write(buffer);
// reset the counter to start reading to the buffer beginning:
readIntoBuffer = 0;
} else if (readIntoBuffer > buffer.length) {
throw new IOException("Read more than the buffer length: "
+ readIntoBuffer + ", buffer length = " + buffer.length);
}
}
}
} finally {
if (close) {
fsdis.close();
}
}
}