org.apache.hadoop.fs.FSDataInputStream#available ( )源码实例Demo

下面列出了org.apache.hadoop.fs.FSDataInputStream#available ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

private String retrieveLineSeparator(FSDataInputStream fis)
    throws IOException {
    char current;
    String lineSeparator = "";
    while (fis.available() > 0) {
        current = (char) fis.read();
        if ((current == '\n') || (current == '\r')) {
            lineSeparator += current;
            if (fis.available() > 0) {
                char next = (char) fis.read();
                if ((next == '\r') || (next == '\n')) {
                    lineSeparator += next;
                }
            }
            return lineSeparator;
        }
    }
    return null;
}
 
源代码2 项目: flink-perf   文件: KMeansDriver.java
public static void createCentersSequenceFile (Configuration conf, FileSystem fs, String centroidsPath, String sequenceFilePath) throws Exception {
	Path seqFile = new Path (sequenceFilePath);
	if (fs.exists(seqFile)) {
		fs.delete(seqFile, true);
	}
	FSDataInputStream inputStream = fs.open(new Path(centroidsPath));
	SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, seqFile, Centroid.class, IntWritable.class);
	IntWritable value = new IntWritable(0);
	while (inputStream.available() > 0) {
		String line = inputStream.readLine();
		StringTokenizer tokenizer = new StringTokenizer(line, " ");
		int dim = tokenizer.countTokens() - 1;
		int clusterId = Integer.valueOf(tokenizer.nextToken());
		double [] coords = new double [dim];
		for (int i = 0; i < dim; i++) {
			coords[i] = Double.valueOf(tokenizer.nextToken());
		}
		Centroid cluster = new Centroid(clusterId, new Point(coords));
		writer.append(cluster, value);
	}
	IOUtils.closeStream(writer);
	inputStream.close();
}
 
源代码3 项目: hadoop-sstable   文件: SSTableIndexIndex.java
/**
 * Read an existing index. Reads and returns the index index, which is a list of chunks defined by the Cassandra
 * Index.db file along with the configured split size.
 *
 * @param fileSystem Hadoop file system.
 * @param sstablePath SSTable Index.db.
 * @return Index of chunks.
 * @throws IOException
 */
public static SSTableIndexIndex readIndex(final FileSystem fileSystem, final Path sstablePath) throws IOException {
    final Closer closer = Closer.create();
    final Path indexPath = sstablePath.suffix(SSTABLE_INDEX_SUFFIX);

    // Detonate if we don't have an index.
    final FSDataInputStream inputStream = closer.register(fileSystem.open(indexPath));

    final SSTableIndexIndex indexIndex = new SSTableIndexIndex();
    try {
        while (inputStream.available() != 0) {
            indexIndex.add(inputStream.readLong(), inputStream.readLong());
        }
    } finally {
        closer.close();
    }

    return indexIndex;
}
 
源代码4 项目: tajo   文件: FSDataInputChannel.java
public FSDataInputChannel(FSDataInputStream inputStream) throws IOException {
  if (inputStream.getWrappedStream() instanceof ByteBufferReadable) {
    this.isDirectRead = true;
  } else {
    /* LocalFileSystem, S3 does not support ByteBufferReadable */
    this.channel = Channels.newChannel(inputStream);
  }
  this.inputStream = inputStream;
  this.size = inputStream.getPos() + inputStream.available();
}
 
源代码5 项目: neo4j-mazerunner   文件: FileUtil.java
/**
 * Read the contents of a file and return the results as a string.
 * @param path The path to the HDFS file to be created.
 * @return Returns the full contents of an HDFS file.
 * @throws IOException
 * @throws URISyntaxException
 */
public static String readHdfsFile(String path) throws IOException, URISyntaxException {
    FileSystem fs = getHadoopFileSystem();
    Path filePath = new Path(path);
    FSDataInputStream inputStream = fs.open(filePath);

    Charset encoding = Charset.defaultCharset();

    byte[] buffer = new byte[inputStream.available()];
    inputStream.readFully(buffer);
    inputStream.close();
    String contents = new String(buffer, encoding);

    return contents;
}
 
源代码6 项目: hbase   文件: RegionSplitter.java
private static byte [] readFile(final FileSystem fs, final Path path) throws IOException {
  FSDataInputStream tmpIn = fs.open(path);
  try {
    byte [] rawData = new byte[tmpIn.available()];
    tmpIn.readFully(rawData);
    return rawData;
  } finally {
    tmpIn.close();
  }
}
 
源代码7 项目: hadoop   文件: TestHadoopArchives.java
private static byte[] readAllWithBuffer(FSDataInputStream fsdis, boolean close)
    throws IOException {
  try {
    final int available = fsdis.available();
    final byte[] buffer;
    final ByteArrayOutputStream baos;
    if (available < 0) {
      buffer = new byte[1024];
      baos = new ByteArrayOutputStream(buffer.length * 2);
    } else {
      buffer = new byte[available];
      baos = new ByteArrayOutputStream(available);
    }
    int readIntoBuffer = 0;
    int read; 
    while (true) {
      read = fsdis.read(buffer, readIntoBuffer, buffer.length - readIntoBuffer);
      if (read < 0) {
        // end of stream:
        if (readIntoBuffer > 0) {
          baos.write(buffer, 0, readIntoBuffer);
        }
        return baos.toByteArray();
      } else {
        readIntoBuffer += read;
        if (readIntoBuffer == buffer.length) {
          // buffer is full, need to clean the buffer.
          // drop the buffered data to baos:
          baos.write(buffer);
          // reset the counter to start reading to the buffer beginning:
          readIntoBuffer = 0;
        } else if (readIntoBuffer > buffer.length) {
          throw new IOException("Read more than the buffer length: "
              + readIntoBuffer + ", buffer length = " + buffer.length);
        }
      }
    }
  } finally {
    if (close) {
      fsdis.close();
    }
  }
}
 
源代码8 项目: big-c   文件: TestHadoopArchives.java
private static byte[] readAllWithBuffer(FSDataInputStream fsdis, boolean close)
    throws IOException {
  try {
    final int available = fsdis.available();
    final byte[] buffer;
    final ByteArrayOutputStream baos;
    if (available < 0) {
      buffer = new byte[1024];
      baos = new ByteArrayOutputStream(buffer.length * 2);
    } else {
      buffer = new byte[available];
      baos = new ByteArrayOutputStream(available);
    }
    int readIntoBuffer = 0;
    int read; 
    while (true) {
      read = fsdis.read(buffer, readIntoBuffer, buffer.length - readIntoBuffer);
      if (read < 0) {
        // end of stream:
        if (readIntoBuffer > 0) {
          baos.write(buffer, 0, readIntoBuffer);
        }
        return baos.toByteArray();
      } else {
        readIntoBuffer += read;
        if (readIntoBuffer == buffer.length) {
          // buffer is full, need to clean the buffer.
          // drop the buffered data to baos:
          baos.write(buffer);
          // reset the counter to start reading to the buffer beginning:
          readIntoBuffer = 0;
        } else if (readIntoBuffer > buffer.length) {
          throw new IOException("Read more than the buffer length: "
              + readIntoBuffer + ", buffer length = " + buffer.length);
        }
      }
    }
  } finally {
    if (close) {
      fsdis.close();
    }
  }
}