org.apache.hadoop.hbase.mapreduce.TableOutputFormat#org.apache.hadoop.hbase.client.Mutation源码实例Demo

下面列出了org.apache.hadoop.hbase.mapreduce.TableOutputFormat#org.apache.hadoop.hbase.client.Mutation 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: storm-hbase   文件: HBaseBolt.java
@Override
public void execute(Tuple tuple) {
    byte[] rowKey = this.mapper.rowKey(tuple);
    ColumnList cols = this.mapper.columns(tuple);
    List<Mutation> mutations = hBaseClient.constructMutationReq(rowKey, cols, writeToWAL? Durability.SYNC_WAL : Durability.SKIP_WAL);

    try {
        this.hBaseClient.batchMutate(mutations);
    } catch(Exception e){
        LOG.warn("Failing tuple. Error writing rowKey " + rowKey, e);
        this.collector.fail(tuple);
        return;
    }

    this.collector.ack(tuple);
}
 
@Override
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
    MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
  Mutation mut = miniBatchOp.getOperation(0);

  if (mut instanceof Delete) {
    List<Cell> cells = mut.getFamilyCellMap().get(test);
    Delete[] deletes = new Delete[] {
        // delete only 2 rows
        new Delete(row1, cells.get(0).getTimestamp()),
        new Delete(row2, cells.get(0).getTimestamp()),
    };
    LOG.info("Deleting:" + Arrays.toString(deletes));
    miniBatchOp.addOperationsFromCP(0, deletes);
  }
}
 
源代码3 项目: phoenix-omid   文件: HBaseSyncPostCommitter.java
private void addShadowCell(HBaseCellId cell, HBaseTransaction tx, SettableFuture<Void> updateSCFuture,
                           Map<TableName,List<Mutation>> mutations) throws IOException, InterruptedException {
    Put put = new Put(cell.getRow());
    put.addColumn(cell.getFamily(),
            CellUtils.addShadowCellSuffixPrefix(cell.getQualifier(), 0, cell.getQualifier().length),
            cell.getTimestamp(),
            Bytes.toBytes(tx.getCommitTimestamp()));

    TableName table = cell.getTable().getHTable().getName();
    List<Mutation> tableMutations = mutations.get(table);
    if (tableMutations == null) {
        ArrayList<Mutation> newList = new ArrayList<>();
        newList.add(put);
        mutations.put(table, newList);
    } else {
        tableMutations.add(put);
        if (tableMutations.size() > MAX_BATCH_SIZE) {
            flushMutations(table, tableMutations);
            mutations.remove(table);
        }
    }
}
 
源代码4 项目: phoenix   文件: ConnectionQueryServicesImpl.java
@Override
public MetaDataMutationResult createTable(final List<Mutation> tableMetaData, byte[] tableName, PTableType tableType,
        Map<String,Object> tableProps, final List<Pair<byte[],Map<String,Object>>> families, byte[][] splits) throws SQLException {
    byte[][] rowKeyMetadata = new byte[3][];
    Mutation m = tableMetaData.get(0);
    byte[] key = m.getRow();
    SchemaUtil.getVarChars(key, rowKeyMetadata);
    byte[] tenantIdBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
    byte[] schemaBytes = rowKeyMetadata[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
    byte[] tableBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
    if (tableType != PTableType.VIEW || tableName != null) {
        tableName = tableName == null ? SchemaUtil.getTableNameAsBytes(schemaBytes, tableBytes) : tableName;
        ensureTableCreated(tableName, tableType, tableProps, families, splits);
    }
    
    byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaBytes, tableBytes);
    MetaDataMutationResult result = metaDataCoprocessorExec(tableKey,
        new Batch.Call<MetaDataProtocol, MetaDataMutationResult>() {
            @Override
            public MetaDataMutationResult call(MetaDataProtocol instance) throws IOException {
              return instance.createTable(tableMetaData);
            }
        });
    return result;
}
 
源代码5 项目: hbase   文件: ThriftUtilities.java
public static TRowMutations rowMutationsFromHBase(RowMutations in) {
  TRowMutations tRowMutations = new TRowMutations();
  tRowMutations.setRow(in.getRow());
  for (Mutation mutation : in.getMutations()) {
    TMutation tMutation = new TMutation();
    if (mutation instanceof Put) {
      tMutation.setPut(ThriftUtilities.putFromHBase((Put)mutation));
    } else if (mutation instanceof Delete) {
      tMutation.setDeleteSingle(ThriftUtilities.deleteFromHBase((Delete)mutation));
    } else {
      throw new IllegalArgumentException(
          "Only Put and Delete is supported in mutateRow, but muation=" + mutation);
    }
    tRowMutations.addToMutations(tMutation);
  }
  return tRowMutations;
}
 
源代码6 项目: phoenix   文件: CoveredColumnsIndexBuilder.java
/**
  * Split the mutation into batches based on the timestamps of each keyvalue. We need to check each
  * key-value in the update to see if it matches the others. Generally, this will be the case, but
  * you can add kvs to a mutation that don't all have the timestamp, so we need to manage
  * everything in batches based on timestamp.
  * <p>
  * Adds all the updates in the {@link Mutation} to the state, as a side-effect.
  * @param updateMap index updates into which to add new updates. Modified as a side-effect.
  * @param state current state of the row for the mutation.
  * @param m mutation to batch
* @throws IOException 
  */
 private void batchMutationAndAddUpdates(IndexUpdateManager manager, Mutation m) throws IOException {
   // split the mutation into timestamp-based batches
   Collection<Batch> batches = createTimestampBatchesFromMutation(m);

   // create a state manager, so we can manage each batch
   LocalTableState state = new LocalTableState(env, localTable, m);

   // go through each batch of keyvalues and build separate index entries for each
   boolean cleanupCurrentState = true;
   for (Batch batch : batches) {
     /*
      * We have to split the work between the cleanup and the update for each group because when we
      * update the current state of the row for the current batch (appending the mutations for the
      * current batch) the next group will see that as the current state, which will can cause the
      * a delete and a put to be created for the next group.
      */
     if (addMutationsForBatch(manager, batch, state, cleanupCurrentState)) {
       cleanupCurrentState = false;
     }
   }
 }
 
源代码7 项目: hbase   文件: RSGroupInfoManagerImpl.java
private void multiMutate(List<Mutation> mutations) throws IOException {
  MutateRowsRequest.Builder builder = MutateRowsRequest.newBuilder();
  for (Mutation mutation : mutations) {
    if (mutation instanceof Put) {
      builder
          .addMutationRequest(ProtobufUtil.toMutation(MutationProto.MutationType.PUT, mutation));
    } else if (mutation instanceof Delete) {
      builder.addMutationRequest(
        ProtobufUtil.toMutation(MutationProto.MutationType.DELETE, mutation));
    } else {
      throw new DoNotRetryIOException(
          "multiMutate doesn't support " + mutation.getClass().getName());
    }
  }
  MutateRowsRequest request = builder.build();
  AsyncTable<?> table = conn.getTable(RSGROUP_TABLE_NAME);
  FutureUtils.get(table.<MultiRowMutationService, MutateRowsResponse> coprocessorService(
    MultiRowMutationService::newStub,
    (stub, controller, done) -> stub.mutateRows(controller, request, done), ROW_KEY));
}
 
源代码8 项目: hbase   文件: RequestConverter.java
/**
 * Create a protocol buffer MultiRequest for row mutations.
 * Does not propagate Action absolute position.  Does not set atomic action on the created
 * RegionAtomic.  Caller should do that if wanted.
 * @param regionName
 * @param rowMutations
 * @return a data-laden RegionMutation.Builder
 * @throws IOException
 */
public static RegionAction.Builder buildRegionAction(final byte [] regionName,
    final RowMutations rowMutations)
throws IOException {
  RegionAction.Builder builder =
    getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName);
  ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
  MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
  for (Mutation mutation: rowMutations.getMutations()) {
    MutationType mutateType = null;
    if (mutation instanceof Put) {
      mutateType = MutationType.PUT;
    } else if (mutation instanceof Delete) {
      mutateType = MutationType.DELETE;
    } else {
      throw new DoNotRetryIOException("RowMutations supports only put and delete, not " +
        mutation.getClass().getName());
    }
    mutationBuilder.clear();
    MutationProto mp = ProtobufUtil.toMutation(mutateType, mutation, mutationBuilder);
    actionBuilder.clear();
    actionBuilder.setMutation(mp);
    builder.addAction(actionBuilder.build());
  }
  return builder;
}
 
源代码9 项目: phoenix   文件: MutationState.java
private static void logMutationSize(HTableInterface htable, List<Mutation> mutations) {
    long byteSize = 0;
    int keyValueCount = 0;
    for (Mutation mutation : mutations) {
        if (mutation.getFamilyMap() != null) { // Not a Delete of the row
            for (Entry<byte[], List<KeyValue>> entry : mutation.getFamilyMap().entrySet()) {
                if (entry.getValue() != null) {
                    for (KeyValue kv : entry.getValue()) {
                        byteSize += kv.getBuffer().length;
                        keyValueCount++;
                    }
                }
            }
        }
    }
    logger.debug("Sending " + mutations.size() + " mutations for " + Bytes.toString(htable.getTableName()) + " with " + keyValueCount + " key values of total size " + byteSize + " bytes");
}
 
源代码10 项目: phoenix   文件: Indexer.java
@Override
public void preWALRestore(ObserverContext<RegionCoprocessorEnvironment> env, HRegionInfo info,
    HLogKey logKey, WALEdit logEdit) throws IOException {
    if (this.disabled) {
        super.preWALRestore(env, info, logKey, logEdit);
        return;
      }
  // TODO check the regions in transition. If the server on which the region lives is this one,
  // then we should rety that write later in postOpen.
  // we might be able to get even smarter here and pre-split the edits that are server-local
  // into their own recovered.edits file. This then lets us do a straightforward recovery of each
  // region (and more efficiently as we aren't writing quite as hectically from this one place).

  /*
   * Basically, we let the index regions recover for a little while long before retrying in the
   * hopes they come up before the primary table finishes.
   */
  Collection<Pair<Mutation, byte[]>> indexUpdates = extractIndexUpdate(logEdit);
  recoveryWriter.write(indexUpdates);
}
 
源代码11 项目: phoenix   文件: StatisticsCollector.java
private void writeStatsToStatsTable(final HRegion region,
        boolean delete, List<Mutation> mutations, long currentTime) throws IOException {
    try {
        // update the statistics table
        for (ImmutableBytesPtr fam : guidePostsMap.keySet()) {
            if (delete) {
                if(logger.isDebugEnabled()) {
                    logger.debug("Deleting the stats for the region "+region.getRegionInfo());
                }
                statsTable.deleteStats(region.getRegionInfo().getRegionName(), this, fam,
                        mutations);
            }
            if(logger.isDebugEnabled()) {
                logger.debug("Adding new stats for the region "+region.getRegionInfo());
            }
            statsTable.addStats((region.getRegionInfo().getRegionName()), this, fam,
                    mutations);
        }
    } catch (IOException e) {
        logger.error("Failed to update statistics table!", e);
        throw e;
    }
}
 
源代码12 项目: phoenix   文件: PhoenixRuntime.java
/**
 * Get the list of uncommitted KeyValues for the connection. Currently used to write an
 * Phoenix-compliant HFile from a map/reduce job.
 * @param conn an open JDBC connection
 * @return the list of HBase mutations for uncommitted data
 * @throws SQLException
 */
public static Iterator<Pair<byte[],List<KeyValue>>> getUncommittedDataIterator(Connection conn, boolean includeMutableIndexes) throws SQLException {
    final PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
    final Iterator<Pair<byte[],List<Mutation>>> iterator = pconn.getMutationState().toMutations(includeMutableIndexes);
    return new Iterator<Pair<byte[],List<KeyValue>>>() {

        @Override
        public boolean hasNext() {
            return iterator.hasNext();
        }

        @Override
        public Pair<byte[], List<KeyValue>> next() {
            Pair<byte[],List<Mutation>> pair = iterator.next();
            List<KeyValue> keyValues = Lists.newArrayListWithExpectedSize(pair.getSecond().size() * 5); // Guess-timate 5 key values per row
            for (Mutation mutation : pair.getSecond()) {
                for (List<Cell> keyValueList : mutation.getFamilyCellMap().values()) {
                    for (Cell keyValue : keyValueList) {
                        keyValues.add(org.apache.hadoop.hbase.KeyValueUtil.ensureKeyValue(keyValue));
                    }
                }
            }
            Collections.sort(keyValues, pconn.getKeyValueBuilder().getKeyValueComparator());
            return new Pair<byte[], List<KeyValue>>(pair.getFirst(),keyValues);
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException();
        }

    };
}
 
源代码13 项目: phoenix   文件: WALRecoveryRegionPostOpenIT.java
@Override
public void handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause) throws IOException
{
    LOGGER.info("Found index update failure!");
    handleFailureCalledCount++;
    tableReferenceToMutation=attempted;
    LOGGER.info("failed index update on WAL recovery - allowing index table can be write.");
    failIndexTableWrite=false;
    super.handleFailure(attempted, cause);

    if(handleFailureCountDownLatch!=null) {
        handleFailureCountDownLatch.countDown();
    }
 }
 
源代码14 项目: phoenix   文件: PTableImpl.java
private void removeIfPresent(Mutation m, byte[] family, byte[] qualifier) {
    Map<byte[],List<Cell>> familyMap = m.getFamilyCellMap();
    List<Cell> kvs = familyMap.get(family);
    if (kvs != null) {
        Iterator<Cell> iterator = kvs.iterator();
        while (iterator.hasNext()) {
            Cell kv = iterator.next();
            if (Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength(),
                  qualifier, 0, qualifier.length) == 0) {
                iterator.remove();
                break;
            }
        }
    }
}
 
源代码15 项目: phoenix   文件: IndexWriter.java
/**
 * see {@link #writeAndHandleFailure(Collection)}.
 * @param toWrite
 * @throws IOException
 */
  public void writeAndHandleFailure(Multimap<HTableInterfaceReference, Mutation> toWrite,
                                    boolean allowLocalUpdates, int clientVersion) throws IOException {
  try {
    write(toWrite, allowLocalUpdates, clientVersion);
    if (LOGGER.isTraceEnabled()) {
      LOGGER.trace("Done writing all index updates!\n\t" + toWrite);
    }
  } catch (Exception e) {
    this.failurePolicy.handleFailure(toWrite, e);
  }
}
 
源代码16 项目: phoenix   文件: IndexTestUtil.java
public static List<Mutation> generateIndexData(PTable index, PTable table,
        List<Mutation> dataMutations, ImmutableBytesWritable ptr, KeyValueBuilder builder)
        throws SQLException {
    List<Mutation> indexMutations = Lists.newArrayListWithExpectedSize(dataMutations.size());
    for (Mutation dataMutation : dataMutations) {
        indexMutations.addAll(generateIndexData(index, table, dataMutation, ptr, builder));
    }
    return indexMutations;
}
 
源代码17 项目: phoenix   文件: IndexRegionObserver.java
private void doPre(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context,
                   MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
    if (ignoreIndexRebuildForTesting && context.rebuild) {
        return;
    }
    long start = EnvironmentEdgeManager.currentTimeMillis();
    try {
        if (failPreIndexUpdatesForTesting) {
            throw new DoNotRetryIOException("Simulating the first (i.e., pre) index table write failure");
        }
        doIndexWritesWithExceptions(context, false);
        metricSource.updatePreIndexUpdateTime(EnvironmentEdgeManager.currentTimeMillis() - start);
        return;
    } catch (Throwable e) {
        metricSource.updatePreIndexUpdateFailureTime(EnvironmentEdgeManager.currentTimeMillis() - start);
        metricSource.incrementPreIndexUpdateFailures();
        // Remove all locks as they are already unlocked. There is no need to unlock them again later when
        // postBatchMutateIndispensably() is called
        removePendingRows(context);
        context.rowLocks.clear();
        if (context.rebuild) {
            throw new IOException(String.format("%s for rebuild", e.getMessage()), e);
        } else {
            rethrowIndexingException(e);
        }
    }
    throw new RuntimeException(
            "Somehow didn't complete the index update, but didn't return succesfully either!");
}
 
源代码18 项目: phoenix   文件: TestReadWriteKeyValuesWithCodec.java
/**
 * Add all the {@link KeyValue}s in the {@link Mutation}, for the pass family, to the given
 * {@link WALEdit}.
 */
private void addMutation(WALEdit edit, Mutation m, byte[] family) {
  List<KeyValue> kvs = m.getFamilyMap().get(FAMILY);
  for (KeyValue kv : kvs) {
    edit.add(kv);
  }
}
 
源代码19 项目: phoenix   文件: PTableImpl.java
private void newMutations() {
     Mutation put = this.hasOnDupKey ? new Increment(this.key) : new Put(this.key);
     Delete delete = new Delete(this.key);
     if (isWALDisabled()) {
         put.setDurability(Durability.SKIP_WAL);
         delete.setDurability(Durability.SKIP_WAL);
     }
     this.setValues = put;
     this.unsetValues = delete;
}
 
源代码20 项目: phoenix   文件: DelegateConnectionQueryServices.java
@Override
public MetaDataMutationResult addColumn(List<Mutation> tableMetaData,
                                        PTable table,
                                        PTable parentTable,
                                        Map<String, List<Pair<String, Object>>> properties,
                                        Set<String> colFamiliesForPColumnsToBeAdded,
                                        List<PColumn> columns) throws SQLException {
    return getDelegate().addColumn(tableMetaData, table, parentTable,
            properties, colFamiliesForPColumnsToBeAdded, columns);
}
 
源代码21 项目: hbase   文件: WALSplitUtil.java
public MutationReplay(ClientProtos.MutationProto.MutationType type, Mutation mutation,
    long nonceGroup, long nonce) {
  this.type = type;
  this.mutation = mutation;
  if (this.mutation.getDurability() != Durability.SKIP_WAL) {
    // using ASYNC_WAL for relay
    this.mutation.setDurability(Durability.ASYNC_WAL);
  }
  this.nonceGroup = nonceGroup;
  this.nonce = nonce;
}
 
源代码22 项目: hbase   文件: TableOutputFormat.java
/**
 * Writes a key/value pair into the table.
 *
 * @param key  The key.
 * @param value  The value.
 * @throws IOException When writing fails.
 * @see RecordWriter#write(Object, Object)
 */
@Override
public void write(KEY key, Mutation value)
throws IOException {
  if (!(value instanceof Put) && !(value instanceof Delete)) {
    throw new IOException("Pass a Delete or a Put");
  }
  mutator.mutate(value);
}
 
源代码23 项目: phoenix   文件: ScanUtil.java
public static ScanRanges newScanRanges(List<? extends Mutation> mutations) throws SQLException {
    List<KeyRange> keys = Lists.newArrayListWithExpectedSize(mutations.size());
    for (Mutation m : mutations) {
        keys.add(PVarbinary.INSTANCE.getKeyRange(m.getRow()));
    }
    ScanRanges keyRanges = ScanRanges.createPointLookup(keys);
    return keyRanges;
}
 
源代码24 项目: phoenix   文件: MetaDataUtil.java
public static PTableType getTableType(List<Mutation> tableMetaData, KeyValueBuilder builder,
  ImmutableBytesPtr value) {
    if (getMutationValue(getPutOnlyTableHeaderRow(tableMetaData),
        PhoenixDatabaseMetaData.TABLE_TYPE_BYTES, builder, value)) {
        return PTableType.fromSerializedValue(value.get()[value.getOffset()]);
    }
    return null;
}
 
源代码25 项目: hbase   文件: MasterCoprocessorHost.java
/**
 * Invoked before merge regions operation writes the new region to hbase:meta
 * @param regionsToMerge the regions to merge
 * @param metaEntries the meta entry
 * @param user the user
 * @throws IOException
 */
public void preMergeRegionsCommit(
    final RegionInfo[] regionsToMerge,
    final @MetaMutationAnnotation List<Mutation> metaEntries,
    final User user) throws IOException {
  execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
    @Override
    public void call(MasterObserver observer) throws IOException {
      observer.preMergeRegionsCommitAction(this, regionsToMerge, metaEntries);
    }
  });
}
 
源代码26 项目: phoenix   文件: Indexer.java
@Override
  public void preWALRestore(
          org.apache.hadoop.hbase.coprocessor.ObserverContext<? extends RegionCoprocessorEnvironment> ctx,
          org.apache.hadoop.hbase.client.RegionInfo info, org.apache.hadoop.hbase.wal.WALKey logKey, WALEdit logEdit)
          throws IOException {

    if (this.disabled) {
        return;
    }

  // TODO check the regions in transition. If the server on which the region lives is this one,
  // then we should rety that write later in postOpen.
  // we might be able to get even smarter here and pre-split the edits that are server-local
  // into their own recovered.edits file. This then lets us do a straightforward recovery of each
  // region (and more efficiently as we aren't writing quite as hectically from this one place).

    long start = EnvironmentEdgeManager.currentTimeMillis();
    try {
        /*
         * Basically, we let the index regions recover for a little while long before retrying in the
         * hopes they come up before the primary table finishes.
         */
        Collection<Pair<Mutation, byte[]>> indexUpdates = extractIndexUpdate(logEdit);
        recoveryWriter.writeAndHandleFailure(indexUpdates, true, ScanUtil.UNKNOWN_CLIENT_VERSION);
    } finally {
        long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
        if (duration >= slowPreWALRestoreThreshold) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(getCallTooSlowMessage("preWALRestore",
                        duration, slowPreWALRestoreThreshold));
            }
            metricSource.incrementNumSlowPreWALRestoreCalls();
        }
        metricSource.updatePreWALRestoreTime(duration);
    }
}
 
源代码27 项目: hbase   文件: RegionCoprocessorHost.java
public List<Pair<Cell, Cell>> postAppendBeforeWAL(final Mutation mutation,
    final List<Pair<Cell, Cell>> cellPairs) throws IOException {
  if (this.coprocEnvironments.isEmpty()) {
    return cellPairs;
  }
  return execOperationWithResult(
      new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>(
          regionObserverGetter, cellPairs) {
        @Override
        public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException {
          return observer.postAppendBeforeWAL(this, mutation, getResult());
        }
      });
}
 
源代码28 项目: phoenix   文件: ProtobufUtil.java
public static MutationProto toProto(Mutation mutation) throws IOException {
    MutationType type;
    if (mutation instanceof Put) {
        type = MutationType.PUT;
    } else if (mutation instanceof Delete) {
        type = MutationType.DELETE;
    } else {
        throw new IllegalArgumentException("Only Put and Delete are supported");
    }
    return org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(type, mutation);
}
 
源代码29 项目: phoenix   文件: NonTxIndexBuilderTest.java
private void assertContains(Collection<Pair<Mutation, byte[]>> indexUpdates,
        final long mutationTs, final byte[] row, final Type cellType, final byte[] fam,
        final byte[] qual, final long cellTs) {
    Predicate<Pair<Mutation, byte[]>> hasCellPredicate =
            new Predicate<Pair<Mutation, byte[]>>() {
                @Override
                public boolean apply(Pair<Mutation, byte[]> input) {
                    assertEquals(TEST_TABLE_INDEX_STRING, Bytes.toString(input.getSecond()));
                    Mutation mutation = input.getFirst();
                    if (mutationTs == mutation.getTimeStamp()) {
                        NavigableMap<byte[], List<Cell>> familyCellMap =
                                mutation.getFamilyCellMap();
                        Cell updateCell = familyCellMap.get(fam).get(0);
                        if (cellType == KeyValue.Type.codeToType(updateCell.getTypeByte())
                                && Bytes.compareTo(fam, CellUtil.cloneFamily(updateCell)) == 0
                                && Bytes.compareTo(qual,
                                    CellUtil.cloneQualifier(updateCell)) == 0
                                && cellTs == updateCell.getTimestamp()) {
                            return true;
                        }
                    }
                    return false;
                }
            };
    Optional<Pair<Mutation, byte[]>> tryFind =
            Iterables.tryFind(indexUpdates, hasCellPredicate);
    assertTrue(tryFind.isPresent());
}
 
源代码30 项目: hbase   文件: VisibilityController.java
private Cell createNewCellWithTags(Mutation mutation, Cell newCell) throws IOException {
  List<Tag> tags = Lists.newArrayList();
  CellVisibility cellVisibility = null;
  try {
    cellVisibility = mutation.getCellVisibility();
  } catch (DeserializationException e) {
    throw new IOException(e);
  }
  if (cellVisibility == null) {
    return newCell;
  }
  // Prepend new visibility tags to a new list of tags for the cell
  // Don't check user auths for labels with Mutations when the user is super user
  boolean authCheck = authorizationEnabled && checkAuths && !(isSystemOrSuperUser());
  tags.addAll(this.visibilityLabelService.createVisibilityExpTags(cellVisibility.getExpression(),
      true, authCheck));
  // Carry forward all other tags
  Iterator<Tag> tagsItr = PrivateCellUtil.tagsIterator(newCell);
  while (tagsItr.hasNext()) {
    Tag tag = tagsItr.next();
    if (tag.getType() != TagType.VISIBILITY_TAG_TYPE
        && tag.getType() != TagType.VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE) {
      tags.add(tag);
    }
  }

  return PrivateCellUtil.createCell(newCell, tags);
}