下面列出了org.apache.hadoop.hbase.wal.WALEdit#isMetaEditFamily ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void map(WALKey key, WALEdit value, Context context) throws IOException {
try {
// skip all other tables
TableName table = key.getTableName();
if (tableSet.contains(table.getNameAsString())) {
for (Cell cell : value.getCells()) {
if (WALEdit.isMetaEditFamily(cell)) {
continue;
}
byte[] outKey = multiTableSupport
? Bytes.add(table.getName(), Bytes.toBytes(tableSeparator), CellUtil.cloneRow(cell))
: CellUtil.cloneRow(cell);
context.write(new ImmutableBytesWritable(outKey), new MapReduceExtendedCell(cell));
}
}
} catch (InterruptedException e) {
LOG.error("Interrupted while emitting Cell", e);
Thread.currentThread().interrupt();
}
}
@VisibleForTesting
static Set<byte[]> collectFamilies(List<Cell> cells) {
if (CollectionUtils.isEmpty(cells)) {
return Collections.emptySet();
} else {
Set<byte[]> set = new TreeSet<>(Bytes.BYTES_COMPARATOR);
for (Cell cell: cells) {
if (!WALEdit.isMetaEditFamily(cell)) {
set.add(CellUtil.cloneFamily(cell));
}
}
return set;
}
}
private int countWAL(Path log, FileSystem fs, Configuration conf) throws IOException {
int count = 0;
try (WAL.Reader in = WALFactory.createReader(fs, log, conf)) {
WAL.Entry e;
while ((e = in.next()) != null) {
if (!WALEdit.isMetaEditFamily(e.getEdit().getCells().get(0))) {
count++;
}
}
}
return count;
}
static int replayEdit(HRegion region, WAL.Entry entry) throws IOException {
if (WALEdit.isMetaEditFamily(entry.getEdit().getCells().get(0))) {
return 0; // handled elsewhere
}
Put put = new Put(CellUtil.cloneRow(entry.getEdit().getCells().get(0)));
for (Cell cell : entry.getEdit().getCells()) put.add(cell);
put.setDurability(Durability.SKIP_WAL);
MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0);
region.batchReplay(new MutationReplay[] {mutation},
entry.getKey().getSequenceId());
return Integer.parseInt(Bytes.toString(put.getRow()));
}
@Test
public void testWALMonotonicallyIncreasingSeqId() throws Exception {
List<Thread> putThreads = new ArrayList<>();
for (int i = 0; i < 1; i++) {
putThreads.add(new PutThread(region));
}
IncThread incThread = new IncThread(region);
for (int i = 0; i < 1; i++) {
putThreads.get(i).start();
}
incThread.start();
incThread.join();
Path logPath = ((AbstractFSWAL<?>) region.getWAL()).getCurrentFileName();
region.getWAL().rollWriter();
Thread.sleep(10);
Path hbaseDir = new Path(walConf.get(HConstants.HBASE_DIR));
Path oldWalsDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME);
try (WAL.Reader reader = createReader(logPath, oldWalsDir)) {
long currentMaxSeqid = 0;
for (WAL.Entry e; (e = reader.next()) != null;) {
if (!WALEdit.isMetaEditFamily(e.getEdit().getCells().get(0))) {
long currentSeqid = e.getKey().getSequenceId();
if (currentSeqid > currentMaxSeqid) {
currentMaxSeqid = currentSeqid;
} else {
fail("Current max Seqid is " + currentMaxSeqid +
", but the next seqid in wal is smaller:" + currentSeqid);
}
}
}
}
}
/**
* @param fs
* @param conf
* @param edits
* @param region
* @return Return how many edits seen.
* @throws IOException
*/
private int verifyAllEditsMadeItIn(final FileSystem fs, final Configuration conf,
final Path edits, final HRegion region) throws IOException {
int count = 0;
// Read all cells from recover edits
List<Cell> walCells = new ArrayList<>();
try (WAL.Reader reader = WALFactory.createReader(fs, edits, conf)) {
WAL.Entry entry;
while ((entry = reader.next()) != null) {
WALKey key = entry.getKey();
WALEdit val = entry.getEdit();
count++;
// Check this edit is for this region.
if (!Bytes.equals(key.getEncodedRegionName(),
region.getRegionInfo().getEncodedNameAsBytes())) {
continue;
}
Cell previous = null;
for (Cell cell : val.getCells()) {
if (WALEdit.isMetaEditFamily(cell)) {
continue;
}
if (previous != null && CellComparatorImpl.COMPARATOR.compareRows(previous, cell) == 0) {
continue;
}
previous = cell;
walCells.add(cell);
}
}
}
// Read all cells from region
List<Cell> regionCells = new ArrayList<>();
try (RegionScanner scanner = region.getScanner(new Scan())) {
List<Cell> tmpCells;
do {
tmpCells = new ArrayList<>();
scanner.nextRaw(tmpCells);
regionCells.addAll(tmpCells);
} while (!tmpCells.isEmpty());
}
Collections.sort(walCells, CellComparatorImpl.COMPARATOR);
int found = 0;
for (int i = 0, j = 0; i < walCells.size() && j < regionCells.size(); ) {
int compareResult = PrivateCellUtil
.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, walCells.get(i),
regionCells.get(j));
if (compareResult == 0) {
i++;
j++;
found++;
} else if (compareResult > 0) {
j++;
} else {
i++;
}
}
assertEquals("Only found " + found + " cells in region, but there are " + walCells.size() +
" cells in recover edits", found, walCells.size());
return count;
}