org.apache.hadoop.fs.CacheFlag#org.apache.htrace.Sampler源码实例Demo

下面列出了org.apache.hadoop.fs.CacheFlag#org.apache.htrace.Sampler 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: hadoop   文件: RemoteBlockReader2.java
@Override
public int read(ByteBuffer buf) throws IOException {
  if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
    TraceScope scope = Trace.startSpan(
        "RemoteBlockReader2#readNextPacket(" + blockId + ")", Sampler.NEVER);
    try {
      readNextPacket();
    } finally {
      scope.close();
    }
  }
  if (curDataSlice.remaining() == 0) {
    // we're at EOF now
    return -1;
  }

  int nRead = Math.min(curDataSlice.remaining(), buf.remaining());
  ByteBuffer writeSlice = curDataSlice.duplicate();
  writeSlice.limit(writeSlice.position() + nRead);
  buf.put(writeSlice);
  curDataSlice.position(writeSlice.position());

  return nRead;
}
 
源代码2 项目: hadoop   文件: BlockReaderLocalLegacy.java
/**
 * Reads bytes into a buffer until EOF or the buffer's limit is reached
 */
private int fillBuffer(FileInputStream stream, ByteBuffer buf)
    throws IOException {
  TraceScope scope = Trace.startSpan("BlockReaderLocalLegacy#fillBuffer(" +
      blockId + ")", Sampler.NEVER);
  try {
    int bytesRead = stream.getChannel().read(buf);
    if (bytesRead < 0) {
      //EOF
      return bytesRead;
    }
    while (buf.remaining() > 0) {
      int n = stream.getChannel().read(buf);
      if (n < 0) {
        //EOF
        return bytesRead;
      }
      bytesRead += n;
    }
    return bytesRead;
  } finally {
    scope.close();
  }
}
 
源代码3 项目: big-c   文件: RemoteBlockReader2.java
@Override
public int read(ByteBuffer buf) throws IOException {
  if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
    TraceScope scope = Trace.startSpan(
        "RemoteBlockReader2#readNextPacket(" + blockId + ")", Sampler.NEVER);
    try {
      readNextPacket();
    } finally {
      scope.close();
    }
  }
  if (curDataSlice.remaining() == 0) {
    // we're at EOF now
    return -1;
  }

  int nRead = Math.min(curDataSlice.remaining(), buf.remaining());
  ByteBuffer writeSlice = curDataSlice.duplicate();
  writeSlice.limit(writeSlice.position() + nRead);
  buf.put(writeSlice);
  curDataSlice.position(writeSlice.position());

  return nRead;
}
 
源代码4 项目: big-c   文件: BlockReaderLocalLegacy.java
/**
 * Reads bytes into a buffer until EOF or the buffer's limit is reached
 */
private int fillBuffer(FileInputStream stream, ByteBuffer buf)
    throws IOException {
  TraceScope scope = Trace.startSpan("BlockReaderLocalLegacy#fillBuffer(" +
      blockId + ")", Sampler.NEVER);
  try {
    int bytesRead = stream.getChannel().read(buf);
    if (bytesRead < 0) {
      //EOF
      return bytesRead;
    }
    while (buf.remaining() > 0) {
      int n = stream.getChannel().read(buf);
      if (n < 0) {
        //EOF
        return bytesRead;
      }
      bytesRead += n;
    }
    return bytesRead;
  } finally {
    scope.close();
  }
}
 
源代码5 项目: accumulo-examples   文件: TracingExample.java
private void createEntries(Opts opts) throws TableNotFoundException, AccumuloException {

    // Trace the write operation. Note, unless you flush the BatchWriter, you will not capture
    // the write operation as it is occurs asynchronously. You can optionally create additional
    // Spans
    // within a given Trace as seen below around the flush
    TraceScope scope = Trace.startSpan("Client Write", Sampler.ALWAYS);

    System.out.println("TraceID: " + Long.toHexString(scope.getSpan().getTraceId()));
    try (BatchWriter batchWriter = client.createBatchWriter(opts.getTableName())) {
      Mutation m = new Mutation("row");
      m.put("cf", "cq", "value");

      batchWriter.addMutation(m);
      // You can add timeline annotations to Spans which will be able to be viewed in the Monitor
      scope.getSpan().addTimelineAnnotation("Initiating Flush");
      batchWriter.flush();
    }
    scope.close();
  }
 
源代码6 项目: accumulo-examples   文件: TracingExample.java
private void readEntries(Opts opts) throws TableNotFoundException {

    Scanner scanner = client.createScanner(opts.getTableName(), opts.auths);

    // Trace the read operation.
    TraceScope readScope = Trace.startSpan("Client Read", Sampler.ALWAYS);
    System.out.println("TraceID: " + Long.toHexString(readScope.getSpan().getTraceId()));

    int numberOfEntriesRead = 0;
    for (Entry<Key,Value> entry : scanner) {
      System.out.println(entry.getKey().toString() + " -> " + entry.getValue().toString());
      ++numberOfEntriesRead;
    }
    // You can add additional metadata (key, values) to Spans which will be able to be viewed in the
    // Monitor
    readScope.getSpan().addKVAnnotation("Number of Entries Read".getBytes(UTF_8),
        String.valueOf(numberOfEntriesRead).getBytes(UTF_8));

    readScope.close();
  }
 
源代码7 项目: hadoop   文件: DFSOutputStream.java
private void waitForAckedSeqno(long seqno) throws IOException {
  TraceScope scope = Trace.startSpan("waitForAckedSeqno", Sampler.NEVER);
  try {
    if (DFSClient.LOG.isDebugEnabled()) {
      DFSClient.LOG.debug("Waiting for ack for: " + seqno);
    }
    long begin = Time.monotonicNow();
    try {
      synchronized (dataQueue) {
        while (!isClosed()) {
          checkClosed();
          if (lastAckedSeqno >= seqno) {
            break;
          }
          try {
            dataQueue.wait(1000); // when we receive an ack, we notify on
            // dataQueue
          } catch (InterruptedException ie) {
            throw new InterruptedIOException(
                "Interrupted while waiting for data to be acknowledged by pipeline");
          }
        }
      }
      checkClosed();
    } catch (ClosedChannelException e) {
    }
    long duration = Time.monotonicNow() - begin;
    if (duration > dfsclientSlowLogThresholdMs) {
      DFSClient.LOG.warn("Slow waitForAckedSeqno took " + duration
          + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms)");
    }
  } finally {
    scope.close();
  }
}
 
源代码8 项目: hadoop   文件: RemoteBlockReader.java
@Override
protected synchronized int readChunk(long pos, byte[] buf, int offset, 
                                     int len, byte[] checksumBuf) 
                                     throws IOException {
  TraceScope scope =
      Trace.startSpan("RemoteBlockReader#readChunk(" + blockId + ")",
          Sampler.NEVER);
  try {
    return readChunkImpl(pos, buf, offset, len, checksumBuf);
  } finally {
    scope.close();
  }
}
 
源代码9 项目: hadoop   文件: RemoteBlockReader2.java
@Override
public synchronized int read(byte[] buf, int off, int len) 
                             throws IOException {

  UUID randomId = null;
  if (LOG.isTraceEnabled()) {
    randomId = UUID.randomUUID();
    LOG.trace(String.format("Starting read #%s file %s from datanode %s",
      randomId.toString(), this.filename,
      this.datanodeID.getHostName()));
  }

  if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
    TraceScope scope = Trace.startSpan(
        "RemoteBlockReader2#readNextPacket(" + blockId + ")", Sampler.NEVER);
    try {
      readNextPacket();
    } finally {
      scope.close();
    }
  }

  if (LOG.isTraceEnabled()) {
    LOG.trace(String.format("Finishing read #" + randomId));
  }

  if (curDataSlice.remaining() == 0) {
    // we're at EOF now
    return -1;
  }
  
  int nRead = Math.min(curDataSlice.remaining(), len);
  curDataSlice.get(buf, off, nRead);
  
  return nRead;
}
 
源代码10 项目: hadoop   文件: CacheDirectiveIterator.java
public CacheDirectiveIterator(ClientProtocol namenode,
    CacheDirectiveInfo filter, Sampler<?> traceSampler) {
  super(0L);
  this.namenode = namenode;
  this.filter = filter;
  this.traceSampler = traceSampler;
}
 
源代码11 项目: hadoop   文件: DFSInotifyEventInputStream.java
DFSInotifyEventInputStream(Sampler traceSampler, ClientProtocol namenode,
      long lastReadTxid) throws IOException {
  this.traceSampler = traceSampler;
  this.namenode = namenode;
  this.it = Iterators.emptyIterator();
  this.lastReadTxid = lastReadTxid;
}
 
源代码12 项目: hadoop   文件: TestTracingShortCircuitLocalRead.java
@Test
public void testShortCircuitTraceHooks() throws IOException {
  assumeTrue(NativeCodeLoader.isNativeCodeLoaded() && !Path.WINDOWS);
  conf = new Configuration();
  conf.set(DFSConfigKeys.DFS_CLIENT_HTRACE_PREFIX +
      SpanReceiverHost.SPAN_RECEIVERS_CONF_SUFFIX,
      TestTracing.SetSpanReceiver.class.getName());
  conf.setLong("dfs.blocksize", 100 * 1024);
  conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
  conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
  conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
      "testShortCircuitTraceHooks._PORT");
  conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
  cluster = new MiniDFSCluster.Builder(conf)
      .numDataNodes(1)
      .build();
  dfs = cluster.getFileSystem();

  try {
    DFSTestUtil.createFile(dfs, TEST_PATH, TEST_LENGTH, (short)1, 5678L);

    TraceScope ts = Trace.startSpan("testShortCircuitTraceHooks", Sampler.ALWAYS);
    FSDataInputStream stream = dfs.open(TEST_PATH);
    byte buf[] = new byte[TEST_LENGTH];
    IOUtils.readFully(stream, buf, 0, TEST_LENGTH);
    stream.close();
    ts.close();

    String[] expectedSpanNames = {
      "OpRequestShortCircuitAccessProto",
      "ShortCircuitShmRequestProto"
    };
    TestTracing.assertSpanNamesFound(expectedSpanNames);
  } finally {
    dfs.close();
    cluster.shutdown();
  }
}
 
源代码13 项目: hadoop   文件: TestTracing.java
public void readWithTracing() throws Exception {
  String fileName = "testReadTraceHooks.dat";
  writeTestFile(fileName);
  long startTime = System.currentTimeMillis();
  TraceScope ts = Trace.startSpan("testReadTraceHooks", Sampler.ALWAYS);
  readTestFile(fileName);
  ts.close();
  long endTime = System.currentTimeMillis();

  String[] expectedSpanNames = {
    "testReadTraceHooks",
    "org.apache.hadoop.hdfs.protocol.ClientProtocol.getBlockLocations",
    "ClientNamenodeProtocol#getBlockLocations",
    "OpReadBlockProto"
  };
  assertSpanNamesFound(expectedSpanNames);

  // The trace should last about the same amount of time as the test
  Map<String, List<Span>> map = SetSpanReceiver.SetHolder.getMap();
  Span s = map.get("testReadTraceHooks").get(0);
  Assert.assertNotNull(s);

  long spanStart = s.getStartTimeMillis();
  long spanEnd = s.getStopTimeMillis();
  Assert.assertTrue(spanStart - startTime < 100);
  Assert.assertTrue(spanEnd - endTime < 100);

  // There should only be one trace id as it should all be homed in the
  // top trace.
  for (Span span : SetSpanReceiver.SetHolder.spans.values()) {
    Assert.assertEquals(ts.getSpan().getTraceId(), span.getTraceId());
  }
  SetSpanReceiver.SetHolder.spans.clear();
}
 
源代码14 项目: big-c   文件: DFSOutputStream.java
private void waitForAckedSeqno(long seqno) throws IOException {
  TraceScope scope = Trace.startSpan("waitForAckedSeqno", Sampler.NEVER);
  try {
    if (DFSClient.LOG.isDebugEnabled()) {
      DFSClient.LOG.debug("Waiting for ack for: " + seqno);
    }
    long begin = Time.monotonicNow();
    try {
      synchronized (dataQueue) {
        while (!isClosed()) {
          checkClosed();
          if (lastAckedSeqno >= seqno) {
            break;
          }
          try {
            dataQueue.wait(1000); // when we receive an ack, we notify on
            // dataQueue
          } catch (InterruptedException ie) {
            throw new InterruptedIOException(
                "Interrupted while waiting for data to be acknowledged by pipeline");
          }
        }
      }
      checkClosed();
    } catch (ClosedChannelException e) {
    }
    long duration = Time.monotonicNow() - begin;
    if (duration > dfsclientSlowLogThresholdMs) {
      DFSClient.LOG.warn("Slow waitForAckedSeqno took " + duration
          + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms)");
    }
  } finally {
    scope.close();
  }
}
 
源代码15 项目: big-c   文件: RemoteBlockReader.java
@Override
protected synchronized int readChunk(long pos, byte[] buf, int offset, 
                                     int len, byte[] checksumBuf) 
                                     throws IOException {
  TraceScope scope =
      Trace.startSpan("RemoteBlockReader#readChunk(" + blockId + ")",
          Sampler.NEVER);
  try {
    return readChunkImpl(pos, buf, offset, len, checksumBuf);
  } finally {
    scope.close();
  }
}
 
源代码16 项目: big-c   文件: RemoteBlockReader2.java
@Override
public synchronized int read(byte[] buf, int off, int len) 
                             throws IOException {

  UUID randomId = null;
  if (LOG.isTraceEnabled()) {
    randomId = UUID.randomUUID();
    LOG.trace(String.format("Starting read #%s file %s from datanode %s",
      randomId.toString(), this.filename,
      this.datanodeID.getHostName()));
  }

  if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
    TraceScope scope = Trace.startSpan(
        "RemoteBlockReader2#readNextPacket(" + blockId + ")", Sampler.NEVER);
    try {
      readNextPacket();
    } finally {
      scope.close();
    }
  }

  if (LOG.isTraceEnabled()) {
    LOG.trace(String.format("Finishing read #" + randomId));
  }

  if (curDataSlice.remaining() == 0) {
    // we're at EOF now
    return -1;
  }
  
  int nRead = Math.min(curDataSlice.remaining(), len);
  curDataSlice.get(buf, off, nRead);
  
  return nRead;
}
 
源代码17 项目: big-c   文件: CacheDirectiveIterator.java
public CacheDirectiveIterator(ClientProtocol namenode,
    CacheDirectiveInfo filter, Sampler<?> traceSampler) {
  super(0L);
  this.namenode = namenode;
  this.filter = filter;
  this.traceSampler = traceSampler;
}
 
源代码18 项目: big-c   文件: DFSInotifyEventInputStream.java
DFSInotifyEventInputStream(Sampler traceSampler, ClientProtocol namenode,
      long lastReadTxid) throws IOException {
  this.traceSampler = traceSampler;
  this.namenode = namenode;
  this.it = Iterators.emptyIterator();
  this.lastReadTxid = lastReadTxid;
}
 
源代码19 项目: big-c   文件: TestTracingShortCircuitLocalRead.java
@Test
public void testShortCircuitTraceHooks() throws IOException {
  assumeTrue(NativeCodeLoader.isNativeCodeLoaded() && !Path.WINDOWS);
  conf = new Configuration();
  conf.set(DFSConfigKeys.DFS_CLIENT_HTRACE_PREFIX +
      SpanReceiverHost.SPAN_RECEIVERS_CONF_SUFFIX,
      TestTracing.SetSpanReceiver.class.getName());
  conf.setLong("dfs.blocksize", 100 * 1024);
  conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
  conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
  conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
      "testShortCircuitTraceHooks._PORT");
  conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
  cluster = new MiniDFSCluster.Builder(conf)
      .numDataNodes(1)
      .build();
  dfs = cluster.getFileSystem();

  try {
    DFSTestUtil.createFile(dfs, TEST_PATH, TEST_LENGTH, (short)1, 5678L);

    TraceScope ts = Trace.startSpan("testShortCircuitTraceHooks", Sampler.ALWAYS);
    FSDataInputStream stream = dfs.open(TEST_PATH);
    byte buf[] = new byte[TEST_LENGTH];
    IOUtils.readFully(stream, buf, 0, TEST_LENGTH);
    stream.close();
    ts.close();

    String[] expectedSpanNames = {
      "OpRequestShortCircuitAccessProto",
      "ShortCircuitShmRequestProto"
    };
    TestTracing.assertSpanNamesFound(expectedSpanNames);
  } finally {
    dfs.close();
    cluster.shutdown();
  }
}
 
源代码20 项目: big-c   文件: TestTracing.java
public void readWithTracing() throws Exception {
  String fileName = "testReadTraceHooks.dat";
  writeTestFile(fileName);
  long startTime = System.currentTimeMillis();
  TraceScope ts = Trace.startSpan("testReadTraceHooks", Sampler.ALWAYS);
  readTestFile(fileName);
  ts.close();
  long endTime = System.currentTimeMillis();

  String[] expectedSpanNames = {
    "testReadTraceHooks",
    "org.apache.hadoop.hdfs.protocol.ClientProtocol.getBlockLocations",
    "ClientNamenodeProtocol#getBlockLocations",
    "OpReadBlockProto"
  };
  assertSpanNamesFound(expectedSpanNames);

  // The trace should last about the same amount of time as the test
  Map<String, List<Span>> map = SetSpanReceiver.SetHolder.getMap();
  Span s = map.get("testReadTraceHooks").get(0);
  Assert.assertNotNull(s);

  long spanStart = s.getStartTimeMillis();
  long spanEnd = s.getStopTimeMillis();
  Assert.assertTrue(spanStart - startTime < 100);
  Assert.assertTrue(spanEnd - endTime < 100);

  // There should only be one trace id as it should all be homed in the
  // top trace.
  for (Span span : SetSpanReceiver.SetHolder.spans.values()) {
    Assert.assertEquals(ts.getSpan().getTraceId(), span.getTraceId());
  }
  SetSpanReceiver.SetHolder.spans.clear();
}
 
源代码21 项目: phoenix   文件: Tracing.java
public static Sampler<?> getConfiguredSampler(TraceStatement traceStatement) {
  double samplingRate = traceStatement.getSamplingRate();
  if (samplingRate >= 1.0) {
      return Sampler.ALWAYS;
  } else if (samplingRate < 1.0 && samplingRate > 0.0) {
      Map<String, String> items = new HashMap<String, String>();
      items.put(ProbabilitySampler.SAMPLER_FRACTION_CONF_KEY, Double.toString(samplingRate));
      return new ProbabilitySampler(HTraceConfiguration.fromMap(items));
  } else {
      return Sampler.NEVER;
  }
}
 
源代码22 项目: hadoop   文件: EncryptionZoneIterator.java
public EncryptionZoneIterator(ClientProtocol namenode,
                              Sampler<?> traceSampler) {
  super(Long.valueOf(0));
  this.namenode = namenode;
  this.traceSampler = traceSampler;
}
 
源代码23 项目: hadoop   文件: CachePoolIterator.java
public CachePoolIterator(ClientProtocol namenode, Sampler traceSampler) {
  super("");
  this.namenode = namenode;
  this.traceSampler = traceSampler;
}
 
源代码24 项目: hadoop   文件: DFSInotifyEventInputStream.java
DFSInotifyEventInputStream(Sampler<?> traceSampler, ClientProtocol namenode)
      throws IOException {
  // Only consider new transaction IDs.
  this(traceSampler, namenode, namenode.getCurrentEditLogTxid());
}
 
源代码25 项目: hadoop   文件: TestTracing.java
public void writeWithTracing() throws Exception {
  long startTime = System.currentTimeMillis();
  TraceScope ts = Trace.startSpan("testWriteTraceHooks", Sampler.ALWAYS);
  writeTestFile("testWriteTraceHooks.dat");
  long endTime = System.currentTimeMillis();
  ts.close();

  String[] expectedSpanNames = {
    "testWriteTraceHooks",
    "org.apache.hadoop.hdfs.protocol.ClientProtocol.create",
    "ClientNamenodeProtocol#create",
    "org.apache.hadoop.hdfs.protocol.ClientProtocol.fsync",
    "ClientNamenodeProtocol#fsync",
    "org.apache.hadoop.hdfs.protocol.ClientProtocol.complete",
    "ClientNamenodeProtocol#complete",
    "newStreamForCreate",
    "DFSOutputStream#writeChunk",
    "DFSOutputStream#close",
    "dataStreamer",
    "OpWriteBlockProto",
    "org.apache.hadoop.hdfs.protocol.ClientProtocol.addBlock",
    "ClientNamenodeProtocol#addBlock"
  };
  assertSpanNamesFound(expectedSpanNames);

  // The trace should last about the same amount of time as the test
  Map<String, List<Span>> map = SetSpanReceiver.SetHolder.getMap();
  Span s = map.get("testWriteTraceHooks").get(0);
  Assert.assertNotNull(s);
  long spanStart = s.getStartTimeMillis();
  long spanEnd = s.getStopTimeMillis();

  // Spans homed in the top trace shoud have same trace id.
  // Spans having multiple parents (e.g. "dataStreamer" added by HDFS-7054)
  // and children of them are exception.
  String[] spansInTopTrace = {
    "testWriteTraceHooks",
    "org.apache.hadoop.hdfs.protocol.ClientProtocol.create",
    "ClientNamenodeProtocol#create",
    "org.apache.hadoop.hdfs.protocol.ClientProtocol.fsync",
    "ClientNamenodeProtocol#fsync",
    "org.apache.hadoop.hdfs.protocol.ClientProtocol.complete",
    "ClientNamenodeProtocol#complete",
    "newStreamForCreate",
    "DFSOutputStream#writeChunk",
    "DFSOutputStream#close",
  };
  for (String desc : spansInTopTrace) {
    for (Span span : map.get(desc)) {
      Assert.assertEquals(ts.getSpan().getTraceId(), span.getTraceId());
    }
  }
  SetSpanReceiver.SetHolder.spans.clear();
}
 
源代码26 项目: hadoop   文件: TestCacheDirectives.java
@Test(timeout=120000)
public void testWaitForCachedReplicas() throws Exception {
  FileSystemTestHelper helper = new FileSystemTestHelper();
  GenericTestUtils.waitFor(new Supplier<Boolean>() {
    @Override
    public Boolean get() {
      return ((namenode.getNamesystem().getCacheCapacity() ==
          (NUM_DATANODES * CACHE_CAPACITY)) &&
            (namenode.getNamesystem().getCacheUsed() == 0));
    }
  }, 500, 60000);

  // Send a cache report referring to a bogus block.  It is important that
  // the NameNode be robust against this.
  NamenodeProtocols nnRpc = namenode.getRpcServer();
  DataNode dn0 = cluster.getDataNodes().get(0);
  String bpid = cluster.getNamesystem().getBlockPoolId();
  LinkedList<Long> bogusBlockIds = new LinkedList<Long> ();
  bogusBlockIds.add(999999L);
  nnRpc.cacheReport(dn0.getDNRegistrationForBP(bpid), bpid, bogusBlockIds);

  Path rootDir = helper.getDefaultWorkingDirectory(dfs);
  // Create the pool
  final String pool = "friendlyPool";
  nnRpc.addCachePool(new CachePoolInfo("friendlyPool"));
  // Create some test files
  final int numFiles = 2;
  final int numBlocksPerFile = 2;
  final List<String> paths = new ArrayList<String>(numFiles);
  for (int i=0; i<numFiles; i++) {
    Path p = new Path(rootDir, "testCachePaths-" + i);
    FileSystemTestHelper.createFile(dfs, p, numBlocksPerFile,
        (int)BLOCK_SIZE);
    paths.add(p.toUri().getPath());
  }
  // Check the initial statistics at the namenode
  waitForCachedBlocks(namenode, 0, 0, "testWaitForCachedReplicas:0");
  // Cache and check each path in sequence
  int expected = 0;
  for (int i=0; i<numFiles; i++) {
    CacheDirectiveInfo directive =
        new CacheDirectiveInfo.Builder().
          setPath(new Path(paths.get(i))).
          setPool(pool).
          build();
    nnRpc.addCacheDirective(directive, EnumSet.noneOf(CacheFlag.class));
    expected += numBlocksPerFile;
    waitForCachedBlocks(namenode, expected, expected,
        "testWaitForCachedReplicas:1");
  }

  // Check that the datanodes have the right cache values
  DatanodeInfo[] live = dfs.getDataNodeStats(DatanodeReportType.LIVE);
  assertEquals("Unexpected number of live nodes", NUM_DATANODES, live.length);
  long totalUsed = 0;
  for (DatanodeInfo dn : live) {
    final long cacheCapacity = dn.getCacheCapacity();
    final long cacheUsed = dn.getCacheUsed();
    final long cacheRemaining = dn.getCacheRemaining();
    assertEquals("Unexpected cache capacity", CACHE_CAPACITY, cacheCapacity);
    assertEquals("Capacity not equal to used + remaining",
        cacheCapacity, cacheUsed + cacheRemaining);
    assertEquals("Remaining not equal to capacity - used",
        cacheCapacity - cacheUsed, cacheRemaining);
    totalUsed += cacheUsed;
  }
  assertEquals(expected*BLOCK_SIZE, totalUsed);

  // Uncache and check each path in sequence
  RemoteIterator<CacheDirectiveEntry> entries =
    new CacheDirectiveIterator(nnRpc, null, Sampler.NEVER);
  for (int i=0; i<numFiles; i++) {
    CacheDirectiveEntry entry = entries.next();
    nnRpc.removeCacheDirective(entry.getInfo().getId());
    expected -= numBlocksPerFile;
    waitForCachedBlocks(namenode, expected, expected,
        "testWaitForCachedReplicas:2");
  }
}
 
源代码27 项目: big-c   文件: EncryptionZoneIterator.java
public EncryptionZoneIterator(ClientProtocol namenode,
                              Sampler<?> traceSampler) {
  super(Long.valueOf(0));
  this.namenode = namenode;
  this.traceSampler = traceSampler;
}
 
源代码28 项目: big-c   文件: CachePoolIterator.java
public CachePoolIterator(ClientProtocol namenode, Sampler traceSampler) {
  super("");
  this.namenode = namenode;
  this.traceSampler = traceSampler;
}
 
源代码29 项目: big-c   文件: DFSInotifyEventInputStream.java
DFSInotifyEventInputStream(Sampler<?> traceSampler, ClientProtocol namenode)
      throws IOException {
  // Only consider new transaction IDs.
  this(traceSampler, namenode, namenode.getCurrentEditLogTxid());
}
 
源代码30 项目: big-c   文件: TestTracing.java
public void writeWithTracing() throws Exception {
  long startTime = System.currentTimeMillis();
  TraceScope ts = Trace.startSpan("testWriteTraceHooks", Sampler.ALWAYS);
  writeTestFile("testWriteTraceHooks.dat");
  long endTime = System.currentTimeMillis();
  ts.close();

  String[] expectedSpanNames = {
    "testWriteTraceHooks",
    "org.apache.hadoop.hdfs.protocol.ClientProtocol.create",
    "ClientNamenodeProtocol#create",
    "org.apache.hadoop.hdfs.protocol.ClientProtocol.fsync",
    "ClientNamenodeProtocol#fsync",
    "org.apache.hadoop.hdfs.protocol.ClientProtocol.complete",
    "ClientNamenodeProtocol#complete",
    "newStreamForCreate",
    "DFSOutputStream#writeChunk",
    "DFSOutputStream#close",
    "dataStreamer",
    "OpWriteBlockProto",
    "org.apache.hadoop.hdfs.protocol.ClientProtocol.addBlock",
    "ClientNamenodeProtocol#addBlock"
  };
  assertSpanNamesFound(expectedSpanNames);

  // The trace should last about the same amount of time as the test
  Map<String, List<Span>> map = SetSpanReceiver.SetHolder.getMap();
  Span s = map.get("testWriteTraceHooks").get(0);
  Assert.assertNotNull(s);
  long spanStart = s.getStartTimeMillis();
  long spanEnd = s.getStopTimeMillis();

  // Spans homed in the top trace shoud have same trace id.
  // Spans having multiple parents (e.g. "dataStreamer" added by HDFS-7054)
  // and children of them are exception.
  String[] spansInTopTrace = {
    "testWriteTraceHooks",
    "org.apache.hadoop.hdfs.protocol.ClientProtocol.create",
    "ClientNamenodeProtocol#create",
    "org.apache.hadoop.hdfs.protocol.ClientProtocol.fsync",
    "ClientNamenodeProtocol#fsync",
    "org.apache.hadoop.hdfs.protocol.ClientProtocol.complete",
    "ClientNamenodeProtocol#complete",
    "newStreamForCreate",
    "DFSOutputStream#writeChunk",
    "DFSOutputStream#close",
  };
  for (String desc : spansInTopTrace) {
    for (Span span : map.get(desc)) {
      Assert.assertEquals(ts.getSpan().getTraceId(), span.getTraceId());
    }
  }
  SetSpanReceiver.SetHolder.spans.clear();
}