类org.apache.hadoop.util.Time源码实例Demo

下面列出了怎么用org.apache.hadoop.util.Time的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: hadoop   文件: TestZKFailoverControllerStress.java
/**
 * Randomly expire the ZK sessions of the two ZKFCs. This differs
 * from the above test in that it is not a controlled failover -
 * we just do random expirations and expect neither one to ever
 * generate fatal exceptions.
 */
@Test(timeout=(STRESS_RUNTIME_SECS + EXTRA_TIMEOUT_SECS) * 1000)
public void testRandomExpirations() throws Exception {
  cluster.start();
  long st = Time.now();
  long runFor = STRESS_RUNTIME_SECS * 1000;

  Random r = new Random();
  while (Time.now() - st < runFor) {
    cluster.getTestContext().checkException();
    int targetIdx = r.nextInt(2);
    ActiveStandbyElector target = cluster.getElector(targetIdx);
    long sessId = target.getZKSessionIdForTests();
    if (sessId != -1) {
      LOG.info(String.format("Expiring session %x for svc %d",
          sessId, targetIdx));
      getServer(serverFactory).closeSession(sessId);
    }
    Thread.sleep(r.nextInt(300));
  }
}
 
源代码2 项目: hadoop   文件: TestMultithreadedTestUtil.java
@Test
public void testThreadFails() throws Exception {
  TestContext ctx = new TestContext();
  ctx.addThread(new TestingThread(ctx) {
    @Override
    public void doWork() throws Exception {
      fail(FAIL_MSG);
    }
  });
  ctx.startThreads();
  long st = Time.now();
  try {
    ctx.waitFor(30000);
    fail("waitFor did not throw");
  } catch (RuntimeException rte) {
    // expected
    assertEquals(FAIL_MSG, rte.getCause().getMessage());
  }
  long et = Time.now();
  // Test shouldn't have waited the full 30 seconds, since
  // the thread throws faster than that
  assertTrue("Test took " + (et - st) + "ms",
      et - st < 5000);
}
 
源代码3 项目: hadoop   文件: HATestUtil.java
public static void waitForCheckpoint(MiniDFSCluster cluster, int nnIdx,
    List<Integer> txids) throws InterruptedException {
  long start = Time.now();
  while (true) {
    try {
      FSImageTestUtil.assertNNHasCheckpoints(cluster, nnIdx, txids);
      return;
    } catch (AssertionError err) {
      if (Time.now() - start > 10000) {
        throw err;
      } else {
        Thread.sleep(300);
      }
    }
  }
}
 
源代码4 项目: big-c   文件: TestINodeFile.java
@Test
public void testFileUnderConstruction() {
  replication = 3;
  final INodeFile file = new INodeFile(INodeId.GRANDFATHER_INODE_ID, null,
      perm, 0L, 0L, null, replication, 1024L, (byte)0);
  assertFalse(file.isUnderConstruction());

  final String clientName = "client";
  final String clientMachine = "machine";
  file.toUnderConstruction(clientName, clientMachine);
  assertTrue(file.isUnderConstruction());
  FileUnderConstructionFeature uc = file.getFileUnderConstructionFeature();
  assertEquals(clientName, uc.getClientName());
  assertEquals(clientMachine, uc.getClientMachine());

  file.toCompleteFile(Time.now());
  assertFalse(file.isUnderConstruction());
}
 
源代码5 项目: hadoop   文件: PBImageTextWriter.java
private void output(Configuration conf, FileSummary summary,
    FileInputStream fin, ArrayList<FileSummary.Section> sections)
    throws IOException {
  InputStream is;
  long startTime = Time.monotonicNow();
  out.println(getHeader());
  for (FileSummary.Section section : sections) {
    if (SectionName.fromString(section.getName()) == SectionName.INODE) {
      fin.getChannel().position(section.getOffset());
      is = FSImageUtil.wrapInputStreamForCompression(conf,
          summary.getCodec(), new BufferedInputStream(new LimitInputStream(
              fin, section.getLength())));
      outputINodes(is);
    }
  }
  long timeTaken = Time.monotonicNow() - startTime;
  LOG.debug("Time to output inodes: {}ms", timeTaken);
}
 
/**
 * Create DummyBucketCreate response.
 */
private OMDummyCreateBucketResponse createDummyBucketResponse(
    String volumeName) {
  OmBucketInfo omBucketInfo =
      OmBucketInfo.newBuilder()
          .setVolumeName(volumeName)
          .setBucketName(UUID.randomUUID().toString())
          .setCreationTime(Time.now())
          .build();
  return new OMDummyCreateBucketResponse(omBucketInfo,
      OMResponse.newBuilder()
          .setCmdType(OzoneManagerProtocolProtos.Type.CreateBucket)
          .setStatus(OzoneManagerProtocolProtos.Status.OK)
          .setCreateBucketResponse(CreateBucketResponse.newBuilder().build())
          .build());
}
 
源代码7 项目: big-c   文件: PeerCache.java
private synchronized Peer getInternal(DatanodeID dnId, boolean isDomain) {
  List<Value> sockStreamList = multimap.get(new Key(dnId, isDomain));
  if (sockStreamList == null) {
    return null;
  }

  Iterator<Value> iter = sockStreamList.iterator();
  while (iter.hasNext()) {
    Value candidate = iter.next();
    iter.remove();
    long ageMs = Time.monotonicNow() - candidate.getTime();
    Peer peer = candidate.getPeer();
    if (ageMs >= expiryPeriod) {
      try {
        peer.close();
      } catch (IOException e) {
        LOG.warn("got IOException closing stale peer " + peer +
              ", which is " + ageMs + " ms old");
      }
    } else if (!peer.isClosed()) {
      return peer;
    }
  }
  return null;
}
 
源代码8 项目: big-c   文件: BlockTokenSecretManager.java
/** Initialize block keys */
private synchronized void generateKeys() {
  if (!isMaster)
    return;
  /*
   * Need to set estimated expiry dates for currentKey and nextKey so that if
   * NN crashes, DN can still expire those keys. NN will stop using the newly
   * generated currentKey after the first keyUpdateInterval, however it may
   * still be used by DN and Balancer to generate new tokens before they get a
   * chance to sync their keys with NN. Since we require keyUpdInterval to be
   * long enough so that all live DN's and Balancer will sync their keys with
   * NN at least once during the period, the estimated expiry date for
   * currentKey is set to now() + 2 * keyUpdateInterval + tokenLifetime.
   * Similarly, the estimated expiry date for nextKey is one keyUpdateInterval
   * more.
   */
  setSerialNo(serialNo + 1);
  currentKey = new BlockKey(serialNo, Time.now() + 2
      * keyUpdateInterval + tokenLifetime, generateSecret());
  setSerialNo(serialNo + 1);
  nextKey = new BlockKey(serialNo, Time.now() + 3
      * keyUpdateInterval + tokenLifetime, generateSecret());
  allKeys.put(currentKey.getKeyId(), currentKey);
  allKeys.put(nextKey.getKeyId(), nextKey);
}
 
源代码9 项目: hadoop   文件: BlockTokenSecretManager.java
/**
 * Update block keys, only to be used in master mode
 */
synchronized boolean updateKeys() throws IOException {
  if (!isMaster)
    return false;

  LOG.info("Updating block keys");
  removeExpiredKeys();
  // set final expiry date of retiring currentKey
  allKeys.put(currentKey.getKeyId(), new BlockKey(currentKey.getKeyId(),
      Time.now() + keyUpdateInterval + tokenLifetime,
      currentKey.getKey()));
  // update the estimated expiry date of new currentKey
  currentKey = new BlockKey(nextKey.getKeyId(), Time.now()
      + 2 * keyUpdateInterval + tokenLifetime, nextKey.getKey());
  allKeys.put(currentKey.getKeyId(), currentKey);
  // generate a new nextKey
  setSerialNo(serialNo + 1);
  nextKey = new BlockKey(serialNo, Time.now() + 3
      * keyUpdateInterval + tokenLifetime, generateSecret());
  allKeys.put(nextKey.getKeyId(), nextKey);
  return true;
}
 
源代码10 项目: big-c   文件: ShortCircuitCache.java
/**
 * Trim the eviction lists.
 */
private void trimEvictionMaps() {
  long now = Time.monotonicNow();
  demoteOldEvictableMmaped(now);

  while (true) {
    long evictableSize = evictable.size();
    long evictableMmappedSize = evictableMmapped.size();
    if (evictableSize + evictableMmappedSize <= maxTotalSize) {
      return;
    }
    ShortCircuitReplica replica;
    if (evictableSize == 0) {
     replica = evictableMmapped.firstEntry().getValue();
    } else {
     replica = evictable.firstEntry().getValue();
    }
    if (LOG.isTraceEnabled()) {
      LOG.trace(this + ": trimEvictionMaps is purging " + replica +
        StringUtils.getStackTrace(Thread.currentThread()));
    }
    purge(replica);
  }
}
 
源代码11 项目: big-c   文件: TestLog4Json.java
@Test
public void testException() throws Throwable {
  Exception e =
      new NoRouteToHostException("that box caught fire 3 years ago");
  ThrowableInformation ti = new ThrowableInformation(e);
  Log4Json l4j = new Log4Json();
  long timeStamp = Time.now();
  String outcome = l4j.toJson(new StringWriter(),
      "testException",
      timeStamp,
      "INFO",
      "quoted\"",
      "new line\n and {}",
      ti)
      .toString();
  println("testException", outcome);
}
 
/**
 * Checks if TokenInfo for the given identifier exists in database and if the
 * token is expired.
 */
private TokenInfo validateToken(OzoneTokenIdentifier identifier)
    throws InvalidToken {
  TokenInfo info = currentTokens.get(identifier);
  if (info == null) {
    throw new InvalidToken("token " + formatTokenId(identifier)
        + " can't be found in cache");
  }
  long now = Time.now();
  if (info.getRenewDate() < now) {
    throw new InvalidToken("token " + formatTokenId(identifier) + " is " +
        "expired, current time: " + Time.formatTime(now) +
        " expected renewal time: " + Time.formatTime(info.getRenewDate()));
  }
  if (!verifySignature(identifier, info.getPassword())) {
    throw new InvalidToken("Tampered/Invalid token.");
  }
  return info;
}
 
源代码13 项目: big-c   文件: TestJsonUtil.java
@Test
public void testHdfsFileStatus() throws IOException {
  final long now = Time.now();
  final String parent = "/dir";
  final HdfsFileStatus status = new HdfsFileStatus(1001L, false, 3, 1L << 26,
      now, now + 10, new FsPermission((short) 0644), "user", "group",
      DFSUtil.string2Bytes("bar"), DFSUtil.string2Bytes("foo"),
      INodeId.GRANDFATHER_INODE_ID, 0, null, (byte) 0);
  final FileStatus fstatus = toFileStatus(status, parent);
  System.out.println("status  = " + status);
  System.out.println("fstatus = " + fstatus);
  final String json = JsonUtil.toJsonString(status, true);
  System.out.println("json    = " + json.replace(",", ",\n  "));
  ObjectReader reader = new ObjectMapper().reader(Map.class);
  final HdfsFileStatus s2 =
      JsonUtil.toFileStatus((Map<?, ?>) reader.readValue(json), true);
  final FileStatus fs2 = toFileStatus(s2, parent);
  System.out.println("s2      = " + s2);
  System.out.println("fs2     = " + fs2);
  Assert.assertEquals(fstatus, fs2);
}
 
源代码14 项目: hadoop   文件: GenericTestUtils.java
public static void waitFor(Supplier<Boolean> check,
    int checkEveryMillis, int waitForMillis)
    throws TimeoutException, InterruptedException
{
  long st = Time.now();
  do {
    boolean result = check.get();
    if (result) {
      return;
    }
    
    Thread.sleep(checkEveryMillis);
  } while (Time.now() - st < waitForMillis);
  
  throw new TimeoutException("Timed out waiting for condition. " +
      "Thread diagnostics:\n" +
      TimedOutTestsListener.buildThreadDiagnosticString());
}
 
源代码15 项目: submarine   文件: MockRemoteDirectoryManager.java
private File initializeJobParentDir() throws IOException {
  File dir = new File(STAGING_AREA, String.valueOf(Time.monotonicNow()));
  if (!dir.mkdirs()) {
    throw new IOException(
        String.format(FAILED_TO_CREATE_DIRS_FORMAT_STRING,
            dir.getAbsolutePath()));
  }
  return dir;
}
 
源代码16 项目: big-c   文件: VolumeScanner.java
public void printStats(StringBuilder p) {
  p.append("Block scanner information for volume " +
      volume.getStorageID() + " with base path " + volume.getBasePath() +
      "%n");
  synchronized (stats) {
    p.append(String.format("Bytes verified in last hour       : %57d%n",
        stats.bytesScannedInPastHour));
    p.append(String.format("Blocks scanned in current period  : %57d%n",
        stats.blocksScannedInCurrentPeriod));
    p.append(String.format("Blocks scanned since restart      : %57d%n",
        stats.blocksScannedSinceRestart));
    p.append(String.format("Block pool scans since restart    : %57d%n",
        stats.scansSinceRestart));
    p.append(String.format("Block scan errors since restart   : %57d%n",
        stats.scanErrorsSinceRestart));
    if (stats.nextBlockPoolScanStartMs > 0) {
      p.append(String.format("Hours until next block pool scan  : %57.3f%n",
          positiveMsToHours(stats.nextBlockPoolScanStartMs -
              Time.monotonicNow())));
    }
    if (stats.blockPoolPeriodEndsMs > 0) {
      p.append(String.format("Hours until possible pool rescan  : %57.3f%n",
          positiveMsToHours(stats.blockPoolPeriodEndsMs -
              Time.now())));
    }
    p.append(String.format("Last block scanned                : %57s%n",
        ((stats.lastBlockScanned == null) ? "none" :
        stats.lastBlockScanned.toString())));
    p.append(String.format("More blocks to scan in period     : %57s%n",
        !stats.eof));
    p.append("%n");
  }
}
 
源代码17 项目: big-c   文件: FSImage.java
/**
 * @param imageFile the image file that was loaded
 * @param numEditsLoaded the number of edits loaded from edits logs
 * @return true if the NameNode should automatically save the namespace
 * when it is started, due to the latest checkpoint being too old.
 */
private boolean needsResaveBasedOnStaleCheckpoint(
    File imageFile, long numEditsLoaded) {
  final long checkpointPeriod = conf.getLong(
      DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 
      DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT);
  final long checkpointTxnCount = conf.getLong(
      DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 
      DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT);
  long checkpointAge = Time.now() - imageFile.lastModified();

  return (checkpointAge > checkpointPeriod * 1000) ||
         (numEditsLoaded > checkpointTxnCount);
}
 
源代码18 项目: big-c   文件: InstrumentationService.java
@Override
public Cron stop() {
  if (total != 0) {
    throw new IllegalStateException("Cron already used");
  }
  if (lapStart > 0) {
    own += Time.now() - lapStart;
    lapStart = 0;
  }
  return this;
}
 
源代码19 项目: hadoop   文件: DFSClient.java
@Override
public DataEncryptionKey newDataEncryptionKey() throws IOException {
  if (shouldEncryptData()) {
    synchronized (this) {
      if (encryptionKey == null ||
          encryptionKey.expiryDate < Time.now()) {
        LOG.debug("Getting new encryption token from NN");
        encryptionKey = namenode.getDataEncryptionKey();
      }
      return encryptionKey;
    }
  } else {
    return null;
  }
}
 
源代码20 项目: hadoop-ozone   文件: ContainerStateMachine.java
@Override
public long takeSnapshot() throws IOException {
  TermIndex ti = getLastAppliedTermIndex();
  long startTime = Time.monotonicNow();
  if (!isStateMachineHealthy()) {
    String msg =
        "Failed to take snapshot " + " for " + gid + " as the stateMachine"
            + " is unhealthy. The last applied index is at " + ti;
    StateMachineException sme = new StateMachineException(msg);
    LOG.error(msg);
    throw sme;
  }
  if (ti != null && ti.getIndex() != RaftLog.INVALID_LOG_INDEX) {
    final File snapshotFile =
        storage.getSnapshotFile(ti.getTerm(), ti.getIndex());
    LOG.info("{}: Taking a snapshot at:{} file {}", gid, ti, snapshotFile);
    try (FileOutputStream fos = new FileOutputStream(snapshotFile)) {
      persistContainerSet(fos);
      fos.flush();
      // make sure the snapshot file is synced
      fos.getFD().sync();
    } catch (IOException ioe) {
      LOG.error("{}: Failed to write snapshot at:{} file {}", gid, ti,
          snapshotFile);
      throw ioe;
    }
    LOG.info("{}: Finished taking a snapshot at:{} file:{} time:{}", gid, ti,
        snapshotFile, (Time.monotonicNow() - startTime));
    return ti.getIndex();
  }
  return -1;
}
 
源代码21 项目: big-c   文件: PeerCache.java
private synchronized void putInternal(DatanodeID dnId, Peer peer) {
  startExpiryDaemon();

  if (capacity == multimap.size()) {
    evictOldest();
  }
  multimap.put(new Key(dnId, peer.getDomainSocket() != null),
      new Value(peer, Time.monotonicNow()));
}
 
源代码22 项目: hadoop   文件: DirectorySnapshottableFeature.java
/** Add a snapshot. */
public Snapshot addSnapshot(INodeDirectory snapshotRoot, int id, String name)
    throws SnapshotException, QuotaExceededException {
  //check snapshot quota
  final int n = getNumSnapshots();
  if (n + 1 > snapshotQuota) {
    throw new SnapshotException("Failed to add snapshot: there are already "
        + n + " snapshot(s) and the snapshot quota is "
        + snapshotQuota);
  }
  final Snapshot s = new Snapshot(id, name, snapshotRoot);
  final byte[] nameBytes = s.getRoot().getLocalNameBytes();
  final int i = searchSnapshot(nameBytes);
  if (i >= 0) {
    throw new SnapshotException("Failed to add snapshot: there is already a "
        + "snapshot with the same name \"" + Snapshot.getSnapshotName(s) + "\".");
  }

  final DirectoryDiff d = getDiffs().addDiff(id, snapshotRoot);
  d.setSnapshotRoot(s.getRoot());
  snapshotsByNames.add(-i - 1, s);

  // set modification time
  final long now = Time.now();
  snapshotRoot.updateModificationTime(now, Snapshot.CURRENT_STATE_ID);
  s.getRoot().setModificationTime(now, Snapshot.CURRENT_STATE_ID);
  return s;
}
 
源代码23 项目: big-c   文件: TestDelegationTokenRenewer.java
@Test(timeout=4000)
public void testMultipleTokensDoNotDeadlock() throws IOException,
    InterruptedException {
  Configuration conf = mock(Configuration.class);
  FileSystem fs = mock(FileSystem.class);
  doReturn(conf).when(fs).getConf();
  
  long distantFuture = Time.now() + 3600 * 1000; // 1h
  Token<?> token1 = mock(Token.class);
  doReturn(new Text("myservice1")).when(token1).getService();
  doReturn(distantFuture).when(token1).renew(eq(conf));
  
  Token<?> token2 = mock(Token.class);
  doReturn(new Text("myservice2")).when(token2).getService();
  doReturn(distantFuture).when(token2).renew(eq(conf));

  RenewableFileSystem fs1 = mock(RenewableFileSystem.class);
  doReturn(conf).when(fs1).getConf();
  doReturn(token1).when(fs1).getRenewToken();

  RenewableFileSystem fs2 = mock(RenewableFileSystem.class);
  doReturn(conf).when(fs2).getConf();
  doReturn(token2).when(fs2).getRenewToken();

  renewer.addRenewAction(fs1);
  renewer.addRenewAction(fs2);
  assertEquals(2, renewer.getRenewQueueLength());
  
  renewer.removeRenewAction(fs1);
  assertEquals(1, renewer.getRenewQueueLength());
  renewer.removeRenewAction(fs2);
  assertEquals(0, renewer.getRenewQueueLength());
  
  verify(token1).cancel(eq(conf));
  verify(token2).cancel(eq(conf));
}
 
源代码24 项目: big-c   文件: DFSClient.java
@Override
public DataEncryptionKey newDataEncryptionKey() throws IOException {
  if (shouldEncryptData()) {
    synchronized (this) {
      if (encryptionKey == null ||
          encryptionKey.expiryDate < Time.now()) {
        LOG.debug("Getting new encryption token from NN");
        encryptionKey = namenode.getDataEncryptionKey();
      }
      return encryptionKey;
    }
  } else {
    return null;
  }
}
 
源代码25 项目: hadoop   文件: ReconfigurableBase.java
public void run() {
  LOG.info("Starting reconfiguration task.");
  Configuration oldConf = this.parent.getConf();
  Configuration newConf = new Configuration();
  Collection<PropertyChange> changes =
      this.parent.getChangedProperties(newConf, oldConf);
  Map<PropertyChange, Optional<String>> results = Maps.newHashMap();
  for (PropertyChange change : changes) {
    String errorMessage = null;
    if (!this.parent.isPropertyReconfigurable(change.prop)) {
      errorMessage = "Property " + change.prop +
          " is not reconfigurable";
      LOG.info(errorMessage);
      results.put(change, Optional.of(errorMessage));
      continue;
    }
    LOG.info("Change property: " + change.prop + " from \""
        + ((change.oldVal == null) ? "<default>" : change.oldVal)
        + "\" to \"" + ((change.newVal == null) ? "<default>" : change.newVal)
        + "\".");
    try {
      this.parent.reconfigurePropertyImpl(change.prop, change.newVal);
    } catch (ReconfigurationException e) {
      errorMessage = e.getCause().getMessage();
    }
    results.put(change, Optional.fromNullable(errorMessage));
  }

  synchronized (this.parent.reconfigLock) {
    this.parent.endTime = Time.now();
    this.parent.status = Collections.unmodifiableMap(results);
    this.parent.reconfigThread = null;
  }
}
 
源代码26 项目: big-c   文件: MiniRPCBenchmark.java
long connectToServer(Configuration conf, InetSocketAddress addr)
throws IOException {
  MiniProtocol client = null;
  try {
    long start = Time.now();
    client = RPC.getProxy(MiniProtocol.class,
        MiniProtocol.versionID, addr, conf);
    long end = Time.now();
    return end - start;
  } finally {
    RPC.stopProxy(client);
  }
}
 
源代码27 项目: big-c   文件: TestHFSTestCase.java
@Test
public void sleepRatio1() {
  setWaitForRatio(1);
  long start = Time.now();
  sleep(100);
  long end = Time.now();
  assertEquals(end - start, 100, 50);
}
 
源代码28 项目: hadoop   文件: UserGroupInformation.java
/**
 * Log a user in from a keytab file. Loads a user identity from a keytab
 * file and logs them in. They become the currently logged-in user.
 * @param user the principal name to load from the keytab
 * @param path the path to the keytab file
 * @throws IOException if the keytab file can't be read
 */
@InterfaceAudience.Public
@InterfaceStability.Evolving
public synchronized
static void loginUserFromKeytab(String user,
                                String path
                                ) throws IOException {
  if (!isSecurityEnabled())
    return;

  keytabFile = path;
  keytabPrincipal = user;
  Subject subject = new Subject();
  LoginContext login; 
  long start = 0;
  try {
    login = newLoginContext(HadoopConfiguration.KEYTAB_KERBEROS_CONFIG_NAME,
          subject, new HadoopConfiguration());
    start = Time.now();
    login.login();
    metrics.loginSuccess.add(Time.now() - start);
    loginUser = new UserGroupInformation(subject);
    loginUser.setLogin(login);
    loginUser.setAuthenticationMethod(AuthenticationMethod.KERBEROS);
  } catch (LoginException le) {
    if (start > 0) {
      metrics.loginFailure.add(Time.now() - start);
    }
    throw new IOException("Login failure for " + user + " from keytab " + 
                          path+ ": " + le, le);
  }
  LOG.info("Login successful for user " + keytabPrincipal
      + " using keytab file " + keytabFile);
}
 
源代码29 项目: big-c   文件: ShuffleSchedulerImpl.java
public synchronized void copySucceeded(TaskAttemptID mapId,
                                       MapHost host,
                                       long bytes,
                                       long startMillis,
                                       long endMillis,
                                       MapOutput<K,V> output
                                       ) throws IOException {
  failureCounts.remove(mapId);
  hostFailures.remove(host.getHostName());
  int mapIndex = mapId.getTaskID().getId();

  if (!finishedMaps[mapIndex]) {
    output.commit();
    finishedMaps[mapIndex] = true;
    shuffledMapsCounter.increment(1);
    if (--remainingMaps == 0) {
      notifyAll();
    }

    // update single copy task status
    long copyMillis = (endMillis - startMillis);
    if (copyMillis == 0) copyMillis = 1;
    float bytesPerMillis = (float) bytes / copyMillis;
    float transferRate = bytesPerMillis * BYTES_PER_MILLIS_TO_MBS;
    String individualProgress = "copy task(" + mapId + " succeeded"
        + " at " + mbpsFormat.format(transferRate) + " MB/s)";
    // update the aggregated status
    copyTimeTracker.add(startMillis, endMillis);

    totalBytesShuffledTillNow += bytes;
    updateStatus(individualProgress);
    reduceShuffleBytes.increment(bytes);
    lastProgressTime = Time.monotonicNow();
    LOG.debug("map " + mapId + " done " + status.getStateString());
  }
}
 
源代码30 项目: hadoop-ozone   文件: TestEndPoint.java
@Test
public void testRegisterRpcTimeout() throws Exception {
  final long rpcTimeout = 1000;
  final long tolerance = 200;
  scmServerImpl.setRpcResponseDelay(1500);
  long start = Time.monotonicNow();
  registerTaskHelper(serverAddress, 1000, false).close();
  long end = Time.monotonicNow();
  scmServerImpl.setRpcResponseDelay(0);
  Assert.assertThat(end - start, lessThanOrEqualTo(rpcTimeout + tolerance));
}
 
 类所在包
 类方法
 同包方法