下面列出了怎么用org.apache.hadoop.hbase.client.Put的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
this.processArgs(conf, args);
Job job = Job.getInstance(conf, "analyser_logdata");
job.setJarByClass(AnalyserLogDataRunner.class);
job.setMapperClass(AnalyserLogDataMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Put.class);
//设置reducer配置
//1集群上运行 打成jar运行 (要求addDependencyJars为true(默认true)
// TableMapReduceUtil.initTableReducerJob(EventLogConstants.HBASE_NAME_EVENT_LOGS, null, job);
TableMapReduceUtil.initTableReducerJob(EventLogConstants.HBASE_NAME_EVENT_LOGS, null, job,null,
null,null,null,true);
//2本地运行 打成jar运行 (要求addDependencyJars为true(默认true)
// TableMapReduceUtil
// .initTableReducerJob(EventLogConstants.HBASE_NAME_EVENT_LOGS, null, job, null, null, null,
// null, false);
//设置输入路径
job.setNumReduceTasks(0);
this.setJobInputPaths(job);
return job.waitForCompletion(true) ? 0 : -1;
}
@BeforeClass
public static void before() throws Exception {
HTU.startMiniCluster(NB_SERVERS);
final TableName tableName = TableName.valueOf(TestRegionServerNoMaster.class.getSimpleName());
// Create table then get the single region for our new table.
table = HTU.createTable(tableName,HConstants.CATALOG_FAMILY);
Put p = new Put(row);
p.addColumn(HConstants.CATALOG_FAMILY, row, row);
table.put(p);
try (RegionLocator locator = HTU.getConnection().getRegionLocator(tableName)) {
hri = locator.getRegionLocation(row, false).getRegion();
}
regionName = hri.getRegionName();
stopMasterAndAssignMeta(HTU);
}
private static void createHBaseTable1() throws IOException {
// create a table
TableName tableName = TableName.valueOf(TEST_TABLE_1);
createTable(tableName, FAMILIES, SPLIT_KEYS);
// get the HTable instance
HTable table = openTable(tableName);
List<Put> puts = new ArrayList<>();
// add some data
puts.add(putRow(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1"));
puts.add(putRow(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2"));
puts.add(putRow(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3"));
puts.add(putRow(4, 40, null, 400L, 4.04, true, "Welt-4"));
puts.add(putRow(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5"));
puts.add(putRow(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6"));
puts.add(putRow(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7"));
puts.add(putRow(8, 80, null, 800L, 8.08, true, "Welt-8"));
// append rows to table
table.put(puts);
table.close();
}
protected boolean add(KVSerializable item, String tableName) {
long size = 1L;
Put put = new Put(item.getKey());
HTableInterface table = getHTableInterface(tableName);
//put.setWriteToWAL(false);
put.add(CF, V_DATA, item.getValue());
try {
table.put(put);
table.flushCommits();
succCounter.update(size);
hbaseSendTps.update(size);
} catch (Throwable ex) {
logger.error("Error", ex);
failedCounter.update(size);
return false;
} finally {
closeTable(table);
}
return true;
}
/**
* 具体处理数据的方法
*
* @param clientInfo
* @param context
* @param event
* @throws InterruptedException
* @throws IOException
*/
private void handleData(Map<String, String> clientInfo, EventEnum event,
Context context) throws IOException, InterruptedException {
String uuid = clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_UUID);
String memberId = clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_MEMBER_ID);
String serverTime = clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME);
if (StringUtils.isNotBlank(serverTime)) {
// 要求服务器时间不为空
clientInfo.remove(EventLogConstants.LOG_COLUMN_NAME_USER_AGENT); // 浏览器信息去掉
String rowkey = this.generateRowKey(uuid, memberId, event.alias, serverTime); // timestamp
// +
// (uuid+memberid+event).crc
Put put = new Put(Bytes.toBytes(rowkey));
for (Map.Entry<String, String> entry : clientInfo.entrySet()) {
if (StringUtils.isNotBlank(entry.getKey()) && StringUtils.isNotBlank(entry.getValue())) {
put.addColumn(family, Bytes.toBytes(entry.getKey()), Bytes.toBytes(entry.getValue()));
}
}
context.write(NullWritable.get(), put);
this.outputRecords++;
} else {
this.filterRecords++;
}
}
/**
* 具体处理数据的方法
*
* @param clientInfo
* @param context
* @param event
* @throws InterruptedException
* @throws IOException
*/
private void handleData(Map<String, String> clientInfo, EventEnum event, Context context) throws IOException, InterruptedException {
String uuid = clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_UUID);
String memberId = clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_MEMBER_ID);
String serverTime = clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME);
if (StringUtils.isNotBlank(serverTime)) {
// 要求服务器时间不为空
clientInfo.remove(EventLogConstants.LOG_COLUMN_NAME_USER_AGENT); // 浏览器信息去掉
String rowkey = this.generateRowKey(uuid, memberId, event.alias, serverTime); // timestamp
// +
// (uuid+memberid+event).crc
Put put = new Put(Bytes.toBytes(rowkey));
for (Map.Entry<String, String> entry : clientInfo.entrySet()) {
if (StringUtils.isNotBlank(entry.getKey()) && StringUtils.isNotBlank(entry.getValue())) {
put.addColumn(family, Bytes.toBytes(entry.getKey()), Bytes.toBytes(entry.getValue()));
}
}
context.write(NullWritable.get(), put);
this.outputRecords++;
} else {
this.filterRecords++;
}
}
/**
* Create a protocol buffer MultiRequest for row mutations.
* Does not propagate Action absolute position. Does not set atomic action on the created
* RegionAtomic. Caller should do that if wanted.
* @param regionName
* @param rowMutations
* @return a data-laden RegionMutation.Builder
* @throws IOException
*/
public static RegionAction.Builder buildRegionAction(final byte [] regionName,
final RowMutations rowMutations)
throws IOException {
RegionAction.Builder builder =
getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName);
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
for (Mutation mutation: rowMutations.getMutations()) {
MutationType mutateType = null;
if (mutation instanceof Put) {
mutateType = MutationType.PUT;
} else if (mutation instanceof Delete) {
mutateType = MutationType.DELETE;
} else {
throw new DoNotRetryIOException("RowMutations supports only put and delete, not " +
mutation.getClass().getName());
}
mutationBuilder.clear();
MutationProto mp = ProtobufUtil.toMutation(mutateType, mutation, mutationBuilder);
actionBuilder.clear();
actionBuilder.setMutation(mp);
builder.addAction(actionBuilder.build());
}
return builder;
}
@Test
public void testEvents_WithPayload() throws IOException {
Put put = new Put(Bytes.toBytes("rowkey"));
put.addColumn(DATA_COL_FAMILY, Bytes.toBytes("data"), Bytes.toBytes("value"));
put.addColumn(PAYLOAD_COL_FAMILY, PAYLOAD_COL_QUALIFIER, Bytes.toBytes("payload"));
htable.put(put);
waitForEvents(eventListener, 1);
waitForEvents(eventListenerWithPayloads, 1);
SepEvent eventWithoutPayload = eventListener.getEvents().get(0);
SepEvent eventWithPayload = eventListenerWithPayloads.getEvents().get(0);
assertEquals("rowkey", Bytes.toString(eventWithoutPayload.getRow()));
assertEquals("rowkey", Bytes.toString(eventWithPayload.getRow()));
assertEquals(2, eventWithoutPayload.getKeyValues().size());
assertEquals(2, eventWithPayload.getKeyValues().size());
assertNull(eventWithoutPayload.getPayload());
assertEquals("payload", Bytes.toString(eventWithPayload.getPayload()));
}
private boolean switchAttribute(boolean currAttribute, List<Mutation> tableMetaData,
byte[] attrQualifier) {
for (Mutation m : tableMetaData) {
if (m instanceof Put) {
Put p = (Put)m;
List<Cell> cells = p.get(TABLE_FAMILY_BYTES, attrQualifier);
if (cells != null && cells.size() > 0) {
Cell cell = cells.get(0);
boolean newAttribute = (boolean)PBoolean.INSTANCE.toObject(cell.getValueArray(),
cell.getValueOffset(), cell.getValueLength());
return currAttribute != newAttribute;
}
}
}
return false;
}
public void run(Connection connection, List<String> args) throws InvalidArgsException, IOException {
if (args.size() != 5) {
throw new InvalidArgsException(args);
}
// Get the arguments passed by the user.
String tableName = args.get(0);
String rowId = args.get(1);
String columnFamily = args.get(2);
String column = args.get(3);
String value = args.get(4);
Table table = connection.getTable(TableName.valueOf(tableName));
// Create a new Put request.
Put put = new Put(Bytes.toBytes(rowId));
// Here we add only one column value to the row but
// multiple column values can be added to the row at
// once by calling this method multiple times.
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
// Execute the put on the table.
table.put(put);
}
@Override
protected void map(NullWritable key, NullWritable value, Context context) throws IOException,
InterruptedException {
String suffix = "/" + shortTaskId;
int BLOCK_SIZE = (int) (recordsToWrite / 100);
for (long i = 0; i < recordsToWrite;) {
for (long idx = 0; idx < BLOCK_SIZE && i < recordsToWrite; idx++, i++) {
int expIdx = rand.nextInt(BLOCK_SIZE) % VISIBILITY_EXPS_COUNT;
String exp = VISIBILITY_EXPS[expIdx];
byte[] row = Bytes.add(Bytes.toBytes(i), Bytes.toBytes(suffix), Bytes.toBytes(exp));
Put p = new Put(row);
p.addColumn(TEST_FAMILY, TEST_QUALIFIER, HConstants.EMPTY_BYTE_ARRAY);
p.setCellVisibility(new CellVisibility(exp));
getCounter(expIdx).increment(1);
mutator.mutate(p);
if (i % 100 == 0) {
context.setStatus("Written " + i + "/" + recordsToWrite + " records");
context.progress();
}
}
// End of block, flush all of them before we start writing anything
// pointing to these!
mutator.flush();
}
}
int checkAndPut(ByteArray columnFamily, String rowkeyStr, int expectedValue, int putValue, boolean checkPrevious)
throws IOException {
byte[] rowkey = rowkeyStr.getBytes(StandardCharsets.UTF_8);
if (rowkey.length == 0) {
return ID_FOR_EMPTY_STR;
}
byte[] valueByte = Integer.toString(putValue).getBytes(StandardCharsets.UTF_8);
Put put = new Put(rowkey);
put.addColumn(columnFamily.array(), encodeQualifierName, valueByte);
put.addColumn(columnFamily.array(), tsQualifierName, Bytes.toBytes(System.currentTimeMillis()));
boolean hasPut = table.checkAndPut(rowkey, columnFamily.array(), encodeQualifierName,
checkPrevious ? Integer.toString(expectedValue).getBytes(StandardCharsets.UTF_8) : null, put);
if (hasPut) {
if (printValue) {
logger.debug("Encode {} to {}", rowkeyStr, putValue);
}
return putValue;
} else {
return ID_UNKNOWN;
}
}
public static void create(Table table, Creator... creators) {
List<Mutation> batch = new ArrayList<>();
for (Creator creator : creators) {
Iterator<Put> insertions = creator.constructInsertions();
insertions.forEachRemaining(put -> {
byte[] isUniqueBytes = put.getAttribute(IS_UNIQUE);
boolean isUnique = isUniqueBytes == null || Bytes.toBoolean(isUniqueBytes);
if (isUnique) {
create(table, creator, put);
} else {
batch.add(put);
}
});
}
write(table, batch);
}
void writeEvent(SequenceEvent sequenceEvent) throws IOException {
try {
retryer.call(() -> {
try (final Table eventsTable = connection.getTable(EVENTS_TABLE_NAME)) {
final String workflowInstanceKey = sequenceEvent.event().workflowInstance().toKey();
final String keyString = String.format("%s#%08d", workflowInstanceKey, sequenceEvent.counter());
final byte[] key = Bytes.toBytes(keyString);
final Put put = new Put(key, sequenceEvent.timestamp());
final byte[] eventBytes = serialize(sequenceEvent.event()).toByteArray();
put.addColumn(EVENT_CF, EVENT_QUALIFIER, eventBytes);
eventsTable.put(put);
return null;
}
});
} catch (ExecutionException | RetryException e) {
var cause = e.getCause();
if (cause instanceof IOException) {
throw (IOException) cause;
} else {
throw new RuntimeException(cause);
}
}
}
@Override
public void createProfile(long userId, ProfilePojo pojo, String ipAddress) throws Exception {
HTableInterface profileTable = hTablePool.getTable(DataModelConsts.PROFILE_TABLE);
ArrayList<Row> actions = new ArrayList<Row>();
byte[] rowKey = generateProfileRowKey(userId);
Put put = new Put(rowKey);
put.add(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.FIXED_INFO_COL, Bytes.toBytes(pojo.getUsername() + "|" + pojo.getAge() + "|" + System.currentTimeMillis()));
put.add(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.LOG_IN_IP_ADDERSSES, Bytes.toBytes(ipAddress));
put.add(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.LAST_LOG_IN_COL, Bytes.toBytes(System.currentTimeMillis()));
actions.add(put);
Increment increment = new Increment(rowKey);
increment.addColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.LOG_IN_COUNT_COL, 1);
increment.addColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.TOTAL_SELLS_COL, 0);
increment.addColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.TOTAL_PURCHASES_COL, 0);
increment.addColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.TOTAL_VALUE_OF_PAST_PURCHASES_COL, 0);
increment.addColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.TOTAL_VALUE_OF_PAST_SELLS_COL, 0);
increment.addColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.CURRENT_LOG_IN_SELLS_VALUE_COL, 0);
increment.addColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.CURRENT_LOG_IN_PURCHASES_VALUE_COL, 0);
actions.add(increment);
profileTable.batch(actions);
}
private <R extends Serializable & Comparable<R>, T extends HBRecord<R>> T readValueFromRowAndPut(byte[] rowKeyBytes, Put put, Class<T> clazz) {
Map<byte[], List<Cell>> rawMap = put.getFamilyCellMap();
NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
for (Map.Entry<byte[], List<Cell>> familyNameAndColumnValues : rawMap.entrySet()) {
byte[] family = familyNameAndColumnValues.getKey();
if (!map.containsKey(family)) {
map.put(family, new TreeMap<>(Bytes.BYTES_COMPARATOR));
}
List<Cell> cellList = familyNameAndColumnValues.getValue();
for (Cell cell : cellList) {
byte[] column = CellUtil.cloneQualifier(cell);
if (!map.get(family).containsKey(column)) {
map.get(family).put(column, new TreeMap<>());
}
map.get(family).get(column).put(cell.getTimestamp(), CellUtil.cloneValue(cell));
}
}
return convertMapToRecord(rowKeyBytes, map, clazz);
}
@Test
public void testClientWritesWithPriority() throws Exception {
Configuration conf = new Configuration(UTIL.getConfiguration());
// add the keys for our rpc factory
conf.set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
CountingIndexClientRpcFactory.class.getName());
// and set the index table as the current table
conf.setStrings(IndexQosRpcControllerFactory.INDEX_TABLE_NAMES_KEY,
TestTable.getTableNameString());
HTable table = new HTable(conf, TestTable.getTableName());
// do a write to the table
Put p = new Put(row);
p.add(family, qual, new byte[] { 1, 0, 1, 0 });
table.put(p);
table.flushCommits();
// check the counts on the rpc controller
assertEquals("Didn't get the expected number of index priority writes!", 1,
(int) CountingIndexClientRpcController.priorityCounts
.get(QueryServicesOptions.DEFAULT_INDEX_MIN_PRIORITY));
table.close();
}
void batchWriteAndWait(Table table, final FSHLog log, int start, boolean expect, int timeout)
throws IOException {
for (int i = 0; i < 10; i++) {
Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", (start + i))));
put.addColumn(HConstants.CATALOG_FAMILY, null, value);
table.put(put);
}
Put tmpPut = new Put(Bytes.toBytes("tmprow"));
tmpPut.addColumn(HConstants.CATALOG_FAMILY, null, value);
long startTime = System.currentTimeMillis();
long remaining = timeout;
while (remaining > 0) {
if (log.isLowReplicationRollEnabled() == expect) {
break;
} else {
// Trigger calling FSHlog#checkLowReplication()
table.put(tmpPut);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
// continue
}
remaining = timeout - (System.currentTimeMillis() - startTime);
}
}
}
@Test
public void testCheckpointRollback() throws Exception {
// start a transaction, using checkpoints between writes
transactionContext.start();
transactionAwareHTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
transactionContext.checkpoint();
transactionAwareHTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value2));
transactionContext.checkpoint();
transactionAwareHTable.put(new Put(TestBytes.row3).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
transactionContext.abort();
transactionContext.start();
verifyRow(transactionAwareHTable, TestBytes.row, null);
verifyRow(transactionAwareHTable, TestBytes.row2, null);
verifyRow(transactionAwareHTable, TestBytes.row3, null);
Scan scan = new Scan();
ResultScanner scanner = transactionAwareHTable.getScanner(scan);
assertNull(scanner.next());
scanner.close();
transactionContext.finish();
}
private Table createTableWithOneFile(TableDescriptor tableDescriptor) throws IOException {
Table table = TEST_UTIL.createTable(tableDescriptor, null);
try {
// insert data
byte[] value = generateMobValue(10);
byte[] row = Bytes.toBytes("row");
Put put = new Put(row);
put.addColumn(FAMILY, QF, EnvironmentEdgeManager.currentTime(), value);
table.put(put);
// create an hfile
TEST_UTIL.getAdmin().flush(tableDescriptor.getTableName());
} catch (IOException e) {
table.close();
throw e;
}
return table;
}
public <T extends AgentStatDataPoint> List<Put> createPuts(String agentId, AgentStatType agentStatType, List<T> agentStatDataPoints, HbaseSerializer<List<T>, Put> agentStatSerializer) {
if (CollectionUtils.isEmpty(agentStatDataPoints)) {
return Collections.emptyList();
}
Map<Long, List<T>> timeslots = slotAgentStatDataPoints(agentStatDataPoints);
List<Put> puts = new ArrayList<Put>();
for (Map.Entry<Long, List<T>> timeslot : timeslots.entrySet()) {
long baseTimestamp = timeslot.getKey();
List<T> slottedAgentStatDataPoints = timeslot.getValue();
final AgentStatRowKeyComponent rowKeyComponent = new AgentStatRowKeyComponent(agentId, agentStatType, baseTimestamp);
byte[] rowKey = this.rowKeyEncoder.encodeRowKey(rowKeyComponent);
byte[] distributedRowKey = this.rowKeyDistributor.getDistributedKey(rowKey);
Put put = new Put(distributedRowKey);
agentStatSerializer.serialize(slottedAgentStatDataPoints, put, null);
puts.add(put);
}
return puts;
}
@Override
public void writeRecord(String record) throws IOException {
Put put = new Put(Bytes.toBytes(taskNumber + rowNumber));
put.add(Bytes.toBytes("entry"), Bytes.toBytes("entry"),
Bytes.toBytes(rowNumber));
rowNumber++;
table.put(put);
}
public SampleResult runTest(JavaSamplerContext context) {
SampleResult sampleResult = new SampleResult();
sampleResult.sampleStart();
String key = String.valueOf(String.valueOf(new Random().nextInt(keyNumLength)).hashCode());
try {
if (methedType.equals("put")) {
put = new Put(Bytes.toBytes(key));
put.setWriteToWAL(writeToWAL);
for (int j = 0; j < cfs.length; j++) {
for (int n = 0; n < qualifiers.length; n++) {
put.add(Bytes.toBytes(cfs[j]),
Bytes.toBytes(qualifiers[n]),
Bytes.toBytes(values));
table.put(put);
}
}
} else if (methedType.equals("get")) {
get = new Get((key ).getBytes());
table.get(get);
// Result rs = table.get(get);
}
sampleResult.setSuccessful(true);
} catch (Throwable e) {
sampleResult.setSuccessful(false);
} finally {
sampleResult.sampleEnd();
}
// // 返回是否处理成功
return sampleResult;
}
private void createTable() throws Exception {
tableName = name.getTableName();
// Create a table and write a record as the service user (hbase)
UserGroupInformation serviceUgi = UserGroupInformation
.loginUserFromKeytabAndReturnUGI("hbase/localhost", KEYTAB_FILE.getAbsolutePath());
clusterId = serviceUgi.doAs(new PrivilegedExceptionAction<String>() {
@Override
public String run() throws Exception {
try (Connection conn = ConnectionFactory.createConnection(CONF);
Admin admin = conn.getAdmin();) {
admin.createTable(TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of("f1")).build());
UTIL.waitTableAvailable(tableName);
try (Table t = conn.getTable(tableName)) {
Put p = new Put(Bytes.toBytes("r1"));
p.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("q1"), Bytes.toBytes("1"));
t.put(p);
}
return admin.getClusterMetrics().getClusterId();
}
}
});
assertNotNull(clusterId);
}
private void testDeleteRollback(TxConstants.ConflictDetection conflictDetection) throws Exception {
String tableName = String.format("%s%s", "TestColFamilyDelete", conflictDetection);
HTable hTable = createTable(Bytes.toBytes(tableName), new byte[][]{TestBytes.family});
try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, conflictDetection)) {
TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
txContext.start();
txTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
txContext.finish();
// Start a tx, delete the row and then abort the tx
txContext.start();
txTable.delete(new Delete(TestBytes.row));
txContext.abort();
// Start a tx, delete a column family and then abort the tx
txContext.start();
txTable.delete(new Delete(TestBytes.row).deleteFamily(TestBytes.family));
txContext.abort();
// Above operations should have no effect on the row, since they were aborted
txContext.start();
Get get = new Get(TestBytes.row);
Result result = txTable.get(get);
assertFalse(result.isEmpty());
assertArrayEquals(TestBytes.value, result.getValue(TestBytes.family, TestBytes.qualifier));
txContext.finish();
}
}
private <R extends Serializable & Comparable<R>> void testPutWithRow(HBRecord<R> p) {
long start, end;
Put put = hbMapper.writeValueAsPut(p);
ImmutableBytesWritable rowKey = hbMapper.getRowKey(p);
start = System.currentTimeMillis();
HBRecord pFromPut = null;
for (int i = 0; i < NUM_ITERATIONS; i++) {
pFromPut = hbMapper.readValue(rowKey, put, p.getClass());
}
end = System.currentTimeMillis();
assertEquals(p, pFromPut, "Data mismatch after deserialization from Put");
System.out.printf("Time taken for Rowkey+Put -> POJO = %.2fms%n%n", timeTaken(start, end));
}
@Override
public void sinkConfInit(FlowProcess<JobConf> process,
Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
conf.setOutputFormat(HBaseOutputFormat.class);
conf.setOutputKeyClass(ImmutableBytesWritable.class);
conf.setOutputValueClass(Put.class);
String tableName = conf.get(HBaseOutputFormat.OUTPUT_TABLE);
useSalt = conf.getBoolean(String.format(HBaseConstants.USE_SALT, tableName), false);
}
@Test
// HBase-3758
public void testHBase3758() throws IOException {
final TableName tableName = TableName.valueOf(name.getMethodName());
util.createTable(tableName, new byte[][] { A, B, C });
verifyMethodResult(SimpleRegionObserver.class,
new String[] { "hadDeleted", "wasScannerOpenCalled" }, tableName,
new Boolean[] { false, false });
Table table = util.getConnection().getTable(tableName);
Put put = new Put(ROW);
put.addColumn(A, A, A);
table.put(put);
Delete delete = new Delete(ROW);
table.delete(delete);
verifyMethodResult(SimpleRegionObserver.class,
new String[] { "hadDeleted", "wasScannerOpenCalled" }, tableName,
new Boolean[] { true, false });
Scan s = new Scan();
ResultScanner scanner = table.getScanner(s);
try {
for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
}
} finally {
scanner.close();
}
// now scanner hooks should be invoked.
verifyMethodResult(SimpleRegionObserver.class, new String[] { "wasScannerOpenCalled" },
tableName, new Boolean[] { true });
util.deleteTable(tableName);
table.close();
}
/**
* Do hfile bulk exporting
*
* @param builder
* @throws Exception
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public static void doExporting(CommandLine line) throws Exception {
// Configuration.
String tabname = line.getOptionValue("tabname");
String user = line.getOptionValue("user");
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", line.getOptionValue("zkaddr"));
conf.set("hbase.fs.tmp.dir", line.getOptionValue("T", DEFAULT_HBASE_MR_TMPDIR));
conf.set(TableInputFormat.INPUT_TABLE, tabname);
conf.set(TableInputFormat.SCAN_BATCHSIZE, line.getOptionValue("batchSize", DEFAULT_SCAN_BATCH_SIZE));
// Check directory.
String outputDir = line.getOptionValue("output", DEFAULT_HFILE_OUTPUT_DIR) + "/" + tabname;
FileSystem fs = FileSystem.get(new URI(outputDir), new Configuration(), user);
state(!fs.exists(new Path(outputDir)), format("HDFS temporary directory already has data, path: '%s'", outputDir));
// Set scan condition.(if necessary)
setScanIfNecessary(conf, line);
// Job.
Connection conn = ConnectionFactory.createConnection(conf);
TableName tab = TableName.valueOf(tabname);
Job job = Job.getInstance(conf);
job.setJobName(HfileBulkExporter.class.getSimpleName() + "@" + tab.getNameAsString());
job.setJarByClass(HfileBulkExporter.class);
job.setMapperClass((Class<Mapper>) ClassUtils.getClass(line.getOptionValue("mapperClass", DEFAULT_MAPPER_CLASS)));
job.setInputFormatClass(TableInputFormat.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
HFileOutputFormat2.configureIncrementalLoad(job, conn.getTable(tab), conn.getRegionLocator(tab));
FileOutputFormat.setOutputPath(job, new Path(outputDir));
if (job.waitForCompletion(true)) {
long total = job.getCounters().findCounter(DEFUALT_COUNTER_GROUP, DEFUALT_COUNTER_TOTAL).getValue();
long processed = job.getCounters().findCounter(DEFUALT_COUNTER_GROUP, DEFUALT_COUNTER_PROCESSED).getValue();
log.info(String.format("Exported to successfully! with processed:(%d)/total:(%d)", processed, total));
}
}
private void testDeleteRollback(TxConstants.ConflictDetection conflictDetection) throws Exception {
String tableName = String.format("%s%s", "TestColFamilyDelete", conflictDetection);
Table hTable = createTable(Bytes.toBytes(tableName), new byte[][]{TestBytes.family});
try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, conflictDetection)) {
TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
txContext.start();
txTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
txContext.finish();
// Start a tx, delete the row and then abort the tx
txContext.start();
txTable.delete(new Delete(TestBytes.row));
txContext.abort();
// Start a tx, delete a column family and then abort the tx
txContext.start();
txTable.delete(new Delete(TestBytes.row).deleteFamily(TestBytes.family));
txContext.abort();
// Above operations should have no effect on the row, since they were aborted
txContext.start();
Get get = new Get(TestBytes.row);
Result result = txTable.get(get);
assertFalse(result.isEmpty());
assertArrayEquals(TestBytes.value, result.getValue(TestBytes.family, TestBytes.qualifier));
txContext.finish();
}
}