下面列出了怎么用org.apache.hadoop.hbase.util.EnvironmentEdgeManager的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Archive the store file
* @param fs the filesystem where the store files live
* @param regionInfo region hosting the store files
* @param conf {@link Configuration} to examine to determine the archive directory
* @param tableDir {@link Path} to where the table is being stored (for building the archive path)
* @param family the family hosting the store files
* @param storeFile file to be archived
* @throws IOException if the files could not be correctly disposed.
*/
public static void archiveStoreFile(Configuration conf, FileSystem fs, RegionInfo regionInfo,
Path tableDir, byte[] family, Path storeFile) throws IOException {
Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, regionInfo, tableDir, family);
// make sure we don't archive if we can't and that the archive dir exists
if (!fs.mkdirs(storeArchiveDir)) {
throw new IOException("Could not make archive directory (" + storeArchiveDir + ") for store:"
+ Bytes.toString(family) + ", deleting compacted files instead.");
}
// do the actual archive
long start = EnvironmentEdgeManager.currentTime();
File file = new FileablePath(fs, storeFile);
if (!resolveAndArchiveFile(storeArchiveDir, file, Long.toString(start))) {
throw new IOException("Failed to archive/delete the file for region:"
+ regionInfo.getRegionNameAsString() + ", family:" + Bytes.toString(family)
+ " into " + storeArchiveDir + ". Something is probably awry on the filesystem.");
}
}
private static void appendCompactionEvent(Writer w, RegionInfo hri, String[] inputs,
String output) throws IOException {
WALProtos.CompactionDescriptor.Builder desc = WALProtos.CompactionDescriptor.newBuilder();
desc.setTableName(ByteString.copyFrom(hri.getTable().toBytes()))
.setEncodedRegionName(ByteString.copyFrom(hri.getEncodedNameAsBytes()))
.setRegionName(ByteString.copyFrom(hri.getRegionName()))
.setFamilyName(ByteString.copyFrom(FAMILY))
.setStoreHomeDir(hri.getEncodedName() + "/" + Bytes.toString(FAMILY))
.addAllCompactionInput(Arrays.asList(inputs))
.addCompactionOutput(output);
WALEdit edit = WALEdit.createCompaction(hri, desc.build());
WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), TABLE_NAME, 1,
EnvironmentEdgeManager.currentTime(), HConstants.DEFAULT_CLUSTER_ID);
w.append(new Entry(key, edit));
w.sync(false);
}
/**
* Deletes some replica columns corresponding to replicas for the passed rows
* @param metaRows rows in hbase:meta
* @param replicaIndexToDeleteFrom the replica ID we would start deleting from
* @param numReplicasToRemove how many replicas to remove
* @param connection connection we're using to access meta table
*/
public static void removeRegionReplicasFromMeta(Set<byte[]> metaRows,
int replicaIndexToDeleteFrom, int numReplicasToRemove, Connection connection)
throws IOException {
int absoluteIndex = replicaIndexToDeleteFrom + numReplicasToRemove;
for (byte[] row : metaRows) {
long now = EnvironmentEdgeManager.currentTime();
Delete deleteReplicaLocations = new Delete(row);
for (int i = replicaIndexToDeleteFrom; i < absoluteIndex; i++) {
deleteReplicaLocations.addColumns(HConstants.CATALOG_FAMILY,
CatalogFamilyFormat.getServerColumn(i), now);
deleteReplicaLocations.addColumns(HConstants.CATALOG_FAMILY,
CatalogFamilyFormat.getSeqNumColumn(i), now);
deleteReplicaLocations.addColumns(HConstants.CATALOG_FAMILY,
CatalogFamilyFormat.getStartCodeColumn(i), now);
deleteReplicaLocations.addColumns(HConstants.CATALOG_FAMILY,
CatalogFamilyFormat.getServerNameColumn(i), now);
deleteReplicaLocations.addColumns(HConstants.CATALOG_FAMILY,
CatalogFamilyFormat.getRegionStateColumn(i), now);
}
deleteFromMetaTable(connection, deleteReplicaLocations);
}
}
/**
* Batch all the {@link KeyValue}s in a collection of kvs by timestamp. Updates any
* {@link KeyValue} with a timestamp == {@link HConstants#LATEST_TIMESTAMP} to the timestamp at
* the time the method is called.
* @param kvs {@link KeyValue}s to break into batches
* @param batches to update with the given kvs
*/
protected void createTimestampBatchesFromKeyValues(Collection<KeyValue> kvs,
Map<Long, Batch> batches) {
long now = EnvironmentEdgeManager.currentTimeMillis();
byte[] nowBytes = Bytes.toBytes(now);
// batch kvs by timestamp
for (KeyValue kv : kvs) {
long ts = kv.getTimestamp();
// override the timestamp to the current time, so the index and primary tables match
// all the keys with LATEST_TIMESTAMP will then be put into the same batch
if (kv.updateLatestStamp(nowBytes)) {
ts = now;
}
Batch batch = batches.get(ts);
if (batch == null) {
batch = new Batch(ts);
batches.put(ts, batch);
}
batch.add(kv);
}
}
/**
* Overwrites the specified regions from hbase:meta. Deletes old rows for the given regions and
* adds new ones. Regions added back have state CLOSED.
* @param connection connection we're using
* @param regionInfos list of regions to be added to META
*/
public static void overwriteRegions(Connection connection, List<RegionInfo> regionInfos,
int regionReplication) throws IOException {
// use master time for delete marker and the Put
long now = EnvironmentEdgeManager.currentTime();
deleteRegionInfos(connection, regionInfos, now);
// Why sleep? This is the easiest way to ensure that the previous deletes does not
// eclipse the following puts, that might happen in the same ts from the server.
// See HBASE-9906, and HBASE-9879. Once either HBASE-9879, HBASE-8770 is fixed,
// or HBASE-9905 is fixed and meta uses seqIds, we do not need the sleep.
//
// HBASE-13875 uses master timestamp for the mutations. The 20ms sleep is not needed
addRegionsToMeta(connection, regionInfos, regionReplication, now + 1);
LOG.info("Overwritten " + regionInfos.size() + " regions to Meta");
LOG.debug("Overwritten regions: {} ", regionInfos);
}
@Override
public void batch(List<? extends Row> actions, Object[] results) throws IOException {
if (ArrayUtils.isEmpty(results)) {
FutureUtils.get(table.batchAll(actions));
return;
}
List<ThrowableWithExtraContext> errors = new ArrayList<>();
List<CompletableFuture<Object>> futures = table.batch(actions);
for (int i = 0, n = results.length; i < n; i++) {
try {
results[i] = FutureUtils.get(futures.get(i));
} catch (IOException e) {
results[i] = e;
errors.add(new ThrowableWithExtraContext(e, EnvironmentEdgeManager.currentTime(),
"Error when processing " + actions.get(i)));
}
}
if (!errors.isEmpty()) {
throw new RetriesExhaustedException(errors.size(), errors);
}
}
synchronized void removeExpiredKeys() {
if (!leaderElector.isMaster()) {
LOG.info("Skipping removeExpiredKeys() because not running as master.");
return;
}
long now = EnvironmentEdgeManager.currentTime();
Iterator<AuthenticationKey> iter = allKeys.values().iterator();
while (iter.hasNext()) {
AuthenticationKey key = iter.next();
if (key.getExpiration() < now) {
LOG.debug("Removing expired key {}", key);
iter.remove();
zkWatcher.removeKeyFromZK(key);
}
}
}
protected ArrayList<HStoreFile> sfCreate(long[] minTimestamps, long[] maxTimestamps, long[] sizes)
throws IOException {
ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge();
EnvironmentEdgeManager.injectEdge(timeMachine);
// Has to be > 0 and < now.
timeMachine.setValue(1);
ArrayList<Long> ageInDisk = new ArrayList<>();
for (int i = 0; i < sizes.length; i++) {
ageInDisk.add(0L);
}
ArrayList<HStoreFile> ret = Lists.newArrayList();
for (int i = 0; i < sizes.length; i++) {
MockHStoreFile msf =
new MockHStoreFile(TEST_UTIL, TEST_FILE, sizes[i], ageInDisk.get(i), false, i);
msf.setTimeRangeTracker(TimeRangeTracker.create(TimeRangeTracker.Type.SYNC, minTimestamps[i], maxTimestamps[i]));
ret.add(msf);
}
return ret;
}
private void startCompaction(Admin admin, TableName table, RegionInfo region, byte[] cf)
throws IOException, InterruptedException {
LOG.info("Started major compaction: table={} cf={} region={}", table,
Bytes.toString(cf), region.getRegionNameAsString());
admin.majorCompactRegion(region.getRegionName(), cf);
// Wait until it really starts
// but with finite timeout
long waitTime = 300000; // 5 min
long startTime = EnvironmentEdgeManager.currentTime();
while (admin.getCompactionStateForRegion(region.getRegionName()) == CompactionState.NONE) {
// Is 1 second too aggressive?
Thread.sleep(1000);
if (EnvironmentEdgeManager.currentTime() - startTime > waitTime) {
LOG.warn("Waited for {} ms to start major MOB compaction on table={} cf={} region={}."+
" Stopped waiting for request confirmation. This is not an ERROR, continue next region."
, waitTime, table.getNameAsString(), Bytes.toString(cf),region.getRegionNameAsString());
break;
}
}
}
private void cleanupIdleConnections() {
long closeBeforeTime = EnvironmentEdgeManager.currentTime() - minIdleTimeBeforeClose;
synchronized (connections) {
for (T conn : connections.values()) {
// Remove connection if it has not been chosen by anyone for more than maxIdleTime, and the
// connection itself has already shutdown. The latter check is because we may still
// have some pending calls on connection so we should not shutdown the connection outside.
// The connection itself will disconnect if there is no pending call for maxIdleTime.
if (conn.getLastTouched() < closeBeforeTime && !conn.isActive()) {
if (LOG.isTraceEnabled()) {
LOG.trace("Cleanup idle connection to {}", conn.remoteId().address);
}
connections.removeValue(conn.remoteId(), conn);
conn.cleanupConnection();
}
}
}
}
@Override
public MetaDataMutationResult getTable(byte[] tenantId, byte[] schemaName, byte[] tableName, long tableTimeStamp, long clientTimeStamp) throws IOException {
try {
byte[] key = SchemaUtil.getTableKey(tenantId, schemaName, tableName);
// get the co-processor environment
RegionCoprocessorEnvironment env = getEnvironment();
// TODO: check that key is within region.getStartKey() and region.getEndKey()
// and return special code to force client to lookup region from meta.
HRegion region = env.getRegion();
MetaDataMutationResult result = checkTableKeyInRegion(key, region);
if (result != null) {
return result;
}
long currentTime = EnvironmentEdgeManager.currentTimeMillis();
PTable table = doGetTable(key, clientTimeStamp);
if (table == null) {
return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, currentTime, null);
}
return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, currentTime, table.getTimeStamp() != tableTimeStamp ? table : null);
} catch (Throwable t) {
ServerUtil.throwIOException(SchemaUtil.getTableName(schemaName, tableName), t);
return null; // impossible
}
}
private Collection<HStoreFile> getExpiredStores(Collection<HStoreFile> files,
Collection<HStoreFile> filesCompacting) {
long currentTime = EnvironmentEdgeManager.currentTime();
Collection<HStoreFile> expiredStores = new ArrayList<>();
for (HStoreFile sf : files) {
if (isEmptyStoreFile(sf) && !filesCompacting.contains(sf)) {
expiredStores.add(sf);
continue;
}
// Check MIN_VERSIONS is in HStore removeUnneededFiles
long maxTs = sf.getReader().getMaxTimestamp();
long maxTtl = storeConfigInfo.getStoreFileTtl();
if (maxTtl == Long.MAX_VALUE || (currentTime - maxTtl < maxTs)) {
continue;
} else if (filesCompacting == null || !filesCompacting.contains(sf)) {
expiredStores.add(sf);
}
}
return expiredStores;
}
@Test
public void testCanExecuteOfFixedIntervalRateLimiter() throws InterruptedException {
RateLimiter limiter = new FixedIntervalRateLimiter();
// when set limit is 100 per sec, this FixedIntervalRateLimiter will support at max 100 per sec
limiter.set(100, TimeUnit.SECONDS);
limiter.setNextRefillTime(EnvironmentEdgeManager.currentTime());
assertEquals(50, testCanExecuteByRate(limiter, 50));
// refill the avail to limit
limiter.set(100, TimeUnit.SECONDS);
limiter.setNextRefillTime(EnvironmentEdgeManager.currentTime());
assertEquals(100, testCanExecuteByRate(limiter, 100));
// refill the avail to limit
limiter.set(100, TimeUnit.SECONDS);
limiter.setNextRefillTime(EnvironmentEdgeManager.currentTime());
assertEquals(100, testCanExecuteByRate(limiter, 200));
}
/**
* Removes from the specified region the store files of the specified column family,
* either by archiving them or outright deletion
* @param fs the filesystem where the store files live
* @param conf {@link Configuration} to examine to determine the archive directory
* @param parent Parent region hosting the store files
* @param familyDir {@link Path} to where the family is being stored
* @param family the family hosting the store files
* @throws IOException if the files could not be correctly disposed.
*/
public static void archiveFamilyByFamilyDir(FileSystem fs, Configuration conf,
RegionInfo parent, Path familyDir, byte[] family) throws IOException {
FileStatus[] storeFiles = CommonFSUtils.listStatus(fs, familyDir);
if (storeFiles == null) {
LOG.debug("No files to dispose of in {}, family={}", parent.getRegionNameAsString(),
Bytes.toString(family));
return;
}
FileStatusConverter getAsFile = new FileStatusConverter(fs);
Collection<File> toArchive = Stream.of(storeFiles).map(getAsFile).collect(Collectors.toList());
Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, parent, family);
// do the actual archive
List<File> failedArchive = resolveAndArchive(fs, storeArchiveDir, toArchive,
EnvironmentEdgeManager.currentTime());
if (!failedArchive.isEmpty()){
throw new FailedArchiveException("Failed to archive/delete all the files for region:"
+ Bytes.toString(parent.getRegionName()) + ", family:" + Bytes.toString(family)
+ " into " + storeArchiveDir + ". Something is probably awry on the filesystem.",
failedArchive.stream().map(FUNC_FILE_TO_PATH).collect(Collectors.toList()));
}
}
@Test
public void testSortExtract(){
ManualEnvironmentEdge mee = new ManualEnvironmentEdge();
EnvironmentEdgeManager.injectEdge(mee);
mee.setValue(1);
DeadServer d = new DeadServer();
d.putIfAbsent(hostname123);
mee.incValue(1);
d.putIfAbsent(hostname1234);
mee.incValue(1);
d.putIfAbsent(hostname12345);
List<Pair<ServerName, Long>> copy = d.copyDeadServersSince(2L);
Assert.assertEquals(2, copy.size());
Assert.assertEquals(hostname1234, copy.get(0).getFirst());
Assert.assertEquals(new Long(2L), copy.get(0).getSecond());
Assert.assertEquals(hostname12345, copy.get(1).getFirst());
Assert.assertEquals(new Long(3L), copy.get(1).getSecond());
EnvironmentEdgeManager.reset();
}
/**
* Updates state in META
* @param conn connection to use
* @param tableName table to look for
*/
public static void updateTableState(Connection conn, TableName tableName,
TableState.State actual) throws IOException {
Put put = makePutFromTableState(new TableState(tableName, actual),
EnvironmentEdgeManager.currentTime());
conn.getTable(TableName.META_TABLE_NAME).put(put);
}
public static Path getBulkOutputDir(String tableName, Configuration conf, boolean deleteOnExit)
throws IOException {
FileSystem fs = FileSystem.get(conf);
String tmp = conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
fs.getHomeDirectory() + "/hbase-staging");
Path path =
new Path(tmp + Path.SEPARATOR + "bulk_output-" + tableName + "-"
+ EnvironmentEdgeManager.currentTime());
if (deleteOnExit) {
fs.deleteOnExit(path);
}
return path;
}
/**
* Tests region sever reportForDuty with manual environment edge
*/
@Test
public void testReportForDutyWithEnvironmentEdge() throws Exception {
// Start a master and wait for it to become the active/primary master.
// Use a random unique port
cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort());
// Set the dispatch and retry delay to 0 since we want the rpc request to be sent immediately
cluster.getConfiguration().setInt("hbase.procedure.remote.dispatcher.delay.msec", 0);
cluster.getConfiguration().setLong("hbase.regionserver.rpc.retry.interval", 0);
// master has a rs. defaultMinToStart = 2
boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(testUtil.getConfiguration());
cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
tablesOnMaster ? 2 : 1);
cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART,
tablesOnMaster ? 2 : 1);
// Inject manual environment edge for clock skew computation between RS and master
ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
EnvironmentEdgeManager.injectEdge(edge);
master = cluster.addMaster();
rs = cluster.addRegionServer();
LOG.debug("Starting master: " + master.getMaster().getServerName());
master.start();
rs.start();
waitForClusterOnline(master);
}
public void writeBulkLoadedFiles(List<TableName> sTableList, Map<byte[], List<Path>>[] maps,
String backupId) throws IOException {
try (Table table = connection.getTable(bulkLoadTableName)) {
long ts = EnvironmentEdgeManager.currentTime();
int cnt = 0;
List<Put> puts = new ArrayList<>();
for (int idx = 0; idx < maps.length; idx++) {
Map<byte[], List<Path>> map = maps[idx];
TableName tn = sTableList.get(idx);
if (map == null) {
continue;
}
for (Map.Entry<byte[], List<Path>> entry : map.entrySet()) {
byte[] fam = entry.getKey();
List<Path> paths = entry.getValue();
for (Path p : paths) {
Put put = BackupSystemTable.createPutForBulkLoadedFile(tn, fam, p.toString(), backupId,
ts, cnt++);
puts.add(put);
}
}
}
if (!puts.isEmpty()) {
table.put(puts);
}
}
}
void addDelete(final Delete delete) {
long now = EnvironmentEdgeManager.currentTimeMillis();
updateLatestTimestamp(delete.getFamilyMap().values(), now);
if (delete.getTimeStamp() == HConstants.LATEST_TIMESTAMP) {
delete.setTimestamp(now);
}
deletes.add(delete);
writeOrdering.add(new WriteAction(delete));
}
@Test
public void testUnconfiguredLimiters() throws InterruptedException {
ManualEnvironmentEdge testEdge = new ManualEnvironmentEdge();
EnvironmentEdgeManager.injectEdge(testEdge);
long limit = Long.MAX_VALUE;
// For unconfigured limiters, it is supposed to use as much as possible
RateLimiter avgLimiter = new AverageIntervalRateLimiter();
RateLimiter fixLimiter = new FixedIntervalRateLimiter();
assertEquals(limit, avgLimiter.getAvailable());
assertEquals(limit, fixLimiter.getAvailable());
assertTrue(avgLimiter.canExecute(limit));
avgLimiter.consume(limit);
assertTrue(fixLimiter.canExecute(limit));
fixLimiter.consume(limit);
// Make sure that available is Long.MAX_VALUE
assertTrue(limit == avgLimiter.getAvailable());
assertTrue(limit == fixLimiter.getAvailable());
// after 100 millseconds, it should be able to execute limit as well
testEdge.incValue(100);
assertTrue(avgLimiter.canExecute(limit));
avgLimiter.consume(limit);
assertTrue(fixLimiter.canExecute(limit));
fixLimiter.consume(limit);
// Make sure that available is Long.MAX_VALUE
assertTrue(limit == avgLimiter.getAvailable());
assertTrue(limit == fixLimiter.getAvailable());
EnvironmentEdgeManager.reset();
}
private SnapshotBuilder createSnapshot(final String snapshotName, final String tableName,
final int numRegions, final int version, final long ttl) throws IOException {
TableDescriptor htd = createHtd(tableName);
RegionData[] regions = createTable(htd, numRegions);
SnapshotProtos.SnapshotDescription desc = SnapshotProtos.SnapshotDescription.newBuilder()
.setTable(htd.getTableName().getNameAsString())
.setName(snapshotName)
.setVersion(version)
.setCreationTime(EnvironmentEdgeManager.currentTime())
.setTtl(ttl)
.build();
Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir, conf);
SnapshotDescriptionUtils.writeSnapshotInfo(desc, workingDir, fs);
return new SnapshotBuilder(conf, fs, rootDir, htd, desc, regions);
}
/**
* Check whether we tolerate IO error this time. If the duration of IOEngine
* throwing errors exceeds ioErrorsDurationTimeTolerated, we will disable the
* cache
*/
private void checkIOErrorIsTolerated() {
long now = EnvironmentEdgeManager.currentTime();
// Do a single read to a local variable to avoid timing issue - HBASE-24454
long ioErrorStartTimeTmp = this.ioErrorStartTime;
if (ioErrorStartTimeTmp > 0) {
if (cacheEnabled && (now - ioErrorStartTimeTmp) > this.ioErrorsTolerationDuration) {
LOG.error("IO errors duration time has exceeded " + ioErrorsTolerationDuration +
"ms, disabling cache, please check your IOEngine");
disableCache();
}
} else {
this.ioErrorStartTime = now;
}
}
/**
* Splits the region into two in an atomic operation. Offlines the parent region with the
* information that it is split into two, and also adds the daughter regions. Does not add the
* location information to the daughter regions since they are not open yet.
* @param connection connection we're using
* @param parent the parent region which is split
* @param parentOpenSeqNum the next open sequence id for parent region, used by serial
* replication. -1 if not necessary.
* @param splitA Split daughter region A
* @param splitB Split daughter region B
* @param sn the location of the region
*/
public static void splitRegion(Connection connection, RegionInfo parent, long parentOpenSeqNum,
RegionInfo splitA, RegionInfo splitB, ServerName sn, int regionReplication) throws IOException {
try (Table meta = getMetaHTable(connection)) {
long time = EnvironmentEdgeManager.currentTime();
// Put for parent
Put putParent = makePutFromRegionInfo(
RegionInfoBuilder.newBuilder(parent).setOffline(true).setSplit(true).build(), time);
addDaughtersToPut(putParent, splitA, splitB);
// Puts for daughters
Put putA = makePutFromRegionInfo(splitA, time);
Put putB = makePutFromRegionInfo(splitB, time);
if (parentOpenSeqNum > 0) {
addReplicationBarrier(putParent, parentOpenSeqNum);
addReplicationParent(putA, Collections.singletonList(parent));
addReplicationParent(putB, Collections.singletonList(parent));
}
// Set initial state to CLOSED
// NOTE: If initial state is not set to CLOSED then daughter regions get added with the
// default OFFLINE state. If Master gets restarted after this step, start up sequence of
// master tries to assign these offline regions. This is followed by re-assignments of the
// daughter regions from resumed {@link SplitTableRegionProcedure}
addRegionStateToPut(putA, RegionState.State.CLOSED);
addRegionStateToPut(putB, RegionState.State.CLOSED);
addSequenceNum(putA, 1, splitA.getReplicaId()); // new regions, openSeqNum = 1 is fine.
addSequenceNum(putB, 1, splitB.getReplicaId());
// Add empty locations for region replicas of daughters so that number of replicas can be
// cached whenever the primary region is looked up from meta
for (int i = 1; i < regionReplication; i++) {
addEmptyLocation(putA, i);
addEmptyLocation(putB, i);
}
byte[] tableRow = Bytes.toBytes(parent.getRegionNameAsString() + HConstants.DELIMITER);
multiMutate(meta, tableRow, putParent, putA, putB);
}
}
/**
* Remove state for table from meta
* @param connection to use for deletion
* @param table to delete state for
*/
public static void deleteTableState(Connection connection, TableName table) throws IOException {
long time = EnvironmentEdgeManager.currentTime();
Delete delete = new Delete(table.getName());
delete.addColumns(HConstants.TABLE_FAMILY, HConstants.TABLE_STATE_QUALIFIER, time);
deleteFromMetaTable(connection, delete);
LOG.info("Deleted table " + table + " state from META");
}
/**
* Here is the unit test for UserScanQueryMatcher#mergeFilterResponse: the match code may be
* changed to SEEK_NEXT_COL or INCLUDE_AND_SEEK_NEXT_COL after merging with filterResponse, even
* if the passed match code is neither SEEK_NEXT_COL nor INCLUDE_AND_SEEK_NEXT_COL. In that case,
* we need to make sure that the ColumnTracker has been switched to the next column. <br/>
* An effective test way is: we only need to check the cell from getKeyForNextColumn(). because
* that as long as the UserScanQueryMatcher returns SEEK_NEXT_COL or INCLUDE_AND_SEEK_NEXT_COL,
* UserScanQueryMatcher#getKeyForNextColumn should return an cell whose column is larger than the
* current cell's.
*/
@Test
public void testMergeFilterResponseCase2() throws Exception {
List<MatchCode> expected = new ArrayList<>();
expected.add(ScanQueryMatcher.MatchCode.INCLUDE);
expected.add(ScanQueryMatcher.MatchCode.INCLUDE);
expected.add(ScanQueryMatcher.MatchCode.INCLUDE);
expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL);
Scan scanWithFilter = new Scan(scan).setFilter(new AlwaysIncludeFilter()).readVersions(3);
long now = EnvironmentEdgeManager.currentTime();
// scan with column 2,4,5, the family with maxVersion = 5
UserScanQueryMatcher qm = UserScanQueryMatcher.create(
scanWithFilter, new ScanInfo(this.conf, fam2, 0, 5, ttl, KeepDeletedCells.FALSE,
HConstants.DEFAULT_BLOCKSIZE, 0, rowComparator, false),
get.getFamilyMap().get(fam2), now - ttl, now, null);
List<KeyValue> memstore = new ArrayList<>();
memstore.add(new KeyValue(row1, fam1, col2, 1, data)); // match code will be INCLUDE
memstore.add(new KeyValue(row1, fam1, col2, 2, data)); // match code will be INCLUDE
memstore.add(new KeyValue(row1, fam1, col2, 3, data)); // match code will be INCLUDE
memstore.add(new KeyValue(row1, fam1, col2, 4, data)); // match code will be SEEK_NEXT_COL
KeyValue k = memstore.get(0);
qm.setToNewRow(k);
for (int i = 0; i < memstore.size(); i++) {
assertEquals(expected.get(i), qm.match(memstore.get(i)));
}
// For last cell, the query matcher will return SEEK_NEXT_COL, and the
// ColumnTracker will skip to the next column, which is col4.
Cell lastCell = memstore.get(memstore.size() - 1);
Cell nextCell = qm.getKeyForNextColumn(lastCell);
assertArrayEquals(nextCell.getQualifierArray(), col4);
}
private void cleanUpOldNonces() {
long cutoff = EnvironmentEdgeManager.currentTime() - deleteNonceGracePeriod;
for (Map.Entry<NonceKey, OperationContext> entry : nonces.entrySet()) {
OperationContext oc = entry.getValue();
if (!oc.isExpired(cutoff)) continue;
synchronized (oc) {
if (oc.getState() == OperationContext.WAIT || !oc.isExpired(cutoff)) continue;
nonces.remove(entry.getKey());
}
}
}
@Override
protected void mutate(Put put, AtomicLong endTime, CountDownLatch latch) throws IOException {
FutureUtils.addListener(mutator.mutate(put), (r, e) -> {
endTime.set(EnvironmentEdgeManager.currentTime());
latch.countDown();
});
mutator.flush();
}
private RegionServerStartupResponse reportForDuty() throws IOException {
if (this.masterless) return RegionServerStartupResponse.getDefaultInstance();
ServerName masterServerName = createRegionServerStatusStub(true);
RegionServerStatusService.BlockingInterface rss = rssStub;
if (masterServerName == null || rss == null) return null;
RegionServerStartupResponse result = null;
try {
rpcServices.requestCount.reset();
rpcServices.rpcGetRequestCount.reset();
rpcServices.rpcScanRequestCount.reset();
rpcServices.rpcMultiRequestCount.reset();
rpcServices.rpcMutateRequestCount.reset();
LOG.info("reportForDuty to master=" + masterServerName + " with port="
+ rpcServices.isa.getPort() + ", startcode=" + this.startcode);
long now = EnvironmentEdgeManager.currentTime();
int port = rpcServices.isa.getPort();
RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
if (!StringUtils.isBlank(useThisHostnameInstead)) {
request.setUseThisHostnameInstead(useThisHostnameInstead);
}
request.setPort(port);
request.setServerStartCode(this.startcode);
request.setServerCurrentTime(now);
result = rss.regionServerStartup(null, request.build());
} catch (ServiceException se) {
IOException ioe = ProtobufUtil.getRemoteException(se);
if (ioe instanceof ClockOutOfSyncException) {
LOG.error(HBaseMarkers.FATAL, "Master rejected startup because clock is out of sync",
ioe);
// Re-throw IOE will cause RS to abort
throw ioe;
} else if (ioe instanceof ServerNotRunningYetException) {
LOG.debug("Master is not running yet");
} else {
LOG.warn("error telling master we are up", se);
}
rssStub = null;
}
return result;
}
/**
* Tests that the timeOfOldestEdit is updated correctly for the
* various edit operations in memstore.
* @throws Exception
*/
@Test
public void testUpdateToTimeOfOldestEdit() throws Exception {
try {
EnvironmentEdgeForMemstoreTest edge = new EnvironmentEdgeForMemstoreTest();
EnvironmentEdgeManager.injectEdge(edge);
DefaultMemStore memstore = new DefaultMemStore();
long t = memstore.timeOfOldestEdit();
assertEquals(Long.MAX_VALUE, t);
// test the case that the timeOfOldestEdit is updated after a KV add
memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, "v"), null);
t = memstore.timeOfOldestEdit();
assertTrue(t == 1234);
// snapshot() will reset timeOfOldestEdit. The method will also assert the
// value is reset to Long.MAX_VALUE
t = runSnapshot(memstore);
// test the case that the timeOfOldestEdit is updated after a KV delete
memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, KeyValue.Type.Delete, "v"), null);
t = memstore.timeOfOldestEdit();
assertTrue(t == 1234);
t = runSnapshot(memstore);
// test the case that the timeOfOldestEdit is updated after a KV upsert
List<Cell> l = new ArrayList<>();
KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v");
kv1.setSequenceId(100);
l.add(kv1);
memstore.upsert(l, 1000, null);
t = memstore.timeOfOldestEdit();
assertTrue(t == 1234);
} finally {
EnvironmentEdgeManager.reset();
}
}