下面列出了org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils.BlockingRpcCallback#get ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public TxnMessage.ActionResponse lifecycleAction(byte[] rowKey,TxnMessage.TxnLifecycleMessage lifecycleMessage) throws IOException{
TxnMessage.TxnLifecycleService service=getLifecycleService(rowKey);
ServerRpcController controller=new ServerRpcController();
BlockingRpcCallback<TxnMessage.ActionResponse> done=new BlockingRpcCallback<>();
service.lifecycleAction(controller,lifecycleMessage,done);
dealWithError(controller);
return done.get();
}
@Override
public TxnMessage.Txn getTxn(byte[] rowKey,TxnMessage.TxnRequest request) throws IOException{
TxnMessage.TxnLifecycleService service=getLifecycleService(rowKey);
SpliceRpcController controller = new SpliceRpcController();
controller.setPriority(HConstants.HIGH_QOS);
BlockingRpcCallback<TxnMessage.Txn> done=new BlockingRpcCallback<>();
service.getTransaction(controller,request,done);
dealWithError(controller);
return done.get();
}
@Override
public TxnMessage.TaskId getTaskId(byte[] rowKey,TxnMessage.TxnRequest request) throws IOException{
TxnMessage.TxnLifecycleService service=getLifecycleService(rowKey);
ServerRpcController controller=new ServerRpcController();
BlockingRpcCallback<TxnMessage.TaskId> done=new BlockingRpcCallback<>();
service.getTaskId(controller,request,done);
dealWithError(controller);
return done.get();
}
public BulkWritesResult invoke(BulkWrites write) throws IOException {
TableName tableName=tableInfoFactory.getTableInfo(this.tableName);
CoprocessorRpcChannel channel = channelFactory.newChannel(tableName,write.getRegionKey());
boolean cacheCheck = false;
try {
SpliceMessage.SpliceIndexService service = ProtobufUtil.newServiceStub(SpliceMessage.SpliceIndexService.class, channel);
SpliceMessage.BulkWriteRequest.Builder builder = SpliceMessage.BulkWriteRequest.newBuilder();
byte[] requestBytes = compressor.compress(write);
builder.setBytes(ZeroCopyLiteralByteString.wrap(requestBytes));
SpliceMessage.BulkWriteRequest bwr = builder.build();
BlockingRpcCallback<SpliceMessage.BulkWriteResponse> doneCallback =new BlockingRpcCallback<>();
ServerRpcController controller = new ServerRpcController();
service.bulkWrite(controller, bwr, doneCallback);
if (controller.failed()){
IOException error=controller.getFailedOn();
clearCacheIfNeeded(error);
cacheCheck=true;
if(error!=null)
throw pef.processRemoteException(error);
else
throw pef.fromErrorString(controller.errorText());
}
SpliceMessage.BulkWriteResponse bulkWriteResponse = doneCallback.get();
byte[] bytes = bulkWriteResponse.getBytes().toByteArray();
if(bytes==null || bytes.length<=0){
Logger logger=Logger.getLogger(BulkWriteChannelInvoker.class);
logger.error("zero-length bytes returned with a null error for encodedString: "+write.getBulkWrites().iterator().next().getEncodedStringName());
}
return compressor.decompress(bytes,BulkWritesResult.class);
} catch (Exception e) {
if (!cacheCheck) clearCacheIfNeeded(e);
throw pef.processRemoteException(e);
}
}
@Test
public void testCachedConnections() throws Throwable {
final String schemaName = generateUniqueName();
final String tableName = generateUniqueName();
final String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
final String indexName = generateUniqueName();
final String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
final Connection conn = DriverManager.getConnection(getUrl());
// create table and indices
String createTableSql =
"CREATE TABLE " + fullTableName
+ "(org_id VARCHAR NOT NULL PRIMARY KEY, v1 INTEGER, v2 INTEGER, v3 INTEGER)";
conn.createStatement().execute(createTableSql);
conn.createStatement()
.execute("CREATE INDEX " + indexName + " ON " + fullTableName + "(v1)");
conn.commit();
PhoenixConnection phoenixConn = conn.unwrap(PhoenixConnection.class);
ConnectionQueryServices queryServices = phoenixConn.getQueryServices();
Table metaTable =
phoenixConn.getQueryServices()
.getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
long ts = EnvironmentEdgeManager.currentTimeMillis();
MutationCode code =
IndexUtil
.updateIndexState(fullIndexName, ts, metaTable, PIndexState.PENDING_DISABLE)
.getMutationCode();
assertEquals(MutationCode.TABLE_ALREADY_EXISTS, code);
ts = EnvironmentEdgeManager.currentTimeMillis();
final byte[] schemaBytes = PVarchar.INSTANCE.toBytes(schemaName);
final byte[] tableBytes = PVarchar.INSTANCE.toBytes(tableName);
PName tenantId = phoenixConn.getTenantId();
final long tableTimestamp = HConstants.LATEST_TIMESTAMP;
long tableResolvedTimestamp = HConstants.LATEST_TIMESTAMP;
final long resolvedTimestamp = tableResolvedTimestamp;
final byte[] tenantIdBytes =
tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY : tenantId.getBytes();
byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaBytes, tableBytes);
Batch.Call<MetaDataService, MetaDataResponse> callable =
new Batch.Call<MetaDataService, MetaDataResponse>() {
@Override
public MetaDataResponse call(MetaDataService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<MetaDataResponse> rpcCallback =
new BlockingRpcCallback<MetaDataResponse>();
GetTableRequest.Builder builder = GetTableRequest.newBuilder();
builder.setTenantId(ByteStringer.wrap(tenantIdBytes));
builder.setSchemaName(ByteStringer.wrap(schemaBytes));
builder.setTableName(ByteStringer.wrap(tableBytes));
builder.setTableTimestamp(tableTimestamp);
builder.setClientTimestamp(resolvedTimestamp);
builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION,
13, PHOENIX_PATCH_NUMBER));
instance.getTable(controller, builder.build(), rpcCallback);
if (controller.getFailedOn() != null) {
throw controller.getFailedOn();
}
return rpcCallback.get();
}
};
int version = VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, 13, PHOENIX_PATCH_NUMBER);
LOGGER.info("Client version: " + version);
Table ht =
queryServices.getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
try {
final Map<byte[], MetaDataResponse> results =
ht.coprocessorService(MetaDataService.class, tableKey, tableKey, callable);
assert (results.size() == 1);
MetaDataResponse result = results.values().iterator().next();
assert (result.getTable().getIndexesCount() == 1);
assert (PIndexState.valueOf(result.getTable().getIndexes(0).getIndexState())
.equals(PIndexState.DISABLE));
} catch (Exception e) {
LOGGER.error("Exception Occurred: " + e);
} finally {
Closeables.closeQuietly(ht);
}
}