下面列出了怎么用org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils.BlockingRpcCallback的API类实例代码及写法,或者点击链接到github查看源代码。
public void refreshHFiles(final Table table) throws Throwable {
final RefreshHFilesProtos.RefreshHFilesRequest request =
RefreshHFilesProtos.RefreshHFilesRequest.getDefaultInstance();
table.coprocessorService(RefreshHFilesProtos.RefreshHFilesService.class,
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
new Batch.Call<RefreshHFilesProtos.RefreshHFilesService,
RefreshHFilesProtos.RefreshHFilesResponse>() {
@Override
public RefreshHFilesProtos.RefreshHFilesResponse call(
RefreshHFilesProtos.RefreshHFilesService refreshHFilesService)
throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<RefreshHFilesProtos.RefreshHFilesResponse> rpcCallback =
new BlockingRpcCallback<>();
refreshHFilesService.refreshHFiles(controller, request, rpcCallback);
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
return rpcCallback.get();
}
});
LOG.debug("Done refreshing HFiles");
}
public static void clearMetaDataCache(Connection conn) throws Throwable {
PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
Table htable = pconn.getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
htable.coprocessorService(MetaDataService.class, HConstants.EMPTY_START_ROW,
HConstants.EMPTY_END_ROW, new Batch.Call<MetaDataService, ClearCacheResponse>() {
@Override
public ClearCacheResponse call(MetaDataService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<ClearCacheResponse> rpcCallback =
new BlockingRpcCallback<ClearCacheResponse>();
ClearCacheRequest.Builder builder = ClearCacheRequest.newBuilder();
instance.clearCache(controller, builder.build(), rpcCallback);
if(controller.getFailedOn() != null) {
throw controller.getFailedOn();
}
return rpcCallback.get();
}
});
}
@Override
public Collection<TxnMessage.ActiveTxnResponse> getActiveTxns(final TxnMessage.ActiveTxnRequest request) throws IOException{
Map<byte[], TxnMessage.ActiveTxnResponse> data=coprocessorService(TxnMessage.TxnLifecycleService.class,
HConstants.EMPTY_START_ROW,HConstants.EMPTY_END_ROW,new Batch.Call<TxnMessage.TxnLifecycleService, TxnMessage.ActiveTxnResponse>(){
@Override
public TxnMessage.ActiveTxnResponse call(TxnMessage.TxnLifecycleService instance) throws IOException{
ServerRpcController controller=new ServerRpcController();
BlockingRpcCallback<TxnMessage.ActiveTxnResponse> response=new BlockingRpcCallback<>();
instance.getActiveTransactions(controller,request,response);
dealWithError(controller);
return response.get();
}
});
return data.values();
}
public static MetaDataMutationResult updateIndexState(byte[] indexTableKey, long minTimeStamp,
Table metaTable, PIndexState newState) throws Throwable {
// Mimic the Put that gets generated by the client on an update of the index state
Put put = new Put(indexTableKey);
put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES,
newState.getSerializedBytes());
put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES,
PLong.INSTANCE.toBytes(minTimeStamp));
put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP_BYTES,
PLong.INSTANCE.toBytes(0));
final List<Mutation> tableMetadata = Collections.<Mutation> singletonList(put);
final Map<byte[], MetaDataResponse> results = metaTable.coprocessorService(MetaDataService.class, indexTableKey,
indexTableKey, new Batch.Call<MetaDataService, MetaDataResponse>() {
@Override
public MetaDataResponse call(MetaDataService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<MetaDataResponse> rpcCallback = new BlockingRpcCallback<MetaDataResponse>();
UpdateIndexStateRequest.Builder builder = UpdateIndexStateRequest.newBuilder();
for (Mutation m : tableMetadata) {
MutationProto mp = ProtobufUtil.toProto(m);
builder.addTableMetadataMutations(mp.toByteString());
}
builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
instance.updateIndexState(controller, builder.build(), rpcCallback);
if (controller.getFailedOn() != null) { throw controller.getFailedOn(); }
return rpcCallback.get();
}
});
if (results.isEmpty()) { throw new IOException("Didn't get expected result size"); }
MetaDataResponse tmpResponse = results.values().iterator().next();
return MetaDataMutationResult.constructFromProto(tmpResponse);
}
@Override
public void beginTransaction(byte[] rowKey,TxnMessage.TxnInfo txnInfo) throws IOException{
TxnMessage.TxnLifecycleService service=getLifecycleService(rowKey);
ServerRpcController controller=new ServerRpcController();
service.beginTransaction(controller,txnInfo,new BlockingRpcCallback<>());
dealWithError(controller);
}
@Override
public TxnMessage.ActionResponse lifecycleAction(byte[] rowKey,TxnMessage.TxnLifecycleMessage lifecycleMessage) throws IOException{
TxnMessage.TxnLifecycleService service=getLifecycleService(rowKey);
ServerRpcController controller=new ServerRpcController();
BlockingRpcCallback<TxnMessage.ActionResponse> done=new BlockingRpcCallback<>();
service.lifecycleAction(controller,lifecycleMessage,done);
dealWithError(controller);
return done.get();
}
@Override
public void elevate(byte[] rowKey,TxnMessage.ElevateRequest elevateRequest) throws IOException{
TxnMessage.TxnLifecycleService service=getLifecycleService(rowKey);
ServerRpcController controller=new ServerRpcController();
service.elevateTransaction(controller,elevateRequest,new BlockingRpcCallback<TxnMessage.VoidResponse>());
dealWithError(controller);
}
@Override
public TxnMessage.Txn getTxn(byte[] rowKey,TxnMessage.TxnRequest request) throws IOException{
TxnMessage.TxnLifecycleService service=getLifecycleService(rowKey);
SpliceRpcController controller = new SpliceRpcController();
controller.setPriority(HConstants.HIGH_QOS);
BlockingRpcCallback<TxnMessage.Txn> done=new BlockingRpcCallback<>();
service.getTransaction(controller,request,done);
dealWithError(controller);
return done.get();
}
@Override
public TxnMessage.TaskId getTaskId(byte[] rowKey,TxnMessage.TxnRequest request) throws IOException{
TxnMessage.TxnLifecycleService service=getLifecycleService(rowKey);
ServerRpcController controller=new ServerRpcController();
BlockingRpcCallback<TxnMessage.TaskId> done=new BlockingRpcCallback<>();
service.getTaskId(controller,request,done);
dealWithError(controller);
return done.get();
}
public BulkWritesResult invoke(BulkWrites write) throws IOException {
TableName tableName=tableInfoFactory.getTableInfo(this.tableName);
CoprocessorRpcChannel channel = channelFactory.newChannel(tableName,write.getRegionKey());
boolean cacheCheck = false;
try {
SpliceMessage.SpliceIndexService service = ProtobufUtil.newServiceStub(SpliceMessage.SpliceIndexService.class, channel);
SpliceMessage.BulkWriteRequest.Builder builder = SpliceMessage.BulkWriteRequest.newBuilder();
byte[] requestBytes = compressor.compress(write);
builder.setBytes(ZeroCopyLiteralByteString.wrap(requestBytes));
SpliceMessage.BulkWriteRequest bwr = builder.build();
BlockingRpcCallback<SpliceMessage.BulkWriteResponse> doneCallback =new BlockingRpcCallback<>();
ServerRpcController controller = new ServerRpcController();
service.bulkWrite(controller, bwr, doneCallback);
if (controller.failed()){
IOException error=controller.getFailedOn();
clearCacheIfNeeded(error);
cacheCheck=true;
if(error!=null)
throw pef.processRemoteException(error);
else
throw pef.fromErrorString(controller.errorText());
}
SpliceMessage.BulkWriteResponse bulkWriteResponse = doneCallback.get();
byte[] bytes = bulkWriteResponse.getBytes().toByteArray();
if(bytes==null || bytes.length<=0){
Logger logger=Logger.getLogger(BulkWriteChannelInvoker.class);
logger.error("zero-length bytes returned with a null error for encodedString: "+write.getBulkWrites().iterator().next().getEncodedStringName());
}
return compressor.decompress(bytes,BulkWritesResult.class);
} catch (Exception e) {
if (!cacheCheck) clearCacheIfNeeded(e);
throw pef.processRemoteException(e);
}
}
public static Map<String, PartitionLoad> getCostWhenNoCachedRegionLoadsFound(String tableName){
try (Partition p = SIDriver.driver().getTableFactory().getTable(tableName)){
Map<byte[], Pair<String, Long>> ret = ((SkeletonHBaseClientPartition)p).coprocessorExec(SpliceMessage.SpliceDerbyCoprocessorService.class,
new Batch.Call<SpliceMessage.SpliceDerbyCoprocessorService, Pair<String, Long>>() {
@Override
public Pair<String, Long> call(SpliceMessage.SpliceDerbyCoprocessorService inctance) throws IOException {
ServerRpcController controller = new ServerRpcController();
SpliceMessage.SpliceRegionSizeRequest message = SpliceMessage.SpliceRegionSizeRequest.newBuilder().build();
BlockingRpcCallback<SpliceMessage.SpliceRegionSizeResponse> rpcCallback = new BlockingRpcCallback<>();
inctance.computeRegionSize(controller, message, rpcCallback);
if (controller.failed()) {
Throwable t = Throwables.getRootCause(controller.getFailedOn());
if (t instanceof IOException) throw (IOException) t;
else throw new IOException(t);
}
SpliceMessage.SpliceRegionSizeResponse response = rpcCallback.get();
return Pair.newPair(response.getEncodedName(), response.getSizeInBytes());
}
});
Collection<Pair<String, Long>> collection = ret.values();
Map<String, PartitionLoad> retMap = new HashMap<>();
for(Pair<String, Long> info : collection){
long size = info.getSecond();
HPartitionLoad value=new HPartitionLoad(info.getFirst(),size/2,
size/2);
retMap.put(info.getFirst(),value);
}
return retMap;
} catch (Throwable th){
SpliceLogUtils.error(LOG,"Unable to fetch region load info",th);
}
/*
* When we fail for whatever reason, we don't want to blow up the query, we just return no
* cached information. This will screw up the planning phase (since there is nothing to work with), but
* at least it won't explode.
*/
return Collections.emptyMap();
}
/**
* Remove the cached table from all region servers
* @throws SQLException
* @throws IllegalStateException if hashed table cannot be removed on any region server on which it was added
*/
private void removeServerCache(final ServerCache cache, Set<HRegionLocation> remainingOnServers) throws SQLException {
Table iterateOverTable = null;
final byte[] cacheId = cache.getId();
try {
ConnectionQueryServices services = connection.getQueryServices();
Throwable lastThrowable = null;
final PTable cacheUsingTable = cacheUsingTableMap.get(Bytes.mapKey(cacheId));
byte[] tableName = cacheUsingTable.getPhysicalName().getBytes();
iterateOverTable = services.getTable(tableName);
List<HRegionLocation> locations = services.getAllTableRegions(tableName);
/**
* Allow for the possibility that the region we based where to send our cache has split and been relocated
* to another region server *after* we sent it, but before we removed it. To accommodate this, we iterate
* through the current metadata boundaries and remove the cache once for each server that we originally sent
* to.
*/
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(addCustomAnnotations(
"Removing Cache " + cacheId + " from servers.", connection));
}
for (HRegionLocation entry : locations) {
// Call once per server
if (remainingOnServers.contains(entry)) {
try {
byte[] key = getKeyInRegion(entry.getRegion().getStartKey());
iterateOverTable.coprocessorService(ServerCachingService.class, key, key,
new Batch.Call<ServerCachingService, RemoveServerCacheResponse>() {
@Override
public RemoveServerCacheResponse call(ServerCachingService instance)
throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<RemoveServerCacheResponse> rpcCallback = new BlockingRpcCallback<RemoveServerCacheResponse>();
RemoveServerCacheRequest.Builder builder = RemoveServerCacheRequest
.newBuilder();
final byte[] tenantIdBytes;
if (cacheUsingTable.isMultiTenant()) {
try {
tenantIdBytes = connection.getTenantId() == null ? null
: ScanUtil.getTenantIdBytes(cacheUsingTable.getRowKeySchema(),
cacheUsingTable.getBucketNum() != null,
connection.getTenantId(),
cacheUsingTable.getViewIndexId() != null);
} catch (SQLException e) {
throw new IOException(e);
}
} else {
tenantIdBytes = connection.getTenantId() == null ? null
: connection.getTenantId().getBytes();
}
if (tenantIdBytes != null) {
builder.setTenantId(ByteStringer.wrap(tenantIdBytes));
}
builder.setCacheId(ByteStringer.wrap(cacheId));
instance.removeServerCache(controller, builder.build(), rpcCallback);
if (controller.getFailedOn() != null) { throw controller.getFailedOn(); }
return rpcCallback.get();
}
});
remainingOnServers.remove(entry);
} catch (Throwable t) {
lastThrowable = t;
LOGGER.error(addCustomAnnotations(
"Error trying to remove hash cache for " + entry,
connection), t);
}
}
}
if (!remainingOnServers.isEmpty()) {
LOGGER.warn(addCustomAnnotations("Unable to remove hash cache for "
+ remainingOnServers, connection),
lastThrowable);
}
} finally {
cacheUsingTableMap.remove(Bytes.mapKey(cacheId));
Closeables.closeQuietly(iterateOverTable);
}
}
public boolean addServerCache(Table htable, byte[] key, final PTable cacheUsingTable, final byte[] cacheId,
final ImmutableBytesWritable cachePtr, final ServerCacheFactory cacheFactory, final byte[] txState, final boolean usePersistentCache)
throws Exception {
byte[] keyInRegion = getKeyInRegion(key);
final Map<byte[], AddServerCacheResponse> results;
AddServerCacheRequest.Builder builder = AddServerCacheRequest.newBuilder();
final byte[] tenantIdBytes;
if (cacheUsingTable.isMultiTenant()) {
try {
tenantIdBytes = connection.getTenantId() == null ? null
: ScanUtil.getTenantIdBytes(cacheUsingTable.getRowKeySchema(),
cacheUsingTable.getBucketNum() != null, connection.getTenantId(),
cacheUsingTable.getViewIndexId() != null);
} catch (SQLException e) {
throw new IOException(e);
}
} else {
tenantIdBytes = connection.getTenantId() == null ? null
: connection.getTenantId().getBytes();
}
if (tenantIdBytes != null) {
builder.setTenantId(ByteStringer.wrap(tenantIdBytes));
}
builder.setCacheId(ByteStringer.wrap(cacheId));
builder.setUsePersistentCache(usePersistentCache);
builder.setCachePtr(org.apache.phoenix.protobuf.ProtobufUtil.toProto(cachePtr));
builder.setHasProtoBufIndexMaintainer(true);
ServerCacheFactoryProtos.ServerCacheFactory.Builder svrCacheFactoryBuider = ServerCacheFactoryProtos.ServerCacheFactory
.newBuilder();
svrCacheFactoryBuider.setClassName(cacheFactory.getClass().getName());
builder.setCacheFactory(svrCacheFactoryBuider.build());
builder.setTxState(ByteStringer.wrap(txState));
builder.setClientVersion(MetaDataProtocol.PHOENIX_VERSION);
final AddServerCacheRequest request = builder.build();
try {
results = htable.coprocessorService(ServerCachingService.class, keyInRegion, keyInRegion,
new Batch.Call<ServerCachingService, AddServerCacheResponse>() {
@Override
public AddServerCacheResponse call(ServerCachingService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<AddServerCacheResponse> rpcCallback = new BlockingRpcCallback<AddServerCacheResponse>();
instance.addServerCache(controller, request, rpcCallback);
if (controller.getFailedOn() != null) { throw controller.getFailedOn(); }
return rpcCallback.get();
}
});
} catch (Throwable t) {
throw new Exception(t);
}
if (results != null && results.size() == 1) { return results.values().iterator().next().getReturn(); }
return false;
}
@Test
public void testCachedConnections() throws Throwable {
final String schemaName = generateUniqueName();
final String tableName = generateUniqueName();
final String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
final String indexName = generateUniqueName();
final String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
final Connection conn = DriverManager.getConnection(getUrl());
// create table and indices
String createTableSql =
"CREATE TABLE " + fullTableName
+ "(org_id VARCHAR NOT NULL PRIMARY KEY, v1 INTEGER, v2 INTEGER, v3 INTEGER)";
conn.createStatement().execute(createTableSql);
conn.createStatement()
.execute("CREATE INDEX " + indexName + " ON " + fullTableName + "(v1)");
conn.commit();
PhoenixConnection phoenixConn = conn.unwrap(PhoenixConnection.class);
ConnectionQueryServices queryServices = phoenixConn.getQueryServices();
Table metaTable =
phoenixConn.getQueryServices()
.getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
long ts = EnvironmentEdgeManager.currentTimeMillis();
MutationCode code =
IndexUtil
.updateIndexState(fullIndexName, ts, metaTable, PIndexState.PENDING_DISABLE)
.getMutationCode();
assertEquals(MutationCode.TABLE_ALREADY_EXISTS, code);
ts = EnvironmentEdgeManager.currentTimeMillis();
final byte[] schemaBytes = PVarchar.INSTANCE.toBytes(schemaName);
final byte[] tableBytes = PVarchar.INSTANCE.toBytes(tableName);
PName tenantId = phoenixConn.getTenantId();
final long tableTimestamp = HConstants.LATEST_TIMESTAMP;
long tableResolvedTimestamp = HConstants.LATEST_TIMESTAMP;
final long resolvedTimestamp = tableResolvedTimestamp;
final byte[] tenantIdBytes =
tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY : tenantId.getBytes();
byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaBytes, tableBytes);
Batch.Call<MetaDataService, MetaDataResponse> callable =
new Batch.Call<MetaDataService, MetaDataResponse>() {
@Override
public MetaDataResponse call(MetaDataService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<MetaDataResponse> rpcCallback =
new BlockingRpcCallback<MetaDataResponse>();
GetTableRequest.Builder builder = GetTableRequest.newBuilder();
builder.setTenantId(ByteStringer.wrap(tenantIdBytes));
builder.setSchemaName(ByteStringer.wrap(schemaBytes));
builder.setTableName(ByteStringer.wrap(tableBytes));
builder.setTableTimestamp(tableTimestamp);
builder.setClientTimestamp(resolvedTimestamp);
builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION,
13, PHOENIX_PATCH_NUMBER));
instance.getTable(controller, builder.build(), rpcCallback);
if (controller.getFailedOn() != null) {
throw controller.getFailedOn();
}
return rpcCallback.get();
}
};
int version = VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, 13, PHOENIX_PATCH_NUMBER);
LOGGER.info("Client version: " + version);
Table ht =
queryServices.getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
try {
final Map<byte[], MetaDataResponse> results =
ht.coprocessorService(MetaDataService.class, tableKey, tableKey, callable);
assert (results.size() == 1);
MetaDataResponse result = results.values().iterator().next();
assert (result.getTable().getIndexesCount() == 1);
assert (PIndexState.valueOf(result.getTable().getIndexes(0).getIndexState())
.equals(PIndexState.DISABLE));
} catch (Exception e) {
LOGGER.error("Exception Occurred: " + e);
} finally {
Closeables.closeQuietly(ht);
}
}