org.apache.hadoop.hbase.wal.WALEdit#getCells ( )源码实例Demo

下面列出了org.apache.hadoop.hbase.wal.WALEdit#getCells ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: hbase   文件: ReplicationSourceWALReader.java
/**
 * Calculate the total size of all the store files
 * @param edit edit to count row keys from
 * @return the total size of the store files
 */
private int sizeOfStoreFilesIncludeBulkLoad(WALEdit edit) {
  List<Cell> cells = edit.getCells();
  int totalStoreFilesSize = 0;

  int totalCells = edit.size();
  for (int i = 0; i < totalCells; i++) {
    if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
      try {
        BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
        List<StoreDescriptor> stores = bld.getStoresList();
        int totalStores = stores.size();
        for (int j = 0; j < totalStores; j++) {
          totalStoreFilesSize =
              (int) (totalStoreFilesSize + stores.get(j).getStoreFileSizeBytes());
        }
      } catch (IOException e) {
        LOG.error("Failed to deserialize bulk load entry from wal edit. "
            + "Size of HFiles part of cell will not be considered in replication "
            + "request size calculation.",
          e);
      }
    }
  }
  return totalStoreFilesSize;
}
 
源代码2 项目: hbase   文件: ReplicationSourceShipper.java
private void cleanUpHFileRefs(WALEdit edit) throws IOException {
  String peerId = source.getPeerId();
  if (peerId.contains("-")) {
    // peerClusterZnode will be in the form peerId + "-" + rsZNode.
    // A peerId will not have "-" in its name, see HBASE-11394
    peerId = peerId.split("-")[0];
  }
  List<Cell> cells = edit.getCells();
  int totalCells = cells.size();
  for (int i = 0; i < totalCells; i++) {
    Cell cell = cells.get(i);
    if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
      BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
      List<StoreDescriptor> stores = bld.getStoresList();
      int totalStores = stores.size();
      for (int j = 0; j < totalStores; j++) {
        List<String> storeFileList = stores.get(j).getStoreFileList();
        source.getSourceManager().cleanUpHFileRefs(peerId, storeFileList);
        source.getSourceMetrics().decrSizeOfHFileRefsQueue(storeFileList.size());
      }
    }
  }
}
 
源代码3 项目: hbase   文件: WALUtil.java
public static void filterCells(WALEdit edit, Function<Cell, Cell> mapper) {
  ArrayList<Cell> cells = edit.getCells();
  int size = cells.size();
  int newSize = 0;
  for (int i = 0; i < size; i++) {
    Cell cell = mapper.apply(cells.get(i));
    if (cell != null) {
      cells.set(newSize, cell);
      newSize++;
    }
  }
  for (int i = size - 1; i >= newSize; i--) {
    cells.remove(i);
  }
  if (newSize < size / 2) {
    cells.trimToSize();
  }
}
 
源代码4 项目: hbase   文件: WALPlayer.java
@Override
public void map(WALKey key, WALEdit value, Context context) throws IOException {
  try {
    // skip all other tables
    TableName table = key.getTableName();
    if (tableSet.contains(table.getNameAsString())) {
      for (Cell cell : value.getCells()) {
        if (WALEdit.isMetaEditFamily(cell)) {
          continue;
        }
        byte[] outKey = multiTableSupport
            ? Bytes.add(table.getName(), Bytes.toBytes(tableSeparator), CellUtil.cloneRow(cell))
            : CellUtil.cloneRow(cell);
        context.write(new ImmutableBytesWritable(outKey), new MapReduceExtendedCell(cell));
      }
    }
  } catch (InterruptedException e) {
    LOG.error("Interrupted while emitting Cell", e);
    Thread.currentThread().interrupt();
  }
}
 
源代码5 项目: hbase   文件: ReplicationSourceWALReader.java
/**
 * Count the number of different row keys in the given edit because of mini-batching. We assume
 * that there's at least one Cell in the WALEdit.
 * @param edit edit to count row keys from
 * @return number of different row keys and HFiles
 */
private Pair<Integer, Integer> countDistinctRowKeysAndHFiles(WALEdit edit) {
  List<Cell> cells = edit.getCells();
  int distinctRowKeys = 1;
  int totalHFileEntries = 0;
  Cell lastCell = cells.get(0);

  int totalCells = edit.size();
  for (int i = 0; i < totalCells; i++) {
    // Count HFiles to be replicated
    if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
      try {
        BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
        List<StoreDescriptor> stores = bld.getStoresList();
        int totalStores = stores.size();
        for (int j = 0; j < totalStores; j++) {
          totalHFileEntries += stores.get(j).getStoreFileList().size();
        }
      } catch (IOException e) {
        LOG.error("Failed to deserialize bulk load entry from wal edit. "
            + "Then its hfiles count will not be added into metric.");
      }
    }

    if (!CellUtil.matchingRows(cells.get(i), lastCell)) {
      distinctRowKeys++;
    }
    lastCell = cells.get(i);
  }

  Pair<Integer, Integer> result = new Pair<>(distinctRowKeys, totalHFileEntries);
  return result;
}
 
源代码6 项目: hbase   文件: SampleRegionWALCoprocessor.java
@Override
public void preWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> env,
    RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
  // check table name matches or not.
  if (!Bytes.equals(info.getTable().toBytes(), this.tableName)) {
    return;
  }
  preWALWriteCalled = true;
  // here we're going to remove one keyvalue from the WALEdit, and add
  // another one to it.
  List<Cell> cells = logEdit.getCells();
  Cell deletedCell = null;
  for (Cell cell : cells) {
    // assume only one kv from the WALEdit matches.
    byte[] family = CellUtil.cloneFamily(cell);
    byte[] qulifier = CellUtil.cloneQualifier(cell);

    if (Arrays.equals(family, ignoredFamily) &&
        Arrays.equals(qulifier, ignoredQualifier)) {
      LOG.debug("Found the KeyValue from WALEdit which should be ignored.");
      deletedCell = cell;
    }
    if (Arrays.equals(family, changedFamily) &&
        Arrays.equals(qulifier, changedQualifier)) {
      LOG.debug("Found the KeyValue from WALEdit which should be changed.");
      cell.getValueArray()[cell.getValueOffset()] =
          (byte) (cell.getValueArray()[cell.getValueOffset()] + 1);
    }
  }
  if (null != row) {
    cells.add(new KeyValue(row, addedFamily, addedQualifier));
  }
  if (deletedCell != null) {
    LOG.debug("About to delete a KeyValue from WALEdit.");
    cells.remove(deletedCell);
  }
}
 
源代码7 项目: hbase   文件: TestHRegionServerBulkLoad.java
@Override
public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) {
  for (Cell cell : logEdit.getCells()) {
    KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
    for (Map.Entry entry : kv.toStringMap().entrySet()) {
      if (entry.getValue().equals(Bytes.toString(WALEdit.BULK_LOAD))) {
        found = true;
      }
    }
  }
}
 
源代码8 项目: phoenix   文件: Indexer.java
/**
 * Extract the index updates from the WAL Edit
 * @param edit to search for index updates
 * @return the mutations to apply to the index tables
 */
private Collection<Pair<Mutation, byte[]>> extractIndexUpdate(WALEdit edit) {
  // Avoid multiple internal array resizings. Initial size of 64, unless we have fewer cells in the edit
  int initialSize = Math.min(edit.size(), 64);
  Collection<Pair<Mutation, byte[]>> indexUpdates = new ArrayList<Pair<Mutation, byte[]>>(initialSize);
  for (Cell kv : edit.getCells()) {
    if (kv instanceof IndexedKeyValue) {
      IndexedKeyValue ikv = (IndexedKeyValue) kv;
      indexUpdates.add(new Pair<Mutation, byte[]>(ikv.getMutation(), ikv.getIndexTable()));
    }
  }

  return indexUpdates;
}
 
源代码9 项目: hbase   文件: ReplicationProtbufUtil.java
/**
 * Create a new ReplicateWALEntryRequest from a list of WAL entries
 * @param entries the WAL entries to be replicated
 * @param encodedRegionName alternative region name to use if not null
 * @param replicationClusterId Id which will uniquely identify source cluster FS client
 *          configurations in the replication configuration directory
 * @param sourceBaseNamespaceDir Path to source cluster base namespace directory
 * @param sourceHFileArchiveDir Path to the source cluster hfile archive directory
 * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values found.
 */
public static Pair<ReplicateWALEntryRequest, CellScanner> buildReplicateWALEntryRequest(
    final Entry[] entries, byte[] encodedRegionName, String replicationClusterId,
    Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir) {
  // Accumulate all the Cells seen in here.
  List<List<? extends Cell>> allCells = new ArrayList<>(entries.length);
  int size = 0;
  WALEntry.Builder entryBuilder = WALEntry.newBuilder();
  ReplicateWALEntryRequest.Builder builder = ReplicateWALEntryRequest.newBuilder();

  for (Entry entry: entries) {
    entryBuilder.clear();
    WALProtos.WALKey.Builder keyBuilder;
    try {
      keyBuilder = entry.getKey().getBuilder(WALCellCodec.getNoneCompressor());
    } catch (IOException e) {
      throw new AssertionError(
        "There should not throw exception since NoneCompressor do not throw any exceptions", e);
    }
    if(encodedRegionName != null){
      keyBuilder.setEncodedRegionName(
          UnsafeByteOperations.unsafeWrap(encodedRegionName));
    }
    entryBuilder.setKey(keyBuilder.build());
    WALEdit edit = entry.getEdit();
    List<Cell> cells = edit.getCells();
    // Add up the size.  It is used later serializing out the kvs.
    for (Cell cell: cells) {
      size += PrivateCellUtil.estimatedSerializedSizeOf(cell);
    }
    // Collect up the cells
    allCells.add(cells);
    // Write out how many cells associated with this entry.
    entryBuilder.setAssociatedCellCount(cells.size());
    builder.addEntry(entryBuilder.build());
  }

  if (replicationClusterId != null) {
    builder.setReplicationClusterId(replicationClusterId);
  }
  if (sourceBaseNamespaceDir != null) {
    builder.setSourceBaseNamespaceDirPath(sourceBaseNamespaceDir.toString());
  }
  if (sourceHFileArchiveDir != null) {
    builder.setSourceHFileArchiveDirPath(sourceHFileArchiveDir.toString());
  }

  return new Pair<>(builder.build(),
    getCellScanner(allCells, size));
}
 
源代码10 项目: hbase   文件: WALProcedurePrettyPrinter.java
@Override
protected int doWork() throws Exception {
  Path path = new Path(file);
  FileSystem fs = path.getFileSystem(conf);
  try (WAL.Reader reader = WALFactory.createReader(fs, path, conf)) {
    for (;;) {
      WAL.Entry entry = reader.next();
      if (entry == null) {
        return 0;
      }
      WALKey key = entry.getKey();
      WALEdit edit = entry.getEdit();
      long sequenceId = key.getSequenceId();
      long writeTime = key.getWriteTime();
      out.println(
        String.format(KEY_TMPL, sequenceId, FORMATTER.format(Instant.ofEpochMilli(writeTime))));
      for (Cell cell : edit.getCells()) {
        Map<String, Object> op = WALPrettyPrinter.toStringMap(cell);
        if (!Bytes.equals(PROC_FAMILY, 0, PROC_FAMILY.length, cell.getFamilyArray(),
          cell.getFamilyOffset(), cell.getFamilyLength())) {
          // We could have cells other than procedure edits, for example, a flush marker
          WALPrettyPrinter.printCell(out, op, false);
          continue;
        }
        long procId = Bytes.toLong(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
        out.println("pid=" + procId + ", type=" + op.get("type") + ", column=" +
          op.get("family") + ":" + op.get("qualifier"));
        if (cell.getType() == Cell.Type.Put) {
          if (cell.getValueLength() > 0) {
            // should be a normal put
            Procedure<?> proc =
              ProcedureUtil.convertToProcedure(ProcedureProtos.Procedure.parser()
                .parseFrom(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
            out.println("\t" + proc.toStringDetails());
          } else {
            // should be a 'delete' put
            out.println("\tmark deleted");
          }
        }
        out.println("cell total size sum: " + cell.heapSize());
      }
      out.println("edit heap size: " + edit.heapSize());
      out.println("position: " + reader.getPosition());
    }
  }
}
 
源代码11 项目: hbase   文件: TestRecoveredEdits.java
/**
 * @param fs
 * @param conf
 * @param edits
 * @param region
 * @return Return how many edits seen.
 * @throws IOException
 */
private int verifyAllEditsMadeItIn(final FileSystem fs, final Configuration conf,
    final Path edits, final HRegion region) throws IOException {
  int count = 0;
  // Read all cells from recover edits
  List<Cell> walCells = new ArrayList<>();
  try (WAL.Reader reader = WALFactory.createReader(fs, edits, conf)) {
    WAL.Entry entry;
    while ((entry = reader.next()) != null) {
      WALKey key = entry.getKey();
      WALEdit val = entry.getEdit();
      count++;
      // Check this edit is for this region.
      if (!Bytes.equals(key.getEncodedRegionName(),
          region.getRegionInfo().getEncodedNameAsBytes())) {
        continue;
      }
      Cell previous = null;
      for (Cell cell : val.getCells()) {
        if (WALEdit.isMetaEditFamily(cell)) {
          continue;
        }
        if (previous != null && CellComparatorImpl.COMPARATOR.compareRows(previous, cell) == 0) {
          continue;
        }
        previous = cell;
        walCells.add(cell);
      }
    }
  }

  // Read all cells from region
  List<Cell> regionCells = new ArrayList<>();
  try (RegionScanner scanner = region.getScanner(new Scan())) {
    List<Cell> tmpCells;
    do {
      tmpCells = new ArrayList<>();
      scanner.nextRaw(tmpCells);
      regionCells.addAll(tmpCells);
    } while (!tmpCells.isEmpty());
  }

  Collections.sort(walCells, CellComparatorImpl.COMPARATOR);
  int found = 0;
  for (int i = 0, j = 0; i < walCells.size() && j < regionCells.size(); ) {
    int compareResult = PrivateCellUtil
        .compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, walCells.get(i),
            regionCells.get(j));
    if (compareResult == 0) {
      i++;
      j++;
      found++;
    } else if (compareResult > 0) {
      j++;
    } else {
      i++;
    }
  }
  assertEquals("Only found " + found + " cells in region, but there are " + walCells.size() +
      " cells in recover edits", found, walCells.size());
  return count;
}