下面列出了怎么用org.apache.hadoop.util.Time的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Randomly expire the ZK sessions of the two ZKFCs. This differs
* from the above test in that it is not a controlled failover -
* we just do random expirations and expect neither one to ever
* generate fatal exceptions.
*/
@Test(timeout=(STRESS_RUNTIME_SECS + EXTRA_TIMEOUT_SECS) * 1000)
public void testRandomExpirations() throws Exception {
cluster.start();
long st = Time.now();
long runFor = STRESS_RUNTIME_SECS * 1000;
Random r = new Random();
while (Time.now() - st < runFor) {
cluster.getTestContext().checkException();
int targetIdx = r.nextInt(2);
ActiveStandbyElector target = cluster.getElector(targetIdx);
long sessId = target.getZKSessionIdForTests();
if (sessId != -1) {
LOG.info(String.format("Expiring session %x for svc %d",
sessId, targetIdx));
getServer(serverFactory).closeSession(sessId);
}
Thread.sleep(r.nextInt(300));
}
}
@Test
public void testThreadFails() throws Exception {
TestContext ctx = new TestContext();
ctx.addThread(new TestingThread(ctx) {
@Override
public void doWork() throws Exception {
fail(FAIL_MSG);
}
});
ctx.startThreads();
long st = Time.now();
try {
ctx.waitFor(30000);
fail("waitFor did not throw");
} catch (RuntimeException rte) {
// expected
assertEquals(FAIL_MSG, rte.getCause().getMessage());
}
long et = Time.now();
// Test shouldn't have waited the full 30 seconds, since
// the thread throws faster than that
assertTrue("Test took " + (et - st) + "ms",
et - st < 5000);
}
public static void waitForCheckpoint(MiniDFSCluster cluster, int nnIdx,
List<Integer> txids) throws InterruptedException {
long start = Time.now();
while (true) {
try {
FSImageTestUtil.assertNNHasCheckpoints(cluster, nnIdx, txids);
return;
} catch (AssertionError err) {
if (Time.now() - start > 10000) {
throw err;
} else {
Thread.sleep(300);
}
}
}
}
@Test
public void testFileUnderConstruction() {
replication = 3;
final INodeFile file = new INodeFile(INodeId.GRANDFATHER_INODE_ID, null,
perm, 0L, 0L, null, replication, 1024L, (byte)0);
assertFalse(file.isUnderConstruction());
final String clientName = "client";
final String clientMachine = "machine";
file.toUnderConstruction(clientName, clientMachine);
assertTrue(file.isUnderConstruction());
FileUnderConstructionFeature uc = file.getFileUnderConstructionFeature();
assertEquals(clientName, uc.getClientName());
assertEquals(clientMachine, uc.getClientMachine());
file.toCompleteFile(Time.now());
assertFalse(file.isUnderConstruction());
}
private void output(Configuration conf, FileSummary summary,
FileInputStream fin, ArrayList<FileSummary.Section> sections)
throws IOException {
InputStream is;
long startTime = Time.monotonicNow();
out.println(getHeader());
for (FileSummary.Section section : sections) {
if (SectionName.fromString(section.getName()) == SectionName.INODE) {
fin.getChannel().position(section.getOffset());
is = FSImageUtil.wrapInputStreamForCompression(conf,
summary.getCodec(), new BufferedInputStream(new LimitInputStream(
fin, section.getLength())));
outputINodes(is);
}
}
long timeTaken = Time.monotonicNow() - startTime;
LOG.debug("Time to output inodes: {}ms", timeTaken);
}
/**
* Create DummyBucketCreate response.
*/
private OMDummyCreateBucketResponse createDummyBucketResponse(
String volumeName) {
OmBucketInfo omBucketInfo =
OmBucketInfo.newBuilder()
.setVolumeName(volumeName)
.setBucketName(UUID.randomUUID().toString())
.setCreationTime(Time.now())
.build();
return new OMDummyCreateBucketResponse(omBucketInfo,
OMResponse.newBuilder()
.setCmdType(OzoneManagerProtocolProtos.Type.CreateBucket)
.setStatus(OzoneManagerProtocolProtos.Status.OK)
.setCreateBucketResponse(CreateBucketResponse.newBuilder().build())
.build());
}
private synchronized Peer getInternal(DatanodeID dnId, boolean isDomain) {
List<Value> sockStreamList = multimap.get(new Key(dnId, isDomain));
if (sockStreamList == null) {
return null;
}
Iterator<Value> iter = sockStreamList.iterator();
while (iter.hasNext()) {
Value candidate = iter.next();
iter.remove();
long ageMs = Time.monotonicNow() - candidate.getTime();
Peer peer = candidate.getPeer();
if (ageMs >= expiryPeriod) {
try {
peer.close();
} catch (IOException e) {
LOG.warn("got IOException closing stale peer " + peer +
", which is " + ageMs + " ms old");
}
} else if (!peer.isClosed()) {
return peer;
}
}
return null;
}
/** Initialize block keys */
private synchronized void generateKeys() {
if (!isMaster)
return;
/*
* Need to set estimated expiry dates for currentKey and nextKey so that if
* NN crashes, DN can still expire those keys. NN will stop using the newly
* generated currentKey after the first keyUpdateInterval, however it may
* still be used by DN and Balancer to generate new tokens before they get a
* chance to sync their keys with NN. Since we require keyUpdInterval to be
* long enough so that all live DN's and Balancer will sync their keys with
* NN at least once during the period, the estimated expiry date for
* currentKey is set to now() + 2 * keyUpdateInterval + tokenLifetime.
* Similarly, the estimated expiry date for nextKey is one keyUpdateInterval
* more.
*/
setSerialNo(serialNo + 1);
currentKey = new BlockKey(serialNo, Time.now() + 2
* keyUpdateInterval + tokenLifetime, generateSecret());
setSerialNo(serialNo + 1);
nextKey = new BlockKey(serialNo, Time.now() + 3
* keyUpdateInterval + tokenLifetime, generateSecret());
allKeys.put(currentKey.getKeyId(), currentKey);
allKeys.put(nextKey.getKeyId(), nextKey);
}
/**
* Update block keys, only to be used in master mode
*/
synchronized boolean updateKeys() throws IOException {
if (!isMaster)
return false;
LOG.info("Updating block keys");
removeExpiredKeys();
// set final expiry date of retiring currentKey
allKeys.put(currentKey.getKeyId(), new BlockKey(currentKey.getKeyId(),
Time.now() + keyUpdateInterval + tokenLifetime,
currentKey.getKey()));
// update the estimated expiry date of new currentKey
currentKey = new BlockKey(nextKey.getKeyId(), Time.now()
+ 2 * keyUpdateInterval + tokenLifetime, nextKey.getKey());
allKeys.put(currentKey.getKeyId(), currentKey);
// generate a new nextKey
setSerialNo(serialNo + 1);
nextKey = new BlockKey(serialNo, Time.now() + 3
* keyUpdateInterval + tokenLifetime, generateSecret());
allKeys.put(nextKey.getKeyId(), nextKey);
return true;
}
/**
* Trim the eviction lists.
*/
private void trimEvictionMaps() {
long now = Time.monotonicNow();
demoteOldEvictableMmaped(now);
while (true) {
long evictableSize = evictable.size();
long evictableMmappedSize = evictableMmapped.size();
if (evictableSize + evictableMmappedSize <= maxTotalSize) {
return;
}
ShortCircuitReplica replica;
if (evictableSize == 0) {
replica = evictableMmapped.firstEntry().getValue();
} else {
replica = evictable.firstEntry().getValue();
}
if (LOG.isTraceEnabled()) {
LOG.trace(this + ": trimEvictionMaps is purging " + replica +
StringUtils.getStackTrace(Thread.currentThread()));
}
purge(replica);
}
}
@Test
public void testException() throws Throwable {
Exception e =
new NoRouteToHostException("that box caught fire 3 years ago");
ThrowableInformation ti = new ThrowableInformation(e);
Log4Json l4j = new Log4Json();
long timeStamp = Time.now();
String outcome = l4j.toJson(new StringWriter(),
"testException",
timeStamp,
"INFO",
"quoted\"",
"new line\n and {}",
ti)
.toString();
println("testException", outcome);
}
/**
* Checks if TokenInfo for the given identifier exists in database and if the
* token is expired.
*/
private TokenInfo validateToken(OzoneTokenIdentifier identifier)
throws InvalidToken {
TokenInfo info = currentTokens.get(identifier);
if (info == null) {
throw new InvalidToken("token " + formatTokenId(identifier)
+ " can't be found in cache");
}
long now = Time.now();
if (info.getRenewDate() < now) {
throw new InvalidToken("token " + formatTokenId(identifier) + " is " +
"expired, current time: " + Time.formatTime(now) +
" expected renewal time: " + Time.formatTime(info.getRenewDate()));
}
if (!verifySignature(identifier, info.getPassword())) {
throw new InvalidToken("Tampered/Invalid token.");
}
return info;
}
@Test
public void testHdfsFileStatus() throws IOException {
final long now = Time.now();
final String parent = "/dir";
final HdfsFileStatus status = new HdfsFileStatus(1001L, false, 3, 1L << 26,
now, now + 10, new FsPermission((short) 0644), "user", "group",
DFSUtil.string2Bytes("bar"), DFSUtil.string2Bytes("foo"),
INodeId.GRANDFATHER_INODE_ID, 0, null, (byte) 0);
final FileStatus fstatus = toFileStatus(status, parent);
System.out.println("status = " + status);
System.out.println("fstatus = " + fstatus);
final String json = JsonUtil.toJsonString(status, true);
System.out.println("json = " + json.replace(",", ",\n "));
ObjectReader reader = new ObjectMapper().reader(Map.class);
final HdfsFileStatus s2 =
JsonUtil.toFileStatus((Map<?, ?>) reader.readValue(json), true);
final FileStatus fs2 = toFileStatus(s2, parent);
System.out.println("s2 = " + s2);
System.out.println("fs2 = " + fs2);
Assert.assertEquals(fstatus, fs2);
}
public static void waitFor(Supplier<Boolean> check,
int checkEveryMillis, int waitForMillis)
throws TimeoutException, InterruptedException
{
long st = Time.now();
do {
boolean result = check.get();
if (result) {
return;
}
Thread.sleep(checkEveryMillis);
} while (Time.now() - st < waitForMillis);
throw new TimeoutException("Timed out waiting for condition. " +
"Thread diagnostics:\n" +
TimedOutTestsListener.buildThreadDiagnosticString());
}
private File initializeJobParentDir() throws IOException {
File dir = new File(STAGING_AREA, String.valueOf(Time.monotonicNow()));
if (!dir.mkdirs()) {
throw new IOException(
String.format(FAILED_TO_CREATE_DIRS_FORMAT_STRING,
dir.getAbsolutePath()));
}
return dir;
}
public void printStats(StringBuilder p) {
p.append("Block scanner information for volume " +
volume.getStorageID() + " with base path " + volume.getBasePath() +
"%n");
synchronized (stats) {
p.append(String.format("Bytes verified in last hour : %57d%n",
stats.bytesScannedInPastHour));
p.append(String.format("Blocks scanned in current period : %57d%n",
stats.blocksScannedInCurrentPeriod));
p.append(String.format("Blocks scanned since restart : %57d%n",
stats.blocksScannedSinceRestart));
p.append(String.format("Block pool scans since restart : %57d%n",
stats.scansSinceRestart));
p.append(String.format("Block scan errors since restart : %57d%n",
stats.scanErrorsSinceRestart));
if (stats.nextBlockPoolScanStartMs > 0) {
p.append(String.format("Hours until next block pool scan : %57.3f%n",
positiveMsToHours(stats.nextBlockPoolScanStartMs -
Time.monotonicNow())));
}
if (stats.blockPoolPeriodEndsMs > 0) {
p.append(String.format("Hours until possible pool rescan : %57.3f%n",
positiveMsToHours(stats.blockPoolPeriodEndsMs -
Time.now())));
}
p.append(String.format("Last block scanned : %57s%n",
((stats.lastBlockScanned == null) ? "none" :
stats.lastBlockScanned.toString())));
p.append(String.format("More blocks to scan in period : %57s%n",
!stats.eof));
p.append("%n");
}
}
/**
* @param imageFile the image file that was loaded
* @param numEditsLoaded the number of edits loaded from edits logs
* @return true if the NameNode should automatically save the namespace
* when it is started, due to the latest checkpoint being too old.
*/
private boolean needsResaveBasedOnStaleCheckpoint(
File imageFile, long numEditsLoaded) {
final long checkpointPeriod = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY,
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT);
final long checkpointTxnCount = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY,
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT);
long checkpointAge = Time.now() - imageFile.lastModified();
return (checkpointAge > checkpointPeriod * 1000) ||
(numEditsLoaded > checkpointTxnCount);
}
@Override
public Cron stop() {
if (total != 0) {
throw new IllegalStateException("Cron already used");
}
if (lapStart > 0) {
own += Time.now() - lapStart;
lapStart = 0;
}
return this;
}
@Override
public DataEncryptionKey newDataEncryptionKey() throws IOException {
if (shouldEncryptData()) {
synchronized (this) {
if (encryptionKey == null ||
encryptionKey.expiryDate < Time.now()) {
LOG.debug("Getting new encryption token from NN");
encryptionKey = namenode.getDataEncryptionKey();
}
return encryptionKey;
}
} else {
return null;
}
}
@Override
public long takeSnapshot() throws IOException {
TermIndex ti = getLastAppliedTermIndex();
long startTime = Time.monotonicNow();
if (!isStateMachineHealthy()) {
String msg =
"Failed to take snapshot " + " for " + gid + " as the stateMachine"
+ " is unhealthy. The last applied index is at " + ti;
StateMachineException sme = new StateMachineException(msg);
LOG.error(msg);
throw sme;
}
if (ti != null && ti.getIndex() != RaftLog.INVALID_LOG_INDEX) {
final File snapshotFile =
storage.getSnapshotFile(ti.getTerm(), ti.getIndex());
LOG.info("{}: Taking a snapshot at:{} file {}", gid, ti, snapshotFile);
try (FileOutputStream fos = new FileOutputStream(snapshotFile)) {
persistContainerSet(fos);
fos.flush();
// make sure the snapshot file is synced
fos.getFD().sync();
} catch (IOException ioe) {
LOG.error("{}: Failed to write snapshot at:{} file {}", gid, ti,
snapshotFile);
throw ioe;
}
LOG.info("{}: Finished taking a snapshot at:{} file:{} time:{}", gid, ti,
snapshotFile, (Time.monotonicNow() - startTime));
return ti.getIndex();
}
return -1;
}
private synchronized void putInternal(DatanodeID dnId, Peer peer) {
startExpiryDaemon();
if (capacity == multimap.size()) {
evictOldest();
}
multimap.put(new Key(dnId, peer.getDomainSocket() != null),
new Value(peer, Time.monotonicNow()));
}
/** Add a snapshot. */
public Snapshot addSnapshot(INodeDirectory snapshotRoot, int id, String name)
throws SnapshotException, QuotaExceededException {
//check snapshot quota
final int n = getNumSnapshots();
if (n + 1 > snapshotQuota) {
throw new SnapshotException("Failed to add snapshot: there are already "
+ n + " snapshot(s) and the snapshot quota is "
+ snapshotQuota);
}
final Snapshot s = new Snapshot(id, name, snapshotRoot);
final byte[] nameBytes = s.getRoot().getLocalNameBytes();
final int i = searchSnapshot(nameBytes);
if (i >= 0) {
throw new SnapshotException("Failed to add snapshot: there is already a "
+ "snapshot with the same name \"" + Snapshot.getSnapshotName(s) + "\".");
}
final DirectoryDiff d = getDiffs().addDiff(id, snapshotRoot);
d.setSnapshotRoot(s.getRoot());
snapshotsByNames.add(-i - 1, s);
// set modification time
final long now = Time.now();
snapshotRoot.updateModificationTime(now, Snapshot.CURRENT_STATE_ID);
s.getRoot().setModificationTime(now, Snapshot.CURRENT_STATE_ID);
return s;
}
@Test(timeout=4000)
public void testMultipleTokensDoNotDeadlock() throws IOException,
InterruptedException {
Configuration conf = mock(Configuration.class);
FileSystem fs = mock(FileSystem.class);
doReturn(conf).when(fs).getConf();
long distantFuture = Time.now() + 3600 * 1000; // 1h
Token<?> token1 = mock(Token.class);
doReturn(new Text("myservice1")).when(token1).getService();
doReturn(distantFuture).when(token1).renew(eq(conf));
Token<?> token2 = mock(Token.class);
doReturn(new Text("myservice2")).when(token2).getService();
doReturn(distantFuture).when(token2).renew(eq(conf));
RenewableFileSystem fs1 = mock(RenewableFileSystem.class);
doReturn(conf).when(fs1).getConf();
doReturn(token1).when(fs1).getRenewToken();
RenewableFileSystem fs2 = mock(RenewableFileSystem.class);
doReturn(conf).when(fs2).getConf();
doReturn(token2).when(fs2).getRenewToken();
renewer.addRenewAction(fs1);
renewer.addRenewAction(fs2);
assertEquals(2, renewer.getRenewQueueLength());
renewer.removeRenewAction(fs1);
assertEquals(1, renewer.getRenewQueueLength());
renewer.removeRenewAction(fs2);
assertEquals(0, renewer.getRenewQueueLength());
verify(token1).cancel(eq(conf));
verify(token2).cancel(eq(conf));
}
@Override
public DataEncryptionKey newDataEncryptionKey() throws IOException {
if (shouldEncryptData()) {
synchronized (this) {
if (encryptionKey == null ||
encryptionKey.expiryDate < Time.now()) {
LOG.debug("Getting new encryption token from NN");
encryptionKey = namenode.getDataEncryptionKey();
}
return encryptionKey;
}
} else {
return null;
}
}
public void run() {
LOG.info("Starting reconfiguration task.");
Configuration oldConf = this.parent.getConf();
Configuration newConf = new Configuration();
Collection<PropertyChange> changes =
this.parent.getChangedProperties(newConf, oldConf);
Map<PropertyChange, Optional<String>> results = Maps.newHashMap();
for (PropertyChange change : changes) {
String errorMessage = null;
if (!this.parent.isPropertyReconfigurable(change.prop)) {
errorMessage = "Property " + change.prop +
" is not reconfigurable";
LOG.info(errorMessage);
results.put(change, Optional.of(errorMessage));
continue;
}
LOG.info("Change property: " + change.prop + " from \""
+ ((change.oldVal == null) ? "<default>" : change.oldVal)
+ "\" to \"" + ((change.newVal == null) ? "<default>" : change.newVal)
+ "\".");
try {
this.parent.reconfigurePropertyImpl(change.prop, change.newVal);
} catch (ReconfigurationException e) {
errorMessage = e.getCause().getMessage();
}
results.put(change, Optional.fromNullable(errorMessage));
}
synchronized (this.parent.reconfigLock) {
this.parent.endTime = Time.now();
this.parent.status = Collections.unmodifiableMap(results);
this.parent.reconfigThread = null;
}
}
long connectToServer(Configuration conf, InetSocketAddress addr)
throws IOException {
MiniProtocol client = null;
try {
long start = Time.now();
client = RPC.getProxy(MiniProtocol.class,
MiniProtocol.versionID, addr, conf);
long end = Time.now();
return end - start;
} finally {
RPC.stopProxy(client);
}
}
@Test
public void sleepRatio1() {
setWaitForRatio(1);
long start = Time.now();
sleep(100);
long end = Time.now();
assertEquals(end - start, 100, 50);
}
/**
* Log a user in from a keytab file. Loads a user identity from a keytab
* file and logs them in. They become the currently logged-in user.
* @param user the principal name to load from the keytab
* @param path the path to the keytab file
* @throws IOException if the keytab file can't be read
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public synchronized
static void loginUserFromKeytab(String user,
String path
) throws IOException {
if (!isSecurityEnabled())
return;
keytabFile = path;
keytabPrincipal = user;
Subject subject = new Subject();
LoginContext login;
long start = 0;
try {
login = newLoginContext(HadoopConfiguration.KEYTAB_KERBEROS_CONFIG_NAME,
subject, new HadoopConfiguration());
start = Time.now();
login.login();
metrics.loginSuccess.add(Time.now() - start);
loginUser = new UserGroupInformation(subject);
loginUser.setLogin(login);
loginUser.setAuthenticationMethod(AuthenticationMethod.KERBEROS);
} catch (LoginException le) {
if (start > 0) {
metrics.loginFailure.add(Time.now() - start);
}
throw new IOException("Login failure for " + user + " from keytab " +
path+ ": " + le, le);
}
LOG.info("Login successful for user " + keytabPrincipal
+ " using keytab file " + keytabFile);
}
public synchronized void copySucceeded(TaskAttemptID mapId,
MapHost host,
long bytes,
long startMillis,
long endMillis,
MapOutput<K,V> output
) throws IOException {
failureCounts.remove(mapId);
hostFailures.remove(host.getHostName());
int mapIndex = mapId.getTaskID().getId();
if (!finishedMaps[mapIndex]) {
output.commit();
finishedMaps[mapIndex] = true;
shuffledMapsCounter.increment(1);
if (--remainingMaps == 0) {
notifyAll();
}
// update single copy task status
long copyMillis = (endMillis - startMillis);
if (copyMillis == 0) copyMillis = 1;
float bytesPerMillis = (float) bytes / copyMillis;
float transferRate = bytesPerMillis * BYTES_PER_MILLIS_TO_MBS;
String individualProgress = "copy task(" + mapId + " succeeded"
+ " at " + mbpsFormat.format(transferRate) + " MB/s)";
// update the aggregated status
copyTimeTracker.add(startMillis, endMillis);
totalBytesShuffledTillNow += bytes;
updateStatus(individualProgress);
reduceShuffleBytes.increment(bytes);
lastProgressTime = Time.monotonicNow();
LOG.debug("map " + mapId + " done " + status.getStateString());
}
}
@Test
public void testRegisterRpcTimeout() throws Exception {
final long rpcTimeout = 1000;
final long tolerance = 200;
scmServerImpl.setRpcResponseDelay(1500);
long start = Time.monotonicNow();
registerTaskHelper(serverAddress, 1000, false).close();
long end = Time.monotonicNow();
scmServerImpl.setRpcResponseDelay(0);
Assert.assertThat(end - start, lessThanOrEqualTo(rpcTimeout + tolerance));
}