下面列出了org.apache.hadoop.hbase.mapreduce.TableOutputFormat#org.apache.hadoop.hbase.client.Mutation 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void execute(Tuple tuple) {
byte[] rowKey = this.mapper.rowKey(tuple);
ColumnList cols = this.mapper.columns(tuple);
List<Mutation> mutations = hBaseClient.constructMutationReq(rowKey, cols, writeToWAL? Durability.SYNC_WAL : Durability.SKIP_WAL);
try {
this.hBaseClient.batchMutate(mutations);
} catch(Exception e){
LOG.warn("Failing tuple. Error writing rowKey " + rowKey, e);
this.collector.fail(tuple);
return;
}
this.collector.ack(tuple);
}
@Override
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
Mutation mut = miniBatchOp.getOperation(0);
if (mut instanceof Delete) {
List<Cell> cells = mut.getFamilyCellMap().get(test);
Delete[] deletes = new Delete[] {
// delete only 2 rows
new Delete(row1, cells.get(0).getTimestamp()),
new Delete(row2, cells.get(0).getTimestamp()),
};
LOG.info("Deleting:" + Arrays.toString(deletes));
miniBatchOp.addOperationsFromCP(0, deletes);
}
}
private void addShadowCell(HBaseCellId cell, HBaseTransaction tx, SettableFuture<Void> updateSCFuture,
Map<TableName,List<Mutation>> mutations) throws IOException, InterruptedException {
Put put = new Put(cell.getRow());
put.addColumn(cell.getFamily(),
CellUtils.addShadowCellSuffixPrefix(cell.getQualifier(), 0, cell.getQualifier().length),
cell.getTimestamp(),
Bytes.toBytes(tx.getCommitTimestamp()));
TableName table = cell.getTable().getHTable().getName();
List<Mutation> tableMutations = mutations.get(table);
if (tableMutations == null) {
ArrayList<Mutation> newList = new ArrayList<>();
newList.add(put);
mutations.put(table, newList);
} else {
tableMutations.add(put);
if (tableMutations.size() > MAX_BATCH_SIZE) {
flushMutations(table, tableMutations);
mutations.remove(table);
}
}
}
@Override
public MetaDataMutationResult createTable(final List<Mutation> tableMetaData, byte[] tableName, PTableType tableType,
Map<String,Object> tableProps, final List<Pair<byte[],Map<String,Object>>> families, byte[][] splits) throws SQLException {
byte[][] rowKeyMetadata = new byte[3][];
Mutation m = tableMetaData.get(0);
byte[] key = m.getRow();
SchemaUtil.getVarChars(key, rowKeyMetadata);
byte[] tenantIdBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
byte[] schemaBytes = rowKeyMetadata[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
byte[] tableBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
if (tableType != PTableType.VIEW || tableName != null) {
tableName = tableName == null ? SchemaUtil.getTableNameAsBytes(schemaBytes, tableBytes) : tableName;
ensureTableCreated(tableName, tableType, tableProps, families, splits);
}
byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaBytes, tableBytes);
MetaDataMutationResult result = metaDataCoprocessorExec(tableKey,
new Batch.Call<MetaDataProtocol, MetaDataMutationResult>() {
@Override
public MetaDataMutationResult call(MetaDataProtocol instance) throws IOException {
return instance.createTable(tableMetaData);
}
});
return result;
}
public static TRowMutations rowMutationsFromHBase(RowMutations in) {
TRowMutations tRowMutations = new TRowMutations();
tRowMutations.setRow(in.getRow());
for (Mutation mutation : in.getMutations()) {
TMutation tMutation = new TMutation();
if (mutation instanceof Put) {
tMutation.setPut(ThriftUtilities.putFromHBase((Put)mutation));
} else if (mutation instanceof Delete) {
tMutation.setDeleteSingle(ThriftUtilities.deleteFromHBase((Delete)mutation));
} else {
throw new IllegalArgumentException(
"Only Put and Delete is supported in mutateRow, but muation=" + mutation);
}
tRowMutations.addToMutations(tMutation);
}
return tRowMutations;
}
/**
* Split the mutation into batches based on the timestamps of each keyvalue. We need to check each
* key-value in the update to see if it matches the others. Generally, this will be the case, but
* you can add kvs to a mutation that don't all have the timestamp, so we need to manage
* everything in batches based on timestamp.
* <p>
* Adds all the updates in the {@link Mutation} to the state, as a side-effect.
* @param updateMap index updates into which to add new updates. Modified as a side-effect.
* @param state current state of the row for the mutation.
* @param m mutation to batch
* @throws IOException
*/
private void batchMutationAndAddUpdates(IndexUpdateManager manager, Mutation m) throws IOException {
// split the mutation into timestamp-based batches
Collection<Batch> batches = createTimestampBatchesFromMutation(m);
// create a state manager, so we can manage each batch
LocalTableState state = new LocalTableState(env, localTable, m);
// go through each batch of keyvalues and build separate index entries for each
boolean cleanupCurrentState = true;
for (Batch batch : batches) {
/*
* We have to split the work between the cleanup and the update for each group because when we
* update the current state of the row for the current batch (appending the mutations for the
* current batch) the next group will see that as the current state, which will can cause the
* a delete and a put to be created for the next group.
*/
if (addMutationsForBatch(manager, batch, state, cleanupCurrentState)) {
cleanupCurrentState = false;
}
}
}
private void multiMutate(List<Mutation> mutations) throws IOException {
MutateRowsRequest.Builder builder = MutateRowsRequest.newBuilder();
for (Mutation mutation : mutations) {
if (mutation instanceof Put) {
builder
.addMutationRequest(ProtobufUtil.toMutation(MutationProto.MutationType.PUT, mutation));
} else if (mutation instanceof Delete) {
builder.addMutationRequest(
ProtobufUtil.toMutation(MutationProto.MutationType.DELETE, mutation));
} else {
throw new DoNotRetryIOException(
"multiMutate doesn't support " + mutation.getClass().getName());
}
}
MutateRowsRequest request = builder.build();
AsyncTable<?> table = conn.getTable(RSGROUP_TABLE_NAME);
FutureUtils.get(table.<MultiRowMutationService, MutateRowsResponse> coprocessorService(
MultiRowMutationService::newStub,
(stub, controller, done) -> stub.mutateRows(controller, request, done), ROW_KEY));
}
/**
* Create a protocol buffer MultiRequest for row mutations.
* Does not propagate Action absolute position. Does not set atomic action on the created
* RegionAtomic. Caller should do that if wanted.
* @param regionName
* @param rowMutations
* @return a data-laden RegionMutation.Builder
* @throws IOException
*/
public static RegionAction.Builder buildRegionAction(final byte [] regionName,
final RowMutations rowMutations)
throws IOException {
RegionAction.Builder builder =
getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName);
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
for (Mutation mutation: rowMutations.getMutations()) {
MutationType mutateType = null;
if (mutation instanceof Put) {
mutateType = MutationType.PUT;
} else if (mutation instanceof Delete) {
mutateType = MutationType.DELETE;
} else {
throw new DoNotRetryIOException("RowMutations supports only put and delete, not " +
mutation.getClass().getName());
}
mutationBuilder.clear();
MutationProto mp = ProtobufUtil.toMutation(mutateType, mutation, mutationBuilder);
actionBuilder.clear();
actionBuilder.setMutation(mp);
builder.addAction(actionBuilder.build());
}
return builder;
}
private static void logMutationSize(HTableInterface htable, List<Mutation> mutations) {
long byteSize = 0;
int keyValueCount = 0;
for (Mutation mutation : mutations) {
if (mutation.getFamilyMap() != null) { // Not a Delete of the row
for (Entry<byte[], List<KeyValue>> entry : mutation.getFamilyMap().entrySet()) {
if (entry.getValue() != null) {
for (KeyValue kv : entry.getValue()) {
byteSize += kv.getBuffer().length;
keyValueCount++;
}
}
}
}
}
logger.debug("Sending " + mutations.size() + " mutations for " + Bytes.toString(htable.getTableName()) + " with " + keyValueCount + " key values of total size " + byteSize + " bytes");
}
@Override
public void preWALRestore(ObserverContext<RegionCoprocessorEnvironment> env, HRegionInfo info,
HLogKey logKey, WALEdit logEdit) throws IOException {
if (this.disabled) {
super.preWALRestore(env, info, logKey, logEdit);
return;
}
// TODO check the regions in transition. If the server on which the region lives is this one,
// then we should rety that write later in postOpen.
// we might be able to get even smarter here and pre-split the edits that are server-local
// into their own recovered.edits file. This then lets us do a straightforward recovery of each
// region (and more efficiently as we aren't writing quite as hectically from this one place).
/*
* Basically, we let the index regions recover for a little while long before retrying in the
* hopes they come up before the primary table finishes.
*/
Collection<Pair<Mutation, byte[]>> indexUpdates = extractIndexUpdate(logEdit);
recoveryWriter.write(indexUpdates);
}
private void writeStatsToStatsTable(final HRegion region,
boolean delete, List<Mutation> mutations, long currentTime) throws IOException {
try {
// update the statistics table
for (ImmutableBytesPtr fam : guidePostsMap.keySet()) {
if (delete) {
if(logger.isDebugEnabled()) {
logger.debug("Deleting the stats for the region "+region.getRegionInfo());
}
statsTable.deleteStats(region.getRegionInfo().getRegionName(), this, fam,
mutations);
}
if(logger.isDebugEnabled()) {
logger.debug("Adding new stats for the region "+region.getRegionInfo());
}
statsTable.addStats((region.getRegionInfo().getRegionName()), this, fam,
mutations);
}
} catch (IOException e) {
logger.error("Failed to update statistics table!", e);
throw e;
}
}
/**
* Get the list of uncommitted KeyValues for the connection. Currently used to write an
* Phoenix-compliant HFile from a map/reduce job.
* @param conn an open JDBC connection
* @return the list of HBase mutations for uncommitted data
* @throws SQLException
*/
public static Iterator<Pair<byte[],List<KeyValue>>> getUncommittedDataIterator(Connection conn, boolean includeMutableIndexes) throws SQLException {
final PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
final Iterator<Pair<byte[],List<Mutation>>> iterator = pconn.getMutationState().toMutations(includeMutableIndexes);
return new Iterator<Pair<byte[],List<KeyValue>>>() {
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public Pair<byte[], List<KeyValue>> next() {
Pair<byte[],List<Mutation>> pair = iterator.next();
List<KeyValue> keyValues = Lists.newArrayListWithExpectedSize(pair.getSecond().size() * 5); // Guess-timate 5 key values per row
for (Mutation mutation : pair.getSecond()) {
for (List<Cell> keyValueList : mutation.getFamilyCellMap().values()) {
for (Cell keyValue : keyValueList) {
keyValues.add(org.apache.hadoop.hbase.KeyValueUtil.ensureKeyValue(keyValue));
}
}
}
Collections.sort(keyValues, pconn.getKeyValueBuilder().getKeyValueComparator());
return new Pair<byte[], List<KeyValue>>(pair.getFirst(),keyValues);
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
@Override
public void handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause) throws IOException
{
LOGGER.info("Found index update failure!");
handleFailureCalledCount++;
tableReferenceToMutation=attempted;
LOGGER.info("failed index update on WAL recovery - allowing index table can be write.");
failIndexTableWrite=false;
super.handleFailure(attempted, cause);
if(handleFailureCountDownLatch!=null) {
handleFailureCountDownLatch.countDown();
}
}
private void removeIfPresent(Mutation m, byte[] family, byte[] qualifier) {
Map<byte[],List<Cell>> familyMap = m.getFamilyCellMap();
List<Cell> kvs = familyMap.get(family);
if (kvs != null) {
Iterator<Cell> iterator = kvs.iterator();
while (iterator.hasNext()) {
Cell kv = iterator.next();
if (Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength(),
qualifier, 0, qualifier.length) == 0) {
iterator.remove();
break;
}
}
}
}
/**
* see {@link #writeAndHandleFailure(Collection)}.
* @param toWrite
* @throws IOException
*/
public void writeAndHandleFailure(Multimap<HTableInterfaceReference, Mutation> toWrite,
boolean allowLocalUpdates, int clientVersion) throws IOException {
try {
write(toWrite, allowLocalUpdates, clientVersion);
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Done writing all index updates!\n\t" + toWrite);
}
} catch (Exception e) {
this.failurePolicy.handleFailure(toWrite, e);
}
}
public static List<Mutation> generateIndexData(PTable index, PTable table,
List<Mutation> dataMutations, ImmutableBytesWritable ptr, KeyValueBuilder builder)
throws SQLException {
List<Mutation> indexMutations = Lists.newArrayListWithExpectedSize(dataMutations.size());
for (Mutation dataMutation : dataMutations) {
indexMutations.addAll(generateIndexData(index, table, dataMutation, ptr, builder));
}
return indexMutations;
}
private void doPre(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
if (ignoreIndexRebuildForTesting && context.rebuild) {
return;
}
long start = EnvironmentEdgeManager.currentTimeMillis();
try {
if (failPreIndexUpdatesForTesting) {
throw new DoNotRetryIOException("Simulating the first (i.e., pre) index table write failure");
}
doIndexWritesWithExceptions(context, false);
metricSource.updatePreIndexUpdateTime(EnvironmentEdgeManager.currentTimeMillis() - start);
return;
} catch (Throwable e) {
metricSource.updatePreIndexUpdateFailureTime(EnvironmentEdgeManager.currentTimeMillis() - start);
metricSource.incrementPreIndexUpdateFailures();
// Remove all locks as they are already unlocked. There is no need to unlock them again later when
// postBatchMutateIndispensably() is called
removePendingRows(context);
context.rowLocks.clear();
if (context.rebuild) {
throw new IOException(String.format("%s for rebuild", e.getMessage()), e);
} else {
rethrowIndexingException(e);
}
}
throw new RuntimeException(
"Somehow didn't complete the index update, but didn't return succesfully either!");
}
/**
* Add all the {@link KeyValue}s in the {@link Mutation}, for the pass family, to the given
* {@link WALEdit}.
*/
private void addMutation(WALEdit edit, Mutation m, byte[] family) {
List<KeyValue> kvs = m.getFamilyMap().get(FAMILY);
for (KeyValue kv : kvs) {
edit.add(kv);
}
}
private void newMutations() {
Mutation put = this.hasOnDupKey ? new Increment(this.key) : new Put(this.key);
Delete delete = new Delete(this.key);
if (isWALDisabled()) {
put.setDurability(Durability.SKIP_WAL);
delete.setDurability(Durability.SKIP_WAL);
}
this.setValues = put;
this.unsetValues = delete;
}
@Override
public MetaDataMutationResult addColumn(List<Mutation> tableMetaData,
PTable table,
PTable parentTable,
Map<String, List<Pair<String, Object>>> properties,
Set<String> colFamiliesForPColumnsToBeAdded,
List<PColumn> columns) throws SQLException {
return getDelegate().addColumn(tableMetaData, table, parentTable,
properties, colFamiliesForPColumnsToBeAdded, columns);
}
public MutationReplay(ClientProtos.MutationProto.MutationType type, Mutation mutation,
long nonceGroup, long nonce) {
this.type = type;
this.mutation = mutation;
if (this.mutation.getDurability() != Durability.SKIP_WAL) {
// using ASYNC_WAL for relay
this.mutation.setDurability(Durability.ASYNC_WAL);
}
this.nonceGroup = nonceGroup;
this.nonce = nonce;
}
/**
* Writes a key/value pair into the table.
*
* @param key The key.
* @param value The value.
* @throws IOException When writing fails.
* @see RecordWriter#write(Object, Object)
*/
@Override
public void write(KEY key, Mutation value)
throws IOException {
if (!(value instanceof Put) && !(value instanceof Delete)) {
throw new IOException("Pass a Delete or a Put");
}
mutator.mutate(value);
}
public static ScanRanges newScanRanges(List<? extends Mutation> mutations) throws SQLException {
List<KeyRange> keys = Lists.newArrayListWithExpectedSize(mutations.size());
for (Mutation m : mutations) {
keys.add(PVarbinary.INSTANCE.getKeyRange(m.getRow()));
}
ScanRanges keyRanges = ScanRanges.createPointLookup(keys);
return keyRanges;
}
public static PTableType getTableType(List<Mutation> tableMetaData, KeyValueBuilder builder,
ImmutableBytesPtr value) {
if (getMutationValue(getPutOnlyTableHeaderRow(tableMetaData),
PhoenixDatabaseMetaData.TABLE_TYPE_BYTES, builder, value)) {
return PTableType.fromSerializedValue(value.get()[value.getOffset()]);
}
return null;
}
/**
* Invoked before merge regions operation writes the new region to hbase:meta
* @param regionsToMerge the regions to merge
* @param metaEntries the meta entry
* @param user the user
* @throws IOException
*/
public void preMergeRegionsCommit(
final RegionInfo[] regionsToMerge,
final @MetaMutationAnnotation List<Mutation> metaEntries,
final User user) throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
@Override
public void call(MasterObserver observer) throws IOException {
observer.preMergeRegionsCommitAction(this, regionsToMerge, metaEntries);
}
});
}
@Override
public void preWALRestore(
org.apache.hadoop.hbase.coprocessor.ObserverContext<? extends RegionCoprocessorEnvironment> ctx,
org.apache.hadoop.hbase.client.RegionInfo info, org.apache.hadoop.hbase.wal.WALKey logKey, WALEdit logEdit)
throws IOException {
if (this.disabled) {
return;
}
// TODO check the regions in transition. If the server on which the region lives is this one,
// then we should rety that write later in postOpen.
// we might be able to get even smarter here and pre-split the edits that are server-local
// into their own recovered.edits file. This then lets us do a straightforward recovery of each
// region (and more efficiently as we aren't writing quite as hectically from this one place).
long start = EnvironmentEdgeManager.currentTimeMillis();
try {
/*
* Basically, we let the index regions recover for a little while long before retrying in the
* hopes they come up before the primary table finishes.
*/
Collection<Pair<Mutation, byte[]>> indexUpdates = extractIndexUpdate(logEdit);
recoveryWriter.writeAndHandleFailure(indexUpdates, true, ScanUtil.UNKNOWN_CLIENT_VERSION);
} finally {
long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
if (duration >= slowPreWALRestoreThreshold) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(getCallTooSlowMessage("preWALRestore",
duration, slowPreWALRestoreThreshold));
}
metricSource.incrementNumSlowPreWALRestoreCalls();
}
metricSource.updatePreWALRestoreTime(duration);
}
}
public List<Pair<Cell, Cell>> postAppendBeforeWAL(final Mutation mutation,
final List<Pair<Cell, Cell>> cellPairs) throws IOException {
if (this.coprocEnvironments.isEmpty()) {
return cellPairs;
}
return execOperationWithResult(
new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>(
regionObserverGetter, cellPairs) {
@Override
public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException {
return observer.postAppendBeforeWAL(this, mutation, getResult());
}
});
}
public static MutationProto toProto(Mutation mutation) throws IOException {
MutationType type;
if (mutation instanceof Put) {
type = MutationType.PUT;
} else if (mutation instanceof Delete) {
type = MutationType.DELETE;
} else {
throw new IllegalArgumentException("Only Put and Delete are supported");
}
return org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(type, mutation);
}
private void assertContains(Collection<Pair<Mutation, byte[]>> indexUpdates,
final long mutationTs, final byte[] row, final Type cellType, final byte[] fam,
final byte[] qual, final long cellTs) {
Predicate<Pair<Mutation, byte[]>> hasCellPredicate =
new Predicate<Pair<Mutation, byte[]>>() {
@Override
public boolean apply(Pair<Mutation, byte[]> input) {
assertEquals(TEST_TABLE_INDEX_STRING, Bytes.toString(input.getSecond()));
Mutation mutation = input.getFirst();
if (mutationTs == mutation.getTimeStamp()) {
NavigableMap<byte[], List<Cell>> familyCellMap =
mutation.getFamilyCellMap();
Cell updateCell = familyCellMap.get(fam).get(0);
if (cellType == KeyValue.Type.codeToType(updateCell.getTypeByte())
&& Bytes.compareTo(fam, CellUtil.cloneFamily(updateCell)) == 0
&& Bytes.compareTo(qual,
CellUtil.cloneQualifier(updateCell)) == 0
&& cellTs == updateCell.getTimestamp()) {
return true;
}
}
return false;
}
};
Optional<Pair<Mutation, byte[]>> tryFind =
Iterables.tryFind(indexUpdates, hasCellPredicate);
assertTrue(tryFind.isPresent());
}
private Cell createNewCellWithTags(Mutation mutation, Cell newCell) throws IOException {
List<Tag> tags = Lists.newArrayList();
CellVisibility cellVisibility = null;
try {
cellVisibility = mutation.getCellVisibility();
} catch (DeserializationException e) {
throw new IOException(e);
}
if (cellVisibility == null) {
return newCell;
}
// Prepend new visibility tags to a new list of tags for the cell
// Don't check user auths for labels with Mutations when the user is super user
boolean authCheck = authorizationEnabled && checkAuths && !(isSystemOrSuperUser());
tags.addAll(this.visibilityLabelService.createVisibilityExpTags(cellVisibility.getExpression(),
true, authCheck));
// Carry forward all other tags
Iterator<Tag> tagsItr = PrivateCellUtil.tagsIterator(newCell);
while (tagsItr.hasNext()) {
Tag tag = tagsItr.next();
if (tag.getType() != TagType.VISIBILITY_TAG_TYPE
&& tag.getType() != TagType.VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE) {
tags.add(tag);
}
}
return PrivateCellUtil.createCell(newCell, tags);
}