下面列出了怎么用org.apache.hadoop.hbase.CellBuilderFactory的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Generates and returns a Put containing the region into for the catalog table
*/
public static Put makePutFromRegionInfo(RegionInfo region, long ts) throws IOException {
Put put = new Put(region.getRegionName(), ts);
//copied from MetaTableAccessor
put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
.setRow(put.getRow())
.setFamily(HConstants.CATALOG_FAMILY)
.setQualifier(HConstants.REGIONINFO_QUALIFIER)
.setTimestamp(put.getTimestamp())
.setType(Cell.Type.Put)
// Serialize the Default Replica HRI otherwise scan of hbase:meta
// shows an info:regioninfo value with encoded name and region
// name that differs from that of the hbase;meta row.
.setValue(RegionInfo.toByteArray(RegionReplicaUtil.getRegionInfoForDefaultReplica(region)))
.build());
return put;
}
/**
* Generates and returns a Put containing the region info for the catalog table and the servers
* @return Put object
*/
private static Put makePutFromRegionInfo(RegionInfo regionInfo, List<ServerName> favoredNodeList)
throws IOException {
Put put = null;
if (favoredNodeList != null) {
long time = EnvironmentEdgeManager.currentTime();
put = MetaTableAccessor.makePutFromRegionInfo(regionInfo, time);
byte[] favoredNodes = getFavoredNodes(favoredNodeList);
put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
.setRow(put.getRow())
.setFamily(HConstants.CATALOG_FAMILY)
.setQualifier(FAVOREDNODES_QUALIFIER)
.setTimestamp(time)
.setType(Type.Put)
.setValue(favoredNodes)
.build());
LOG.debug("Create the region {} with favored nodes {}", regionInfo.getRegionNameAsString(),
favoredNodeList);
}
return put;
}
protected void addSystemLabel(Region region, Map<String, Integer> labels,
Map<String, List<Integer>> userAuths) throws IOException {
if (!labels.containsKey(SYSTEM_LABEL)) {
byte[] row = Bytes.toBytes(SYSTEM_LABEL_ORDINAL);
Put p = new Put(row);
p.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
.setRow(row)
.setFamily(LABELS_TABLE_FAMILY)
.setQualifier(LABEL_QUALIFIER)
.setTimestamp(p.getTimestamp())
.setType(Type.Put)
.setValue(Bytes.toBytes(SYSTEM_LABEL))
.build());
region.put(p);
labels.put(SYSTEM_LABEL, SYSTEM_LABEL_ORDINAL);
}
}
@Test
public void testNumberOfMemStoreScannersAfterFlush() throws IOException {
long seqId = 100;
long timestamp = System.currentTimeMillis();
Cell cell0 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
.setQualifier(qf1).setTimestamp(timestamp).setType(Cell.Type.Put)
.setValue(qf1).build();
PrivateCellUtil.setSequenceId(cell0, seqId);
testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Collections.emptyList());
Cell cell1 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
.setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put)
.setValue(qf1).build();
PrivateCellUtil.setSequenceId(cell1, seqId);
testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1));
seqId = 101;
timestamp = System.currentTimeMillis();
Cell cell2 = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row2).setFamily(family)
.setQualifier(qf2).setTimestamp(timestamp).setType(Cell.Type.Put)
.setValue(qf1).build();
PrivateCellUtil.setSequenceId(cell2, seqId);
testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1, cell2));
}
private Cell checkAndResetTimestamp(Cell sourceCell){
if (ignoreTimestamp) {
sourceCell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
.setType(sourceCell.getType())
.setRow(sourceCell.getRowArray(),
sourceCell.getRowOffset(), sourceCell.getRowLength())
.setFamily(sourceCell.getFamilyArray(),
sourceCell.getFamilyOffset(), sourceCell.getFamilyLength())
.setQualifier(sourceCell.getQualifierArray(),
sourceCell.getQualifierOffset(), sourceCell.getQualifierLength())
.setTimestamp(System.currentTimeMillis())
.setValue(sourceCell.getValueArray(),
sourceCell.getValueOffset(), sourceCell.getValueLength()).build();
}
return sourceCell;
}
@Test
public void testAppendCopyConstructor() throws IOException {
Append origin = new Append(Bytes.toBytes("ROW-01"));
origin.setPriority(100);
byte[] family = Bytes.toBytes("CF-01");
origin.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
.setRow(origin.getRow())
.setFamily(family)
.setQualifier(Bytes.toBytes("q"))
.setType(Type.Put)
.setValue(Bytes.toBytes(100))
.build());
origin.addColumn(family, Bytes.toBytes("q0"), Bytes.toBytes("value"));
origin.setTimeRange(100, 1000);
Append clone = new Append(origin);
assertEquals(origin, clone);
origin.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("value"));
//They should have different cell lists
assertNotEquals(origin.getCellList(family), clone.getCellList(family));
}
@Test
public void testIncrementCopyConstructor() throws IOException {
Increment origin = new Increment(Bytes.toBytes("ROW-01"));
origin.setPriority(100);
byte[] family = Bytes.toBytes("CF-01");
origin.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
.setRow(origin.getRow())
.setFamily(family)
.setQualifier(Bytes.toBytes("q"))
.setType(Cell.Type.Put)
.setValue(Bytes.toBytes(100))
.build());
origin.addColumn(family, Bytes.toBytes("q0"), 4);
origin.setTimeRange(100, 1000);
Increment clone = new Increment(origin);
assertEquals(origin, clone);
origin.addColumn(family, Bytes.toBytes("q1"), 3);
//They should have different cell lists
assertNotEquals(origin.getCellList(family), clone.getCellList(family));
}
@Test
public void testDeleteCopyConstructor() throws IOException {
Delete origin = new Delete(Bytes.toBytes("ROW-01"));
origin.setPriority(100);
byte[] family = Bytes.toBytes("CF-01");
origin.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
.setRow(origin.getRow())
.setFamily(family)
.setQualifier(Bytes.toBytes("q"))
.setType(Type.Delete)
.build());
origin.addColumn(family, Bytes.toBytes("q0"));
origin.addColumns(family, Bytes.toBytes("q1"));
origin.addFamily(family);
origin.addColumns(family, Bytes.toBytes("q2"), 100);
origin.addFamilyVersion(family, 1000);
Delete clone = new Delete(origin);
assertEquals(origin, clone);
origin.addColumn(family, Bytes.toBytes("q3"));
//They should have different cell lists
assertNotEquals(origin.getCellList(family), clone.getCellList(family));
}
@Test
public void testPutCopyConstructor() throws IOException {
Put origin = new Put(Bytes.toBytes("ROW-01"));
origin.setPriority(100);
byte[] family = Bytes.toBytes("CF-01");
origin.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
.setRow(origin.getRow())
.setFamily(family)
.setQualifier(Bytes.toBytes("q"))
.setType(Cell.Type.Put)
.setValue(Bytes.toBytes("value"))
.build());
origin.addColumn(family, Bytes.toBytes("q0"), Bytes.toBytes("V-01"));
origin.addColumn(family, Bytes.toBytes("q1"), 100, Bytes.toBytes("V-01"));
Put clone = new Put(origin);
assertEquals(origin, clone);
origin.addColumn(family, Bytes.toBytes("q2"), Bytes.toBytes("V-02"));
//They should have different cell lists
assertNotEquals(origin.getCellList(family), clone.getCellList(family));
}
@Override
public Boolean call() throws Exception {
try (Table t = connection.getTable(tableName)) {
byte[] value = Bytes.toBytes(Double.toString(ThreadLocalRandom.current().nextDouble()));
byte[] rk = Bytes.toBytes(ThreadLocalRandom.current().nextLong());
Put p = new Put(rk);
p.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
.setRow(rk)
.setFamily(FAMILY)
.setQualifier(QUAL)
.setTimestamp(p.getTimestamp())
.setType(Type.Put)
.setValue(value)
.build());
t.put(p);
}
return true;
}
@Override
public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> c, Increment increment)
throws IOException {
byte[] row = increment.getRow();
Put put = new Put(row);
long ts = getUniqueTimestamp(row);
for (Map.Entry<byte[], List<Cell>> entry : increment.getFamilyCellMap().entrySet()) {
for (Cell cell : entry.getValue()) {
put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row)
.setFamily(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())
.setQualifier(cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierLength())
.setValue(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())
.setType(Cell.Type.Put).setTimestamp(ts).build());
}
}
c.getEnvironment().getRegion().put(put);
c.bypass();
return Result.EMPTY_RESULT;
}
private static Cell adaptFirstCellFromMutation(Mutation m) {
if (m != null && m.getFamilyCellMap() != null &&
m.getFamilyCellMap().firstEntry() != null &&
m.getFamilyCellMap().firstEntry().getValue() != null
&& m.getFamilyCellMap().firstEntry().getValue().get(0) != null) {
//have to replace the column family with WALEdit.METAFAMILY to make sure
//that IndexedKeyValues don't get replicated. The superclass KeyValue fields
//like row, qualifier and value are placeholders to prevent NPEs
// when using the KeyValue APIs. See PHOENIX-5188 / 5455
Cell mutationCell = m.getFamilyCellMap().firstEntry().getValue().get(0);
CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
return builder.setFamily(WALEdit.METAFAMILY).
setQualifier(mutationCell.getQualifierArray()).
setRow(m.getRow()).
setTimestamp(mutationCell.getTimestamp()).
setValue(mutationCell.getValueArray()).setType(Cell.Type.Put).build();
} else {
throw new IllegalArgumentException("Tried to create an IndexedKeyValue with a " +
"Mutation with no Cells!");
}
}
public static Put addLocation(Put p, ServerName sn, long openSeqNum, int replicaId)
throws IOException {
CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
return p.add(builder.clear()
.setRow(p.getRow())
.setFamily(CATALOG_FAMILY)
.setQualifier(getServerColumn(replicaId))
.setTimestamp(p.getTimestamp())
.setType(Cell.Type.Put)
.setValue(Bytes.toBytes(sn.getAddress().toString()))
.build())
.add(builder.clear()
.setRow(p.getRow())
.setFamily(CATALOG_FAMILY)
.setQualifier(getStartCodeColumn(replicaId))
.setTimestamp(p.getTimestamp())
.setType(Cell.Type.Put)
.setValue(Bytes.toBytes(sn.getStartcode()))
.build())
.add(builder.clear()
.setRow(p.getRow())
.setFamily(CATALOG_FAMILY)
.setQualifier(getSeqNumColumn(replicaId))
.setTimestamp(p.getTimestamp())
.setType(Cell.Type.Put)
.setValue(Bytes.toBytes(openSeqNum))
.build());
}
private static void addRegionStateToPut(Put put, RegionState.State state) throws IOException {
put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
.setRow(put.getRow())
.setFamily(HConstants.CATALOG_FAMILY)
.setQualifier(HConstants.STATE_QUALIFIER)
.setTimestamp(put.getTimestamp())
.setType(Cell.Type.Put)
.setValue(Bytes.toBytes(state.name()))
.build());
}
private Cell createCellForRegionInfo(RegionInfo info){
byte[] regionInfoValue = ProtobufUtil.prependPBMagic(ProtobufUtil.toRegionInfo(info)
.toByteArray());
Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
.setRow(info.getRegionName())
.setFamily(Bytes.toBytes("info"))
.setQualifier(Bytes.toBytes("regioninfo"))
.setType(Cell.Type.Put)
.setValue(regionInfoValue)
.build();
return cell;
}
private Cell createCellForTableState(TableName tableName){
Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
.setRow(tableName.getName())
.setFamily(Bytes.toBytes("table"))
.setQualifier(Bytes.toBytes("state"))
.setType(Cell.Type.Put)
.setValue(HBaseProtos.TableState.newBuilder()
.setState(TableState.State.ENABLED.convert()).build().toByteArray())
.build();
return cell;
}
private List<Cell> makeCells(byte [] row, int columns, int versions) {
List<Cell> cells = new ArrayList<Cell>(columns);
for (int j = 0; j < columns; j++) {
for (int k = versions; k > 0; k--) {
Cell cell = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).
setRow(row).setFamily(CF).
setQualifier(Bytes.toBytes(j)).
setType(Cell.Type.Put).
setTimestamp(k).
setValue(row).build();
cells.add(cell);
}
}
return cells;
}
private Cell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId, byte[] value)
throws IOException {
Cell c = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(row).setFamily(family)
.setQualifier(qualifier).setTimestamp(ts).setType(Cell.Type.Put)
.setValue(value).build();
PrivateCellUtil.setSequenceId(c, sequenceId);
return c;
}
@Test
public void test() throws IOException, InterruptedException {
try (Table table = UTIL.getConnection().getTable(NAME)) {
for (int i = 0; i < 100; i++) {
byte[] row = Bytes.toBytes(i);
table.put(new Put(row, true)
.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
.setRow(row)
.setFamily(CF)
.setQualifier(QUALIFIER)
.setTimestamp(HConstants.LATEST_TIMESTAMP)
.setType(Type.Put)
.setValue(Bytes.toBytes(i))
.build()));
}
}
Tracker tracker = new Tracker();
TRACKER = tracker;
region.requestFlush(tracker);
tracker.await();
assertNull(tracker.reason);
assertTrue(tracker.beforeExecutionCalled);
assertTrue(tracker.afterExecutionCalled);
// request flush on a region with empty memstore should still success
tracker = new Tracker();
TRACKER = tracker;
region.requestFlush(tracker);
tracker.await();
assertNull(tracker.reason);
assertTrue(tracker.beforeExecutionCalled);
assertTrue(tracker.afterExecutionCalled);
}
@Test
public void testNotExecuted() throws IOException, InterruptedException {
try (Table table = UTIL.getConnection().getTable(NAME)) {
for (int i = 0; i < 100; i++) {
byte[] row = Bytes.toBytes(i);
table.put(new Put(row, true)
.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
.setRow(row)
.setFamily(CF)
.setQualifier(QUALIFIER)
.setTimestamp(HConstants.LATEST_TIMESTAMP)
.setType(Type.Put)
.setValue(Bytes.toBytes(i))
.build()));
}
}
// here we may have overlap when calling the CP hooks so we do not assert on TRACKER
Tracker tracker1 = new Tracker();
ARRIVE = new CountDownLatch(1);
BLOCK = new CountDownLatch(1);
region.requestFlush(tracker1);
ARRIVE.await();
Tracker tracker2 = new Tracker();
region.requestFlush(tracker2);
tracker2.await();
assertNotNull(tracker2.reason);
assertFalse(tracker2.beforeExecutionCalled);
assertFalse(tracker2.afterExecutionCalled);
BLOCK.countDown();
tracker1.await();
assertNull(tracker1.reason);
assertTrue(tracker1.beforeExecutionCalled);
assertTrue(tracker1.afterExecutionCalled);
}
private String createHFileForFamilies(byte[] row, byte[] value,
Configuration clusterConfig) throws IOException {
CellBuilder cellBuilder = CellBuilderFactory.create(CellBuilderType.DEEP_COPY);
cellBuilder.setRow(row)
.setFamily(TestReplicationBase.famName)
.setQualifier(Bytes.toBytes("1"))
.setValue(value)
.setType(Cell.Type.Put);
HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(clusterConfig);
// TODO We need a way to do this without creating files
File hFileLocation = testFolder.newFile();
FSDataOutputStream out =
new FSDataOutputStream(new FileOutputStream(hFileLocation), null);
try {
hFileFactory.withOutputStream(out);
hFileFactory.withFileContext(new HFileContextBuilder().build());
HFile.Writer writer = hFileFactory.create();
try {
writer.append(new KeyValue(cellBuilder.build()));
} finally {
writer.close();
}
} finally {
out.close();
}
return hFileLocation.getAbsoluteFile().getAbsolutePath();
}
@Override
public Boolean call() throws Exception {
// Table implements Closable so we use the try with resource structure here.
// https://docs.oracle.com/javase/tutorial/essential/exceptions/tryResourceClose.html
try (Table t = connection.getTable(tableName)) {
byte[] value = Bytes.toBytes(Double.toString(ThreadLocalRandom.current().nextDouble()));
int rows = 30;
// Array to put the batch
ArrayList<Put> puts = new ArrayList<>(rows);
for (int i = 0; i < 30; i++) {
byte[] rk = Bytes.toBytes(ThreadLocalRandom.current().nextLong());
Put p = new Put(rk);
p.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
.setRow(rk)
.setFamily(FAMILY)
.setQualifier(QUAL)
.setTimestamp(p.getTimestamp())
.setType(Cell.Type.Put)
.setValue(value)
.build());
puts.add(p);
}
// now that we've assembled the batch it's time to push it to hbase.
t.put(puts);
}
return true;
}
@Override
public void start(
@SuppressWarnings("rawtypes") CoprocessorEnvironment env) throws IOException {
RegionCoprocessorEnvironment renv = (RegionCoprocessorEnvironment) env;
sourceValue = Bytes.toBytes(renv.getConfiguration().get(ORIGINAL_VALUE_KEY));
replacedValue = Bytes.toBytes(renv.getConfiguration().get(REPLACED_VALUE_KEY));
comparator = new Bytes.ByteArrayComparator();
cellBuilder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
}
public static Cell newKeyValue(byte[] key, int keyOffset, int keyLength, byte[] cf,
int cfOffset, int cfLength, byte[] cq, int cqOffset, int cqLength, long ts, byte[] value,
int valueOffset, int valueLength,Type type) {
return CellBuilderFactory.create(CellBuilderType.DEEP_COPY)
.setRow(key, keyOffset, keyLength).setFamily(cf, cfOffset, cfLength)
.setQualifier(cq, cqOffset, cqLength).setTimestamp(ts)
.setValue(value, valueOffset, valueLength).setType(type).build();
}
/**
* Stores a new user permission grant in the access control lists table.
* @param conf the configuration
* @param userPerm the details of the permission to be granted
* @param t acl table instance. It is closed upon method return.
* @throws IOException in the case of an error accessing the metadata table
*/
public static void addUserPermission(Configuration conf, UserPermission userPerm, Table t,
boolean mergeExistingPermissions) throws IOException {
Permission permission = userPerm.getPermission();
Permission.Action[] actions = permission.getActions();
byte[] rowKey = userPermissionRowKey(permission);
Put p = new Put(rowKey);
byte[] key = userPermissionKey(userPerm);
if ((actions == null) || (actions.length == 0)) {
String msg = "No actions associated with user '" + userPerm.getUser() + "'";
LOG.warn(msg);
throw new IOException(msg);
}
Set<Permission.Action> actionSet = new TreeSet<Permission.Action>();
if(mergeExistingPermissions){
List<UserPermission> perms = getUserPermissions(conf, rowKey, null, null, null, false);
UserPermission currentPerm = null;
for (UserPermission perm : perms) {
if (userPerm.equalsExceptActions(perm)) {
currentPerm = perm;
break;
}
}
if (currentPerm != null && currentPerm.getPermission().getActions() != null){
actionSet.addAll(Arrays.asList(currentPerm.getPermission().getActions()));
}
}
// merge current action with new action.
actionSet.addAll(Arrays.asList(actions));
// serialize to byte array.
byte[] value = new byte[actionSet.size()];
int index = 0;
for (Permission.Action action : actionSet) {
value[index++] = action.code();
}
p.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
.setRow(p.getRow())
.setFamily(ACL_LIST_FAMILY)
.setQualifier(key)
.setTimestamp(p.getTimestamp())
.setType(Type.Put)
.setValue(value)
.build());
if (LOG.isDebugEnabled()) {
LOG.debug("Writing permission with rowKey " + Bytes.toString(rowKey) + " "
+ Bytes.toString(key) + ": " + Bytes.toStringBinary(value));
}
try {
t.put(p);
} finally {
t.close();
}
}
private void updateUserRegionLocation(RegionInfo regionInfo, State state,
ServerName regionLocation, long openSeqNum,
long pid) throws IOException {
long time = EnvironmentEdgeManager.currentTime();
final int replicaId = regionInfo.getReplicaId();
final Put put = new Put(CatalogFamilyFormat.getMetaKeyForRegion(regionInfo), time);
MetaTableAccessor.addRegionInfo(put, regionInfo);
final StringBuilder info =
new StringBuilder("pid=").append(pid).append(" updating hbase:meta row=")
.append(regionInfo.getEncodedName()).append(", regionState=").append(state);
if (openSeqNum >= 0) {
Preconditions.checkArgument(state == State.OPEN && regionLocation != null,
"Open region should be on a server");
MetaTableAccessor.addLocation(put, regionLocation, openSeqNum, replicaId);
// only update replication barrier for default replica
if (regionInfo.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID &&
hasGlobalReplicationScope(regionInfo.getTable())) {
MetaTableAccessor.addReplicationBarrier(put, openSeqNum);
info.append(", repBarrier=").append(openSeqNum);
}
info.append(", openSeqNum=").append(openSeqNum);
info.append(", regionLocation=").append(regionLocation);
} else if (regionLocation != null) {
// Ideally, if no regionLocation, write null to the hbase:meta but this will confuse clients
// currently; they want a server to hit. TODO: Make clients wait if no location.
put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
.setRow(put.getRow())
.setFamily(HConstants.CATALOG_FAMILY)
.setQualifier(CatalogFamilyFormat.getServerNameColumn(replicaId))
.setTimestamp(put.getTimestamp())
.setType(Cell.Type.Put)
.setValue(Bytes.toBytes(regionLocation.getServerName()))
.build());
info.append(", regionLocation=").append(regionLocation);
}
put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
.setRow(put.getRow())
.setFamily(HConstants.CATALOG_FAMILY)
.setQualifier(getStateColumn(replicaId))
.setTimestamp(put.getTimestamp())
.setType(Cell.Type.Put)
.setValue(Bytes.toBytes(state.name()))
.build());
LOG.info(info.toString());
updateRegionLocation(regionInfo, state, put);
}
/**
* Test filter. Filter will filter out any write time that is <= 5 (BOUNDARY). We count how many
* items we filter out and we count how many cells make it through for distribution way down below
* in the Table#batch implementation. Puts in place a custom DevNullConnection so we can insert
* our counting Table.
* @throws IOException
*/
@Test
public void testWALEntryFilter() throws IOException {
Configuration conf = HBaseConfiguration.create();
// Make it so our filter is instantiated on construction of ReplicationSink.
conf.setClass(DummyConnectionRegistry.REGISTRY_IMPL_CONF_KEY, DevNullConnectionRegistry.class,
DummyConnectionRegistry.class);
conf.setClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY,
IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl.class, WALEntrySinkFilter.class);
conf.setClass(ClusterConnectionFactory.HBASE_SERVER_CLUSTER_CONNECTION_IMPL,
DevNullAsyncClusterConnection.class, AsyncClusterConnection.class);
ReplicationSink sink = new ReplicationSink(conf, STOPPABLE);
// Create some dumb walentries.
List<AdminProtos.WALEntry> entries = new ArrayList<>();
AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder();
// Need a tablename.
ByteString tableName =
ByteString.copyFromUtf8(TableName.valueOf(this.name.getMethodName()).toString());
// Add WALEdit Cells to Cells List. The way edits arrive at the sink is with protos
// describing the edit with all Cells from all edits aggregated in a single CellScanner.
final List<Cell> cells = new ArrayList<>();
int count = BOUNDARY * 2;
for (int i = 0; i < count; i++) {
byte[] bytes = Bytes.toBytes(i);
// Create a wal entry. Everything is set to the current index as bytes or int/long.
entryBuilder.clear();
entryBuilder.setKey(entryBuilder.getKeyBuilder().setLogSequenceNumber(i)
.setEncodedRegionName(ByteString.copyFrom(bytes)).setWriteTime(i).setTableName(tableName)
.build());
// Lets have one Cell associated with each WALEdit.
entryBuilder.setAssociatedCellCount(1);
entries.add(entryBuilder.build());
// We need to add a Cell per WALEdit to the cells array.
CellBuilder cellBuilder = CellBuilderFactory.create(CellBuilderType.DEEP_COPY);
// Make cells whose row, family, cell, value, and ts are == 'i'.
Cell cell = cellBuilder.setRow(bytes).setFamily(bytes).setQualifier(bytes)
.setType(Cell.Type.Put).setTimestamp(i).setValue(bytes).build();
cells.add(cell);
}
// Now wrap our cells array in a CellScanner that we can pass in to replicateEntries. It has
// all Cells from all the WALEntries made above.
CellScanner cellScanner = new CellScanner() {
// Set to -1 because advance gets called before current.
int index = -1;
@Override
public Cell current() {
return cells.get(index);
}
@Override
public boolean advance() throws IOException {
index++;
return index < cells.size();
}
};
// Call our sink.
sink.replicateEntries(entries, cellScanner, null, null, null);
// Check what made it through and what was filtered.
assertTrue(FILTERED.get() > 0);
assertTrue(UNFILTERED.get() > 0);
assertEquals(count, FILTERED.get() + UNFILTERED.get());
}
private Cell createCell(RegionInfo region) {
return CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(region.getStartKey())
.setType(Type.Put).build();
}
private void testRegionReplicaReplicationIgnores(boolean dropTable, boolean disableReplication)
throws Exception {
// tests having edits from a disabled or dropped table is handled correctly by skipping those
// entries and further edits after the edits from dropped/disabled table can be replicated
// without problems.
final TableName tableName = TableName.valueOf(
name.getMethodName() + "_drop_" + dropTable + "_disabledReplication_" + disableReplication);
HTableDescriptor htd = HTU.createTableDescriptor(tableName);
int regionReplication = 3;
htd.setRegionReplication(regionReplication);
HTU.deleteTableIfAny(tableName);
HTU.getAdmin().createTable(htd);
TableName toBeDisabledTable = TableName.valueOf(
dropTable ? "droppedTable" : (disableReplication ? "disableReplication" : "disabledTable"));
HTU.deleteTableIfAny(toBeDisabledTable);
htd = HTU.createTableDescriptor(TableName.valueOf(toBeDisabledTable.toString()),
HColumnDescriptor.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
HColumnDescriptor.DEFAULT_KEEP_DELETED);
htd.setRegionReplication(regionReplication);
HTU.getAdmin().createTable(htd);
// both tables are created, now pause replication
HTU.getAdmin().disableReplicationPeer(ServerRegionReplicaUtil.getReplicationPeerId());
// now that the replication is disabled, write to the table to be dropped, then drop the table.
Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
Table table = connection.getTable(tableName);
Table tableToBeDisabled = connection.getTable(toBeDisabledTable);
HTU.loadNumericRows(tableToBeDisabled, HBaseTestingUtility.fam1, 6000, 7000);
RegionLocator rl = connection.getRegionLocator(toBeDisabledTable);
HRegionLocation hrl = rl.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY);
byte[] encodedRegionName = hrl.getRegion().getEncodedNameAsBytes();
Cell cell = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(Bytes.toBytes("A"))
.setFamily(HTU.fam1).setValue(Bytes.toBytes("VAL")).setType(Type.Put).build();
Entry entry = new Entry(
new WALKeyImpl(encodedRegionName, toBeDisabledTable, 1),
new WALEdit()
.add(cell));
HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table
if (dropTable) {
HTU.getAdmin().deleteTable(toBeDisabledTable);
} else if (disableReplication) {
htd.setRegionReplication(regionReplication - 2);
HTU.getAdmin().modifyTable(htd);
HTU.getAdmin().enableTable(toBeDisabledTable);
}
HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(0);
MetricsSource metrics = mock(MetricsSource.class);
ReplicationEndpoint.Context ctx =
new ReplicationEndpoint.Context(rs, HTU.getConfiguration(), HTU.getConfiguration(),
HTU.getTestFileSystem(), ServerRegionReplicaUtil.getReplicationPeerId(),
UUID.fromString(rs.getClusterId()), rs.getReplicationSourceService().getReplicationPeers()
.getPeer(ServerRegionReplicaUtil.getReplicationPeerId()),
metrics, rs.getTableDescriptors(), rs);
RegionReplicaReplicationEndpoint rrpe = new RegionReplicaReplicationEndpoint();
rrpe.init(ctx);
rrpe.start();
ReplicationEndpoint.ReplicateContext repCtx = new ReplicationEndpoint.ReplicateContext();
repCtx.setEntries(Lists.newArrayList(entry, entry));
assertTrue(rrpe.replicate(repCtx));
verify(metrics, times(1)).incrLogEditsFiltered(eq(2L));
rrpe.stop();
if (disableReplication) {
// enable replication again so that we can verify replication
HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table
htd.setRegionReplication(regionReplication);
HTU.getAdmin().modifyTable(htd);
HTU.getAdmin().enableTable(toBeDisabledTable);
}
try {
// load some data to the to-be-dropped table
// load the data to the table
HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000);
// now enable the replication
HTU.getAdmin().enableReplicationPeer(ServerRegionReplicaUtil.getReplicationPeerId());
verifyReplication(tableName, regionReplication, 0, 1000);
} finally {
table.close();
rl.close();
tableToBeDisabled.close();
HTU.deleteTableIfAny(toBeDisabledTable);
connection.close();
}
}
@Test
public void testBlockMultiLimits() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
TEST_UTIL.getAdmin().createTable(
TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(FAMILY).setDataBlockEncoding(DataBlockEncoding.FAST_DIFF).build()).build());
Table t = TEST_UTIL.getConnection().getTable(tableName);
final HRegionServer regionServer = TEST_UTIL.getHBaseCluster().getRegionServer(0);
RpcServerInterface rpcServer = regionServer.getRpcServer();
BaseSource s = rpcServer.getMetrics().getMetricsSource();
long startingExceptions = METRICS_ASSERT.getCounter("exceptions", s);
long startingMultiExceptions = METRICS_ASSERT.getCounter("exceptions.multiResponseTooLarge", s);
byte[] row = Bytes.toBytes("TEST");
byte[][] cols = new byte[][]{
Bytes.toBytes("0"), // Get this
Bytes.toBytes("1"), // Buffer
Bytes.toBytes("2"), // Buffer
Bytes.toBytes("3"), // Get This
Bytes.toBytes("4"), // Buffer
Bytes.toBytes("5"), // Buffer
};
// Set the value size so that one result will be less than the MAX_SIE
// however the block being reference will be larger than MAX_SIZE.
// This should cause the regionserver to try and send a result immediately.
byte[] value = new byte[MAX_SIZE - 100];
ThreadLocalRandom.current().nextBytes(value);
for (byte[] col:cols) {
Put p = new Put(row);
p.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
.setRow(row)
.setFamily(FAMILY)
.setQualifier(col)
.setTimestamp(p.getTimestamp())
.setType(Cell.Type.Put)
.setValue(value)
.build());
t.put(p);
}
// Make sure that a flush happens
try (final Admin admin = TEST_UTIL.getAdmin()) {
admin.flush(tableName);
TEST_UTIL.waitFor(60000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return regionServer.getRegions(tableName).get(0).getMaxFlushedSeqId() > 3;
}
});
}
List<Get> gets = new ArrayList<>(2);
Get g0 = new Get(row);
g0.addColumn(FAMILY, cols[0]);
gets.add(g0);
Get g2 = new Get(row);
g2.addColumn(FAMILY, cols[3]);
gets.add(g2);
Result[] results = t.get(gets);
assertEquals(2, results.length);
METRICS_ASSERT.assertCounterGt("exceptions", startingExceptions, s);
METRICS_ASSERT.assertCounterGt("exceptions.multiResponseTooLarge",
startingMultiExceptions, s);
}