下面列出了怎么用org.apache.hadoop.hbase.client.Row的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Transactional version of {@link Table#batch(List<? extends Row> rows)}
*
* @param transaction an instance of transaction to be used
* @param rows List of rows that must be instances of Put or Delete
* @param addShadowCell denotes whether to add the shadow cell
* @throws IOException if a remote or network exception occurs
*/
public void batch(Transaction transaction, List<? extends Row> rows, boolean addShadowCells) throws IOException {
List<Mutation> mutations = new ArrayList<>(rows.size());
for (Row row : rows) {
if (row instanceof Put) {
mutations.add(putInternal(transaction, (Put)row, addShadowCells));
} else if (row instanceof Delete) {
Put deleteP = deleteInternal(transaction, (Delete)row);
if (!deleteP.isEmpty()) {
mutations.add(deleteP);
}
} else {
throw new UnsupportedOperationException("Unsupported mutation: " + row);
}
}
addMutations(mutations);
}
private List<? extends Row> transactionalizeActions(List<? extends Row> actions)
throws IOException {
List<Row> transactionalizedActions = new ArrayList<>(actions.size());
for (Row action : actions) {
if (action instanceof Get) {
transactionalizedActions.add(transactionalizeAction((Get) action));
} else if (action instanceof Put) {
transactionalizedActions.add(transactionalizeAction((Put) action));
} else if (action instanceof Delete) {
transactionalizedActions.add(transactionalizeAction((Delete) action));
} else {
transactionalizedActions.add(action);
}
}
return transactionalizedActions;
}
public void logInProfileInHBase(long userId, String ipAddress) throws IOException, Exception {
HTableInterface profileTable = hTablePool.getTable(DataModelConsts.PROFILE_TABLE);
ArrayList<Row> actions = new ArrayList<Row>();
byte[] profileRowKey = generateProfileRowKey(userId);
Delete delete = new Delete(profileRowKey);
delete.deleteColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.CURRENT_LOG_IN_PURCHASES_VALUE_COL);
delete.deleteColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.CURRENT_LOG_IN_SELLS_VALUE_COL);
actions.add(delete);
Increment increment = new Increment(profileRowKey);
increment.addColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.LOG_IN_COUNT_COL, 1);
actions.add(increment);
Put put = new Put(profileRowKey);
put.add(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.LAST_LOG_IN_COL, Bytes.toBytes(System.currentTimeMillis()));
put.add(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.LOG_IN_IP_ADDERSSES, Bytes.toBytes(ipAddress));
actions.add(put);
profileTable.batch(actions);
}
@Override
public void createProfile(long userId, ProfilePojo pojo, String ipAddress) throws Exception {
HTableInterface profileTable = hTablePool.getTable(DataModelConsts.PROFILE_TABLE);
ArrayList<Row> actions = new ArrayList<Row>();
byte[] rowKey = generateProfileRowKey(userId);
Put put = new Put(rowKey);
put.add(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.FIXED_INFO_COL, Bytes.toBytes(pojo.getUsername() + "|" + pojo.getAge() + "|" + System.currentTimeMillis()));
put.add(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.LOG_IN_IP_ADDERSSES, Bytes.toBytes(ipAddress));
put.add(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.LAST_LOG_IN_COL, Bytes.toBytes(System.currentTimeMillis()));
actions.add(put);
Increment increment = new Increment(rowKey);
increment.addColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.LOG_IN_COUNT_COL, 1);
increment.addColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.TOTAL_SELLS_COL, 0);
increment.addColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.TOTAL_PURCHASES_COL, 0);
increment.addColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.TOTAL_VALUE_OF_PAST_PURCHASES_COL, 0);
increment.addColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.TOTAL_VALUE_OF_PAST_SELLS_COL, 0);
increment.addColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.CURRENT_LOG_IN_SELLS_VALUE_COL, 0);
increment.addColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.CURRENT_LOG_IN_PURCHASES_VALUE_COL, 0);
actions.add(increment);
profileTable.batch(actions);
}
@Override
public List<Row> getActions() throws FlumeException {
List<Row> actions = new LinkedList<Row>();
if(plCol != null){
byte[] rowKey;
try {
if (keyType == KeyType.TS) {
rowKey = SimpleRowKeyGenerator.getTimestampKey(rowPrefix);
} else if(keyType == KeyType.RANDOM) {
rowKey = SimpleRowKeyGenerator.getRandomKey(rowPrefix);
} else if(keyType == KeyType.TSNANO) {
rowKey = SimpleRowKeyGenerator.getNanoTimestampKey(rowPrefix);
} else {
rowKey = SimpleRowKeyGenerator.getUUIDKey(rowPrefix);
}
Put put = new Put(rowKey);
put.add(cf, plCol, payload);
actions.add(put);
} catch (Exception e){
throw new FlumeException("Could not get row key!", e);
}
}
return actions;
}
public static void handleHBaseException(
RetriesExhaustedWithDetailsException rex,
Record record,
Map<String, Record> rowKeyToRecord,
ErrorRecordHandler errorRecordHandler
) throws StageException {
for (int i = 0; i < rex.getNumExceptions(); i++) {
if (rex.getCause(i) instanceof NoSuchColumnFamilyException) {
Row r = rex.getRow(i);
Record errorRecord = record != null ? record : rowKeyToRecord.get(Bytes.toString(r.getRow()));
OnRecordErrorException exception = new OnRecordErrorException(errorRecord,
Errors.HBASE_10,
getErrorDescription(rex.getCause(i), r, rex.getHostnamePort(i))
);
errorRecordHandler.onError(exception);
} else {
// If at least 1 non NoSuchColumnFamilyException exception,
// consider as stage exception
throw new StageException(Errors.HBASE_02, rex);
}
}
}
/**
* @param actions
* @deprecated
*/
@Deprecated
@Override
public Object[] batch(List<? extends Row> actions) throws IOException, InterruptedException {
List<Result> results = new ArrayList<Result>();
for (Row r : actions) {
if (r instanceof Delete) {
delete((Delete) r);
continue;
}
if (r instanceof Put) {
put((Put) r);
continue;
}
if (r instanceof Get) {
results.add(get((Get) r));
}
}
return results.toArray();
}
public void updateProfileCountsForSaleInHBase(Long buyerId, Long sellerId, ItemSaleEvent event) throws IOException, InterruptedException {
HTableInterface profileTable = hTablePool.getTable(DataModelConsts.PROFILE_TABLE);
ArrayList<Row> actions = new ArrayList<Row>();
Increment buyerValueIncrement = new Increment(generateProfileRowKey(buyerId));
buyerValueIncrement.addColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.CURRENT_LOG_IN_PURCHASES_VALUE_COL, event.getItemValue());
buyerValueIncrement.addColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.TOTAL_VALUE_OF_PAST_SELLS_COL, event.getItemValue());
actions.add(buyerValueIncrement);
Increment sellerValueIncrement = new Increment(generateProfileRowKey(sellerId));
sellerValueIncrement.addColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.CURRENT_LOG_IN_SELLS_VALUE_COL, event.getItemValue());
sellerValueIncrement.addColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.TOTAL_VALUE_OF_PAST_SELLS_COL, event.getItemValue());
actions.add(sellerValueIncrement);
profileTable.batch(actions);
}
/**
* Do the changes and handle the pool
* @param tableName table to insert into
* @param allRows list of actions
*/
private void batch(TableName tableName, Collection<List<Row>> allRows) throws IOException {
if (allRows.isEmpty()) {
return;
}
AsyncTable<?> table = getConnection().getTable(tableName);
List<Future<?>> futures = allRows.stream().map(table::batchAll).collect(Collectors.toList());
for (Future<?> future : futures) {
try {
FutureUtils.get(future);
} catch (RetriesExhaustedException e) {
if (e.getCause() instanceof TableNotFoundException) {
throw new TableNotFoundException("'" + tableName + "'");
}
throw e;
}
}
}
@Override
public AsyncTable<AdvancedScanResultConsumer> getTable(TableName tableName) {
return new DummyAsyncTable<AdvancedScanResultConsumer>() {
@Override
public <T> CompletableFuture<List<T>> batchAll(List<? extends Row> actions) {
List<T> list = new ArrayList<>(actions.size());
for (Row action : actions) {
// Row is the index of the loop above where we make WALEntry and Cells.
int row = Bytes.toInt(action.getRow());
assertTrue("" + row, row > BOUNDARY);
UNFILTERED.incrementAndGet();
list.add(null);
}
return CompletableFuture.completedFuture(list);
}
};
}
private void batch(String table, Row row) {
List<Row> rows = this.batch.get(table);
if (rows == null) {
rows = new ArrayList<>();
this.batch.put(table, rows);
}
rows.add(row);
}
private int batchSize() {
int size = 0;
for (List<Row> puts : this.batch.values()) {
size += puts.size();
}
return size;
}
private void checkBatchResults(Object[] results, List<Row> rows)
throws Throwable {
assert rows.size() == results.length;
for (int i = 0; i < results.length; i++) {
Object result = results[i];
if (result instanceof Throwable) {
throw (Throwable) result;
}
if (result == null || !((Result) result).isEmpty()) {
throw new BackendException("Failed batch for row: %s",
rows.get(i));
}
}
}
@Override
public void batch(List<? extends Row> actions, Object[] results) throws IOException, InterruptedException {
if (tx == null) {
throw new IOException("Transaction not started");
}
hTable.batch(transactionalizeActions(actions), results);
}
@Override
public Object[] batch(List<? extends Row> actions) throws IOException, InterruptedException {
if (tx == null) {
throw new IOException("Transaction not started");
}
return hTable.batch(transactionalizeActions(actions));
}
@Test
/** Ensure that when no config is specified, the a catch-all regex is used
* with default column name. */
public void testDefaultBehavior() throws Exception {
RegexHbaseEventSerializer s = new RegexHbaseEventSerializer();
Context context = new Context();
s.configure(context);
String logMsg = "The sky is falling!";
Event e = EventBuilder.withBody(Bytes.toBytes(logMsg));
s.initialize(e, "CF".getBytes());
List<Row> actions = s.getActions();
assertTrue(actions.size() == 1);
assertTrue(actions.get(0) instanceof Put);
Put put = (Put) actions.get(0);
assertTrue(put.getFamilyMap().containsKey(s.cf));
List<KeyValue> kvPairs = put.getFamilyMap().get(s.cf);
assertTrue(kvPairs.size() == 1);
Map<String, String> resultMap = Maps.newHashMap();
for (KeyValue kv : kvPairs) {
resultMap.put(new String(kv.getQualifier()), new String(kv.getValue()));
}
assertTrue(resultMap.containsKey(
RegexHbaseEventSerializer.COLUMN_NAME_DEFAULT));
assertEquals("The sky is falling!",
resultMap.get(RegexHbaseEventSerializer.COLUMN_NAME_DEFAULT));
}
@Override
public <R> Object[] batchCallback(List<? extends Row> actions, Batch.Callback<R> callback) throws IOException,
InterruptedException {
if (tx == null) {
throw new IOException("Transaction not started");
}
return hTable.batchCallback(transactionalizeActions(actions), callback);
}
@Override
public List<Row> getActions() throws FlumeException {
List<Row> actions = Lists.newArrayList();
byte[] rowKey;
Matcher m = inputPattern.matcher(new String(payload, charset));
if (!m.matches()) {
return Lists.newArrayList();
}
if (m.groupCount() != colNames.size()) {
return Lists.newArrayList();
}
try {
if(rowKeyIndex < 0){
rowKey = getRowKey();
}else{
rowKey = m.group(rowKeyIndex + 1).getBytes(Charsets.UTF_8);
}
Put put = new Put(rowKey);
for (int i = 0; i < colNames.size(); i++) {
if(i != rowKeyIndex) {
put.add(cf, colNames.get(i), m.group(i + 1).getBytes(Charsets.UTF_8));
}
}
if (depositHeaders) {
for (Map.Entry<String, String> entry : headers.entrySet()) {
put.add(cf, entry.getKey().getBytes(charset), entry.getValue().getBytes(charset));
}
}
actions.add(put);
} catch (Exception e) {
throw new FlumeException("Could not get row key!", e);
}
return actions;
}
@Override
public void batch(List<? extends Row> actions, Object[] results) throws IOException, InterruptedException {
if (tx == null) {
throw new IOException("Transaction not started");
}
hTable.batch(transactionalizeActions(actions), results);
}
@Override
public Object[] batch(List<? extends Row> actions) throws IOException, InterruptedException {
if (tx == null) {
throw new IOException("Transaction not started");
}
return hTable.batch(transactionalizeActions(actions));
}
@Override
public <R> Object[] batchCallback(List<? extends Row> actions, Batch.Callback<R> callback) throws IOException,
InterruptedException {
if (tx == null) {
throw new IOException("Transaction not started");
}
return hTable.batchCallback(transactionalizeActions(actions), callback);
}
private List<? extends Row> transactionalizeActions(List<? extends Row> actions) throws IOException {
List<Row> transactionalizedActions = new ArrayList<>(actions.size());
for (Row action : actions) {
if (action instanceof Get) {
transactionalizedActions.add(transactionalizeAction((Get) action));
} else if (action instanceof Put) {
transactionalizedActions.add(transactionalizeAction((Put) action));
} else if (action instanceof Delete) {
transactionalizedActions.add(transactionalizeAction((Delete) action));
} else {
transactionalizedActions.add(action);
}
}
return transactionalizedActions;
}
@Override
public Object[] batch(List<? extends Row> actions) throws IOException, InterruptedException {
if (tx == null) {
throw new IOException("Transaction not started");
}
return hTable.batch(transactionalizeActions(actions));
}
@Override
public <R> void batchCallback(List<? extends Row> actions, Object[] results, Batch.Callback<R> callback) throws
IOException, InterruptedException {
if (tx == null) {
throw new IOException("Transaction not started");
}
hTable.batchCallback(transactionalizeActions(actions), results, callback);
}
@Override
public <R> Object[] batchCallback(List<? extends Row> actions, Batch.Callback<R> callback) throws IOException,
InterruptedException {
if (tx == null) {
throw new IOException("Transaction not started");
}
return hTable.batchCallback(transactionalizeActions(actions), callback);
}
private List<? extends Row> transactionalizeActions(List<? extends Row> actions) throws IOException {
List<Row> transactionalizedActions = new ArrayList<>(actions.size());
for (Row action : actions) {
if (action instanceof Get) {
transactionalizedActions.add(transactionalizeAction((Get) action));
} else if (action instanceof Put) {
transactionalizedActions.add(transactionalizeAction((Put) action));
} else if (action instanceof Delete) {
transactionalizedActions.add(transactionalizeAction((Delete) action));
} else {
transactionalizedActions.add(action);
}
}
return transactionalizedActions;
}
@Override
public void batch(List<? extends Row> actions, Object[] results)
throws IOException, InterruptedException {
if (tx == null) {
throw new IOException("Transaction not started");
}
hTable.batch(transactionalizeActions(actions), results);
}
@Override
public <R> void batchCallback(List<? extends Row> actions, Object[] results,
Batch.Callback<R> callback) throws IOException, InterruptedException {
if (tx == null) {
throw new IOException("Transaction not started");
}
hTable.batchCallback(transactionalizeActions(actions), results, callback);
}
@Override
public void batch(List<? extends Row> actions, Object[] results) throws IOException, InterruptedException {
if (tx == null) {
throw new IOException("Transaction not started");
}
hTable.batch(transactionalizeActions(actions), results);
}
@Override
public Object[] batch(List<? extends Row> actions) throws IOException, InterruptedException {
if (tx == null) {
throw new IOException("Transaction not started");
}
return hTable.batch(transactionalizeActions(actions));
}