类org.apache.hadoop.fs.FSDataOutputStream源码实例Demo

下面列出了怎么用org.apache.hadoop.fs.FSDataOutputStream的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: RDFS   文件: TestFileConcurrentReader.java
/**
 * test case: if the BlockSender decides there is only one packet to send,
 * the previous computation of the pktSize based on transferToAllowed
 * would result in too small a buffer to do the buffer-copy needed
 * for partial chunks.  
 */
public void testUnfinishedBlockPacketBufferOverrun() throws IOException {
  // check that / exists
  Path path = new Path("/");
  System.out.println("Path : \"" + path.toString() + "\"");
  System.out.println(fileSystem.getFileStatus(path).isDir());
  assertTrue("/ should be a directory",
    fileSystem.getFileStatus(path).isDir());

  // create a new file in the root, write data, do no close
  Path file1 = new Path("/unfinished-block");
  final FSDataOutputStream stm =
    TestFileCreation.createFile(fileSystem, file1, 1);

  // write partial block and sync
  final int bytesPerChecksum = conf.getInt("io.bytes.per.checksum", 512);
  final int partialBlockSize = bytesPerChecksum - 1;

  writeFileAndSync(stm, partialBlockSize);

  // Make sure a client can read it before it is closed
  checkCanRead(fileSystem, file1, partialBlockSize);

  stm.close();
}
 
源代码2 项目: big-c   文件: TestDatanodeDeath.java
@Override
public void run() {
  System.out.println("Workload starting ");
  for (int i = 0; i < numberOfFiles; i++) {
    Path filename = new Path(id + "." + i);
    try {
      System.out.println("Workload processing file " + filename);
      FSDataOutputStream stm = createFile(fs, filename, replication);
      DFSOutputStream dfstream = (DFSOutputStream)
                                             (stm.getWrappedStream());
      dfstream.setArtificialSlowdown(1000);
      writeFile(stm, myseed);
      stm.close();
      checkFile(fs, filename, replication, numBlocks, fileSize, myseed);
    } catch (Throwable e) {
      System.out.println("Workload exception " + e);
      assertTrue(e.toString(), false);
    }

    // increment the stamp to indicate that another file is done.
    synchronized (this) {
      stamp++;
    }
  }
}
 
源代码3 项目: incubator-heron   文件: HDFSStorage.java
@Override
public void storeCheckpoint(CheckpointInfo info, Checkpoint checkpoint)
    throws StatefulStorageException {
  Path path = new Path(getCheckpointPath(info.getCheckpointId(),
                                         info.getComponent(),
                                         info.getInstanceId()));

  // We need to ensure the existence of directories structure,
  // since it is not guaranteed that FileSystem.create(..) always creates parents' dirs.
  String checkpointDir = getCheckpointDir(info.getCheckpointId(),
                                          info.getComponent());
  createDir(checkpointDir);

  FSDataOutputStream out = null;
  try {
    out = fileSystem.create(path);
    checkpoint.getCheckpoint().writeTo(out);
  } catch (IOException e) {
    throw new StatefulStorageException("Failed to persist", e);
  } finally {
    SysUtils.closeIgnoringExceptions(out);
  }
}
 
源代码4 项目: Knowage-Server   文件: PersistedHDFSManagerTest.java
@Test
public void testPersistDataStore() {
	IDataStore dataStore = Mockito.mock(IDataStore.class);
	IMetaData metaData = Mockito.mock(IMetaData.class);
	IRecord record = Mockito.mock(IRecord.class);
	IField fieldInt = Mockito.mock(IField.class);
	IField fieldStr = Mockito.mock(IField.class);

	Mockito.when(dataStore.getMetaData()).thenReturn(metaData);
	Mockito.when(dataStore.getRecordAt(Mockito.anyInt())).thenReturn(record);
	Mockito.when(dataStore.getRecordsCount()).thenReturn(10L);
	Mockito.when(metaData.getFieldCount()).thenReturn(2);
	Mockito.when(metaData.getFieldName(1)).thenReturn("column_Int");
	Mockito.when(metaData.getFieldName(2)).thenReturn("column_Str");
	Mockito.when(metaData.getFieldType(1)).thenReturn(Integer.class);
	Mockito.when(metaData.getFieldType(2)).thenReturn(String.class);
	Mockito.when(record.getFieldAt(1)).thenReturn(fieldInt);
	Mockito.when(record.getFieldAt(2)).thenReturn(fieldStr);
	Mockito.when(fieldInt.getValue()).thenReturn(new Integer(1));
	Mockito.when(fieldStr.getValue()).thenReturn(new String("test"));
	FSDataOutputStream fsOS = (FSDataOutputStream) hdfsManager.persistDataStore(dataStore, "test_table", "signature_xyz");
	assertNotNull(fsOS);
	assertEquals(fsOS.size(), 232);
}
 
源代码5 项目: hadoop-gpu   文件: TestTFileSplit.java
void createFile(int count, String compress) throws IOException {
  conf = new Configuration();
  path = new Path(ROOT, outputFile + "." + compress);
  fs = path.getFileSystem(conf);
  FSDataOutputStream out = fs.create(path);
  Writer writer = new Writer(out, BLOCK_SIZE, compress, comparator, conf);

  int nx;
  for (nx = 0; nx < count; nx++) {
    byte[] key = composeSortedKey(KEY, count, nx).getBytes();
    byte[] value = (VALUE + nx).getBytes();
    writer.append(key, value);
  }
  writer.close();
  out.close();
}
 
private void processBatchIfRequired(List<List<Writable>> list, boolean finalRecord) throws Exception {
    if (list.isEmpty())
        return;
    if (list.size() < batchSize && !finalRecord)
        return;

    RecordReader rr = new CollectionRecordReader(list);
    RecordReaderDataSetIterator iter = new RecordReaderDataSetIterator(rr, null, batchSize, labelIndex, labelIndex, numPossibleLabels, -1, regression);

    DataSet ds = iter.next();

    String filename = "dataset_" + uid + "_" + (outputCount++) + ".bin";

    URI uri = new URI(outputDir.getPath() + "/" + filename);
    Configuration c = conf == null ? DefaultHadoopConfig.get() : conf.getValue().getConfiguration();
    FileSystem file = FileSystem.get(uri, c);
    try (FSDataOutputStream out = file.create(new Path(uri))) {
        ds.save(out);
    }

    list.clear();
}
 
/** @throws Exception If failed. */
@Test
public void testSetWorkingDirectory() throws Exception {
    Path dir = new Path("/tmp/nested/dir");
    Path file = new Path("file");

    fs.mkdirs(dir);

    fs.setWorkingDirectory(dir);

    FSDataOutputStream os = fs.create(file);
    os.close();

    String filePath = fs.getFileStatus(new Path(dir, file)).getPath().toString();

    assertTrue(filePath.contains("/tmp/nested/dir/file"));
}
 
@Test
public void outOfBandFolder_siblingCreate() throws Exception {

  // NOTE: manual use of CloubBlockBlob targets working directory explicitly.
  // WASB driver methods prepend working directory implicitly.
  String workingDir = "user/"
      + UserGroupInformation.getCurrentUser().getShortUserName() + "/";
  CloudBlockBlob blob = testAccount.getBlobReference(workingDir
      + "testFolder3/a/input/file");
  BlobOutputStream s = blob.openOutputStream();
  s.close();
  assertTrue(fs.exists(new Path("testFolder3/a/input/file")));

  Path targetFile = new Path("testFolder3/a/input/file2");
  FSDataOutputStream s2 = fs.create(targetFile);
  s2.close();
}
 
源代码9 项目: tez   文件: TestMergeManager.java
private SrcFileInfo createFile(Configuration conf, FileSystem fs, Path path, int numPartitions,
                               int numKeysPerPartition, int startKey) throws IOException {
  FSDataOutputStream outStream = fs.create(path);
  int currentKey = startKey;
  SrcFileInfo srcFileInfo = new SrcFileInfo();
  srcFileInfo.indexedRecords = new TezIndexRecord[numPartitions];
  srcFileInfo.path = path;
  for (int i = 0; i < numPartitions; i++) {
    long pos = outStream.getPos();
    IFile.Writer writer =
        new IFile.Writer(conf, outStream, IntWritable.class, IntWritable.class, null, null, null);
    for (int j = 0; j < numKeysPerPartition; j++) {
      writer.append(new IntWritable(currentKey), new IntWritable(currentKey));
      currentKey++;
    }
    writer.close();
    srcFileInfo.indexedRecords[i] =
        new TezIndexRecord(pos, writer.getRawLength(), writer.getCompressedLength());
  }
  outStream.close();
  return srcFileInfo;
}
 
源代码10 项目: big-c   文件: TestStickyBit.java
/**
 * Ensure that even if a file is in a directory with the sticky bit on,
 * another user can write to that file (assuming correct permissions).
 */
private void confirmCanAppend(Configuration conf, Path p) throws Exception {
  // Write a file to the new tmp directory as a regular user
  Path file = new Path(p, "foo");
  writeFile(hdfsAsUser1, file);
  hdfsAsUser1.setPermission(file, new FsPermission((short) 0777));

  // Log onto cluster as another user and attempt to append to file
  Path file2 = new Path(p, "foo");
  FSDataOutputStream h = null;
  try {
    h = hdfsAsUser2.append(file2);
    h.write("Some more data".getBytes());
    h.close();
    h = null;
  } finally {
    IOUtils.cleanup(null, h);
  }
}
 
源代码11 项目: RDFS   文件: TestBlocksScheduledCounter.java
public void testBlocksScheduledCounter() throws IOException {
  
  MiniDFSCluster cluster = new MiniDFSCluster(new Configuration(), 1, 
                                              true, null);
  cluster.waitActive();
  FileSystem fs = cluster.getFileSystem();
  
  //open a file an write a few bytes:
  FSDataOutputStream out = fs.create(new Path("/testBlockScheduledCounter"));
  for (int i=0; i<1024; i++) {
    out.write(i);
  }
  // flush to make sure a block is allocated.
  ((DFSOutputStream)(out.getWrappedStream())).sync();
  
  ArrayList<DatanodeDescriptor> dnList = new ArrayList<DatanodeDescriptor>();
  cluster.getNameNode().namesystem.DFSNodesStatus(dnList, dnList);
  DatanodeDescriptor dn = dnList.get(0);
  
  assertEquals(1, dn.getBlocksScheduled());
 
  // close the file and the counter should go to zero.
  out.close();   
  assertEquals(0, dn.getBlocksScheduled());
}
 
源代码12 项目: pxf   文件: Hdfs.java
private void writeTableToStream(FSDataOutputStream stream, Table dataTable, String delimiter, Charset encoding) throws Exception {
    BufferedWriter bufferedWriter = new BufferedWriter(
            new OutputStreamWriter(stream, encoding));
    List<List<String>> data = dataTable.getData();

    for (int i = 0, flushThreshold = 0; i < data.size(); i++, flushThreshold++) {
        List<String> row = data.get(i);
        StringBuilder sBuilder = new StringBuilder();
        for (int j = 0; j < row.size(); j++) {
            sBuilder.append(row.get(j));
            if (j != row.size() - 1) {
                sBuilder.append(delimiter);
            }
        }
        if (i != data.size() - 1) {
            sBuilder.append("\n");
        }
        bufferedWriter.append(sBuilder.toString());
        if (flushThreshold > ROW_BUFFER) {
            bufferedWriter.flush();
        }
    }
    bufferedWriter.close();
}
 
源代码13 项目: spork   文件: Util.java
static public void createInputFile(FileSystem fs, String fileName,
        String[] inputData) throws IOException {
    if(Util.WINDOWS){
        fileName = fileName.replace('\\','/');
    }
    if(fs.exists(new Path(fileName))) {
        throw new IOException("File " + fileName + " already exists on the FileSystem");
    }
    FSDataOutputStream stream = fs.create(new Path(fileName));
    PrintWriter pw = new PrintWriter(new OutputStreamWriter(stream, "UTF-8"));
    for (int i=0; i<inputData.length; i++){
        pw.print(inputData[i]);
        pw.print("\n");
    }
    pw.close();

}
 
源代码14 项目: big-c   文件: TestBlockUnderConstruction.java
void writeFile(Path file, FSDataOutputStream stm, int size)
throws IOException {
  long blocksBefore = stm.getPos() / BLOCK_SIZE;
  
  TestFileCreation.writeFile(stm, BLOCK_SIZE);
  // need to make sure the full block is completely flushed to the DataNodes
  // (see FSOutputSummer#flush)
  stm.flush();
  int blocksAfter = 0;
  // wait until the block is allocated by DataStreamer
  BlockLocation[] locatedBlocks;
  while(blocksAfter <= blocksBefore) {
    locatedBlocks = DFSClientAdapter.getDFSClient(hdfs).getBlockLocations(
        file.toString(), 0L, BLOCK_SIZE*NUM_BLOCKS);
    blocksAfter = locatedBlocks == null ? 0 : locatedBlocks.length;
  }
}
 
源代码15 项目: big-c   文件: TestEvents.java
private byte[] getEvents() throws Exception {
  ByteArrayOutputStream output = new ByteArrayOutputStream();
  FSDataOutputStream fsOutput = new FSDataOutputStream(output,
      new FileSystem.Statistics("scheme"));
  EventWriter writer = new EventWriter(fsOutput);
  writer.write(getJobPriorityChangedEvent());
  writer.write(getJobStatusChangedEvent());
  writer.write(getTaskUpdatedEvent());
  writer.write(getReduceAttemptKilledEvent());
  writer.write(getJobKilledEvent());
  writer.write(getSetupAttemptStartedEvent());
  writer.write(getTaskAttemptFinishedEvent());
  writer.write(getSetupAttemptFieledEvent());
  writer.write(getSetupAttemptKilledEvent());
  writer.write(getCleanupAttemptStartedEvent());
  writer.write(getCleanupAttemptFinishedEvent());
  writer.write(getCleanupAttemptFiledEvent());
  writer.write(getCleanupAttemptKilledEvent());

  writer.flush();
  writer.close();

  return output.toByteArray();
}
 
源代码16 项目: sahara-extra   文件: TestSwiftFileSystemRename.java
@Test(timeout = SWIFT_TEST_TIMEOUT)
public void testRenameFile() throws Exception {
  assumeRenameSupported();

  final Path old = new Path("/test/alice/file");
  final Path newPath = new Path("/test/bob/file");
  fs.mkdirs(newPath.getParent());
  final FSDataOutputStream fsDataOutputStream = fs.create(old);
  final byte[] message = "Some data".getBytes();
  fsDataOutputStream.write(message);
  fsDataOutputStream.close();

  assertTrue(fs.exists(old));
  rename(old, newPath, true, false, true);

  final FSDataInputStream bobStream = fs.open(newPath);
  final byte[] bytes = new byte[512];
  final int read = bobStream.read(bytes);
  bobStream.close();
  final byte[] buffer = new byte[read];
  System.arraycopy(bytes, 0, buffer, 0, read);
  assertEquals(new String(message), new String(buffer));
}
 
源代码17 项目: hbase   文件: TestOverwriteFileUnderConstruction.java
@Test
public void testOverwrite() throws IOException {
  Path file = new Path("/" + name.getMethodName());
  FSDataOutputStream out1 = FS.create(file);
  FSDataOutputStream out2 = FS.create(file, true);
  out1.write(2);
  out2.write(1);
  try {
    out1.close();
    // a successful close is also OK for us so no assertion here, we just need to confirm that the
    // data in the file are correct.
  } catch (FileNotFoundException fnfe) {
    // hadoop3 throws one of these.
  } catch (RemoteException e) {
    // expected
    assertThat(e.unwrapRemoteException(), instanceOf(LeaseExpiredException.class));
  }
  out2.close();
  try (FSDataInputStream in = FS.open(file)) {
    assertEquals(1, in.read());
    assertEquals(-1, in.read());
  }
}
 
源代码18 项目: kylin   文件: CreateHTableJob.java
private void exportHBaseConfiguration(String hbaseTableName) throws IOException {

        Configuration hbaseConf = HBaseConnection.getCurrentHBaseConfiguration();
        HadoopUtil.healSickConfig(hbaseConf);
        Job job = Job.getInstance(hbaseConf, hbaseTableName);
        HTable table = new HTable(hbaseConf, hbaseTableName);
        HFileOutputFormat3.configureIncrementalLoadMap(job, table);

        logger.info("Saving HBase configuration to {}", hbaseConfPath);
        FileSystem fs = HadoopUtil.getWorkingFileSystem();
        FSDataOutputStream out = null;
        try {
            out = fs.create(new Path(hbaseConfPath));
            job.getConfiguration().writeXml(out);
        } finally {
            IOUtils.closeQuietly(out);
        }
    }
 
源代码19 项目: presto   文件: PrestoS3FileSystem.java
@Override
public FSDataOutputStream create(Path path, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress)
        throws IOException
{
    // Ignore the overwrite flag, since Presto always writes to unique file names.
    // Checking for file existence can break read-after-write consistency.

    if (!stagingDirectory.exists()) {
        createDirectories(stagingDirectory.toPath());
    }
    if (!stagingDirectory.isDirectory()) {
        throw new IOException("Configured staging path is not a directory: " + stagingDirectory);
    }
    File tempFile = createTempFile(stagingDirectory.toPath(), "presto-s3-", ".tmp").toFile();

    String key = keyFromPath(qualifiedPath(path));
    return new FSDataOutputStream(
            new PrestoS3OutputStream(s3, getBucketName(uri), key, tempFile, sseEnabled, sseType, sseKmsKeyId, multiPartUploadMinFileSize, multiPartUploadMinPartSize, s3AclType, requesterPaysEnabled, s3StorageClass),
            statistics);
}
 
/** @throws Exception If failed. */
@Test
public void testSetPermissionCheckNonRecursiveness() throws Exception {
    Path fsHome = new Path(primaryFsUri);
    Path file = new Path(fsHome, "/tmp/my");

    FSDataOutputStream os = fs.create(file, EnumSet.noneOf(CreateFlag.class),
        Options.CreateOpts.perms(FsPermission.getDefault()));

    os.close();

    Path tmpDir = new Path(fsHome, "/tmp");

    FsPermission perm = new FsPermission((short)123);

    fs.setPermission(tmpDir, perm);

    assertEquals(perm, fs.getFileStatus(tmpDir).getPermission());
    assertEquals(FsPermission.getDefault(), fs.getFileStatus(file).getPermission());
}
 
源代码21 项目: hudi   文件: HoodieTestUtils.java
public static void createCleanFiles(HoodieTableMetaClient metaClient, String basePath,
    String instantTime, Configuration configuration)
    throws IOException {
  createPendingCleanFiles(metaClient, instantTime);
  Path commitFile = new Path(
      basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeCleanerFileName(instantTime));
  FileSystem fs = FSUtils.getFs(basePath, configuration);
  try (FSDataOutputStream os = fs.create(commitFile, true)) {
    HoodieCleanStat cleanStats = new HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS,
        DEFAULT_PARTITION_PATHS[rand.nextInt(DEFAULT_PARTITION_PATHS.length)], new ArrayList<>(), new ArrayList<>(),
        new ArrayList<>(), instantTime);
    // Create the clean metadata

    HoodieCleanMetadata cleanMetadata =
        CleanerUtils.convertCleanMetadata(instantTime, Option.of(0L), Collections.singletonList(cleanStats));
    // Write empty clean metadata
    os.write(TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata).get());
  }
}
 
源代码22 项目: hadoop   文件: TestFiHftp.java
private static byte[] createFile(FileSystem fs, Path name, long length, 
    short replication, long blocksize) throws IOException {
  final FSDataOutputStream out = fs.create(name, false, 4096,
      replication, blocksize);
  try {
    for(long n = length; n > 0; ) {
      ran.nextBytes(buffer);
      final int w = n < buffer.length? (int)n: buffer.length;
      out.write(buffer, 0, w);
      md5.update(buffer, 0, w);
      n -= w;
    }
  } finally {
    IOUtils.closeStream(out);
  }
  return md5.digest();
}
 
/** @throws Exception If failed. */
@Test
public void testSetOwnerCheckNonRecursiveness() throws Exception {
    Path fsHome = new Path(primaryFsUri);
    Path file = new Path(fsHome, "/tmp/my");

    FSDataOutputStream os = fs.create(file, EnumSet.noneOf(CreateFlag.class),
        Options.CreateOpts.perms(FsPermission.getDefault()));

    os.close();

    Path tmpDir = new Path(fsHome, "/tmp");

    fs.setOwner(file, "fUser", "fGroup");
    fs.setOwner(tmpDir, "dUser", "dGroup");

    assertEquals("dUser", fs.getFileStatus(tmpDir).getOwner());
    assertEquals("dGroup", fs.getFileStatus(tmpDir).getGroup());

    assertEquals("fUser", fs.getFileStatus(file).getOwner());
    assertEquals("fGroup", fs.getFileStatus(file).getGroup());
}
 
源代码24 项目: RDFS   文件: TestVLong.java
private long writeAndVerify(int shift) throws IOException {
  FSDataOutputStream out = fs.create(path);
  for (int i = Short.MIN_VALUE; i <= Short.MAX_VALUE; ++i) {
    Utils.writeVLong(out, ((long) i) << shift);
  }
  out.close();
  FSDataInputStream in = fs.open(path);
  for (int i = Short.MIN_VALUE; i <= Short.MAX_VALUE; ++i) {
    long n = Utils.readVLong(in);
    Assert.assertEquals(n, ((long) i) << shift);
  }
  in.close();
  long ret = fs.getFileStatus(path).getLen();
  fs.delete(path, false);
  return ret;
}
 
源代码25 项目: hadoop   文件: TestSetTimes.java
private FSDataOutputStream writeFile(FileSystem fileSys, Path name, int repl)
  throws IOException {
  FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf()
      .getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
      (short) repl, blockSize);
  byte[] buffer = new byte[fileSize];
  Random rand = new Random(seed);
  rand.nextBytes(buffer);
  stm.write(buffer);
  return stm;
}
 
源代码26 项目: hadoop   文件: TestDeleteRace.java
private void testDeleteAddBlockRace(boolean hasSnapshot) throws Exception {
  try {
    conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
        SlowBlockPlacementPolicy.class, BlockPlacementPolicy.class);
    cluster = new MiniDFSCluster.Builder(conf).build();
    FileSystem fs = cluster.getFileSystem();
    final String fileName = "/testDeleteAddBlockRace";
    Path filePath = new Path(fileName);

    FSDataOutputStream out = null;
    out = fs.create(filePath);
    if (hasSnapshot) {
      SnapshotTestHelper.createSnapshot((DistributedFileSystem) fs, new Path(
          "/"), "s1");
    }

    Thread deleteThread = new DeleteThread(fs, filePath);
    deleteThread.start();

    try {
      // write data and syn to make sure a block is allocated.
      out.write(new byte[32], 0, 32);
      out.hsync();
      Assert.fail("Should have failed.");
    } catch (FileNotFoundException e) {
      GenericTestUtils.assertExceptionContains(filePath.getName(), e);
    }
  } finally {
    if (cluster != null) {
      cluster.shutdown();
    }
  }
}
 
源代码27 项目: hadoop   文件: TestCheckpoint.java
static void writeFile(FileSystem fileSys, Path name, int repl)
  throws IOException {
  FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf()
      .getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
      (short) repl, blockSize);
  byte[] buffer = new byte[TestCheckpoint.fileSize];
  Random rand = new Random(TestCheckpoint.seed);
  rand.nextBytes(buffer);
  stm.write(buffer);
  stm.close();
}
 
源代码28 项目: incubator-pinot   文件: TopKPhaseJob.java
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {

  for (String dimension : dimensionNames) {

    LOGGER.info("{} records passed metric threshold for dimension {}", thresholdPassCount.get(dimension), dimension);

    // Get top k
    TopKDimensionToMetricsSpec topkSpec = topKDimensionToMetricsSpecMap.get(dimension);
    if (topkSpec != null && topkSpec.getDimensionName() != null && topkSpec.getTopk() != null) {

      // Get top k for each metric specified
      Map<String, Integer> topkMetricsMap = topkSpec.getTopk();
      for (Entry<String, Integer> topKEntry : topkMetricsMap.entrySet()) {

        String metric = topKEntry.getKey();
        int k = topKEntry.getValue();
        MinMaxPriorityQueue<DimensionValueMetricPair> topKQueue = MinMaxPriorityQueue.maximumSize(k).create();

        Map<Object, Number[]> dimensionToMetricsMap = dimensionNameToValuesMap.get(dimension);
        for (Entry<Object, Number[]> entry : dimensionToMetricsMap.entrySet()) {
          topKQueue.add(new DimensionValueMetricPair(entry.getKey(), entry.getValue()[metricToIndexMapping.get(metric)]));
        }
        LOGGER.info("Picking Top {} values for {} based on Metric {} : {}", k, dimension, metric, topKQueue);
        for (DimensionValueMetricPair pair : topKQueue) {
          topkDimensionValues.addValue(dimension, String.valueOf(pair.getDimensionValue()));
        }
      }
    }
  }

  if (topkDimensionValues.getTopKDimensions().size() > 0) {
    String topkValuesPath = configuration.get(TOPK_PHASE_OUTPUT_PATH.toString());
    LOGGER.info("Writing top k values to {}",topkValuesPath);
    FSDataOutputStream topKDimensionValuesOutputStream = fileSystem.create(
        new Path(topkValuesPath + File.separator + ThirdEyeConstants.TOPK_VALUES_FILE));
    OBJECT_MAPPER.writeValue((DataOutput) topKDimensionValuesOutputStream, topkDimensionValues);
    topKDimensionValuesOutputStream.close();
  }
}
 
源代码29 项目: kite   文件: TestSchemaCommandCluster.java
@Test
public void testHDFSCSVSchemaToFile() throws Exception {
  String csvSample = "hdfs:/tmp/sample/users.csv";
  FSDataOutputStream out = getDFS()
      .create(new Path(csvSample), true /* overwrite */);
  OutputStreamWriter writer = new OutputStreamWriter(out, "utf8");
  writer.append("id, username, email\n");
  writer.append("1, test, [email protected]\n");
  writer.close();

  Schema schema = SchemaBuilder.record("User").fields()
      .optionalLong("id")
      .optionalString("username")
      .optionalString("email")
      .endRecord();

  CSVSchemaCommand command = new CSVSchemaCommand(console);
  command.setConf(getConfiguration());
  command.samplePaths = Lists.newArrayList(csvSample);
  command.outputPath = "target/csv2.avsc";
  command.recordName = "User";

  int rc = command.run();
  Assert.assertEquals("Should return success code", 0, rc);
  String fileContent = Files.toString(
      new File("target/csv2.avsc"), BaseCommand.UTF8);
  Assert.assertTrue("File should contain pretty printed schema",
      TestUtil.matchesSchema(schema).matches(fileContent));
  verifyNoMoreInteractions(console);
}
 
源代码30 项目: hadoop   文件: DataGenerator.java
/** Create a file with the name <code>file</code> and 
 * a length of <code>fileSize</code>. The file is filled with character 'a'.
 */
private void genFile(Path file, long fileSize) throws IOException {
  FSDataOutputStream out = fc.create(file,
      EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
      CreateOpts.createParent(), CreateOpts.bufferSize(4096),
      CreateOpts.repFac((short) 3));
  for(long i=0; i<fileSize; i++) {
    out.writeByte('a');
  }
  out.close();
}
 
 类所在包
 同包方法