下面列出了org.apache.hadoop.hbase.util.EnvironmentEdgeManager#currentTimeMillis ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Batch all the {@link KeyValue}s in a collection of kvs by timestamp. Updates any
* {@link KeyValue} with a timestamp == {@link HConstants#LATEST_TIMESTAMP} to the timestamp at
* the time the method is called.
* @param kvs {@link KeyValue}s to break into batches
* @param batches to update with the given kvs
*/
protected void createTimestampBatchesFromKeyValues(Collection<KeyValue> kvs,
Map<Long, Batch> batches) {
long now = EnvironmentEdgeManager.currentTimeMillis();
byte[] nowBytes = Bytes.toBytes(now);
// batch kvs by timestamp
for (KeyValue kv : kvs) {
long ts = kv.getTimestamp();
// override the timestamp to the current time, so the index and primary tables match
// all the keys with LATEST_TIMESTAMP will then be put into the same batch
if (kv.updateLatestStamp(nowBytes)) {
ts = now;
}
Batch batch = batches.get(ts);
if (batch == null) {
batch = new Batch(ts);
batches.put(ts, batch);
}
batch.add(kv);
}
}
@Override
public MetaDataMutationResult getTable(byte[] tenantId, byte[] schemaName, byte[] tableName, long tableTimeStamp, long clientTimeStamp) throws IOException {
try {
byte[] key = SchemaUtil.getTableKey(tenantId, schemaName, tableName);
// get the co-processor environment
RegionCoprocessorEnvironment env = getEnvironment();
// TODO: check that key is within region.getStartKey() and region.getEndKey()
// and return special code to force client to lookup region from meta.
HRegion region = env.getRegion();
MetaDataMutationResult result = checkTableKeyInRegion(key, region);
if (result != null) {
return result;
}
long currentTime = EnvironmentEdgeManager.currentTimeMillis();
PTable table = doGetTable(key, clientTimeStamp);
if (table == null) {
return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, currentTime, null);
}
return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, currentTime, table.getTimeStamp() != tableTimeStamp ? table : null);
} catch (Throwable t) {
ServerUtil.throwIOException(SchemaUtil.getTableName(schemaName, tableName), t);
return null; // impossible
}
}
/**
* Try and commit a transaction. This does both phases of the 2-phase protocol: prepare and commit.
*
* @param transactionState
* @throws IOException
* @throws CommitUnsuccessfulException
*/
public void tryCommit(final TransactionState transactionState) throws CommitUnsuccessfulException, IOException {
long startTime = EnvironmentEdgeManager.currentTimeMillis();
LOG.trace("atempting to commit trasaction: " + transactionState.toString());
int status = prepareCommit(transactionState);
if (status == TransactionalRegionInterface.COMMIT_OK) {
doCommit(transactionState);
} else if (status == TransactionalRegionInterface.COMMIT_OK_READ_ONLY) {
transactionLogger.forgetTransaction(transactionState.getTransactionId());
} else if (status == TransactionalRegionInterface.COMMIT_UNSUCESSFUL) {
// We have already aborted at this point
throw new CommitUnsuccessfulException();
}
LOG.trace("Committed transaction [" + transactionState.getTransactionId() + "] in ["
+ ((EnvironmentEdgeManager.currentTimeMillis() - startTime)) + "]ms");
}
private static MetaDataMutationResult checkTableKeyInRegion(byte[] key, HRegion region) {
byte[] startKey = region.getStartKey();
byte[] endKey = region.getEndKey();
if (Bytes.compareTo(startKey, key) <= 0 && (Bytes.compareTo(HConstants.LAST_ROW, endKey) == 0 || Bytes.compareTo(key, endKey) < 0)) {
return null; // normal case;
}
return new MetaDataMutationResult(MutationCode.TABLE_NOT_IN_REGION, EnvironmentEdgeManager.currentTimeMillis(), null);
}
void addDelete(final Delete delete) {
long now = EnvironmentEdgeManager.currentTimeMillis();
updateLatestTimestamp(delete.getFamilyMap().values(), now);
if (delete.getTimeStamp() == HConstants.LATEST_TIMESTAMP) {
delete.setTimestamp(now);
}
deletes.add(delete);
writeOrdering.add(new WriteAction(delete));
}
TransactionScanner(final Scan scan) {
super(new KeyValue.KVComparator() {
@Override
public int compare(final KeyValue left, final KeyValue right) {
int result = super.compare(left, right);
if (result != 0) {
return result;
}
if (left == right) {
return 0;
}
int put1Number = getTransactionSequenceIndex(left);
int put2Number = getTransactionSequenceIndex(right);
return put2Number - put1Number;
}
}, getAllKVs(scan));
// We want transaction scanner to always take priority over store
// scanners.
setSequenceID(Long.MAX_VALUE);
// matcher = new ScanQueryMatcher(scan, null, null,
// HConstants.FOREVER, KeyValue.KEY_COMPARATOR,
// scan.getMaxVersions());
matcher = new ScanQueryMatcher(scan, null, null,
null, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
EnvironmentEdgeManager.currentTimeMillis());
}
@Override
public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Delete d) throws IOException {
// stores all the return values
IndexUpdateManager updateMap = new IndexUpdateManager();
// We have to figure out which kind of delete it is, since we need to do different things if its
// a general (row) delete, versus a delete of just a single column or family
Map<byte[], List<KeyValue>> families = d.getFamilyMap();
/*
* Option 1: its a row delete marker, so we just need to delete the most recent state for each
* group, as of the specified timestamp in the delete. This can happen if we have a single row
* update and it is part of a batch mutation (prepare doesn't happen until later... maybe a
* bug?). In a single delete, this delete gets all the column families appended, so the family
* map won't be empty by the time it gets here.
*/
if (families.size() == 0) {
LocalTableState state = new LocalTableState(env, localTable, d);
// get a consistent view of name
long now = d.getTimeStamp();
if (now == HConstants.LATEST_TIMESTAMP) {
now = EnvironmentEdgeManager.currentTimeMillis();
// update the delete's idea of 'now' to be consistent with the index
d.setTimestamp(now);
}
// get deletes from the codec
// we only need to get deletes and not add puts because this delete covers all columns
addDeleteUpdatesToMap(updateMap, state, now);
/*
* Update the current state for all the kvs in the delete. Generally, we would just iterate
* the family map, but since we go here, the family map is empty! Therefore, we need to fake a
* bunch of family deletes (just like hos HRegion#prepareDelete works). This is just needed
* for current version of HBase that has an issue where the batch update doesn't update the
* deletes before calling the hook.
*/
byte[] deleteRow = d.getRow();
for (byte[] family : this.env.getRegion().getTableDesc().getFamiliesKeys()) {
state.addPendingUpdates(new KeyValue(deleteRow, family, null, now,
KeyValue.Type.DeleteFamily));
}
} else {
// Option 2: Its actually a bunch single updates, which can have different timestamps.
// Therefore, we need to do something similar to the put case and batch by timestamp
batchMutationAndAddUpdates(updateMap, d);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Found index updates for Delete: " + d + "\n" + updateMap);
}
return updateMap.toMap();
}
@Override
public MetaDataMutationResult createTable(List<Mutation> tableMetadata) throws IOException {
byte[][] rowKeyMetaData = new byte[3][];
MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata,rowKeyMetaData);
byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
byte[] schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
byte[] tableName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
try {
byte[] parentTableName = MetaDataUtil.getParentTableName(tableMetadata);
byte[] lockTableName = parentTableName == null ? tableName : parentTableName;
byte[] lockKey = SchemaUtil.getTableKey(tenantIdBytes, schemaName, lockTableName);
byte[] key = parentTableName == null ? lockKey : SchemaUtil.getTableKey(tenantIdBytes, schemaName, tableName);
byte[] parentKey = parentTableName == null ? null : lockKey;
RegionCoprocessorEnvironment env = getEnvironment();
HRegion region = env.getRegion();
MetaDataMutationResult result = checkTableKeyInRegion(lockKey, region);
if (result != null) {
return result;
}
List<Integer> lids = Lists.newArrayList(5);
long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
try {
acquireLock(region, lockKey, lids);
if (key != lockKey) {
acquireLock(region, key, lids);
}
// Load parent table first
PTable parentTable = null;
ImmutableBytesPtr parentCacheKey = null;
if (parentKey != null) {
parentCacheKey = new ImmutableBytesPtr(parentKey);
parentTable = loadTable(env, parentKey, parentCacheKey, clientTimeStamp, clientTimeStamp);
if (parentTable == null || isTableDeleted(parentTable)) {
return new MetaDataMutationResult(MutationCode.PARENT_TABLE_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), parentTable);
}
// If parent table isn't at the expected sequence number, then return
if (parentTable.getSequenceNumber() != MetaDataUtil.getParentSequenceNumber(tableMetadata)) {
return new MetaDataMutationResult(MutationCode.CONCURRENT_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), parentTable);
}
}
// Load child table next
ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
// Get as of latest timestamp so we can detect if we have a newer table that already exists
// without making an additional query
PTable table = loadTable(env, key, cacheKey, clientTimeStamp, HConstants.LATEST_TIMESTAMP);
if (table != null) {
if (table.getTimeStamp() < clientTimeStamp) {
// If the table is older than the client time stamp and it's deleted, continue
if (!isTableDeleted(table)) {
return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, EnvironmentEdgeManager.currentTimeMillis(), table);
}
} else {
return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND, EnvironmentEdgeManager.currentTimeMillis(), table);
}
} else {
// If we're attempting to create the SYSTEM.TABLE,
// then is the first cluster connection to Phoenix v 3.0, in which case we
// need to upgrade from 2.x to 3.0. Since our updates are additive, we do
// not need to delete any rows, but can just allow the mutation to complete.
if (SchemaUtil.isMetaTable(schemaName, tableName)) {
SchemaUtil.upgradeTo3(region, tableMetadata);
}
}
// TODO: Switch this to HRegion#batchMutate when we want to support indexes on the system
// table. Basically, we get all the locks that we don't already hold for all the
// tableMetadata rows. This ensures we don't have deadlock situations (ensuring primary and
// then index table locks are held, in that order). For now, we just don't support indexing
// on the system table. This is an issue because of the way we manage batch mutation in the
// Indexer.
region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]>emptySet());
// Invalidate the cache - the next getTable call will add it
// TODO: consider loading the table that was just created here, patching up the parent table, and updating the cache
Map<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.getEnvironment()).getMetaDataCache();
if (parentCacheKey != null) {
metaDataCache.remove(parentCacheKey);
}
metaDataCache.remove(cacheKey);
// Get timeStamp from mutations - the above method sets it if it's unset
long currentTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, currentTimeStamp, null);
} finally {
releaseLocks(region, lids);
}
} catch (Throwable t) {
ServerUtil.throwIOException(SchemaUtil.getTableName(schemaName, tableName), t);
return null; // impossible
}
}
@Override
public MetaDataMutationResult dropTable(List<Mutation> tableMetadata, String tableType) throws IOException {
byte[][] rowKeyMetaData = new byte[3][];
MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata,rowKeyMetaData);
byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
byte[] schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
byte[] tableName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
// Disallow deletion of a system table
if (tableType.equals(PTableType.SYSTEM.getSerializedValue())) {
return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), null);
}
List<byte[]> tableNamesToDelete = Lists.newArrayList();
try {
byte[] parentTableName = MetaDataUtil.getParentTableName(tableMetadata);
byte[] lockTableName = parentTableName == null ? tableName : parentTableName;
byte[] lockKey = SchemaUtil.getTableKey(tenantIdBytes, schemaName, lockTableName);
byte[] key = parentTableName == null ? lockKey : SchemaUtil.getTableKey(tenantIdBytes, schemaName, tableName);
RegionCoprocessorEnvironment env = getEnvironment();
HRegion region = env.getRegion();
MetaDataMutationResult result = checkTableKeyInRegion(key, region);
if (result != null) {
return result;
}
List<Integer> lids = Lists.newArrayList(5);
try {
acquireLock(region, lockKey, lids);
if (key != lockKey) {
acquireLock(region, key, lids);
}
List<ImmutableBytesPtr> invalidateList = new ArrayList<ImmutableBytesPtr>();
result = doDropTable(key, tenantIdBytes, schemaName, tableName, PTableType.fromSerializedValue(tableType), tableMetadata, invalidateList, lids, tableNamesToDelete);
if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS || result.getTable() == null) {
return result;
}
Map<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.getEnvironment()).getMetaDataCache();
// Commit the list of deletion.
region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]>emptySet());
long currentTime = MetaDataUtil.getClientTimeStamp(tableMetadata);
for (ImmutableBytesPtr ckey: invalidateList) {
metaDataCache.put(ckey, newDeletedTableMarker(currentTime));
}
if (parentTableName != null) {
ImmutableBytesPtr parentCacheKey = new ImmutableBytesPtr(lockKey);
metaDataCache.remove(parentCacheKey);
}
return result;
} finally {
releaseLocks(region, lids);
}
} catch (Throwable t) {
ServerUtil.throwIOException(SchemaUtil.getTableName(schemaName, tableName), t);
return null; // impossible
}
}
private MetaDataMutationResult mutateColumn(List<Mutation> tableMetadata, ColumnMutator mutator) throws IOException {
byte[][] rowKeyMetaData = new byte[5][];
MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata,rowKeyMetaData);
byte[] tenantId = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
byte[] schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
byte[] tableName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
try {
RegionCoprocessorEnvironment env = getEnvironment();
byte[] key = SchemaUtil.getTableKey(tenantId, schemaName, tableName);
HRegion region = env.getRegion();
MetaDataMutationResult result = checkTableKeyInRegion(key, region);
if (result != null) {
return result;
}
List<Integer> lids = Lists.newArrayList(5);
try {
acquireLock(region, key, lids);
ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
List<ImmutableBytesPtr> invalidateList = new ArrayList<ImmutableBytesPtr>();
invalidateList.add(cacheKey);
Map<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.getEnvironment()).getMetaDataCache();
PTable table = metaDataCache.get(cacheKey);
if (logger.isDebugEnabled()) {
if (table == null) {
logger.debug("Table " + Bytes.toStringBinary(key) + " not found in cache. Will build through scan");
} else {
logger.debug("Table " + Bytes.toStringBinary(key) + " found in cache with timestamp " + table.getTimeStamp() + " seqNum " + table.getSequenceNumber());
}
}
// Get client timeStamp from mutations
long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
if (table == null && (table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP)) == null) {
// if not found then call newerTableExists and add delete marker for timestamp found
if (buildDeletedTable(key, cacheKey, region, clientTimeStamp) != null) {
return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
}
return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
}
if (table.getTimeStamp() >= clientTimeStamp) {
return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND, EnvironmentEdgeManager.currentTimeMillis(), table);
} else if (isTableDeleted(table)) {
return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
}
long expectedSeqNum = MetaDataUtil.getSequenceNumber(tableMetadata) - 1; // lookup TABLE_SEQ_NUM in tableMetaData
if (logger.isDebugEnabled()) {
logger.debug("For table " + Bytes.toStringBinary(key) + " expecting seqNum " + expectedSeqNum + " and found seqNum " + table.getSequenceNumber() + " with " + table.getColumns().size() + " columns: " + table.getColumns());
}
if (expectedSeqNum != table.getSequenceNumber()) {
if (logger.isDebugEnabled()) {
logger.debug("For table " + Bytes.toStringBinary(key) + " returning CONCURRENT_TABLE_MUTATION due to unexpected seqNum");
}
return new MetaDataMutationResult(MutationCode.CONCURRENT_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), table);
}
PTableType type = table.getType();
if (type == PTableType.INDEX) {
// Disallow mutation of an index table
return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), null);
} else {
PTableType expectedType = MetaDataUtil.getTableType(tableMetadata);
// We said to drop a table, but found a view or visa versa
if (type != expectedType) {
return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
}
if (hasViews(region, tenantId, table)) {
// Disallow any column mutations for parents of tenant tables
return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), null);
}
}
result = mutator.updateMutation(table, rowKeyMetaData, tableMetadata, region, invalidateList, lids);
if (result != null) {
return result;
}
region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]>emptySet());
// Invalidate from cache
for (ImmutableBytesPtr invalidateKey : invalidateList) {
PTable invalidatedTable = metaDataCache.remove(invalidateKey);
if (logger.isDebugEnabled()) {
if (invalidatedTable == null) {
logger.debug("Attempted to invalidated table key " + Bytes.toStringBinary(cacheKey.get(),cacheKey.getOffset(),cacheKey.getLength()) + " but found no cached table");
} else {
logger.debug("Invalidated table key " + Bytes.toStringBinary(cacheKey.get(),cacheKey.getOffset(),cacheKey.getLength()) + " with timestamp " + invalidatedTable.getTimeStamp() + " and seqNum " + invalidatedTable.getSequenceNumber());
}
}
}
// Get client timeStamp from mutations, since it may get updated by the mutateRowsWithLocks call
long currentTime = MetaDataUtil.getClientTimeStamp(tableMetadata);
return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, currentTime, null);
} finally {
releaseLocks(region,lids);
}
} catch (Throwable t) {
ServerUtil.throwIOException(SchemaUtil.getTableName(schemaName, tableName), t);
return null; // impossible
}
}