下面列出了怎么用org.apache.hadoop.hbase.client.Durability的API类实例代码及写法,或者点击链接到github查看源代码。
private boolean setDurabilityForTable(String tableName, TableDescriptorBuilder tableDescriptorBuilder, TableDescriptor tableDescriptor) {
String tableDurability = metricsConf.get("timeline.metrics." + tableName + ".durability", "");
if (StringUtils.isEmpty(tableDurability) || tableDescriptor.getDurability().toString().equals(tableDurability)) {
return false;
}
if (StringUtils.isNotEmpty(tableDurability)) {
LOG.info("Setting WAL option " + tableDurability + " for table : " + tableName);
boolean validDurability = true;
if ("SKIP_WAL".equals(tableDurability)) {
tableDescriptorBuilder.setDurability(Durability.SKIP_WAL);
} else if ("SYNC_WAL".equals(tableDurability)) {
tableDescriptorBuilder.setDurability(Durability.SYNC_WAL);
} else if ("ASYNC_WAL".equals(tableDurability)) {
tableDescriptorBuilder.setDurability(Durability.ASYNC_WAL);
} else if ("FSYNC_WAL".equals(tableDurability)) {
tableDescriptorBuilder.setDurability(Durability.FSYNC_WAL);
} else {
LOG.info("Unknown value for durability : " + tableDurability);
validDurability = false;
}
return validDurability;
}
return false;
}
/**
* Test when returnResults set to false in increment it should not return the result instead it
* resturn null.
*/
@Test
public void testIncrementWithReturnResultsSetToFalse() throws Exception {
byte[] row1 = Bytes.toBytes("row1");
byte[] col1 = Bytes.toBytes("col1");
// Setting up region
WALFactory wals = new WALFactory(CONF,
ServerName
.valueOf("testIncrementWithReturnResultsSetToFalse", 16010, System.currentTimeMillis())
.toString());
HRegion region = createHRegion(wals, Durability.USE_DEFAULT);
Increment inc1 = new Increment(row1);
inc1.setReturnResults(false);
inc1.addColumn(FAMILY, col1, 1);
Result res = region.increment(inc1);
assertTrue(res.isEmpty());
}
@Test
public void testPb() throws DeserializationException, IOException {
HTableDescriptor htd = new HTableDescriptor(TableName.META_TABLE_NAME);
final int v = 123;
htd.setMaxFileSize(v);
htd.setDurability(Durability.ASYNC_WAL);
htd.setReadOnly(true);
htd.setRegionReplication(2);
byte [] bytes = htd.toByteArray();
HTableDescriptor deserializedHtd = HTableDescriptor.parseFrom(bytes);
assertEquals(htd, deserializedHtd);
assertEquals(v, deserializedHtd.getMaxFileSize());
assertTrue(deserializedHtd.isReadOnly());
assertEquals(Durability.ASYNC_WAL, deserializedHtd.getDurability());
assertEquals(2, deserializedHtd.getRegionReplication());
}
public HRegion createLocalHRegionWithInMemoryFlags(TableName tableName, byte[] startKey,
byte[] stopKey, boolean isReadOnly, Durability durability, WAL wal, boolean[] compactedMemStore,
byte[]... families) throws IOException {
TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor =
new TableDescriptorBuilder.ModifyableTableDescriptor(tableName);
tableDescriptor.setReadOnly(isReadOnly);
int i = 0;
for (byte[] family : families) {
ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor familyDescriptor =
new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(family);
if (compactedMemStore != null && i < compactedMemStore.length) {
familyDescriptor.setInMemoryCompaction(MemoryCompactionPolicy.BASIC);
} else {
familyDescriptor.setInMemoryCompaction(MemoryCompactionPolicy.NONE);
}
i++;
// Set default to be three versions.
familyDescriptor.setMaxVersions(Integer.MAX_VALUE);
tableDescriptor.setColumnFamily(familyDescriptor);
}
tableDescriptor.setDurability(durability);
RegionInfo info = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName())
.setStartKey(startKey).setEndKey(stopKey).build();
return createLocalHRegion(info, tableDescriptor, wal);
}
@Override
public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put m, WALEdit edit,
Durability durability) throws IOException {
byte[] attribute = m.getAttribute(NON_VISIBILITY);
byte[] cf = null;
List<Cell> updatedCells = new ArrayList<>();
if (attribute != null) {
for (List<? extends Cell> edits : m.getFamilyCellMap().values()) {
for (Cell cell : edits) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
if (cf == null) {
cf = CellUtil.cloneFamily(kv);
}
Tag tag = new ArrayBackedTag((byte) NON_VIS_TAG_TYPE, attribute);
List<Tag> tagList = new ArrayList<>(PrivateCellUtil.getTags(cell).size() + 1);
tagList.add(tag);
tagList.addAll(PrivateCellUtil.getTags(cell));
Cell newcell = PrivateCellUtil.createCell(kv, tagList);
((List<Cell>) updatedCells).add(newcell);
}
}
m.getFamilyCellMap().remove(cf);
// Update the family map
m.getFamilyCellMap().put(cf, updatedCells);
}
}
@Override
public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
String rowkey = Bytes.toString(put.getRow());//得到rowkey
SolrInputDocument doc =new SolrInputDocument();//实例化索引Doc
doc.addField(config.getString("solr_hbase_rowkey_name"),rowkey);//添加主键
for(String cf:config.getString("hbase_column_family").split(",")) {//遍历所有的列簇
List<Cell> cells = put.getFamilyCellMap().get(Bytes.toBytes(cf));
if(cells==null||cells.isEmpty()) continue; // 跳过取值为空或null的数据
for (Cell kv : cells ) {
String name=Bytes.toString(CellUtil.cloneQualifier(kv));//获取列名
String value=Bytes.toString(kv.getValueArray());//获取列值 or CellUtil.cloneValue(kv)
doc.addField(name,value);//添加到索引doc里面
}
}
//发送数据到本地缓存
SolrIndexTools.addDoc(doc);
}
/**
* Creates a pre-split table for load testing. If the table already exists,
* logs a warning and continues.
* @return the number of regions the table was split into
*/
public static int createPreSplitLoadTestTable(Configuration conf,
TableName tableName, byte[][] columnFamilies, Algorithm compression,
DataBlockEncoding dataBlockEncoding, int numRegionsPerServer, int regionReplication,
Durability durability)
throws IOException {
TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor =
new TableDescriptorBuilder.ModifyableTableDescriptor(tableName);
tableDescriptor.setDurability(durability);
tableDescriptor.setRegionReplication(regionReplication);
ColumnFamilyDescriptor[] hcds = new ColumnFamilyDescriptor[columnFamilies.length];
for (int i = 0; i < columnFamilies.length; i++) {
ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor familyDescriptor =
new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(columnFamilies[i]);
familyDescriptor.setDataBlockEncoding(dataBlockEncoding);
familyDescriptor.setCompressionType(compression);
hcds[i] = familyDescriptor;
}
return createPreSplitLoadTestTable(conf, tableDescriptor, hcds, numRegionsPerServer);
}
/**
* Add a Mutation such as a Put or Increment to the batch. The Mutation is only queued for
* later execution.
*
* @param rowKey The row key of the Mutation.
* @param cols The columns affected by the Mutation.
* @param durability The durability of the mutation.
*/
public void addMutation(byte[] rowKey, ColumnList cols, Durability durability) {
if (cols.hasColumns()) {
Put put = createPut(rowKey, cols, durability);
mutations.add(put);
}
if (cols.hasCounters()) {
Increment inc = createIncrement(rowKey, cols, durability);
mutations.add(inc);
}
if (mutations.isEmpty()) {
mutations.add(new Put(rowKey));
}
}
@Override
public void map(ImmutableBytesWritable key, Result result,
Context context)
throws IOException {
List<Long> tsList = new ArrayList<>();
for (Cell kv : result.listCells()) {
tsList.add(kv.getTimestamp());
}
List<Put> puts = new ArrayList<>();
for (Long ts : tsList) {
Put put = new Put(key.get());
put.setDurability(Durability.SKIP_WAL);
put.addColumn(FAMILY_NAME, COLUMN_NAME, ts, Bytes.toBytes(true));
puts.add(put);
}
table.put(puts);
}
@Test
public void testTimeRangeMapRed()
throws IOException, InterruptedException, ClassNotFoundException {
final TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor =
new TableDescriptorBuilder.ModifyableTableDescriptor(TABLE_NAME);
final ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor familyDescriptor =
new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(FAMILY_NAME);
familyDescriptor.setMaxVersions(Integer.MAX_VALUE);
tableDescriptor.setColumnFamily(familyDescriptor);
admin.createTable(tableDescriptor);
List<Put> puts = new ArrayList<>();
for (Map.Entry<Long, Boolean> entry : TIMESTAMP.entrySet()) {
Put put = new Put(KEY);
put.setDurability(Durability.SKIP_WAL);
put.addColumn(FAMILY_NAME, COLUMN_NAME, entry.getKey(), Bytes.toBytes(false));
puts.add(put);
}
Table table = UTIL.getConnection().getTable(tableDescriptor.getTableName());
table.put(puts);
runTestOnTable();
verify(table);
table.close();
}
/**
* Creates a pre-split table for load testing. If the table already exists,
* logs a warning and continues.
* @return the number of regions the table was split into
*/
public static int createPreSplitLoadTestTable(Configuration conf,
TableName tableName, byte[] columnFamily, Algorithm compression,
DataBlockEncoding dataBlockEncoding, int numRegionsPerServer, int regionReplication,
Durability durability)
throws IOException {
TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor =
new TableDescriptorBuilder.ModifyableTableDescriptor(tableName);
tableDescriptor.setDurability(durability);
tableDescriptor.setRegionReplication(regionReplication);
ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor familyDescriptor =
new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(columnFamily);
familyDescriptor.setDataBlockEncoding(dataBlockEncoding);
familyDescriptor.setCompressionType(compression);
return createPreSplitLoadTestTable(conf, tableDescriptor, familyDescriptor,
numRegionsPerServer);
}
@Override
public void execute(Tuple tuple) {
byte[] rowKey = this.mapper.rowKey(tuple);
ColumnList cols = this.mapper.columns(tuple);
List<Mutation> mutations = hBaseClient.constructMutationReq(rowKey, cols, writeToWAL? Durability.SYNC_WAL : Durability.SKIP_WAL);
try {
this.hBaseClient.batchMutate(mutations);
} catch(Exception e){
LOG.warn("Failing tuple. Error writing rowKey " + rowKey, e);
this.collector.fail(tuple);
return;
}
this.collector.ack(tuple);
}
private static Put getPut(List<HbaseColumnCell> hbaseColumnCells, Record record, boolean writeToWAL) {
byte[] cf;
byte[] qualifier;
HbaseColumnCell cell;
Put put = new Put(getRowKey(hbaseColumnCells, record));
if (!writeToWAL) {
put.setDurability(Durability.SKIP_WAL);
}
int size = hbaseColumnCells.size();
for (int i = 0; i < size;) {
cell = hbaseColumnCells.get(i);
if (HbaseUtil.isRowkeyColumn(cell.getColumnName())) {
i++;
continue;
} else {
cf = cell.getCf();
qualifier = cell.getQualifier();
if (cell.isConstant()) {
put.add(cf, qualifier, cell.getColumnValue().getBytes());
} else {
put.add(cf, qualifier, record.getColumn(i).asBytes());
i++;// 只有非常量的情况才需要++操作
}
}
}
return put;
}
@Override
public void preDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit,
Durability durability) throws IOException {
// Translate deletes into our own delete tombstones
// Since HBase deletes cannot be undone, we need to translate deletes into special puts, which allows
// us to rollback the changes (by a real delete) if the transaction fails
// Deletes that are part of a transaction rollback do not need special handling.
// They will never be rolled back, so are performed as normal HBase deletes.
if (isRollbackOperation(delete)) {
return;
}
Transaction tx = getFromOperation(delete);
ensureValidTxLifetime(e.getEnvironment(), delete, tx);
// Other deletes are client-initiated and need to be translated into our own tombstones
// TODO: this should delegate to the DeleteStrategy implementation.
Put deleteMarkers = new Put(delete.getRow(), delete.getTimeStamp());
for (byte[] family : delete.getFamilyCellMap().keySet()) {
List<Cell> familyCells = delete.getFamilyCellMap().get(family);
if (isFamilyDelete(familyCells)) {
deleteMarkers.add(family, TxConstants.FAMILY_DELETE_QUALIFIER, familyCells.get(0).getTimestamp(),
HConstants.EMPTY_BYTE_ARRAY);
} else {
for (Cell cell : familyCells) {
deleteMarkers.add(family, CellUtil.cloneQualifier(cell), cell.getTimestamp(),
HConstants.EMPTY_BYTE_ARRAY);
}
}
}
for (Map.Entry<String, byte[]> entry : delete.getAttributesMap().entrySet()) {
deleteMarkers.setAttribute(entry.getKey(), entry.getValue());
}
e.getEnvironment().getRegion().put(deleteMarkers);
// skip normal delete handling
e.bypass();
}
@Override
public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount,
Durability durability) throws IOException {
if (allowNonTransactional) {
return hTable.incrementColumnValue(row, family, qualifier, amount, durability);
} else {
throw new UnsupportedOperationException("Operation is not supported transactionally");
}
}
@Override
public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
final Put put, final WALEdit edit,
final Durability durability) throws IOException {
if (put.getAttribute(TEST_ATTRIBUTE) == null) {
throw new DoNotRetryIOException("Put should preserve attributes");
}
if (put.getDurability() != Durability.USE_DEFAULT) {
throw new DoNotRetryIOException("Durability is not propagated correctly");
}
}
private void loadTable(Table table, int numRows) throws IOException {
for (int i = 0; i < numRows; ++i) {
byte[] row = Bytes.toBytes(i);
Put put = new Put(row);
put.setDurability(Durability.SKIP_WAL);
put.addColumn(FAMILY_NAME, null, row);
table.put(put);
}
}
/**
* @return A region on which you must call {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)}
* when done.
*/
public static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, WAL wal)
throws IOException {
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, false, Durability.USE_DEFAULT,
wal, COLUMN_FAMILY_BYTES);
}
@Override
public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability)
throws IOException {
if (allowNonTransactional) {
return hTable.incrementColumnValue(row, family, qualifier, amount, durability);
} else {
throw new UnsupportedOperationException("Operation is not supported transactionally");
}
}
@Override
public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
final Delete delete, final WALEdit edit,
final Durability durability) throws IOException {
Map<byte[], List<Cell>> familyMap = delete.getFamilyCellMap();
RegionCoprocessorEnvironment e = c.getEnvironment();
assertNotNull(e);
assertNotNull(e.getRegion());
assertNotNull(familyMap);
if (ctBeforeDelete.get() > 0) {
ctPreDeleted.incrementAndGet();
}
}
/**
* Convert a line of TSV text into an HBase table row after transforming the
* values by multiplying them by 3.
*/
@Override
public void map(LongWritable offset, Text value, Context context)
throws IOException {
byte[] family = Bytes.toBytes("FAM");
final byte[][] qualifiers = { Bytes.toBytes("A"), Bytes.toBytes("B") };
// do some basic line parsing
byte[] lineBytes = value.getBytes();
String[] valueTokens = new String(lineBytes, StandardCharsets.UTF_8).split("\u001b");
// create the rowKey and Put
ImmutableBytesWritable rowKey =
new ImmutableBytesWritable(Bytes.toBytes(valueTokens[0]));
Put put = new Put(rowKey.copyBytes());
put.setDurability(Durability.SKIP_WAL);
//The value should look like this: VALUE1 or VALUE2. Let's multiply
//the integer by 3
for(int i = 1; i < valueTokens.length; i++) {
String prefix = valueTokens[i].substring(0, "VALUE".length());
String suffix = valueTokens[i].substring("VALUE".length());
String newValue = prefix + Integer.parseInt(suffix) * 3;
KeyValue kv = new KeyValue(rowKey.copyBytes(), family,
qualifiers[i-1], Bytes.toBytes(newValue));
put.add(kv);
}
try {
context.write(rowKey, put);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void putDataByReplay(HRegion region,
int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
for (int i = startRow; i < startRow + numRows; i++) {
Put put = new Put(Bytes.toBytes("" + i));
put.setDurability(Durability.SKIP_WAL);
for (byte[] family : families) {
put.addColumn(family, qf, EnvironmentEdgeManager.currentTime(), null);
}
replay(region, put, i+1);
}
}
/**
* @return A region on which you must call {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)}
* when done.
*/
private static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, WAL wal)
throws IOException {
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, false, Durability.SYNC_WAL,
wal, COLUMN_FAMILY_BYTES);
}
@Override
public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
final Put put, final WALEdit edit,
final Durability durability) throws IOException {
if (put.getAttribute(TEST_ATTRIBUTE) == null) {
throw new DoNotRetryIOException("Put should preserve attributes");
}
if (put.getDurability() != Durability.USE_DEFAULT) {
throw new DoNotRetryIOException("Durability is not propagated correctly");
}
}
private void generateRows(int numberOfRows, Table ht, byte[] family, byte[] qf, byte[] value)
throws IOException {
for (int i = 0; i < numberOfRows; i++) {
byte[] row = Bytes.toBytes(i);
Put p = new Put(row);
p.addColumn(family, qf, value);
p.setDurability(Durability.SKIP_WAL);
ht.put(p);
}
TEST_UTIL.flush();
}
@Override
public void postPut(final ObserverContext<RegionCoprocessorEnvironment> c,
final Put put, final WALEdit edit,
final Durability durability)
throws IOException {
id = System.currentTimeMillis();
try {
Thread.sleep(10);
} catch (InterruptedException ex) {
}
}
@Override
public void preDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete,
WALEdit edit, Durability durability) throws IOException {
// Translate deletes into our own delete tombstones
// Since HBase deletes cannot be undone, we need to translate deletes into special puts,
// which allows
// us to rollback the changes (by a real delete) if the transaction fails
// Deletes that are part of a transaction rollback do not need special handling.
// They will never be rolled back, so are performed as normal HBase deletes.
if (isRollbackOperation(delete)) {
return;
}
Transaction tx = getFromOperation(delete);
ensureValidTxLifetime(e.getEnvironment(), delete, tx);
// Other deletes are client-initiated and need to be translated into our own tombstones
// TODO: this should delegate to the DeleteStrategy implementation.
Put deleteMarkers = new Put(delete.getRow(), delete.getTimeStamp());
for (byte[] family : delete.getFamilyCellMap().keySet()) {
List<Cell> familyCells = delete.getFamilyCellMap().get(family);
if (isFamilyDelete(familyCells)) {
deleteMarkers.addColumn(family, TxConstants.FAMILY_DELETE_QUALIFIER,
familyCells.get(0).getTimestamp(), HConstants.EMPTY_BYTE_ARRAY);
} else {
for (Cell cell : familyCells) {
deleteMarkers.addColumn(family, CellUtil.cloneQualifier(cell), cell.getTimestamp(),
HConstants.EMPTY_BYTE_ARRAY);
}
}
}
for (Map.Entry<String, byte[]> entry : delete.getAttributesMap().entrySet()) {
deleteMarkers.setAttribute(entry.getKey(), entry.getValue());
}
e.getEnvironment().getRegion().put(deleteMarkers);
// skip normal delete handling
e.bypass();
}
@Override
public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c, final Put put,
final WALEdit edit, final Durability durability) throws IOException {
if (put.getAttribute(TEST_ATTRIBUTE) == null) {
throw new DoNotRetryIOException("Put should preserve attributes");
}
if (put.getDurability() != Durability.USE_DEFAULT) {
throw new DoNotRetryIOException("Durability is not propagated correctly");
}
}
@Override
public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
final Put put, final WALEdit edit,
final Durability durability) throws IOException {
if (put.getAttribute(TEST_ATTRIBUTE) == null) {
throw new DoNotRetryIOException("Put should preserve attributes");
}
if (put.getDurability() != Durability.USE_DEFAULT) {
throw new DoNotRetryIOException("Durability is not propagated correctly");
}
}
@Override
public void preDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit,
Durability durability) throws IOException {
// Translate deletes into our own delete tombstones
// Since HBase deletes cannot be undone, we need to translate deletes into special puts, which allows
// us to rollback the changes (by a real delete) if the transaction fails
// Deletes that are part of a transaction rollback do not need special handling.
// They will never be rolled back, so are performed as normal HBase deletes.
if (isRollbackOperation(delete)) {
return;
}
Transaction tx = getFromOperation(delete);
ensureValidTxLifetime(e.getEnvironment(), delete, tx);
// Other deletes are client-initiated and need to be translated into our own tombstones
// TODO: this should delegate to the DeleteStrategy implementation.
Put deleteMarkers = new Put(delete.getRow(), delete.getTimeStamp());
for (byte[] family : delete.getFamilyCellMap().keySet()) {
List<Cell> familyCells = delete.getFamilyCellMap().get(family);
if (isFamilyDelete(familyCells)) {
deleteMarkers.add(family, TxConstants.FAMILY_DELETE_QUALIFIER, familyCells.get(0).getTimestamp(),
HConstants.EMPTY_BYTE_ARRAY);
} else {
for (Cell cell : familyCells) {
deleteMarkers.add(family, CellUtil.cloneQualifier(cell), cell.getTimestamp(),
HConstants.EMPTY_BYTE_ARRAY);
}
}
}
for (Map.Entry<String, byte[]> entry : delete.getAttributesMap().entrySet()) {
deleteMarkers.setAttribute(entry.getKey(), entry.getValue());
}
e.getEnvironment().getRegion().put(deleteMarkers);
// skip normal delete handling
e.bypass();
}