下面列出了org.apache.hadoop.hbase.wal.WALEdit#getCells ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Calculate the total size of all the store files
* @param edit edit to count row keys from
* @return the total size of the store files
*/
private int sizeOfStoreFilesIncludeBulkLoad(WALEdit edit) {
List<Cell> cells = edit.getCells();
int totalStoreFilesSize = 0;
int totalCells = edit.size();
for (int i = 0; i < totalCells; i++) {
if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
try {
BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
List<StoreDescriptor> stores = bld.getStoresList();
int totalStores = stores.size();
for (int j = 0; j < totalStores; j++) {
totalStoreFilesSize =
(int) (totalStoreFilesSize + stores.get(j).getStoreFileSizeBytes());
}
} catch (IOException e) {
LOG.error("Failed to deserialize bulk load entry from wal edit. "
+ "Size of HFiles part of cell will not be considered in replication "
+ "request size calculation.",
e);
}
}
}
return totalStoreFilesSize;
}
private void cleanUpHFileRefs(WALEdit edit) throws IOException {
String peerId = source.getPeerId();
if (peerId.contains("-")) {
// peerClusterZnode will be in the form peerId + "-" + rsZNode.
// A peerId will not have "-" in its name, see HBASE-11394
peerId = peerId.split("-")[0];
}
List<Cell> cells = edit.getCells();
int totalCells = cells.size();
for (int i = 0; i < totalCells; i++) {
Cell cell = cells.get(i);
if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
List<StoreDescriptor> stores = bld.getStoresList();
int totalStores = stores.size();
for (int j = 0; j < totalStores; j++) {
List<String> storeFileList = stores.get(j).getStoreFileList();
source.getSourceManager().cleanUpHFileRefs(peerId, storeFileList);
source.getSourceMetrics().decrSizeOfHFileRefsQueue(storeFileList.size());
}
}
}
}
public static void filterCells(WALEdit edit, Function<Cell, Cell> mapper) {
ArrayList<Cell> cells = edit.getCells();
int size = cells.size();
int newSize = 0;
for (int i = 0; i < size; i++) {
Cell cell = mapper.apply(cells.get(i));
if (cell != null) {
cells.set(newSize, cell);
newSize++;
}
}
for (int i = size - 1; i >= newSize; i--) {
cells.remove(i);
}
if (newSize < size / 2) {
cells.trimToSize();
}
}
@Override
public void map(WALKey key, WALEdit value, Context context) throws IOException {
try {
// skip all other tables
TableName table = key.getTableName();
if (tableSet.contains(table.getNameAsString())) {
for (Cell cell : value.getCells()) {
if (WALEdit.isMetaEditFamily(cell)) {
continue;
}
byte[] outKey = multiTableSupport
? Bytes.add(table.getName(), Bytes.toBytes(tableSeparator), CellUtil.cloneRow(cell))
: CellUtil.cloneRow(cell);
context.write(new ImmutableBytesWritable(outKey), new MapReduceExtendedCell(cell));
}
}
} catch (InterruptedException e) {
LOG.error("Interrupted while emitting Cell", e);
Thread.currentThread().interrupt();
}
}
/**
* Count the number of different row keys in the given edit because of mini-batching. We assume
* that there's at least one Cell in the WALEdit.
* @param edit edit to count row keys from
* @return number of different row keys and HFiles
*/
private Pair<Integer, Integer> countDistinctRowKeysAndHFiles(WALEdit edit) {
List<Cell> cells = edit.getCells();
int distinctRowKeys = 1;
int totalHFileEntries = 0;
Cell lastCell = cells.get(0);
int totalCells = edit.size();
for (int i = 0; i < totalCells; i++) {
// Count HFiles to be replicated
if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
try {
BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
List<StoreDescriptor> stores = bld.getStoresList();
int totalStores = stores.size();
for (int j = 0; j < totalStores; j++) {
totalHFileEntries += stores.get(j).getStoreFileList().size();
}
} catch (IOException e) {
LOG.error("Failed to deserialize bulk load entry from wal edit. "
+ "Then its hfiles count will not be added into metric.");
}
}
if (!CellUtil.matchingRows(cells.get(i), lastCell)) {
distinctRowKeys++;
}
lastCell = cells.get(i);
}
Pair<Integer, Integer> result = new Pair<>(distinctRowKeys, totalHFileEntries);
return result;
}
@Override
public void preWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> env,
RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
// check table name matches or not.
if (!Bytes.equals(info.getTable().toBytes(), this.tableName)) {
return;
}
preWALWriteCalled = true;
// here we're going to remove one keyvalue from the WALEdit, and add
// another one to it.
List<Cell> cells = logEdit.getCells();
Cell deletedCell = null;
for (Cell cell : cells) {
// assume only one kv from the WALEdit matches.
byte[] family = CellUtil.cloneFamily(cell);
byte[] qulifier = CellUtil.cloneQualifier(cell);
if (Arrays.equals(family, ignoredFamily) &&
Arrays.equals(qulifier, ignoredQualifier)) {
LOG.debug("Found the KeyValue from WALEdit which should be ignored.");
deletedCell = cell;
}
if (Arrays.equals(family, changedFamily) &&
Arrays.equals(qulifier, changedQualifier)) {
LOG.debug("Found the KeyValue from WALEdit which should be changed.");
cell.getValueArray()[cell.getValueOffset()] =
(byte) (cell.getValueArray()[cell.getValueOffset()] + 1);
}
}
if (null != row) {
cells.add(new KeyValue(row, addedFamily, addedQualifier));
}
if (deletedCell != null) {
LOG.debug("About to delete a KeyValue from WALEdit.");
cells.remove(deletedCell);
}
}
@Override
public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) {
for (Cell cell : logEdit.getCells()) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
for (Map.Entry entry : kv.toStringMap().entrySet()) {
if (entry.getValue().equals(Bytes.toString(WALEdit.BULK_LOAD))) {
found = true;
}
}
}
}
/**
* Extract the index updates from the WAL Edit
* @param edit to search for index updates
* @return the mutations to apply to the index tables
*/
private Collection<Pair<Mutation, byte[]>> extractIndexUpdate(WALEdit edit) {
// Avoid multiple internal array resizings. Initial size of 64, unless we have fewer cells in the edit
int initialSize = Math.min(edit.size(), 64);
Collection<Pair<Mutation, byte[]>> indexUpdates = new ArrayList<Pair<Mutation, byte[]>>(initialSize);
for (Cell kv : edit.getCells()) {
if (kv instanceof IndexedKeyValue) {
IndexedKeyValue ikv = (IndexedKeyValue) kv;
indexUpdates.add(new Pair<Mutation, byte[]>(ikv.getMutation(), ikv.getIndexTable()));
}
}
return indexUpdates;
}
/**
* Create a new ReplicateWALEntryRequest from a list of WAL entries
* @param entries the WAL entries to be replicated
* @param encodedRegionName alternative region name to use if not null
* @param replicationClusterId Id which will uniquely identify source cluster FS client
* configurations in the replication configuration directory
* @param sourceBaseNamespaceDir Path to source cluster base namespace directory
* @param sourceHFileArchiveDir Path to the source cluster hfile archive directory
* @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values found.
*/
public static Pair<ReplicateWALEntryRequest, CellScanner> buildReplicateWALEntryRequest(
final Entry[] entries, byte[] encodedRegionName, String replicationClusterId,
Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir) {
// Accumulate all the Cells seen in here.
List<List<? extends Cell>> allCells = new ArrayList<>(entries.length);
int size = 0;
WALEntry.Builder entryBuilder = WALEntry.newBuilder();
ReplicateWALEntryRequest.Builder builder = ReplicateWALEntryRequest.newBuilder();
for (Entry entry: entries) {
entryBuilder.clear();
WALProtos.WALKey.Builder keyBuilder;
try {
keyBuilder = entry.getKey().getBuilder(WALCellCodec.getNoneCompressor());
} catch (IOException e) {
throw new AssertionError(
"There should not throw exception since NoneCompressor do not throw any exceptions", e);
}
if(encodedRegionName != null){
keyBuilder.setEncodedRegionName(
UnsafeByteOperations.unsafeWrap(encodedRegionName));
}
entryBuilder.setKey(keyBuilder.build());
WALEdit edit = entry.getEdit();
List<Cell> cells = edit.getCells();
// Add up the size. It is used later serializing out the kvs.
for (Cell cell: cells) {
size += PrivateCellUtil.estimatedSerializedSizeOf(cell);
}
// Collect up the cells
allCells.add(cells);
// Write out how many cells associated with this entry.
entryBuilder.setAssociatedCellCount(cells.size());
builder.addEntry(entryBuilder.build());
}
if (replicationClusterId != null) {
builder.setReplicationClusterId(replicationClusterId);
}
if (sourceBaseNamespaceDir != null) {
builder.setSourceBaseNamespaceDirPath(sourceBaseNamespaceDir.toString());
}
if (sourceHFileArchiveDir != null) {
builder.setSourceHFileArchiveDirPath(sourceHFileArchiveDir.toString());
}
return new Pair<>(builder.build(),
getCellScanner(allCells, size));
}
@Override
protected int doWork() throws Exception {
Path path = new Path(file);
FileSystem fs = path.getFileSystem(conf);
try (WAL.Reader reader = WALFactory.createReader(fs, path, conf)) {
for (;;) {
WAL.Entry entry = reader.next();
if (entry == null) {
return 0;
}
WALKey key = entry.getKey();
WALEdit edit = entry.getEdit();
long sequenceId = key.getSequenceId();
long writeTime = key.getWriteTime();
out.println(
String.format(KEY_TMPL, sequenceId, FORMATTER.format(Instant.ofEpochMilli(writeTime))));
for (Cell cell : edit.getCells()) {
Map<String, Object> op = WALPrettyPrinter.toStringMap(cell);
if (!Bytes.equals(PROC_FAMILY, 0, PROC_FAMILY.length, cell.getFamilyArray(),
cell.getFamilyOffset(), cell.getFamilyLength())) {
// We could have cells other than procedure edits, for example, a flush marker
WALPrettyPrinter.printCell(out, op, false);
continue;
}
long procId = Bytes.toLong(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
out.println("pid=" + procId + ", type=" + op.get("type") + ", column=" +
op.get("family") + ":" + op.get("qualifier"));
if (cell.getType() == Cell.Type.Put) {
if (cell.getValueLength() > 0) {
// should be a normal put
Procedure<?> proc =
ProcedureUtil.convertToProcedure(ProcedureProtos.Procedure.parser()
.parseFrom(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
out.println("\t" + proc.toStringDetails());
} else {
// should be a 'delete' put
out.println("\tmark deleted");
}
}
out.println("cell total size sum: " + cell.heapSize());
}
out.println("edit heap size: " + edit.heapSize());
out.println("position: " + reader.getPosition());
}
}
}
/**
* @param fs
* @param conf
* @param edits
* @param region
* @return Return how many edits seen.
* @throws IOException
*/
private int verifyAllEditsMadeItIn(final FileSystem fs, final Configuration conf,
final Path edits, final HRegion region) throws IOException {
int count = 0;
// Read all cells from recover edits
List<Cell> walCells = new ArrayList<>();
try (WAL.Reader reader = WALFactory.createReader(fs, edits, conf)) {
WAL.Entry entry;
while ((entry = reader.next()) != null) {
WALKey key = entry.getKey();
WALEdit val = entry.getEdit();
count++;
// Check this edit is for this region.
if (!Bytes.equals(key.getEncodedRegionName(),
region.getRegionInfo().getEncodedNameAsBytes())) {
continue;
}
Cell previous = null;
for (Cell cell : val.getCells()) {
if (WALEdit.isMetaEditFamily(cell)) {
continue;
}
if (previous != null && CellComparatorImpl.COMPARATOR.compareRows(previous, cell) == 0) {
continue;
}
previous = cell;
walCells.add(cell);
}
}
}
// Read all cells from region
List<Cell> regionCells = new ArrayList<>();
try (RegionScanner scanner = region.getScanner(new Scan())) {
List<Cell> tmpCells;
do {
tmpCells = new ArrayList<>();
scanner.nextRaw(tmpCells);
regionCells.addAll(tmpCells);
} while (!tmpCells.isEmpty());
}
Collections.sort(walCells, CellComparatorImpl.COMPARATOR);
int found = 0;
for (int i = 0, j = 0; i < walCells.size() && j < regionCells.size(); ) {
int compareResult = PrivateCellUtil
.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, walCells.get(i),
regionCells.get(j));
if (compareResult == 0) {
i++;
j++;
found++;
} else if (compareResult > 0) {
j++;
} else {
i++;
}
}
assertEquals("Only found " + found + " cells in region, but there are " + walCells.size() +
" cells in recover edits", found, walCells.size());
return count;
}