下面列出了org.apache.hadoop.hbase.client.Delete#setTimestamp ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Get the index deletes from the codec {@link IndexCodec#getIndexDeletes(TableState)} and then
* add them to the update map.
* <p>
* Expects the {@link LocalTableState} to already be correctly setup (correct timestamp, updates
* applied, etc).
* @throws IOException
*/
protected void
addDeleteUpdatesToMap(IndexUpdateManager updateMap,
LocalTableState state, long ts) throws IOException {
Iterable<IndexUpdate> cleanup = codec.getIndexDeletes(state);
if (cleanup != null) {
for (IndexUpdate d : cleanup) {
if (!d.isValid()) {
continue;
}
// override the timestamps in the delete to match the current batch.
Delete remove = (Delete)d.getUpdate();
remove.setTimestamp(ts);
updateMap.addIndexUpdate(d.getTableName(), remove);
}
}
}
/**
* Get the index deletes from the codec {@link IndexCodec#getIndexDeletes(TableState, IndexMetaData, byte[], byte[])} and then add them to the
* update map.
* <p>
* Expects the {@link LocalTableState} to already be correctly setup (correct timestamp, updates applied, etc).
* @param indexMetaData TODO
*
* @throws IOException
*/
protected void addDeleteUpdatesToMap(IndexUpdateManager updateMap, LocalTableState state, long ts, IndexMetaData indexMetaData)
throws IOException {
Iterable<IndexUpdate> cleanup = codec.getIndexDeletes(state, indexMetaData, env.getRegionInfo().getStartKey(), env.getRegionInfo().getEndKey());
if (cleanup != null) {
for (IndexUpdate d : cleanup) {
if (!d.isValid()) {
continue;
}
// FIXME: PHOENIX-4057 do not attempt to issue index updates
// for out-of-order mutations since it corrupts the index.
final ColumnTracker tracker = d.getIndexedColumns();
if (tracker.hasNewerTimestamps()) {
continue;
}
// override the timestamps in the delete to match the current batch.
Delete remove = (Delete)d.getUpdate();
remove.setTimestamp(ts);
updateMap.addIndexUpdate(d.getTableName(), remove);
}
}
}
/**
* Get all the deletes necessary for a group of columns - logically, the cleanup the index table for a given index.
*
* @param group
* index information
* @return the cleanup for the given index, or <tt>null</tt> if no cleanup is necessary
*/
private IndexUpdate getDeleteForGroup(ColumnGroup group, TableState state, IndexMetaData indexMetaData) {
List<CoveredColumn> refs = group.getColumns();
try {
Pair<CoveredDeleteScanner, IndexUpdate> kvs = ((LocalTableState)state).getIndexedColumnsTableState(refs, false, false, indexMetaData);
Pair<Integer, List<ColumnEntry>> columns = getNextEntries(refs, kvs.getFirst(), state.getCurrentRowKey());
// make sure we close the scanner reference
kvs.getFirst().close();
// no change, just return the passed update
if (columns.getFirst() == 0) { return kvs.getSecond(); }
// have all the column entries, so just turn it into a Delete for the row
// convert the entries to the needed values
byte[] rowKey = composeRowKey(state.getCurrentRowKey(), columns.getFirst(), columns.getSecond());
Delete d = new Delete(rowKey);
d.setTimestamp(state.getCurrentTimestamp());
IndexUpdate update = kvs.getSecond();
update.setUpdate(d);
update.setTable(Bytes.toBytes(group.getTable()));
return update;
} catch (IOException e) {
throw new RuntimeException("Unexpected exception when getting state for columns: " + refs);
}
}
/**
* Get the index deletes from the codec {@link IndexCodec#getIndexDeletes(TableState)} and then
* add them to the update map.
* <p>
* Expects the {@link LocalTableState} to already be correctly setup (correct timestamp, updates
* applied, etc).
* @throws IOException
*/
protected void
addDeleteUpdatesToMap(IndexUpdateManager updateMap,
LocalTableState state, long ts) throws IOException {
Iterable<IndexUpdate> cleanup = codec.getIndexDeletes(state);
if (cleanup != null) {
for (IndexUpdate d : cleanup) {
if (!d.isValid()) {
continue;
}
// override the timestamps in the delete to match the current batch.
Delete remove = (Delete)d.getUpdate();
remove.setTimestamp(ts);
updateMap.addIndexUpdate(d.getTableName(), remove);
}
}
}
/**
* Get all the deletes necessary for a group of columns - logically, the cleanup the index table
* for a given index.
* @param group index information
* @return the cleanup for the given index, or <tt>null</tt> if no cleanup is necessary
*/
private IndexUpdate getDeleteForGroup(ColumnGroup group, TableState state) {
List<CoveredColumn> refs = group.getColumns();
try {
Pair<Scanner, IndexUpdate> kvs = state.getIndexedColumnsTableState(refs);
Pair<Integer, List<ColumnEntry>> columns =
getNextEntries(refs, kvs.getFirst(), state.getCurrentRowKey());
// make sure we close the scanner reference
kvs.getFirst().close();
// no change, just return the passed update
if (columns.getFirst() == 0) {
return kvs.getSecond();
}
// have all the column entries, so just turn it into a Delete for the row
// convert the entries to the needed values
byte[] rowKey =
composeRowKey(state.getCurrentRowKey(), columns.getFirst(), columns.getSecond());
Delete d = new Delete(rowKey);
d.setTimestamp(state.getCurrentTimestamp());
IndexUpdate update = kvs.getSecond();
update.setUpdate(d);
update.setTable(Bytes.toBytes(group.getTable()));
return update;
} catch (IOException e) {
throw new RuntimeException("Unexpected exception when getting state for columns: " + refs);
}
}
/**
* Get all the deletes necessary for a group of columns - logically, the cleanup the index table
* for a given index.
* @param group index information
* @return the cleanup for the given index, or <tt>null</tt> if no cleanup is necessary
*/
private IndexUpdate getDeleteForGroup(ColumnGroup group, TableState state) {
List<CoveredColumn> refs = group.getColumns();
try {
Pair<Scanner, IndexUpdate> kvs = state.getIndexedColumnsTableState(refs);
Pair<Integer, List<ColumnEntry>> columns =
getNextEntries(refs, kvs.getFirst(), state.getCurrentRowKey());
// make sure we close the scanner reference
kvs.getFirst().close();
// no change, just return the passed update
if (columns.getFirst() == 0) {
return kvs.getSecond();
}
// have all the column entries, so just turn it into a Delete for the row
// convert the entries to the needed values
byte[] rowKey =
composeRowKey(state.getCurrentRowKey(), columns.getFirst(), columns.getSecond());
Delete d = new Delete(rowKey);
d.setTimestamp(state.getCurrentTimestamp());
IndexUpdate update = kvs.getSecond();
update.setUpdate(d);
update.setTable(Bytes.toBytes(group.getTable()));
return update;
} catch (IOException e) {
throw new RuntimeException("Unexpected exception when getting state for columns: " + refs);
}
}
void addDelete(final Delete delete) {
long now = EnvironmentEdgeManager.currentTimeMillis();
updateLatestTimestamp(delete.getFamilyMap().values(), now);
if (delete.getTimeStamp() == HConstants.LATEST_TIMESTAMP) {
delete.setTimestamp(now);
}
deletes.add(delete);
writeOrdering.add(new WriteAction(delete));
}
@Override
public void setTimestamp(Delete d, long timestamp)
{
d.setTimestamp(timestamp);
}
@Override
public void setTimestamp(Delete d, long timestamp)
{
d.setTimestamp(timestamp);
}
@Override
public void setTimestamp(Delete d, long timestamp)
{
d.setTimestamp(timestamp);
}
@Override
public void setTimestamp(Delete d, long timestamp)
{
d.setTimestamp(timestamp);
}
@Override
public void setTimestamp(Delete d, long timestamp)
{
d.setTimestamp(timestamp);
}
@Override
public void setTimestamp(Delete d, long timestamp)
{
d.setTimestamp(timestamp);
}
/**
* Add the necessary mutations for the pending batch on the local state. Handles rolling up
* through history to determine the index changes after applying the batch (for the case where the
* batch is back in time).
* @param updateMap to update with index mutations
* @param batch to apply to the current state
* @param state current state of the table
* @return the minimum timestamp across all index columns requested. If
* {@link ColumnTracker#isNewestTime(long)} returns <tt>true</tt> on the returned
* timestamp, we know that this <i>was not a back-in-time update</i>.
* @throws IOException
*/
private long
addCurrentStateMutationsForBatch(IndexUpdateManager updateMap, LocalTableState state) throws IOException {
// get the index updates for this current batch
Iterable<IndexUpdate> upserts = codec.getIndexUpserts(state);
state.resetTrackedColumns();
/*
* go through all the pending updates. If we are sure that all the entries are the latest
* timestamp, we can just add the index updates and move on. However, if there are columns that
* we skip past (based on the timestamp of the batch), we need to roll back up the history.
* Regardless of whether or not they are the latest timestamp, the entries here are going to be
* correct for the current batch timestamp, so we add them to the updates. The only thing we
* really care about it if we need to roll up the history and fix it as we go.
*/
// timestamp of the next update we need to track
long minTs = ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP;
List<IndexedColumnGroup> columnHints = new ArrayList<IndexedColumnGroup>();
for (IndexUpdate update : upserts) {
// this is the one bit where we check the timestamps
final ColumnTracker tracker = update.getIndexedColumns();
long trackerTs = tracker.getTS();
// update the next min TS we need to track
if (trackerTs < minTs) {
minTs = tracker.getTS();
}
// track index hints for the next round. Hint if we need an update for that column for the
// next timestamp. These columns clearly won't need to update as we go through time as they
// already match the most recent possible thing.
boolean needsCleanup = false;
if (tracker.hasNewerTimestamps()) {
columnHints.add(tracker);
// this update also needs to be cleaned up at the next timestamp because it not the latest.
needsCleanup = true;
}
// only make the put if the index update has been setup
if (update.isValid()) {
byte[] table = update.getTableName();
Mutation mutation = update.getUpdate();
updateMap.addIndexUpdate(table, mutation);
// only make the cleanup if we made a put and need cleanup
if (needsCleanup) {
// there is a TS for the interested columns that is greater than the columns in the
// put. Therefore, we need to issue a delete at the same timestamp
Delete d = new Delete(mutation.getRow());
d.setTimestamp(tracker.getTS());
updateMap.addIndexUpdate(table, d);
}
}
}
return minTs;
}
@Override
public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Delete d) throws IOException {
// stores all the return values
IndexUpdateManager updateMap = new IndexUpdateManager();
// We have to figure out which kind of delete it is, since we need to do different things if its
// a general (row) delete, versus a delete of just a single column or family
Map<byte[], List<Cell>> families = d.getFamilyCellMap();
/*
* Option 1: its a row delete marker, so we just need to delete the most recent state for each
* group, as of the specified timestamp in the delete. This can happen if we have a single row
* update and it is part of a batch mutation (prepare doesn't happen until later... maybe a
* bug?). In a single delete, this delete gets all the column families appended, so the family
* map won't be empty by the time it gets here.
*/
if (families.size() == 0) {
LocalTableState state = new LocalTableState(env, localTable, d);
// get a consistent view of name
long now = d.getTimeStamp();
if (now == HConstants.LATEST_TIMESTAMP) {
now = EnvironmentEdgeManager.currentTimeMillis();
// update the delete's idea of 'now' to be consistent with the index
d.setTimestamp(now);
}
// get deletes from the codec
// we only need to get deletes and not add puts because this delete covers all columns
addDeleteUpdatesToMap(updateMap, state, now);
/*
* Update the current state for all the kvs in the delete. Generally, we would just iterate
* the family map, but since we go here, the family map is empty! Therefore, we need to fake a
* bunch of family deletes (just like hos HRegion#prepareDelete works). This is just needed
* for current version of HBase that has an issue where the batch update doesn't update the
* deletes before calling the hook.
*/
byte[] deleteRow = d.getRow();
for (byte[] family : this.env.getRegion().getTableDesc().getFamiliesKeys()) {
state.addPendingUpdates(new KeyValue(deleteRow, family, null, now,
KeyValue.Type.DeleteFamily));
}
} else {
// Option 2: Its actually a bunch single updates, which can have different timestamps.
// Therefore, we need to do something similar to the put case and batch by timestamp
batchMutationAndAddUpdates(updateMap, d);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Found index updates for Delete: " + d + "\n" + updateMap);
}
return updateMap.toMap();
}
/**
* Add the necessary mutations for the pending batch on the local state. Handles rolling up
* through history to determine the index changes after applying the batch (for the case where the
* batch is back in time).
* @param updateMap to update with index mutations
* @param batch to apply to the current state
* @param state current state of the table
* @return the minimum timestamp across all index columns requested. If
* {@link ColumnTracker#isNewestTime(long)} returns <tt>true</tt> on the returned
* timestamp, we know that this <i>was not a back-in-time update</i>.
* @throws IOException
*/
private long
addCurrentStateMutationsForBatch(IndexUpdateManager updateMap, LocalTableState state) throws IOException {
// get the index updates for this current batch
Iterable<IndexUpdate> upserts = codec.getIndexUpserts(state);
state.resetTrackedColumns();
/*
* go through all the pending updates. If we are sure that all the entries are the latest
* timestamp, we can just add the index updates and move on. However, if there are columns that
* we skip past (based on the timestamp of the batch), we need to roll back up the history.
* Regardless of whether or not they are the latest timestamp, the entries here are going to be
* correct for the current batch timestamp, so we add them to the updates. The only thing we
* really care about it if we need to roll up the history and fix it as we go.
*/
// timestamp of the next update we need to track
long minTs = ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP;
List<IndexedColumnGroup> columnHints = new ArrayList<IndexedColumnGroup>();
for (IndexUpdate update : upserts) {
// this is the one bit where we check the timestamps
final ColumnTracker tracker = update.getIndexedColumns();
long trackerTs = tracker.getTS();
// update the next min TS we need to track
if (trackerTs < minTs) {
minTs = tracker.getTS();
}
// track index hints for the next round. Hint if we need an update for that column for the
// next timestamp. These columns clearly won't need to update as we go through time as they
// already match the most recent possible thing.
boolean needsCleanup = false;
if (tracker.hasNewerTimestamps()) {
columnHints.add(tracker);
// this update also needs to be cleaned up at the next timestamp because it not the latest.
needsCleanup = true;
}
// only make the put if the index update has been setup
if (update.isValid()) {
byte[] table = update.getTableName();
Mutation mutation = update.getUpdate();
updateMap.addIndexUpdate(table, mutation);
// only make the cleanup if we made a put and need cleanup
if (needsCleanup) {
// there is a TS for the interested columns that is greater than the columns in the
// put. Therefore, we need to issue a delete at the same timestamp
Delete d = new Delete(mutation.getRow());
d.setTimestamp(tracker.getTS());
updateMap.addIndexUpdate(table, d);
}
}
}
return minTs;
}
@Override
public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Delete d) throws IOException {
// stores all the return values
IndexUpdateManager updateMap = new IndexUpdateManager();
// We have to figure out which kind of delete it is, since we need to do different things if its
// a general (row) delete, versus a delete of just a single column or family
Map<byte[], List<KeyValue>> families = d.getFamilyMap();
/*
* Option 1: its a row delete marker, so we just need to delete the most recent state for each
* group, as of the specified timestamp in the delete. This can happen if we have a single row
* update and it is part of a batch mutation (prepare doesn't happen until later... maybe a
* bug?). In a single delete, this delete gets all the column families appended, so the family
* map won't be empty by the time it gets here.
*/
if (families.size() == 0) {
LocalTableState state = new LocalTableState(env, localTable, d);
// get a consistent view of name
long now = d.getTimeStamp();
if (now == HConstants.LATEST_TIMESTAMP) {
now = EnvironmentEdgeManager.currentTimeMillis();
// update the delete's idea of 'now' to be consistent with the index
d.setTimestamp(now);
}
// get deletes from the codec
// we only need to get deletes and not add puts because this delete covers all columns
addDeleteUpdatesToMap(updateMap, state, now);
/*
* Update the current state for all the kvs in the delete. Generally, we would just iterate
* the family map, but since we go here, the family map is empty! Therefore, we need to fake a
* bunch of family deletes (just like hos HRegion#prepareDelete works). This is just needed
* for current version of HBase that has an issue where the batch update doesn't update the
* deletes before calling the hook.
*/
byte[] deleteRow = d.getRow();
for (byte[] family : this.env.getRegion().getTableDesc().getFamiliesKeys()) {
state.addPendingUpdates(new KeyValue(deleteRow, family, null, now,
KeyValue.Type.DeleteFamily));
}
} else {
// Option 2: Its actually a bunch single updates, which can have different timestamps.
// Therefore, we need to do something similar to the put case and batch by timestamp
batchMutationAndAddUpdates(updateMap, d);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Found index updates for Delete: " + d + "\n" + updateMap);
}
return updateMap.toMap();
}