下面列出了怎么用org.apache.hadoop.hbase.wal.WALEdit的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void preDelete(ObserverContext<RegionCoprocessorEnvironment> c, Delete delete, WALEdit edit, Durability durability) throws IOException {
checkAccess();
try {
if(tableEnvMatch){
if(delete.getAttribute(SIConstants.SUPPRESS_INDEXING_ATTRIBUTE_NAME)==null){
TableName tableName=c.getEnvironment().getRegion().getTableDescriptor().getTableName();
String message="Direct deletes are not supported under snapshot isolation. "+
"Instead a Put is expected that will set a record level tombstone. tableName="+tableName;
throw new RuntimeException(message);
}
}
} catch (Throwable t) {
throw CoprocessorUtils.getIOException(t);
}
}
@Override
public void preDelete(ObserverContext<RegionCoprocessorEnvironment> c, Delete delete, WALEdit edit, Durability durability) throws IOException {
if(LOG.isDebugEnabled()) {
LOG.debug("==> RangerAuthorizationCoprocessor.preDelete()");
}
try {
activatePluginClassLoader();
implRegionObserver.preDelete(c, delete, edit, durability);
} finally {
deactivatePluginClassLoader();
}
if(LOG.isDebugEnabled()) {
LOG.debug("<== RangerAuthorizationCoprocessor.preDelete()");
}
}
@Override
public void postDelete(ObserverContext<RegionCoprocessorEnvironment> c, Delete delete, WALEdit edit, Durability durability) throws IOException {
if(LOG.isDebugEnabled()) {
LOG.debug("==> RangerAuthorizationCoprocessor.postDelete()");
}
try {
activatePluginClassLoader();
implRegionObserver.postDelete(c, delete, edit, durability);
} finally {
deactivatePluginClassLoader();
}
if(LOG.isDebugEnabled()) {
LOG.debug("<== RangerAuthorizationCoprocessor.postDelete()");
}
}
@Override
protected boolean matchesSafely(WALEdit item) {
assertTrue(Arrays.equals(CellUtil.cloneQualifier(item.getCells().get(0)), typeBytes));
BulkLoadDescriptor desc;
try {
desc = WALEdit.getBulkLoadDescriptor(item.getCells().get(0));
} catch (IOException e) {
return false;
}
assertNotNull(desc);
if (tableName != null) {
assertTrue(Bytes.equals(ProtobufUtil.toTableName(desc.getTableName()).getName(),
tableName));
}
if(storeFileNames != null) {
int index=0;
StoreDescriptor store = desc.getStores(0);
assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), familyName));
assertTrue(Bytes.equals(Bytes.toBytes(store.getStoreHomeDir()), familyName));
assertEquals(storeFileNames.size(), store.getStoreFileCount());
}
return true;
}
@Override
public Entry filter(Entry entry) {
// don't replicate if the log entries have already been consumed by the cluster
if (replicationEndpoint.canReplicateToSameCluster()
|| !entry.getKey().getClusterIds().contains(peerClusterId)) {
WALEdit edit = entry.getEdit();
WALKeyImpl logKey = (WALKeyImpl)entry.getKey();
if (edit != null && !edit.isEmpty()) {
// Mark that the current cluster has the change
logKey.addClusterId(clusterId);
return entry;
}
}
return null;
}
private void cleanUpHFileRefs(WALEdit edit) throws IOException {
String peerId = source.getPeerId();
if (peerId.contains("-")) {
// peerClusterZnode will be in the form peerId + "-" + rsZNode.
// A peerId will not have "-" in its name, see HBASE-11394
peerId = peerId.split("-")[0];
}
List<Cell> cells = edit.getCells();
int totalCells = cells.size();
for (int i = 0; i < totalCells; i++) {
Cell cell = cells.get(i);
if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
List<StoreDescriptor> stores = bld.getStoresList();
int totalStores = stores.size();
for (int j = 0; j < totalStores; j++) {
List<String> storeFileList = stores.get(j).getStoreFileList();
source.getSourceManager().cleanUpHFileRefs(peerId, storeFileList);
source.getSourceMetrics().decrSizeOfHFileRefsQueue(storeFileList.size());
}
}
}
}
@Test
public void testFilterNotExistColumnFamilyEdits() {
List<List<Entry>> entryList = new ArrayList<>();
// should be filtered
Cell c1 = new KeyValue(ROW, NON_EXISTING_FAMILY, QUALIFIER, System.currentTimeMillis(),
Type.Put, VALUE);
Entry e1 = new Entry(new WALKeyImpl(new byte[32], TABLE1, System.currentTimeMillis()),
new WALEdit().add(c1));
entryList.add(Lists.newArrayList(e1));
// should be kept
Cell c2 = new KeyValue(ROW, FAMILY, QUALIFIER, System.currentTimeMillis(), Type.Put, VALUE);
Entry e2 = new Entry(new WALKeyImpl(new byte[32], TABLE1, System.currentTimeMillis()),
new WALEdit().add(c2));
entryList.add(Lists.newArrayList(e2, e1));
List<List<Entry>> filtered = endpoint.filterNotExistColumnFamilyEdits(entryList);
assertEquals(1, filtered.size());
assertEquals(1, filtered.get(0).get(0).getEdit().getCells().size());
Cell cell = filtered.get(0).get(0).getEdit().getCells().get(0);
assertTrue(CellUtil.matchingFamily(cell, FAMILY));
}
@Test
public void shouldBulkLoadManyFamilyHLog() throws IOException {
when(log.appendMarker(any(),
any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)))).thenAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) {
WALKeyImpl walKey = invocation.getArgument(1);
MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
if (mvcc != null) {
MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
walKey.setWriteEntry(we);
}
return 01L;
}
});
testRegionWithFamilies(family1, family2).bulkLoadHFiles(withFamilyPathsFor(family1, family2),
false, null);
verify(log).sync(anyLong());
}
@Test
public void testFilterNotExistTableEdits() {
List<List<Entry>> entryList = new ArrayList<>();
// should be filtered
Cell c1 = new KeyValue(ROW, FAMILY, QUALIFIER, System.currentTimeMillis(), Type.Put, VALUE);
Entry e1 = new Entry(new WALKeyImpl(new byte[32], TABLE2, System.currentTimeMillis()),
new WALEdit().add(c1));
entryList.add(Lists.newArrayList(e1));
// should be kept
Cell c2 = new KeyValue(ROW, FAMILY, QUALIFIER, System.currentTimeMillis(), Type.Put, VALUE);
Entry e2 = new Entry(new WALKeyImpl(new byte[32], TABLE1, System.currentTimeMillis()),
new WALEdit().add(c2));
entryList.add(Lists.newArrayList(e2));
List<List<Entry>> filtered = endpoint.filterNotExistTableEdits(entryList);
assertEquals(1, filtered.size());
Entry entry = filtered.get(0).get(0);
assertEquals(1, entry.getEdit().getCells().size());
assertEquals(TABLE1, entry.getKey().getTableName());
}
/**
* @param inMemstore If true, then this is a data edit, one that came from client. If false, it
* is a meta edit made by the hbase system itself and is for the WAL only.
*/
FSWALEntry(final long txid, final WALKeyImpl key, final WALEdit edit, final RegionInfo regionInfo,
final boolean inMemstore, ServerCall<?> rpcCall) {
super(key, edit);
this.inMemstore = inMemstore;
this.closeRegion = !inMemstore && edit.isRegionCloseMarker();
this.regionInfo = regionInfo;
this.txid = txid;
if (inMemstore) {
// construct familyNames here to reduce the work of log sinker.
Set<byte[]> families = edit.getFamilies();
this.familyNames = families != null ? families : collectFamilies(edit.getCells());
} else {
this.familyNames = Collections.emptySet();
}
this.rpcCall = rpcCall;
if (rpcCall != null) {
rpcCall.retainByWAL();
}
}
protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key,
WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer)
throws IOException {
if (this.closed) {
throw new IOException(
"Cannot append; log is closed, regionName = " + hri.getRegionNameAsString());
}
MutableLong txidHolder = new MutableLong();
MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> {
txidHolder.setValue(ringBuffer.next());
});
long txid = txidHolder.longValue();
ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall)
.filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null);
try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) {
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
entry.stampRegionSequenceId(we);
ringBuffer.get(txid).load(entry);
} finally {
ringBuffer.publish(txid);
}
return txid;
}
@Override
public void map(WALKey key, WALEdit value, Context context) throws IOException {
try {
// skip all other tables
TableName table = key.getTableName();
if (tableSet.contains(table.getNameAsString())) {
for (Cell cell : value.getCells()) {
if (WALEdit.isMetaEditFamily(cell)) {
continue;
}
byte[] outKey = multiTableSupport
? Bytes.add(table.getName(), Bytes.toBytes(tableSeparator), CellUtil.cloneRow(cell))
: CellUtil.cloneRow(cell);
context.write(new ImmutableBytesWritable(outKey), new MapReduceExtendedCell(cell));
}
}
} catch (InterruptedException e) {
LOG.error("Interrupted while emitting Cell", e);
Thread.currentThread().interrupt();
}
}
@Override
public void postAppend(final long size, final long time, final WALKey logkey,
final WALEdit logEdit) throws IOException {
source.incrementAppendCount();
source.incrementAppendTime(time);
source.incrementAppendSize(size);
source.incrementWrittenBytes(size);
if (time > 1000) {
source.incrementSlowAppendCount();
LOG.warn(String.format("%s took %d ms appending an edit to wal; len~=%s",
Thread.currentThread().getName(),
time,
StringUtils.humanReadableInt(size)));
}
}
@Test
public void shouldBulkLoadManyFamilyHLogEvenWhenTableNameNamespaceSpecified() throws IOException {
when(log.appendMarker(any(),
any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)))).thenAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) {
WALKeyImpl walKey = invocation.getArgument(1);
MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
if (mvcc != null) {
MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
walKey.setWriteEntry(we);
}
return 01L;
}
});
TableName tableName = TableName.valueOf("test", "test");
testRegionWithFamiliesAndSpecifiedTableName(tableName, family1, family2)
.bulkLoadHFiles(withFamilyPathsFor(family1, family2), false, null);
verify(log).sync(anyLong());
}
@Override
public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c, final Put put,
final WALEdit edit, final Durability durability) throws IOException {
if (put.getAttribute(TEST_ATTRIBUTE) == null) {
throw new DoNotRetryIOException("Put should preserve attributes");
}
if (put.getDurability() != Durability.USE_DEFAULT) {
throw new DoNotRetryIOException("Durability is not propagated correctly");
}
}
@Override
public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> e,
final Delete delete, final WALEdit edit, final Durability durability) throws IOException {
Threads.sleep(sleepTime.get());
if (ct.incrementAndGet() == 1) {
throw new IOException("first call I fail");
}
}
@Override
public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
final Put put, final WALEdit edit,
final Durability durability) throws IOException {
Map<byte[], List<Cell>> familyMap = put.getFamilyCellMap();
RegionCoprocessorEnvironment e = c.getEnvironment();
assertNotNull(e);
assertNotNull(e.getRegion());
assertNotNull(familyMap);
if (e.getRegion().getTableDescriptor().getTableName().equals(
TestRegionObserverInterface.TEST_TABLE)) {
List<Cell> cells = familyMap.get(TestRegionObserverInterface.A);
assertNotNull(cells);
assertNotNull(cells.get(0));
Cell cell = cells.get(0);
assertTrue(Bytes.equals(cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierLength(), TestRegionObserverInterface.A, 0,
TestRegionObserverInterface.A.length));
cells = familyMap.get(TestRegionObserverInterface.B);
assertNotNull(cells);
assertNotNull(cells.get(0));
cell = cells.get(0);
assertTrue(Bytes.equals(cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierLength(), TestRegionObserverInterface.B, 0,
TestRegionObserverInterface.B.length));
cells = familyMap.get(TestRegionObserverInterface.C);
assertNotNull(cells);
assertNotNull(cells.get(0));
cell = cells.get(0);
assertTrue(Bytes.equals(cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierLength(), TestRegionObserverInterface.C, 0,
TestRegionObserverInterface.C.length));
}
ctPrePut.incrementAndGet();
}
/**
* Coprocessors shouldn't get notice of empty waledits.
*/
@Test
public void testEmptyWALEditAreNotSeen() throws Exception {
RegionInfo hri = createBasicHRegionInfo(Bytes.toString(TEST_TABLE));
TableDescriptor htd = createBasic3FamilyHTD(Bytes.toString(TEST_TABLE));
MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
for(byte[] fam : htd.getColumnFamilyNames()) {
scopes.put(fam, 0);
}
WAL log = wals.getWAL(null);
try {
SampleRegionWALCoprocessor cp = getCoprocessor(log, SampleRegionWALCoprocessor.class);
cp.setTestValues(TEST_TABLE, null, null, null, null, null, null, null);
assertFalse(cp.isPreWALWriteCalled());
assertFalse(cp.isPostWALWriteCalled());
final long now = EnvironmentEdgeManager.currentTime();
long txid = log.appendData(hri,
new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes),
new WALEdit());
log.sync(txid);
assertFalse("Empty WALEdit should skip coprocessor evaluation.", cp.isPreWALWriteCalled());
assertFalse("Empty WALEdit should skip coprocessor evaluation.", cp.isPostWALWriteCalled());
} finally {
log.close();
}
}
@SuppressWarnings("null")
@Override
public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
final Put put, final WALEdit edit,
final Durability durability) {
String tableName =
c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
if (tableName.equals("observed_table")) {
// Trigger a NPE to fail the coprocessor
Integer i = null;
i = i + 1;
}
}
@Override
public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
final Delete delete, final WALEdit edit,
final Durability durability) throws IOException {
Map<byte[], List<Cell>> familyMap = delete.getFamilyCellMap();
RegionCoprocessorEnvironment e = c.getEnvironment();
assertNotNull(e);
assertNotNull(e.getRegion());
assertNotNull(familyMap);
ctBeforeDelete.set(0);
ctPostDeleted.incrementAndGet();
}
@Override
public void process(long now, HRegion region,
List<Mutation> mutations, WALEdit walEdit) throws IOException {
try {
// Sleep for a long time so it timeout
Thread.sleep(100 * 1000L);
} catch (Exception e) {
throw new IOException(e);
}
}
@Override
public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put,
WALEdit edit, Durability durability) throws IOException {
// check the put against the stored constraints
for (Constraint c : constraints) {
c.check(put);
}
// if we made it here, then the Put is valid
}
@Test
public void testSystemCatalogWALEntryFilter() throws Exception {
//now create WAL.Entry objects that refer to cells in those view rows in System.Catalog
Get tenantGet = getGet(catalogTable, TENANT_BYTES, TENANT_VIEW_NAME);
Get nonTenantGet = getGet(catalogTable, DEFAULT_TENANT_BYTES, NONTENANT_VIEW_NAME);
WAL.Entry nonTenantEntry = getEntry(systemCatalogTableName, nonTenantGet);
WAL.Entry tenantEntry = getEntry(systemCatalogTableName, tenantGet);
//verify that the tenant view WAL.Entry passes the filter and the non-tenant view does not
SystemCatalogWALEntryFilter filter = new SystemCatalogWALEntryFilter();
Assert.assertNull(filter.filter(nonTenantEntry));
WAL.Entry filteredTenantEntry = filter.filter(tenantEntry);
Assert.assertNotNull("Tenant view was filtered when it shouldn't be!", filteredTenantEntry);
Assert.assertEquals(tenantEntry.getEdit().size(),
filter.filter(tenantEntry).getEdit().size());
//now check that a WAL.Entry with cells from both a tenant and a non-tenant
//catalog row only allow the tenant cells through
WALEdit comboEdit = new WALEdit();
comboEdit.getCells().addAll(nonTenantEntry.getEdit().getCells());
comboEdit.getCells().addAll(tenantEntry.getEdit().getCells());
WAL.Entry comboEntry = new WAL.Entry(walKey, comboEdit);
Assert.assertEquals(tenantEntry.getEdit().size() + nonTenantEntry.getEdit().size()
, comboEntry.getEdit().size());
Assert.assertEquals(tenantEntry.getEdit().size(),
filter.filter(comboEntry).getEdit().size());
}
private void updateBatchStats(WALEntryBatch batch, Entry entry, long entrySize) {
WALEdit edit = entry.getEdit();
batch.incrementHeapSize(entrySize);
Pair<Integer, Integer> nbRowsAndHFiles = countDistinctRowKeysAndHFiles(edit);
batch.incrementNbRowKeys(nbRowsAndHFiles.getFirst());
batch.incrementNbHFiles(nbRowsAndHFiles.getSecond());
}
/**
* Count the number of different row keys in the given edit because of mini-batching. We assume
* that there's at least one Cell in the WALEdit.
* @param edit edit to count row keys from
* @return number of different row keys and HFiles
*/
private Pair<Integer, Integer> countDistinctRowKeysAndHFiles(WALEdit edit) {
List<Cell> cells = edit.getCells();
int distinctRowKeys = 1;
int totalHFileEntries = 0;
Cell lastCell = cells.get(0);
int totalCells = edit.size();
for (int i = 0; i < totalCells; i++) {
// Count HFiles to be replicated
if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
try {
BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
List<StoreDescriptor> stores = bld.getStoresList();
int totalStores = stores.size();
for (int j = 0; j < totalStores; j++) {
totalHFileEntries += stores.get(j).getStoreFileList().size();
}
} catch (IOException e) {
LOG.error("Failed to deserialize bulk load entry from wal edit. "
+ "Then its hfiles count will not be added into metric.");
}
}
if (!CellUtil.matchingRows(cells.get(i), lastCell)) {
distinctRowKeys++;
}
lastCell = cells.get(i);
}
Pair<Integer, Integer> result = new Pair<>(distinctRowKeys, totalHFileEntries);
return result;
}
@Override
public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit)
throws IOException {
if (logKey.getTableName().getNameAsString().equalsIgnoreCase("sleep")) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (logKey.getTableName().getNameAsString()
.equalsIgnoreCase("DamagedWALException")) {
throw new DamagedWALException("Failed appending");
}
}
/**
* Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys from
* compaction WAL edits and if the scope is local.
* @param logKey Key that may get scoped according to its edits
* @param logEdit Edits used to lookup the scopes
*/
@VisibleForTesting
static void scopeWALEdits(WALKey logKey, WALEdit logEdit, Configuration conf) {
// For bulk load replication we need meta family to know the file we want to replicate.
if (ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf)) {
return;
}
// For replay, or if all the cells are markers, do not need to store replication scope.
if (logEdit.isReplay() ||
logEdit.getCells().stream().allMatch(c -> WALEdit.isMetaEditFamily(c))) {
((WALKeyImpl) logKey).clearReplicationScope();
}
}
/**
* Extract the index updates from the WAL Edit
* @param edit to search for index updates
* @return the mutations to apply to the index tables
*/
private Collection<Pair<Mutation, byte[]>> extractIndexUpdate(WALEdit edit) {
// Avoid multiple internal array resizings. Initial size of 64, unless we have fewer cells in the edit
int initialSize = Math.min(edit.size(), 64);
Collection<Pair<Mutation, byte[]>> indexUpdates = new ArrayList<Pair<Mutation, byte[]>>(initialSize);
for (Cell kv : edit.getCells()) {
if (kv instanceof IndexedKeyValue) {
IndexedKeyValue ikv = (IndexedKeyValue) kv;
indexUpdates.add(new Pair<Mutation, byte[]>(ikv.getMutation(), ikv.getIndexTable()));
}
}
return indexUpdates;
}
static int replayEdit(HRegion region, WAL.Entry entry) throws IOException {
if (WALEdit.isMetaEditFamily(entry.getEdit().getCells().get(0))) {
return 0; // handled elsewhere
}
Put put = new Put(CellUtil.cloneRow(entry.getEdit().getCells().get(0)));
for (Cell cell : entry.getEdit().getCells()) put.add(cell);
put.setDurability(Durability.SKIP_WAL);
MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0);
region.batchReplay(new MutationReplay[] {mutation},
entry.getKey().getSequenceId());
return Integer.parseInt(Bytes.toString(put.getRow()));
}
@Override
public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit,
Durability durability) throws IOException {
Region region = e.getEnvironment().getRegion();
if (!region.getRegionInfo().isMetaRegion()
&& !region.getRegionInfo().getTable().isSystemTable()) {
// The put carries the TTL attribute
if (put.getTTL() != Long.MAX_VALUE) {
return;
}
throw new IOException("Operation does not have TTL set");
}
}