下面列出了org.apache.hadoop.hbase.client.coprocessor.Batch.Callback#org.apache.hadoop.hbase.client.coprocessor.Batch 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public void refreshHFiles(final Table table) throws Throwable {
final RefreshHFilesProtos.RefreshHFilesRequest request =
RefreshHFilesProtos.RefreshHFilesRequest.getDefaultInstance();
table.coprocessorService(RefreshHFilesProtos.RefreshHFilesService.class,
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
new Batch.Call<RefreshHFilesProtos.RefreshHFilesService,
RefreshHFilesProtos.RefreshHFilesResponse>() {
@Override
public RefreshHFilesProtos.RefreshHFilesResponse call(
RefreshHFilesProtos.RefreshHFilesService refreshHFilesService)
throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<RefreshHFilesProtos.RefreshHFilesResponse> rpcCallback =
new BlockingRpcCallback<>();
refreshHFilesService.refreshHFiles(controller, request, rpcCallback);
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
return rpcCallback.get();
}
});
LOG.debug("Done refreshing HFiles");
}
@Override
public MetaDataMutationResult updateIndexState(final List<Mutation> tableMetaData, String parentTableName) throws SQLException {
byte[][] rowKeyMetadata = new byte[3][];
SchemaUtil.getVarChars(tableMetaData.get(0).getRow(), rowKeyMetadata);
byte[] tableKey = SchemaUtil.getTableKey(ByteUtil.EMPTY_BYTE_ARRAY, rowKeyMetadata[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX], rowKeyMetadata[PhoenixDatabaseMetaData.TABLE_NAME_INDEX]);
return metaDataCoprocessorExec(tableKey,
new Batch.Call<MetaDataService, MetaDataResponse>() {
@Override
public MetaDataResponse call(MetaDataService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<MetaDataResponse> rpcCallback =
new BlockingRpcCallback<MetaDataResponse>();
UpdateIndexStateRequest.Builder builder = UpdateIndexStateRequest.newBuilder();
for (Mutation m : tableMetaData) {
MutationProto mp = ProtobufUtil.toProto(m);
builder.addTableMetadataMutations(mp.toByteString());
}
instance.updateIndexState(controller, builder.build(), rpcCallback);
if(controller.getFailedOn() != null) {
throw controller.getFailedOn();
}
return rpcCallback.get();
}
});
}
public static void clearMetaDataCache(Connection conn) throws Throwable {
PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
HTableInterface htable = pconn.getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
htable.coprocessorService(MetaDataService.class, HConstants.EMPTY_START_ROW,
HConstants.EMPTY_END_ROW, new Batch.Call<MetaDataService, ClearCacheResponse>() {
@Override
public ClearCacheResponse call(MetaDataService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<ClearCacheResponse> rpcCallback =
new BlockingRpcCallback<ClearCacheResponse>();
ClearCacheRequest.Builder builder = ClearCacheRequest.newBuilder();
instance.clearCache(controller, builder.build(), rpcCallback);
if(controller.getFailedOn() != null) {
throw controller.getFailedOn();
}
return rpcCallback.get();
}
});
}
private static Map<byte [], Long> sum(final Table table, final byte [] family,
final byte [] qualifier, final byte [] start, final byte [] end)
throws ServiceException, Throwable {
return table.coprocessorService(ColumnAggregationProtos.ColumnAggregationService.class,
start, end,
new Batch.Call<ColumnAggregationProtos.ColumnAggregationService, Long>() {
@Override
public Long call(ColumnAggregationProtos.ColumnAggregationService instance)
throws IOException {
CoprocessorRpcUtils.BlockingRpcCallback<ColumnAggregationProtos.SumResponse> rpcCallback =
new CoprocessorRpcUtils.BlockingRpcCallback<>();
ColumnAggregationProtos.SumRequest.Builder builder =
ColumnAggregationProtos.SumRequest.newBuilder();
builder.setFamily(ByteString.copyFrom(family));
if (qualifier != null && qualifier.length > 0) {
builder.setQualifier(ByteString.copyFrom(qualifier));
}
instance.sum(null, builder.build(), rpcCallback);
return rpcCallback.get().getSum();
}
});
}
private Map<byte [], String> hello(final Table table, final String send, final byte [] start,
final byte [] end) throws ServiceException, Throwable {
return table.coprocessorService(PingService.class,
start, end,
new Batch.Call<PingService, String>() {
@Override
public String call(PingService instance) throws IOException {
CoprocessorRpcUtils.BlockingRpcCallback<HelloResponse> rpcCallback =
new CoprocessorRpcUtils.BlockingRpcCallback<>();
HelloRequest.Builder builder = HelloRequest.newBuilder();
if (send != null) {
builder.setName(send);
}
instance.hello(null, builder.build(), rpcCallback);
HelloResponse r = rpcCallback.get();
return r != null && r.hasResponse()? r.getResponse(): null;
}
});
}
private Map<byte [], String> noop(final Table table, final byte [] start, final byte [] end)
throws ServiceException, Throwable {
return table.coprocessorService(PingService.class, start, end,
new Batch.Call<PingService, String>() {
@Override
public String call(PingService instance) throws IOException {
CoprocessorRpcUtils.BlockingRpcCallback<NoopResponse> rpcCallback =
new CoprocessorRpcUtils.BlockingRpcCallback<>();
NoopRequest.Builder builder = NoopRequest.newBuilder();
instance.noop(null, builder.build(), rpcCallback);
rpcCallback.get();
// Looks like null is expected when void. That is what the test below is looking for
return null;
}
});
}
@Override
public <R> void batchCallback(List<? extends Row> actions, Object[] results, Batch.Callback<R> callback) throws
IOException, InterruptedException {
if (tx == null) {
throw new IOException("Transaction not started");
}
hTable.batchCallback(transactionalizeActions(actions), results, callback);
}
@Override
public <R> void batchCallback(List<? extends Row> actions, Object[] results, Batch.Callback<R> callback) throws
IOException, InterruptedException {
if (tx == null) {
throw new IOException("Transaction not started");
}
hTable.batchCallback(transactionalizeActions(actions), results, callback);
}
@Override
public <R> Object[] batchCallback(List<? extends Row> actions, Batch.Callback<R> callback) throws IOException,
InterruptedException {
if (tx == null) {
throw new IOException("Transaction not started");
}
return hTable.batchCallback(transactionalizeActions(actions), callback);
}
@Override
public <R> void batchCallback(List<? extends Row> actions, Object[] results, Batch.Callback<R> callback) throws
IOException, InterruptedException {
if (tx == null) {
throw new IOException("Transaction not started");
}
hTable.batchCallback(transactionalizeActions(actions), results, callback);
}
@Override
public <R> Object[] batchCallback(List<? extends Row> actions, Batch.Callback<R> callback) throws IOException,
InterruptedException {
if (tx == null) {
throw new IOException("Transaction not started");
}
return hTable.batchCallback(transactionalizeActions(actions), callback);
}
@Override
public <R> void batchCallback(List<? extends Row> actions, Object[] results, Batch.Callback<R> callback) throws
IOException, InterruptedException {
if (tx == null) {
throw new IOException("Transaction not started");
}
hTable.batchCallback(transactionalizeActions(actions), results, callback);
}
@Override
public <R> void batchCallback(List<? extends Row> actions, Object[] results, Batch.Callback<R> callback) throws
IOException, InterruptedException {
if (tx == null) {
throw new IOException("Transaction not started");
}
hTable.batchCallback(transactionalizeActions(actions), results, callback);
}
@Override
public <R> Object[] batchCallback(List<? extends Row> actions, Batch.Callback<R> callback) throws IOException,
InterruptedException {
if (tx == null) {
throw new IOException("Transaction not started");
}
return hTable.batchCallback(transactionalizeActions(actions), callback);
}
@Override
public <R> void batchCallback(List<? extends Row> actions, Object[] results, Batch.Callback<R> callback) throws
IOException, InterruptedException {
if (tx == null) {
throw new IOException("Transaction not started");
}
hTable.batchCallback(transactionalizeActions(actions), results, callback);
}
@Override
public <R> Object[] batchCallback(List<? extends Row> actions, Batch.Callback<R> callback) throws IOException,
InterruptedException {
if (tx == null) {
throw new IOException("Transaction not started");
}
return hTable.batchCallback(transactionalizeActions(actions), callback);
}
@Override
public <R> Object[] batchCallback(List<? extends Row> actions, Batch.Callback<R> callback) throws IOException,
InterruptedException {
if (tx == null) {
throw new IOException("Transaction not started");
}
return hTable.batchCallback(transactionalizeActions(actions), callback);
}
/**
* {@inheritDoc}
*/
@Override
public <R> void batchCallback(
final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
throws IOException, InterruptedException {
throw new RuntimeException(this.getClass() + " does NOT implement this method.");
}
/**
* {@inheritDoc}
*/
@Override
public <T extends Service, R> Map<byte[], R> coprocessorService(final Class<T> service,
byte[] startKey, byte[] endKey, final Batch.Call<T, R> callable)
throws ServiceException {
throw new RuntimeException(this.getClass() + " does NOT implement this method.");
}
/**
* {@inheritDoc}
*/
@Override
public <T extends Service, R> void coprocessorService(final Class<T> service,
byte[] startKey, byte[] endKey, final Batch.Call<T, R> callable,
final Batch.Callback<R> callback) throws ServiceException {
throw new RuntimeException(this.getClass() + " does NOT implement this method.");
}
/**
* {@inheritDoc}
*/
@Override
public <R extends Message> void batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor,
Message request, byte[] startKey, byte[] endKey, R responsePrototype,
Batch.Callback<R> callback) throws ServiceException {
throw new RuntimeException(this.getClass() + " does NOT implement this method.");
}
/**
* Invoke meta data coprocessor with one retry if the key was found to not be in the regions
* (due to a table split)
*/
private MetaDataMutationResult metaDataCoprocessorExec(byte[] tableKey,
Batch.Call<MetaDataService, MetaDataResponse> callable) throws SQLException {
try {
boolean retried = false;
while (true) {
if (retried) {
connection.relocateRegion(
TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES),
tableKey);
}
HTableInterface ht = this.getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
try {
final Map<byte[], MetaDataResponse> results =
ht.coprocessorService(MetaDataService.class, tableKey, tableKey, callable);
assert(results.size() == 1);
MetaDataResponse result = results.values().iterator().next();
if (result.getReturnCode() == MetaDataProtos.MutationCode.TABLE_NOT_IN_REGION) {
if (retried) return MetaDataMutationResult.constructFromProto(result);
retried = true;
continue;
}
return MetaDataMutationResult.constructFromProto(result);
} finally {
Closeables.closeQuietly(ht);
}
}
} catch (IOException e) {
throw ServerUtil.parseServerException(e);
} catch (Throwable t) {
throw new SQLException(t);
}
}
@Override
public MetaDataMutationResult getTable(final PName tenantId, final byte[] schemaBytes, final byte[] tableBytes,
final long tableTimestamp, final long clientTimestamp) throws SQLException {
final byte[] tenantIdBytes = tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY : tenantId.getBytes();
byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaBytes, tableBytes);
return metaDataCoprocessorExec(tableKey,
new Batch.Call<MetaDataService, MetaDataResponse>() {
@Override
public MetaDataResponse call(MetaDataService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<MetaDataResponse> rpcCallback =
new BlockingRpcCallback<MetaDataResponse>();
GetTableRequest.Builder builder = GetTableRequest.newBuilder();
builder.setTenantId(HBaseZeroCopyByteString.wrap(tenantIdBytes));
builder.setSchemaName(HBaseZeroCopyByteString.wrap(schemaBytes));
builder.setTableName(HBaseZeroCopyByteString.wrap(tableBytes));
builder.setTableTimestamp(tableTimestamp);
builder.setClientTimestamp(clientTimestamp);
instance.getTable(controller, builder.build(), rpcCallback);
if(controller.getFailedOn() != null) {
throw controller.getFailedOn();
}
return rpcCallback.get();
}
});
}
@Override
public <R extends Message> void batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor,
Message request, byte[] startKey, byte[] endKey, R responsePrototype, Batch.Callback<R> callback)
throws ServiceException, Throwable {
throw new NotImplementedException();
}
public static List<DataProtos.DataQueryResponse.Row> queryByStartRowAndStopRow(String tableName, String startRow, String stopRow, boolean isIncludeEnd, boolean isSalting) {
final DataProtos.DataQueryRequest.Builder requestBuilder = DataProtos.DataQueryRequest.newBuilder();
requestBuilder.setTableName(tableName);
requestBuilder.setStartRow(startRow);
requestBuilder.setEndRow(stopRow);
requestBuilder.setIncluedEnd(isIncludeEnd);
requestBuilder.setIsSalting(isSalting);
try {
HTable table = new HTable(HBaseConfiguration.create(conf), tableName);
Map<byte[], List<DataProtos.DataQueryResponse.Row>> result = table.coprocessorService(DataProtos.QueryDataService.class, null, null, new Batch.Call<DataProtos.QueryDataService, List<DataProtos.DataQueryResponse.Row>>() {
public List<DataProtos.DataQueryResponse.Row> call(DataProtos.QueryDataService counter) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<DataProtos.DataQueryResponse> rpcCallback = new BlockingRpcCallback<>();
counter.queryByStartRowAndEndRow(controller, requestBuilder.build(), rpcCallback);
DataProtos.DataQueryResponse response = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
return response.getRowListList();
}
});
List<DataProtos.DataQueryResponse.Row> results = new LinkedList<>();
result.entrySet()
.stream()
.filter(entry -> null != entry.getValue())
.forEach(entry -> results.addAll(entry.getValue()));
return results;
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
@Test
public void testSingleMethod() throws Throwable {
try (Table table = util.getConnection().getTable(TEST_TABLE);
RegionLocator locator = util.getConnection().getRegionLocator(TEST_TABLE)) {
Map<byte [], String> results = table.coprocessorService(PingService.class,
null, ROW_A,
new Batch.Call<PingService, String>() {
@Override
public String call(PingService instance) throws IOException {
CoprocessorRpcUtils.BlockingRpcCallback<PingResponse> rpcCallback =
new CoprocessorRpcUtils.BlockingRpcCallback<>();
instance.ping(null, PingRequest.newBuilder().build(), rpcCallback);
return rpcCallback.get().getPong();
}
});
// Should have gotten results for 1 of the three regions only since we specified
// rows from 1 region
assertEquals(1, results.size());
verifyRegionResults(locator, results, ROW_A);
final String name = "NAME";
results = hello(table, name, null, ROW_A);
// Should have gotten results for 1 of the three regions only since we specified
// rows from 1 region
assertEquals(1, results.size());
verifyRegionResults(locator, results, "Hello, NAME", ROW_A);
}
}
private Map<byte [], String> ping(final Table table, final byte [] start, final byte [] end)
throws ServiceException, Throwable {
return table.coprocessorService(PingService.class, start, end,
new Batch.Call<PingService, String>() {
@Override
public String call(PingService instance) throws IOException {
return doPing(instance);
}
});
}
public static MetaDataMutationResult updateIndexState(byte[] indexTableKey, long minTimeStamp,
Table metaTable, PIndexState newState) throws Throwable {
// Mimic the Put that gets generated by the client on an update of the index state
Put put = new Put(indexTableKey);
put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES,
newState.getSerializedBytes());
put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES,
PLong.INSTANCE.toBytes(minTimeStamp));
put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP_BYTES,
PLong.INSTANCE.toBytes(0));
final List<Mutation> tableMetadata = Collections.<Mutation> singletonList(put);
final Map<byte[], MetaDataResponse> results = metaTable.coprocessorService(MetaDataService.class, indexTableKey,
indexTableKey, new Batch.Call<MetaDataService, MetaDataResponse>() {
@Override
public MetaDataResponse call(MetaDataService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<MetaDataResponse> rpcCallback = new BlockingRpcCallback<MetaDataResponse>();
UpdateIndexStateRequest.Builder builder = UpdateIndexStateRequest.newBuilder();
for (Mutation m : tableMetadata) {
MutationProto mp = ProtobufUtil.toProto(m);
builder.addTableMetadataMutations(mp.toByteString());
}
builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
instance.updateIndexState(controller, builder.build(), rpcCallback);
if (controller.getFailedOn() != null) { throw controller.getFailedOn(); }
return rpcCallback.get();
}
});
if (results.isEmpty()) { throw new IOException("Didn't get expected result size"); }
MetaDataResponse tmpResponse = results.values().iterator().next();
return MetaDataMutationResult.constructFromProto(tmpResponse);
}
/**
* @param connection the Connection instance to use.
* @param user
* @return labels, the given user is globally authorized for.
* @throws Throwable
*/
public static GetAuthsResponse getAuths(Connection connection, final String user)
throws Throwable {
try (Table table = connection.getTable(LABELS_TABLE_NAME)) {
Batch.Call<VisibilityLabelsService, GetAuthsResponse> callable =
new Batch.Call<VisibilityLabelsService, GetAuthsResponse>() {
ServerRpcController controller = new ServerRpcController();
CoprocessorRpcUtils.BlockingRpcCallback<GetAuthsResponse> rpcCallback =
new CoprocessorRpcUtils.BlockingRpcCallback<>();
@Override
public GetAuthsResponse call(VisibilityLabelsService service) throws IOException {
GetAuthsRequest.Builder getAuthReqBuilder = GetAuthsRequest.newBuilder();
getAuthReqBuilder.setUser(UnsafeByteOperations.unsafeWrap(Bytes.toBytes(user)));
service.getAuths(controller, getAuthReqBuilder.build(), rpcCallback);
GetAuthsResponse response = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
return response;
}
};
Map<byte[], GetAuthsResponse> result =
table.coprocessorService(VisibilityLabelsService.class, HConstants.EMPTY_BYTE_ARRAY,
HConstants.EMPTY_BYTE_ARRAY, callable);
return result.values().iterator().next(); // There will be exactly one region for labels
// table and so one entry in result Map.
}
}
/**
* Retrieve the list of visibility labels defined in the system.
* @param connection The Connection instance to use.
* @param regex The regular expression to filter which labels are returned.
* @return labels The list of visibility labels defined in the system.
* @throws Throwable
*/
public static ListLabelsResponse listLabels(Connection connection, final String regex)
throws Throwable {
try (Table table = connection.getTable(LABELS_TABLE_NAME)) {
Batch.Call<VisibilityLabelsService, ListLabelsResponse> callable =
new Batch.Call<VisibilityLabelsService, ListLabelsResponse>() {
ServerRpcController controller = new ServerRpcController();
CoprocessorRpcUtils.BlockingRpcCallback<ListLabelsResponse> rpcCallback =
new CoprocessorRpcUtils.BlockingRpcCallback<>();
@Override
public ListLabelsResponse call(VisibilityLabelsService service) throws IOException {
ListLabelsRequest.Builder listAuthLabelsReqBuilder = ListLabelsRequest.newBuilder();
if (regex != null) {
// Compile the regex here to catch any regex exception earlier.
Pattern pattern = Pattern.compile(regex);
listAuthLabelsReqBuilder.setRegex(pattern.toString());
}
service.listLabels(controller, listAuthLabelsReqBuilder.build(), rpcCallback);
ListLabelsResponse response = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
return response;
}
};
Map<byte[], ListLabelsResponse> result =
table.coprocessorService(VisibilityLabelsService.class, HConstants.EMPTY_BYTE_ARRAY,
HConstants.EMPTY_BYTE_ARRAY, callable);
return result.values().iterator().next(); // There will be exactly one region for labels
// table and so one entry in result Map.
}
}