下面列出了org.apache.hadoop.hbase.util.EnvironmentEdgeManager#currentTime ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Reports the operation from WAL during replay.
* @param group Nonce group.
* @param nonce Nonce.
* @param writeTime Entry write time, used to ignore entries that are too old.
*/
public void reportOperationFromWal(long group, long nonce, long writeTime) {
if (nonce == HConstants.NO_NONCE) return;
// Give the write time some slack in case the clocks are not synchronized.
long now = EnvironmentEdgeManager.currentTime();
if (now > writeTime + (deleteNonceGracePeriod * 1.5)) return;
OperationContext newResult = new OperationContext();
newResult.setState(OperationContext.DONT_PROCEED);
NonceKey nk = new NonceKey(group, nonce);
OperationContext oldResult = nonces.putIfAbsent(nk, newResult);
if (oldResult != null) {
// Some schemes can have collisions (for example, expiring hashes), so just log it.
// We have no idea about the semantics here, so this is the least of many evils.
LOG.warn("Nonce collision during WAL recovery: " + nk
+ ", " + oldResult + " with " + newResult);
}
}
/**
* Push the current active memstore segment into the pipeline
* and create a snapshot of the tail of current compaction pipeline
* Snapshot must be cleared by call to {@link #clearSnapshot}.
* {@link #clearSnapshot(long)}.
* @return {@link MemStoreSnapshot}
*/
@Override
public MemStoreSnapshot snapshot() {
// If snapshot currently has entries, then flusher failed or didn't call
// cleanup. Log a warning.
if (!this.snapshot.isEmpty()) {
LOG.warn("Snapshot called again without clearing previous. " +
"Doing nothing. Another ongoing flush or did we fail last attempt?");
} else {
LOG.debug("FLUSHING TO DISK {}, store={}",
getRegionServices().getRegionInfo().getEncodedName(), getFamilyName());
stopCompaction();
// region level lock ensures pushing active to pipeline is done in isolation
// no concurrent update operations trying to flush the active segment
pushActiveToPipeline(getActive());
snapshotId = EnvironmentEdgeManager.currentTime();
// in both cases whatever is pushed to snapshot is cleared from the pipeline
if (compositeSnapshot) {
pushPipelineToSnapshot();
} else {
pushTailToSnapshot();
}
compactor.resetStats();
}
return new MemStoreSnapshot(snapshotId, this.snapshot);
}
/**
* Remove the sentinels that are marked as finished and the completion time
* has exceeded the removal timeout.
* @param sentinels map of sentinels to clean
*/
private synchronized void cleanupSentinels(final Map<TableName, SnapshotSentinel> sentinels) {
long currentTime = EnvironmentEdgeManager.currentTime();
long sentinelsCleanupTimeoutMillis =
master.getConfiguration().getLong(HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS,
SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLS_DEFAULT);
Iterator<Map.Entry<TableName, SnapshotSentinel>> it = sentinels.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<TableName, SnapshotSentinel> entry = it.next();
SnapshotSentinel sentinel = entry.getValue();
if (sentinel.isFinished()
&& (currentTime - sentinel.getCompletionTimestamp()) > sentinelsCleanupTimeoutMillis) {
it.remove();
}
}
}
@Override
protected synchronized byte[] createPassword(AuthenticationTokenIdentifier identifier) {
long now = EnvironmentEdgeManager.currentTime();
AuthenticationKey secretKey = currentKey;
identifier.setKeyId(secretKey.getKeyId());
identifier.setIssueDate(now);
identifier.setExpirationDate(now + tokenMaxLifetime);
identifier.setSequenceNumber(tokenSeq.getAndIncrement());
return createPassword(identifier.getBytes(),
secretKey.getKey());
}
/**
* @param update new region state this node should be assigned.
* @param expected current state should be in this given list of expected states
* @return true, if current state is in expected list; otherwise false.
*/
public boolean setState(final State update, final State... expected) {
if (!isInState(expected)) {
return false;
}
this.state = update;
this.lastUpdate = EnvironmentEdgeManager.currentTime();
return true;
}
public FailedProcedure(long procId, String procName, User owner, NonceKey nonceKey,
IOException exception) {
this.procName = procName;
setProcId(procId);
setState(ProcedureState.ROLLEDBACK);
setOwner(owner);
setNonceKey(nonceKey);
long currentTime = EnvironmentEdgeManager.currentTime();
setSubmittedTime(currentTime);
setLastUpdate(currentTime);
setFailure(Objects.toString(exception.getMessage(), ""), exception);
}
/**
* Create the dead server to send. A dead server is sent NB_SEND times. We send at max
* MAX_SERVER_PER_MESSAGE at a time. if there are too many dead servers, we send the newly
* dead first.
*/
protected List<ServerName> generateDeadServersListToSend() {
// We're getting the message sent since last time, and add them to the list
long since = EnvironmentEdgeManager.currentTime() - messagePeriod * 2;
for (Pair<ServerName, Long> dead : getDeadServers(since)) {
lastSent.putIfAbsent(dead.getFirst(), 0);
}
// We're sending the new deads first.
List<Map.Entry<ServerName, Integer>> entries = new ArrayList<>(lastSent.entrySet());
Collections.sort(entries, new Comparator<Map.Entry<ServerName, Integer>>() {
@Override
public int compare(Map.Entry<ServerName, Integer> o1, Map.Entry<ServerName, Integer> o2) {
return o1.getValue().compareTo(o2.getValue());
}
});
// With a limit of MAX_SERVER_PER_MESSAGE
int max = entries.size() > MAX_SERVER_PER_MESSAGE ? MAX_SERVER_PER_MESSAGE : entries.size();
List<ServerName> res = new ArrayList<>(max);
for (int i = 0; i < max; i++) {
Map.Entry<ServerName, Integer> toSend = entries.get(i);
if (toSend.getValue() >= (NB_SEND - 1)) {
lastSent.remove(toSend.getKey());
} else {
lastSent.replace(toSend.getKey(), toSend.getValue(), toSend.getValue() + 1);
}
res.add(toSend.getKey());
}
return res;
}
public static <T> T waitFor(MasterProcedureEnv env, long waitTime, long waitingTimeForEvents,
String purpose, Predicate<T> predicate) throws IOException {
long done = EnvironmentEdgeManager.currentTime() + waitTime;
if (done <= 0) {
// long overflow, usually this means we pass Long.MAX_VALUE as waitTime
done = Long.MAX_VALUE;
}
boolean logged = false;
do {
T result = predicate.evaluate();
if (result != null && !result.equals(Boolean.FALSE)) {
return result;
}
try {
Thread.sleep(waitingTimeForEvents);
} catch (InterruptedException e) {
LOG.warn("Interrupted while sleeping, waiting on " + purpose);
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
}
if (LOG.isTraceEnabled()) {
LOG.trace("waitFor " + purpose);
} else {
if (!logged) LOG.debug("waitFor " + purpose);
}
logged = true;
} while (EnvironmentEdgeManager.currentTime() < done && env.isRunning());
throw new TimeoutIOException("Timed out while waiting on " + purpose);
}
@Override
public void preClean() {
readZKTimestamp = EnvironmentEdgeManager.currentTime();
try {
// The concurrently created new WALs may not be included in the return list,
// but they won't be deleted because they're not in the checking set.
wals = queueStorage.getAllWALs();
} catch (ReplicationException e) {
LOG.warn("Failed to read zookeeper, skipping checking deletable files");
wals = null;
}
}
public CompactionRequestImpl selectMajorCompaction(ArrayList<HStoreFile> candidateSelection) {
long now = EnvironmentEdgeManager.currentTime();
List<Long> boundaries = getCompactBoundariesForMajor(candidateSelection, now);
Map<Long, String> boundariesPolicies = getBoundariesStoragePolicyForMajor(boundaries, now);
return new DateTieredCompactionRequest(candidateSelection,
boundaries, boundariesPolicies);
}
/**
* Removes region reports over a certain age.
*/
void pruneOldRegionReports() {
final long now = EnvironmentEdgeManager.currentTime();
final long pruneTime = now - regionReportLifetimeMillis;
final int numRemoved = quotaManager.pruneEntriesOlderThan(pruneTime,this);
if (LOG.isTraceEnabled()) {
LOG.trace("Removed " + numRemoved + " old region size reports that were older than "
+ pruneTime + ".");
}
}
MockHStoreFile(HBaseTestingUtility testUtil, Path testPath,
long length, long ageInDisk, boolean isRef, long sequenceid) throws IOException {
super(testUtil.getTestFileSystem(), testPath, testUtil.getConfiguration(),
new CacheConfig(testUtil.getConfiguration()), BloomType.NONE, true);
this.length = length;
this.isRef = isRef;
this.ageInDisk = ageInDisk;
this.sequenceid = sequenceid;
this.isMajor = false;
hdfsBlocksDistribution = new HDFSBlocksDistribution();
hdfsBlocksDistribution.addHostsAndBlockWeight(new String[]
{ DNS.getHostname(testUtil.getConfiguration(), DNS.ServerType.REGIONSERVER) }, 1);
modificationTime = EnvironmentEdgeManager.currentTime();
}
protected final void onError(Throwable t, Supplier<String> errMsg,
Consumer<Throwable> updateCachedLocation) {
if (future.isDone()) {
// Give up if the future is already done, this is possible if user has already canceled the
// future. And for timeline consistent read, we will also cancel some requests if we have
// already get one of the responses.
LOG.debug("The future is already done, canceled={}, give up retrying", future.isCancelled());
return;
}
Throwable error = translateException(t);
// We use this retrying caller to open a scanner, as it is idempotent, but we may throw
// ScannerResetException, which is a DoNotRetryIOException when opening a scanner as now we will
// also fetch data when opening a scanner. The intention here is that if we hit a
// ScannerResetException when scanning then we should try to open a new scanner, instead of
// retrying on the old one, so it is declared as a DoNotRetryIOException. But here we are
// exactly trying to open a new scanner, so we should retry on ScannerResetException.
if (error instanceof DoNotRetryIOException && !(error instanceof ScannerResetException)) {
future.completeExceptionally(error);
return;
}
if (tries > startLogErrorsCnt) {
LOG.warn(errMsg.get() + ", tries = " + tries + ", maxAttempts = " + maxAttempts +
", timeout = " + TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) +
" ms, time elapsed = " + elapsedMs() + " ms", error);
}
updateCachedLocation.accept(error);
RetriesExhaustedException.ThrowableWithExtraContext qt =
new RetriesExhaustedException.ThrowableWithExtraContext(error,
EnvironmentEdgeManager.currentTime(), "");
exceptions.add(qt);
if (tries >= maxAttempts) {
completeExceptionally();
return;
}
// check whether the table has been disabled, notice that the check will introduce a request to
// meta, so here we only check for disabled for some specific exception types.
if (error instanceof NotServingRegionException || error instanceof RegionOfflineException) {
Optional<TableName> tableName = getTableName();
if (tableName.isPresent()) {
FutureUtils.addListener(conn.getAdmin().isTableDisabled(tableName.get()), (disabled, e) -> {
if (e != null) {
if (e instanceof TableNotFoundException) {
future.completeExceptionally(e);
} else {
// failed to test whether the table is disabled, not a big deal, continue retrying
tryScheduleRetry(error);
}
return;
}
if (disabled) {
future.completeExceptionally(new TableNotEnabledException(tableName.get()));
} else {
tryScheduleRetry(error);
}
});
} else {
tryScheduleRetry(error);
}
} else {
tryScheduleRetry(error);
}
}
private boolean canPurge(MonitoredTask stat) {
long cts = stat.getCompletionTimestamp();
return (cts > 0 && EnvironmentEdgeManager.currentTime() - cts > expirationTime);
}
@Test
public void test() throws IOException, InterruptedException {
final HRegionServer rs = testUtil.getRSForFirstRegionInTable(tableName);
final HRegion region = (HRegion) rs.getRegions(tableName).get(0);
HRegion spiedRegion = spy(region);
final MutableBoolean flushed = new MutableBoolean(false);
final MutableBoolean reported = new MutableBoolean(false);
doAnswer(new Answer<FlushResult>() {
@Override
public FlushResult answer(InvocationOnMock invocation) throws Throwable {
synchronized (flushed) {
flushed.setValue(true);
flushed.notifyAll();
}
synchronized (reported) {
while (!reported.booleanValue()) {
reported.wait();
}
}
rs.getWAL(region.getRegionInfo()).abortCacheFlush(
region.getRegionInfo().getEncodedNameAsBytes());
throw new DroppedSnapshotException("testcase");
}
}).when(spiedRegion).internalFlushCacheAndCommit(Matchers.<WAL> any(),
Matchers.<MonitoredTask> any(), Matchers.<PrepareFlushResult> any(),
Matchers.<Collection<HStore>> any());
// Find region key; don't pick up key for hbase:meta by mistake.
String key = null;
for (Map.Entry<String, HRegion> entry: rs.getOnlineRegions().entrySet()) {
if (entry.getValue().getRegionInfo().getTable().equals(this.tableName)) {
key = entry.getKey();
break;
}
}
rs.getOnlineRegions().put(key, spiedRegion);
Connection conn = testUtil.getConnection();
try (Table table = conn.getTable(tableName)) {
table.put(new Put(Bytes.toBytes("row0"))
.addColumn(family, qualifier, Bytes.toBytes("val0")));
}
long oldestSeqIdOfStore = region.getOldestSeqIdOfStore(family);
LOG.info("CHANGE OLDEST " + oldestSeqIdOfStore);
assertTrue(oldestSeqIdOfStore > HConstants.NO_SEQNUM);
rs.getMemStoreFlusher().requestFlush(spiedRegion, FlushLifeCycleTracker.DUMMY);
synchronized (flushed) {
while (!flushed.booleanValue()) {
flushed.wait();
}
}
try (Table table = conn.getTable(tableName)) {
table.put(new Put(Bytes.toBytes("row1"))
.addColumn(family, qualifier, Bytes.toBytes("val1")));
}
long now = EnvironmentEdgeManager.currentTime();
rs.tryRegionServerReport(now - 500, now);
synchronized (reported) {
reported.setValue(true);
reported.notifyAll();
}
while (testUtil.getRSForFirstRegionInTable(tableName) == rs) {
Thread.sleep(100);
}
try (Table table = conn.getTable(tableName)) {
Result result = table.get(new Get(Bytes.toBytes("row0")));
assertArrayEquals(Bytes.toBytes("val0"), result.getValue(family, qualifier));
}
}
@Override
public boolean resubmitTask(String path, Task task, ResubmitDirective directive) {
// its ok if this thread misses the update to task.deleted. It will fail later
if (task.status != IN_PROGRESS) {
return false;
}
int version;
if (directive != FORCE) {
// We're going to resubmit:
// 1) immediately if the worker server is now marked as dead
// 2) after a configurable timeout if the server is not marked as dead but has still not
// finished the task. This allows to continue if the worker cannot actually handle it,
// for any reason.
final long time = EnvironmentEdgeManager.currentTime() - task.last_update;
final boolean alive =
details.getMaster().getServerManager() != null ? details.getMaster().getServerManager()
.isServerOnline(task.cur_worker_name) : true;
if (alive && time < timeout) {
LOG.trace("Skipping the resubmit of " + task.toString() + " because the server "
+ task.cur_worker_name + " is not marked as dead, we waited for " + time
+ " while the timeout is " + timeout);
return false;
}
if (task.unforcedResubmits.get() >= resubmitThreshold) {
if (!task.resubmitThresholdReached) {
task.resubmitThresholdReached = true;
SplitLogCounters.tot_mgr_resubmit_threshold_reached.increment();
LOG.info("Skipping resubmissions of task " + path + " because threshold "
+ resubmitThreshold + " reached");
}
return false;
}
// race with heartbeat() that might be changing last_version
version = task.last_version;
} else {
SplitLogCounters.tot_mgr_resubmit_force.increment();
version = -1;
}
LOG.info("Resubmitting task " + path);
task.incarnation.incrementAndGet();
boolean result = resubmit(path, version);
if (!result) {
task.heartbeatNoDetails(EnvironmentEdgeManager.currentTime());
return false;
}
// don't count forced resubmits
if (directive != FORCE) {
task.unforcedResubmits.incrementAndGet();
}
task.setUnassigned();
rescan(Long.MAX_VALUE);
SplitLogCounters.tot_mgr_resubmit.increment();
return true;
}
private StripeCompactionRequest selectExpiredMergeCompaction(
StripeInformationProvider si, boolean canDropDeletesNoL0) {
long cfTtl = this.storeConfigInfo.getStoreFileTtl();
if (cfTtl == Long.MAX_VALUE) {
return null; // minversion might be set, cannot delete old files
}
long timestampCutoff = EnvironmentEdgeManager.currentTime() - cfTtl;
// Merge the longest sequence of stripes where all files have expired, if any.
int start = -1, bestStart = -1, length = 0, bestLength = 0;
ArrayList<ImmutableList<HStoreFile>> stripes = si.getStripes();
OUTER: for (int i = 0; i < stripes.size(); ++i) {
for (HStoreFile storeFile : stripes.get(i)) {
if (storeFile.getReader().getMaxTimestamp() < timestampCutoff) continue;
// Found non-expired file, this stripe has to stay.
if (length > bestLength) {
bestStart = start;
bestLength = length;
}
start = -1;
length = 0;
continue OUTER;
}
if (start == -1) {
start = i;
}
++length;
}
if (length > bestLength) {
bestStart = start;
bestLength = length;
}
if (bestLength == 0) return null;
if (bestLength == 1) {
// This is currently inefficient. If only one stripe expired, we will rewrite some
// entire stripe just to delete some expired files because we rely on metadata and it
// cannot simply be updated in an old file. When we either determine stripe dynamically
// or move metadata to manifest, we can just drop the "expired stripes".
if (bestStart == (stripes.size() - 1)) return null;
++bestLength;
}
LOG.debug("Merging " + bestLength + " stripes to delete expired store files");
int endIndex = bestStart + bestLength - 1;
ConcatenatedLists<HStoreFile> sfs = new ConcatenatedLists<>();
sfs.addAllSublists(stripes.subList(bestStart, endIndex + 1));
SplitStripeCompactionRequest result = new SplitStripeCompactionRequest(sfs,
si.getStartRow(bestStart), si.getEndRow(endIndex), 1, Long.MAX_VALUE);
if (canDropDeletesNoL0) {
result.setMajorRangeFull();
}
return result;
}
MovedRegionInfo(ServerName serverName, long closeSeqNum) {
this.serverName = serverName;
this.seqNum = closeSeqNum;
this.moveTime = EnvironmentEdgeManager.currentTime();
}
private synchronized void init() {
startTime = EnvironmentEdgeManager.currentTime();
prevTime = startTime;
blockedRequestCount = region.getBlockedRequestsCount();
writeRequestCount = region.getWriteRequestsCount();
}
@Test
public void testMatchExplicitColumns() throws IOException {
// Moving up from the Tracker by using Gets and List<KeyValue> instead
// of just byte []
// Expected result
List<MatchCode> expected = new ArrayList<>(6);
expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL);
expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL);
expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL);
expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL);
expected.add(ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW);
expected.add(ScanQueryMatcher.MatchCode.DONE);
long now = EnvironmentEdgeManager.currentTime();
// 2,4,5
UserScanQueryMatcher qm = UserScanQueryMatcher.create(
scan, new ScanInfo(this.conf, fam2, 0, 1, ttl, KeepDeletedCells.FALSE,
HConstants.DEFAULT_BLOCKSIZE, 0, rowComparator, false),
get.getFamilyMap().get(fam2), now - ttl, now, null);
List<KeyValue> memstore = new ArrayList<>(6);
memstore.add(new KeyValue(row1, fam2, col1, 1, data));
memstore.add(new KeyValue(row1, fam2, col2, 1, data));
memstore.add(new KeyValue(row1, fam2, col3, 1, data));
memstore.add(new KeyValue(row1, fam2, col4, 1, data));
memstore.add(new KeyValue(row1, fam2, col5, 1, data));
memstore.add(new KeyValue(row2, fam1, col1, data));
List<ScanQueryMatcher.MatchCode> actual = new ArrayList<>(memstore.size());
KeyValue k = memstore.get(0);
qm.setToNewRow(k);
for (KeyValue kv : memstore) {
actual.add(qm.match(kv));
}
assertEquals(expected.size(), actual.size());
for (int i = 0; i < expected.size(); i++) {
LOG.debug("expected " + expected.get(i) + ", actual " + actual.get(i));
assertEquals(expected.get(i), actual.get(i));
}
}