org.apache.hadoop.fs.FSDataOutputStream#getWrappedStream ( )源码实例Demo

下面列出了org.apache.hadoop.fs.FSDataOutputStream#getWrappedStream ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: hadoop   文件: TestDatanodeDeath.java
@Override
public void run() {
  System.out.println("Workload starting ");
  for (int i = 0; i < numberOfFiles; i++) {
    Path filename = new Path(id + "." + i);
    try {
      System.out.println("Workload processing file " + filename);
      FSDataOutputStream stm = createFile(fs, filename, replication);
      DFSOutputStream dfstream = (DFSOutputStream)
                                             (stm.getWrappedStream());
      dfstream.setArtificialSlowdown(1000);
      writeFile(stm, myseed);
      stm.close();
      checkFile(fs, filename, replication, numBlocks, fileSize, myseed);
    } catch (Throwable e) {
      System.out.println("Workload exception " + e);
      assertTrue(e.toString(), false);
    }

    // increment the stamp to indicate that another file is done.
    synchronized (this) {
      stamp++;
    }
  }
}
 
源代码2 项目: big-c   文件: TestDatanodeDeath.java
@Override
public void run() {
  System.out.println("Workload starting ");
  for (int i = 0; i < numberOfFiles; i++) {
    Path filename = new Path(id + "." + i);
    try {
      System.out.println("Workload processing file " + filename);
      FSDataOutputStream stm = createFile(fs, filename, replication);
      DFSOutputStream dfstream = (DFSOutputStream)
                                             (stm.getWrappedStream());
      dfstream.setArtificialSlowdown(1000);
      writeFile(stm, myseed);
      stm.close();
      checkFile(fs, filename, replication, numBlocks, fileSize, myseed);
    } catch (Throwable e) {
      System.out.println("Workload exception " + e);
      assertTrue(e.toString(), false);
    }

    // increment the stamp to indicate that another file is done.
    synchronized (this) {
      stamp++;
    }
  }
}
 
源代码3 项目: RDFS   文件: TestDatanodeDeath.java
public void run() {
  System.out.println("Workload starting ");
  for (int i = 0; i < numberOfFiles; i++) {
    Path filename = new Path(id + "." + i);
    try {
      System.out.println("Workload processing file " + filename);
      FSDataOutputStream stm = createFile(fs, filename, replication);
      DFSOutputStream dfstream = (DFSOutputStream)(stm.getWrappedStream());
      dfstream.setArtificialSlowdown(1000);
      writeFile(stm, myseed);
      stm.close();
      checkFile(fs, filename, replication, numBlocks, fileSize, myseed);
    } catch (Throwable e) {
      System.out.println("Workload exception " + e);
      assertTrue(e.toString(), false);
    }

    // increment the stamp to indicate that another file is done.
    synchronized (this) {
      stamp++;
    }
  }
}
 
源代码4 项目: hadoop-gpu   文件: TestDatanodeDeath.java
public void run() {
  System.out.println("Workload starting ");
  for (int i = 0; i < numberOfFiles; i++) {
    Path filename = new Path(id + "." + i);
    try {
      System.out.println("Workload processing file " + filename);
      FSDataOutputStream stm = createFile(fs, filename, replication);
      DFSClient.DFSOutputStream dfstream = (DFSClient.DFSOutputStream)
                                             (stm.getWrappedStream());
      dfstream.setArtificialSlowdown(1000);
      writeFile(stm, myseed);
      stm.close();
      checkFile(fs, filename, replication, numBlocks, fileSize, myseed);
    } catch (Throwable e) {
      System.out.println("Workload exception " + e);
      assertTrue(e.toString(), false);
    }

    // increment the stamp to indicate that another file is done.
    synchronized (this) {
      stamp++;
    }
  }
}
 
源代码5 项目: DataLink   文件: BaseRecordHandler.java
private void hsync(FSDataOutputStream fsOut) throws Exception {
    // 调用hsync时,必须设置SyncFlag.UPDATE_LENGTH,否则RDD或者MR任务读取不到写入的数据
    // 参见:
    // https://issues.cloudera.org/browse/DISTRO-696;
    // http://www.hypertable.com/documentation/administrator_guide/hdfs_and_durability
    // https://blog.csdn.net/leen0304/article/details/77854052?locationNum=10&fps=1
    // https://issues.apache.org/jira/browse/HDFS-11915
    if (fsOut instanceof HdfsDataOutputStream) {
        ((HdfsDataOutputStream) fsOut).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
    } else if (fsOut.getWrappedStream() instanceof DFSOutputStream) {
        ((DFSOutputStream) fsOut.getWrappedStream()).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
    } else {
        fsOut.hsync();
    }
}
 
源代码6 项目: hadoop   文件: SwiftFileSystemBaseTest.java
/**
 * Assert that the no. of partitions written matches expectations
 * @param action operation (for use in the assertions)
 * @param out output stream
 * @param expected expected no. of partitions
 */
protected void assertPartitionsWritten(String action, FSDataOutputStream out,
                                       long expected) {
  OutputStream nativeStream = out.getWrappedStream();
  int written = getPartitionsWritten(out);
  if(written !=expected) {
  Assert.fail(action + ": " +
              TestSwiftFileSystemPartitionedUploads.WRONG_PARTITION_COUNT 
              + " + expected: " + expected + " actual: " + written
              + " -- " + nativeStream);
  }
}
 
源代码7 项目: big-c   文件: SwiftFileSystemBaseTest.java
/**
 * Assert that the no. of partitions written matches expectations
 * @param action operation (for use in the assertions)
 * @param out output stream
 * @param expected expected no. of partitions
 */
protected void assertPartitionsWritten(String action, FSDataOutputStream out,
                                       long expected) {
  OutputStream nativeStream = out.getWrappedStream();
  int written = getPartitionsWritten(out);
  if(written !=expected) {
  Assert.fail(action + ": " +
              TestSwiftFileSystemPartitionedUploads.WRONG_PARTITION_COUNT 
              + " + expected: " + expected + " actual: " + written
              + " -- " + nativeStream);
  }
}
 
源代码8 项目: RDFS   文件: DFSClientAdapter.java
public static void abortForTest(FSDataOutputStream out) throws IOException {
  OutputStream stream = out.getWrappedStream();

  if (stream instanceof DFSOutputStream) {
    DFSOutputStream dfsOutputStream =
      (DFSOutputStream) stream;
    dfsOutputStream.abortForTests();
  }
  //no-op otherwise
}
 
源代码9 项目: sahara-extra   文件: SwiftFileSystemBaseTest.java
/**
 * Assert that the no. of partitions written matches expectations
 * @param action operation (for use in the assertions)
 * @param out output stream
 * @param expected expected no. of partitions
 */
protected void assertPartitionsWritten(String action, FSDataOutputStream out,
                                       long expected) {
  OutputStream nativeStream = out.getWrappedStream();
  int written = getPartitionsWritten(out);
  if(written !=expected) {
  Assert.fail(action + ": " +
              TestSwiftFileSystemPartitionedUploads.WRONG_PARTITION_COUNT 
              + " + expected: " + expected + " actual: " + written
              + " -- " + nativeStream);
  }
}
 
源代码10 项目: hadoop   文件: TestDataNodeMetrics.java
/**
 * Tests that round-trip acks in a datanode write pipeline are correctly 
 * measured. 
 */
@Test
public void testRoundTripAckMetric() throws Exception {
  final int datanodeCount = 2;
  final int interval = 1;
  Configuration conf = new HdfsConfiguration();
  conf.set(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY, "" + interval);
  MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
      datanodeCount).build();
  try {
    cluster.waitActive();
    FileSystem fs = cluster.getFileSystem();
    // Open a file and get the head of the pipeline
    Path testFile = new Path("/testRoundTripAckMetric.txt");
    FSDataOutputStream fsout = fs.create(testFile, (short) datanodeCount);
    DFSOutputStream dout = (DFSOutputStream) fsout.getWrappedStream();
    // Slow down the writes to catch the write pipeline
    dout.setChunksPerPacket(5);
    dout.setArtificialSlowdown(3000);
    fsout.write(new byte[10000]);
    DatanodeInfo[] pipeline = null;
    int count = 0;
    while (pipeline == null && count < 5) {
      pipeline = dout.getPipeline();
      System.out.println("Waiting for pipeline to be created.");
      Thread.sleep(1000);
      count++;
    }
    // Get the head node that should be receiving downstream acks
    DatanodeInfo headInfo = pipeline[0];
    DataNode headNode = null;
    for (DataNode datanode : cluster.getDataNodes()) {
      if (datanode.getDatanodeId().equals(headInfo)) {
        headNode = datanode;
        break;
      }
    }
    assertNotNull("Could not find the head of the datanode write pipeline", 
        headNode);
    // Close the file and wait for the metrics to rollover
    Thread.sleep((interval + 1) * 1000);
    // Check the ack was received
    MetricsRecordBuilder dnMetrics = getMetrics(headNode.getMetrics()
        .name());
    assertTrue("Expected non-zero number of acks", 
        getLongCounter("PacketAckRoundTripTimeNanosNumOps", dnMetrics) > 0);
    assertQuantileGauges("PacketAckRoundTripTimeNanos" + interval
        + "s", dnMetrics);
  } finally {
    if (cluster != null) {
      cluster.shutdown();
    }
  }
}
 
源代码11 项目: hadoop   文件: SwiftNativeFileSystem.java
private static SwiftNativeOutputStream getSwiftNativeOutputStream(
  FSDataOutputStream outputStream) {
  OutputStream wrappedStream = outputStream.getWrappedStream();
  return (SwiftNativeOutputStream) wrappedStream;
}
 
源代码12 项目: big-c   文件: TestDataNodeMetrics.java
/**
 * Tests that round-trip acks in a datanode write pipeline are correctly 
 * measured. 
 */
@Test
public void testRoundTripAckMetric() throws Exception {
  final int datanodeCount = 2;
  final int interval = 1;
  Configuration conf = new HdfsConfiguration();
  conf.set(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY, "" + interval);
  MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
      datanodeCount).build();
  try {
    cluster.waitActive();
    FileSystem fs = cluster.getFileSystem();
    // Open a file and get the head of the pipeline
    Path testFile = new Path("/testRoundTripAckMetric.txt");
    FSDataOutputStream fsout = fs.create(testFile, (short) datanodeCount);
    DFSOutputStream dout = (DFSOutputStream) fsout.getWrappedStream();
    // Slow down the writes to catch the write pipeline
    dout.setChunksPerPacket(5);
    dout.setArtificialSlowdown(3000);
    fsout.write(new byte[10000]);
    DatanodeInfo[] pipeline = null;
    int count = 0;
    while (pipeline == null && count < 5) {
      pipeline = dout.getPipeline();
      System.out.println("Waiting for pipeline to be created.");
      Thread.sleep(1000);
      count++;
    }
    // Get the head node that should be receiving downstream acks
    DatanodeInfo headInfo = pipeline[0];
    DataNode headNode = null;
    for (DataNode datanode : cluster.getDataNodes()) {
      if (datanode.getDatanodeId().equals(headInfo)) {
        headNode = datanode;
        break;
      }
    }
    assertNotNull("Could not find the head of the datanode write pipeline", 
        headNode);
    // Close the file and wait for the metrics to rollover
    Thread.sleep((interval + 1) * 1000);
    // Check the ack was received
    MetricsRecordBuilder dnMetrics = getMetrics(headNode.getMetrics()
        .name());
    assertTrue("Expected non-zero number of acks", 
        getLongCounter("PacketAckRoundTripTimeNanosNumOps", dnMetrics) > 0);
    assertQuantileGauges("PacketAckRoundTripTimeNanos" + interval
        + "s", dnMetrics);
  } finally {
    if (cluster != null) {
      cluster.shutdown();
    }
  }
}
 
源代码13 项目: big-c   文件: SwiftNativeFileSystem.java
private static SwiftNativeOutputStream getSwiftNativeOutputStream(
  FSDataOutputStream outputStream) {
  OutputStream wrappedStream = outputStream.getWrappedStream();
  return (SwiftNativeOutputStream) wrappedStream;
}
 
源代码14 项目: sahara-extra   文件: SwiftNativeFileSystem.java
private static SwiftNativeOutputStream getSwiftNativeOutputStream(
  FSDataOutputStream outputStream) {
  OutputStream wrappedStream = outputStream.getWrappedStream();
  return (SwiftNativeOutputStream) wrappedStream;
}
 
源代码15 项目: hbase   文件: FSHLog.java
/**
 * Currently, we need to expose the writer's OutputStream to tests so that they can manipulate the
 * default behavior (such as setting the maxRecoveryErrorCount value). This is
 * done using reflection on the underlying HDFS OutputStream. NOTE: This could be removed once Hadoop1 support is
 * removed.
 * @return null if underlying stream is not ready.
 */
@VisibleForTesting
OutputStream getOutputStream() {
  FSDataOutputStream fsdos = this.hdfs_out;
  return fsdos != null ? fsdos.getWrappedStream() : null;
}