类org.apache.hadoop.io.file.tfile.TFile源码实例Demo

下面列出了怎么用org.apache.hadoop.io.file.tfile.TFile的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: hadoop   文件: AggregatedLogFormat.java
/**
 * Read the next key and return the value-stream.
 * 
 * @param key
 * @return the valueStream if there are more keys or null otherwise.
 * @throws IOException
 */
public DataInputStream next(LogKey key) throws IOException {
  if (!this.atBeginning) {
    this.scanner.advance();
  } else {
    this.atBeginning = false;
  }
  if (this.scanner.atEnd()) {
    return null;
  }
  TFile.Reader.Scanner.Entry entry = this.scanner.entry();
  key.readFields(entry.getKeyStream());
  // Skip META keys
  if (RESERVED_KEYS.containsKey(key.toString())) {
    return next(key);
  }
  DataInputStream valueStream = entry.getValueStream();
  return valueStream;
}
 
源代码2 项目: big-c   文件: AggregatedLogFormat.java
/**
 * Read the next key and return the value-stream.
 * 
 * @param key
 * @return the valueStream if there are more keys or null otherwise.
 * @throws IOException
 */
public DataInputStream next(LogKey key) throws IOException {
  if (!this.atBeginning) {
    this.scanner.advance();
  } else {
    this.atBeginning = false;
  }
  if (this.scanner.atEnd()) {
    return null;
  }
  TFile.Reader.Scanner.Entry entry = this.scanner.entry();
  key.readFields(entry.getKeyStream());
  // Skip META keys
  if (RESERVED_KEYS.containsKey(key.toString())) {
    return next(key);
  }
  DataInputStream valueStream = entry.getValueStream();
  return valueStream;
}
 
源代码3 项目: reef   文件: TFileParser.java
/**
 * @param path
 * @return
 * @throws IOException
 */
private TFile.Reader.Scanner getScanner(final Path path) throws IOException {
  LOG.log(Level.FINE, "Creating Scanner for path {0}", path);
  final TFile.Reader reader = new TFile.Reader(this.fileSystem.open(path),
      this.fileSystem.getFileStatus(path).getLen(),
      this.configuration);
  final TFile.Reader.Scanner scanner = reader.createScanner();
  for (int counter = 0;
       counter < 3 && !scanner.atEnd();
       counter += 1) {
    //skip VERSION, APPLICATION_ACL, and APPLICATION_OWNER
    scanner.advance();
  }
  LOG.log(Level.FINE, "Created Scanner for path {0}", path);
  return scanner;
}
 
源代码4 项目: tez   文件: TFileRecordReader.java
private void populateKV(TFile.Reader.Scanner.Entry entry) throws IOException {
  entry.getKey(keyBytesWritable);
  //splitpath contains the machine name. Create the key as splitPath + realKey
  String keyStr = new StringBuilder()
      .append(splitPath.getName()).append(":")
      .append(new String(keyBytesWritable.getBytes()))
      .toString();

  /**
   * In certain cases, values can be huge (files > 2 GB). Stream is
   * better to handle such scenarios.
   */
  currentValueReader = new BufferedReader(
      new InputStreamReader(entry.getValueStream()));
  key.set(keyStr);
  String line = currentValueReader.readLine();
  value.set((line == null) ? "" : line);
}
 
源代码5 项目: hadoop   文件: AggregatedLogFormat.java
public LogWriter(final Configuration conf, final Path remoteAppLogFile,
    UserGroupInformation userUgi) throws IOException {
  try {
    this.fsDataOStream =
        userUgi.doAs(new PrivilegedExceptionAction<FSDataOutputStream>() {
          @Override
          public FSDataOutputStream run() throws Exception {
            fc = FileContext.getFileContext(remoteAppLogFile.toUri(), conf);
            fc.setUMask(APP_LOG_FILE_UMASK);
            return fc.create(
                remoteAppLogFile,
                EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
                new Options.CreateOpts[] {});
          }
        });
  } catch (InterruptedException e) {
    throw new IOException(e);
  }

  // Keys are not sorted: null arg
  // 256KB minBlockSize : Expected log size for each container too
  this.writer =
      new TFile.Writer(this.fsDataOStream, 256 * 1024, conf.get(
          YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE,
          YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE), null, conf);
  //Write the version string
  writeVersion();
}
 
源代码6 项目: hadoop   文件: AggregatedLogFormat.java
public LogReader(Configuration conf, Path remoteAppLogFile)
    throws IOException {
  FileContext fileContext =
      FileContext.getFileContext(remoteAppLogFile.toUri(), conf);
  this.fsDataIStream = fileContext.open(remoteAppLogFile);
  reader =
      new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus(
          remoteAppLogFile).getLen(), conf);
  this.scanner = reader.createScanner();
}
 
源代码7 项目: hadoop   文件: AggregatedLogFormat.java
/**
 * Returns the owner of the application.
 * 
 * @return the application owner.
 * @throws IOException
 */
public String getApplicationOwner() throws IOException {
  TFile.Reader.Scanner ownerScanner = reader.createScanner();
  LogKey key = new LogKey();
  while (!ownerScanner.atEnd()) {
    TFile.Reader.Scanner.Entry entry = ownerScanner.entry();
    key.readFields(entry.getKeyStream());
    if (key.toString().equals(APPLICATION_OWNER_KEY.toString())) {
      DataInputStream valueStream = entry.getValueStream();
      return valueStream.readUTF();
    }
    ownerScanner.advance();
  }
  return null;
}
 
public HistoryFileReader(Path historyFile) throws IOException {
  fsdis = fs.open(historyFile);
  reader =
      new TFile.Reader(fsdis, fs.getFileStatus(historyFile).getLen(),
        getConfig());
  reset();
}
 
public Entry next() throws IOException {
  TFile.Reader.Scanner.Entry entry = scanner.entry();
  DataInputStream dis = entry.getKeyStream();
  HistoryDataKey key = new HistoryDataKey();
  key.readFields(dis);
  dis = entry.getValueStream();
  byte[] value = new byte[entry.getValueLength()];
  dis.read(value);
  scanner.advance();
  return new Entry(key, value);
}
 
源代码10 项目: hadoop   文件: FileSystemApplicationHistoryStore.java
public HistoryFileWriter(Path historyFile) throws IOException {
  if (fs.exists(historyFile)) {
    fsdos = fs.append(historyFile);
  } else {
    fsdos = fs.create(historyFile);
  }
  fs.setPermission(historyFile, HISTORY_FILE_UMASK);
  writer =
      new TFile.Writer(fsdos, MIN_BLOCK_SIZE, getConfig().get(
        YarnConfiguration.FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE,
        YarnConfiguration.DEFAULT_FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE), null,
        getConfig());
}
 
源代码11 项目: big-c   文件: AggregatedLogFormat.java
public LogWriter(final Configuration conf, final Path remoteAppLogFile,
    UserGroupInformation userUgi) throws IOException {
  try {
    this.fsDataOStream =
        userUgi.doAs(new PrivilegedExceptionAction<FSDataOutputStream>() {
          @Override
          public FSDataOutputStream run() throws Exception {
            fc = FileContext.getFileContext(conf);
            fc.setUMask(APP_LOG_FILE_UMASK);
            return fc.create(
                remoteAppLogFile,
                EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
                new Options.CreateOpts[] {});
          }
        });
  } catch (InterruptedException e) {
    throw new IOException(e);
  }

  // Keys are not sorted: null arg
  // 256KB minBlockSize : Expected log size for each container too
  this.writer =
      new TFile.Writer(this.fsDataOStream, 256 * 1024, conf.get(
          YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE,
          YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE), null, conf);
  //Write the version string
  writeVersion();
}
 
源代码12 项目: big-c   文件: AggregatedLogFormat.java
public LogReader(Configuration conf, Path remoteAppLogFile)
    throws IOException {
  FileContext fileContext = FileContext.getFileContext(conf);
  this.fsDataIStream = fileContext.open(remoteAppLogFile);
  reader =
      new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus(
          remoteAppLogFile).getLen(), conf);
  this.scanner = reader.createScanner();
}
 
源代码13 项目: big-c   文件: AggregatedLogFormat.java
/**
 * Returns the owner of the application.
 * 
 * @return the application owner.
 * @throws IOException
 */
public String getApplicationOwner() throws IOException {
  TFile.Reader.Scanner ownerScanner = reader.createScanner();
  LogKey key = new LogKey();
  while (!ownerScanner.atEnd()) {
    TFile.Reader.Scanner.Entry entry = ownerScanner.entry();
    key.readFields(entry.getKeyStream());
    if (key.toString().equals(APPLICATION_OWNER_KEY.toString())) {
      DataInputStream valueStream = entry.getValueStream();
      return valueStream.readUTF();
    }
    ownerScanner.advance();
  }
  return null;
}
 
源代码14 项目: big-c   文件: FileSystemApplicationHistoryStore.java
public HistoryFileReader(Path historyFile) throws IOException {
  fsdis = fs.open(historyFile);
  reader =
      new TFile.Reader(fsdis, fs.getFileStatus(historyFile).getLen(),
        getConfig());
  reset();
}
 
源代码15 项目: big-c   文件: FileSystemApplicationHistoryStore.java
public Entry next() throws IOException {
  TFile.Reader.Scanner.Entry entry = scanner.entry();
  DataInputStream dis = entry.getKeyStream();
  HistoryDataKey key = new HistoryDataKey();
  key.readFields(dis);
  dis = entry.getValueStream();
  byte[] value = new byte[entry.getValueLength()];
  dis.read(value);
  scanner.advance();
  return new Entry(key, value);
}
 
源代码16 项目: big-c   文件: FileSystemApplicationHistoryStore.java
public HistoryFileWriter(Path historyFile) throws IOException {
  if (fs.exists(historyFile)) {
    fsdos = fs.append(historyFile);
  } else {
    fsdos = fs.create(historyFile);
  }
  fs.setPermission(historyFile, HISTORY_FILE_UMASK);
  writer =
      new TFile.Writer(fsdos, MIN_BLOCK_SIZE, getConfig().get(
        YarnConfiguration.FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE,
        YarnConfiguration.DEFAULT_FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE), null,
        getConfig());
}
 
源代码17 项目: reef   文件: TFileParser.java
/**
 * Parses the given file and writes its contents into the outputWriter for all logs in it.
 *
 * @param inputPath
 * @param outputWriter
 * @throws IOException
 */
void parseOneFile(final Path inputPath, final Writer outputWriter) throws IOException {
  try (TFile.Reader.Scanner scanner = this.getScanner(inputPath)) {
    while (!scanner.atEnd()) {
      new LogFileEntry(scanner.entry()).write(outputWriter);
      scanner.advance();
    }
  }
}
 
源代码18 项目: reef   文件: TFileParser.java
/**
 * Parses the given file and stores the logs for each container in a file named after the container in the given.
 * outputFolder
 *
 * @param inputPath
 * @param outputFolder
 * @throws IOException
 */
void parseOneFile(final Path inputPath, final File outputFolder) throws IOException {
  try (TFile.Reader.Scanner scanner = this.getScanner(inputPath)) {
    while (!scanner.atEnd()) {
      new LogFileEntry(scanner.entry()).write(outputFolder);
      scanner.advance();
    }
  }
}
 
源代码19 项目: tez   文件: TFileRecordReader.java
@Override public void initialize(InputSplit split, TaskAttemptContext context)
    throws IOException, InterruptedException {
  FileSplit fileSplit = (FileSplit) split;
  LOG.info("Initializing TFileRecordReader : " + fileSplit.getPath().toString());
  start = fileSplit.getStart();
  end = start + fileSplit.getLength();

  FileSystem fs = fileSplit.getPath().getFileSystem(context.getConfiguration());
  splitPath = fileSplit.getPath();
  fin = fs.open(splitPath);
  reader = new TFile.Reader(fin, fs.getFileStatus(splitPath).getLen(),
      context.getConfiguration());
  scanner = reader.createScannerByByteRange(start, fileSplit.getLength());
}
 
源代码20 项目: hadoop   文件: AggregatedLogFormat.java
@VisibleForTesting
public TFile.Writer getWriter() {
  return this.writer;
}
 
源代码21 项目: big-c   文件: AggregatedLogFormat.java
@VisibleForTesting
public TFile.Writer getWriter() {
  return this.writer;
}
 
源代码22 项目: reef   文件: LogFileEntry.java
LogFileEntry(final TFile.Reader.Scanner.Entry entry) {
  this.entry = entry;
}
 
 类所在包
 同包方法