下面列出了怎么用org.apache.hadoop.hbase.ipc.BlockingRpcCallback的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public MetaDataMutationResult updateIndexState(final List<Mutation> tableMetaData, String parentTableName) throws SQLException {
byte[][] rowKeyMetadata = new byte[3][];
SchemaUtil.getVarChars(tableMetaData.get(0).getRow(), rowKeyMetadata);
byte[] tableKey = SchemaUtil.getTableKey(ByteUtil.EMPTY_BYTE_ARRAY, rowKeyMetadata[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX], rowKeyMetadata[PhoenixDatabaseMetaData.TABLE_NAME_INDEX]);
return metaDataCoprocessorExec(tableKey,
new Batch.Call<MetaDataService, MetaDataResponse>() {
@Override
public MetaDataResponse call(MetaDataService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<MetaDataResponse> rpcCallback =
new BlockingRpcCallback<MetaDataResponse>();
UpdateIndexStateRequest.Builder builder = UpdateIndexStateRequest.newBuilder();
for (Mutation m : tableMetaData) {
MutationProto mp = ProtobufUtil.toProto(m);
builder.addTableMetadataMutations(mp.toByteString());
}
instance.updateIndexState(controller, builder.build(), rpcCallback);
if(controller.getFailedOn() != null) {
throw controller.getFailedOn();
}
return rpcCallback.get();
}
});
}
public static void clearMetaDataCache(Connection conn) throws Throwable {
PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
HTableInterface htable = pconn.getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
htable.coprocessorService(MetaDataService.class, HConstants.EMPTY_START_ROW,
HConstants.EMPTY_END_ROW, new Batch.Call<MetaDataService, ClearCacheResponse>() {
@Override
public ClearCacheResponse call(MetaDataService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<ClearCacheResponse> rpcCallback =
new BlockingRpcCallback<ClearCacheResponse>();
ClearCacheRequest.Builder builder = ClearCacheRequest.newBuilder();
instance.clearCache(controller, builder.build(), rpcCallback);
if(controller.getFailedOn() != null) {
throw controller.getFailedOn();
}
return rpcCallback.get();
}
});
}
@Override
public AuthenticationProtos.GetAuthenticationTokenResponse getAuthenticationToken(
RpcController controller, AuthenticationProtos.GetAuthenticationTokenRequest request)
throws ServiceException {
LOG.debug("Authentication token request from " + RpcServer.getRequestUserName().orElse(null));
// Ignore above passed in controller -- it is always null
ServerRpcController serverController = new ServerRpcController();
final BlockingRpcCallback<AuthenticationProtos.GetAuthenticationTokenResponse>
callback = new BlockingRpcCallback<>();
getAuthenticationToken(null, request, callback);
try {
serverController.checkFailed();
return callback.get();
} catch (IOException ioe) {
throw new ServiceException(ioe);
}
}
@Override
public AuthenticationProtos.WhoAmIResponse whoAmI(
RpcController controller, AuthenticationProtos.WhoAmIRequest request)
throws ServiceException {
LOG.debug("whoAmI() request from " + RpcServer.getRequestUserName().orElse(null));
// Ignore above passed in controller -- it is always null
ServerRpcController serverController = new ServerRpcController();
BlockingRpcCallback<AuthenticationProtos.WhoAmIResponse> callback =
new BlockingRpcCallback<>();
whoAmI(null, request, callback);
try {
serverController.checkFailed();
return callback.get();
} catch (IOException ioe) {
throw new ServiceException(ioe);
}
}
private Iterator<List<IIProtos.IIResponse.IIRow>> getResults(final IIProtos.IIRequest request, HTableInterface table) throws Throwable {
Map<byte[], List<IIProtos.IIResponse.IIRow>> results = table.coprocessorService(IIProtos.RowsService.class, null, null, new Batch.Call<IIProtos.RowsService, List<IIProtos.IIResponse.IIRow>>() {
public List<IIProtos.IIResponse.IIRow> call(IIProtos.RowsService rowsService) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<IIProtos.IIResponse> rpcCallback = new BlockingRpcCallback<>();
rowsService.getRows(controller, request, rpcCallback);
IIProtos.IIResponse response = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
return response.getRowsList();
}
});
return results.values().iterator();
}
@Override
public MetaDataMutationResult getTable(final PName tenantId, final byte[] schemaBytes, final byte[] tableBytes,
final long tableTimestamp, final long clientTimestamp) throws SQLException {
final byte[] tenantIdBytes = tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY : tenantId.getBytes();
byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaBytes, tableBytes);
return metaDataCoprocessorExec(tableKey,
new Batch.Call<MetaDataService, MetaDataResponse>() {
@Override
public MetaDataResponse call(MetaDataService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<MetaDataResponse> rpcCallback =
new BlockingRpcCallback<MetaDataResponse>();
GetTableRequest.Builder builder = GetTableRequest.newBuilder();
builder.setTenantId(HBaseZeroCopyByteString.wrap(tenantIdBytes));
builder.setSchemaName(HBaseZeroCopyByteString.wrap(schemaBytes));
builder.setTableName(HBaseZeroCopyByteString.wrap(tableBytes));
builder.setTableTimestamp(tableTimestamp);
builder.setClientTimestamp(clientTimestamp);
instance.getTable(controller, builder.build(), rpcCallback);
if(controller.getFailedOn() != null) {
throw controller.getFailedOn();
}
return rpcCallback.get();
}
});
}
public static List<DataProtos.DataQueryResponse.Row> queryByStartRowAndStopRow(String tableName, String startRow, String stopRow, boolean isIncludeEnd, boolean isSalting) {
final DataProtos.DataQueryRequest.Builder requestBuilder = DataProtos.DataQueryRequest.newBuilder();
requestBuilder.setTableName(tableName);
requestBuilder.setStartRow(startRow);
requestBuilder.setEndRow(stopRow);
requestBuilder.setIncluedEnd(isIncludeEnd);
requestBuilder.setIsSalting(isSalting);
try {
HTable table = new HTable(HBaseConfiguration.create(conf), tableName);
Map<byte[], List<DataProtos.DataQueryResponse.Row>> result = table.coprocessorService(DataProtos.QueryDataService.class, null, null, new Batch.Call<DataProtos.QueryDataService, List<DataProtos.DataQueryResponse.Row>>() {
public List<DataProtos.DataQueryResponse.Row> call(DataProtos.QueryDataService counter) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<DataProtos.DataQueryResponse> rpcCallback = new BlockingRpcCallback<>();
counter.queryByStartRowAndEndRow(controller, requestBuilder.build(), rpcCallback);
DataProtos.DataQueryResponse response = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
return response.getRowListList();
}
});
List<DataProtos.DataQueryResponse.Row> results = new LinkedList<>();
result.entrySet()
.stream()
.filter(entry -> null != entry.getValue())
.forEach(entry -> results.addAll(entry.getValue()));
return results;
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
@Override
public AggregateResult aggregate(final HTableInterface table,
final EntityDefinition entityDefinition,
final Scan scan,
final List<String> groupbyFields,
final List<AggregateFunctionType> aggregateFuncTypes,
final List<String> aggregatedFields,
final boolean timeSeries,
final long startTime,
final long endTime,
final long intervalMin) throws IOException {
checkNotNull(entityDefinition,"entityDefinition");
final List<AggregateFunctionType> _aggregateFuncTypes = convertToCoprocessorAggregateFunc(aggregateFuncTypes);
final List<byte[]> _aggregateFuncTypesBytes = AggregateFunctionType.toBytesList(_aggregateFuncTypes);
// if(timeSeries) TimeSeriesAggregator.validateTimeRange(startTime,endTime,intervalMin);
callback = new AggregateResultCallbackImpl(aggregateFuncTypes);
try{
if(!LOG.isDebugEnabled()){
LOG.info("Going to exec coprocessor: "+AggregateProtocol.class.getSimpleName());
}else{
LOG.debug("Going to exec coprocessor: "+AggregateProtocol.class.getName());
}
// table.coprocessorExec(AggregateProtocol.class,scan.getStartRow(),scan.getStopRow(),new Batch.Call<AggregateProtocol, AggregateResult>(){
// @Override
// public AggregateResult call(AggregateProtocol instance) throws IOException {
// if(timeSeries){
// return instance.aggregate(entityDefinition, scan, groupbyFields, _aggregateFuncTypesBytes, aggregatedFields,startTime,endTime,intervalMin);
// }else{
// return instance.aggregate(entityDefinition, scan, groupbyFields, _aggregateFuncTypesBytes, aggregatedFields);
// }
// }
// },callback);
table.coprocessorService(AggregateProtos.AggregateProtocol.class, scan.getStartRow(), scan.getStopRow(), new Batch.Call<AggregateProtos.AggregateProtocol, AggregateProtos.AggregateResult>() {
@Override
public AggregateProtos.AggregateResult call(AggregateProtos.AggregateProtocol instance) throws IOException {
BlockingRpcCallback<AggregateProtos.AggregateResult> rpcCallback = new BlockingRpcCallback<AggregateProtos.AggregateResult>();
if(timeSeries){
AggregateProtos.TimeSeriesAggregateRequest timeSeriesAggregateRequest = ProtoBufConverter
.toPBTimeSeriesRequest(
entityDefinition,
scan,
groupbyFields,
_aggregateFuncTypesBytes,
aggregatedFields,
startTime,
endTime,
intervalMin);
instance.timeseriesAggregate(null, timeSeriesAggregateRequest, rpcCallback);
return rpcCallback.get();
}else{
AggregateProtos.AggregateRequest aggregateRequest = ProtoBufConverter.toPBRequest(
entityDefinition, scan, groupbyFields, _aggregateFuncTypesBytes, aggregatedFields);
instance.aggregate(null, aggregateRequest, rpcCallback);
return rpcCallback.get();
}
}
}, callback);
} catch (Throwable t){
LOG.error(t.getMessage(),t);
throw new IOException(t);
}
return callback.result();
}
@Override
public AggregateResult aggregate(final HTableInterface table,
final EntityDefinition entityDefinition,
final Scan scan,
final List<String> groupbyFields,
final List<AggregateFunctionType> aggregateFuncTypes,
final List<String> aggregatedFields,
final boolean timeSeries,
final long startTime,
final long endTime,
final long intervalMin) throws IOException {
checkNotNull(entityDefinition, "entityDefinition");
final List<AggregateFunctionType> _aggregateFuncTypes = convertToCoprocessorAggregateFunc(aggregateFuncTypes);
final List<byte[]> _aggregateFuncTypesBytes = AggregateFunctionType.toBytesList(_aggregateFuncTypes);
// if(timeSeries) TimeSeriesAggregator.validateTimeRange(startTime,endTime,intervalMin);
callback = new AggregateResultCallbackImpl(aggregateFuncTypes);
try {
if (!LOG.isDebugEnabled()) {
LOG.info("Going to exec coprocessor: " + AggregateProtocol.class.getSimpleName());
} else {
LOG.debug("Going to exec coprocessor: " + AggregateProtocol.class.getName());
}
// table.coprocessorExec(AggregateProtocol.class,scan.getStartRow(),scan.getStopRow(),new Batch.Call<AggregateProtocol, AggregateResult>(){
// @Override
// public AggregateResult call(AggregateProtocol instance) throws IOException {
// if(timeSeries){
// return instance.aggregate(entityDefinition, scan, groupbyFields, _aggregateFuncTypesBytes, aggregatedFields,startTime,endTime,intervalMin);
// }else{
// return instance.aggregate(entityDefinition, scan, groupbyFields, _aggregateFuncTypesBytes, aggregatedFields);
// }
// }
// },callback);
table.coprocessorService(AggregateProtos.AggregateProtocol.class,
scan.getStartRow(), scan.getStopRow(), new Batch.Call<AggregateProtos.AggregateProtocol, AggregateProtos.AggregateResult>() {
@Override
public AggregateProtos.AggregateResult call(AggregateProtos.AggregateProtocol instance) throws IOException {
BlockingRpcCallback<AggregateProtos.AggregateResult> rpcCallback = new BlockingRpcCallback<>();
if (timeSeries) {
AggregateProtos.TimeSeriesAggregateRequest timeSeriesAggregateRequest = ProtoBufConverter
.toPBTimeSeriesRequest(
entityDefinition,
scan,
groupbyFields,
_aggregateFuncTypesBytes,
aggregatedFields,
startTime,
endTime,
intervalMin);
instance.timeseriesAggregate(null, timeSeriesAggregateRequest, rpcCallback);
return rpcCallback.get();
} else {
AggregateProtos.AggregateRequest aggregateRequest = ProtoBufConverter.toPBRequest(
entityDefinition, scan, groupbyFields, _aggregateFuncTypesBytes, aggregatedFields);
instance.aggregate(null, aggregateRequest, rpcCallback);
return rpcCallback.get();
}
}
}, callback);
} catch (Throwable t) {
LOG.error(t.getMessage(), t);
throw new IOException(t);
}
return callback.result();
}
private void checkClientServerCompatibility() throws SQLException {
StringBuilder buf = new StringBuilder("The following servers require an updated " + QueryConstants.DEFAULT_COPROCESS_PATH + " to be put in the classpath of HBase: ");
boolean isIncompatible = false;
int minHBaseVersion = Integer.MAX_VALUE;
try {
List<HRegionLocation> locations = this.getAllTableRegions(SYSTEM_CATALOG_NAME_BYTES);
Set<HRegionLocation> serverMap = Sets.newHashSetWithExpectedSize(locations.size());
TreeMap<byte[], HRegionLocation> regionMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
List<byte[]> regionKeys = Lists.newArrayListWithExpectedSize(locations.size());
for (HRegionLocation entry : locations) {
if (!serverMap.contains(entry)) {
regionKeys.add(entry.getRegionInfo().getStartKey());
regionMap.put(entry.getRegionInfo().getRegionName(), entry);
serverMap.add(entry);
}
}
HTableInterface ht = this.getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
final Map<byte[], Long> results =
ht.coprocessorService(MetaDataService.class, null, null, new Batch.Call<MetaDataService,Long>() {
@Override
public Long call(MetaDataService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<GetVersionResponse> rpcCallback =
new BlockingRpcCallback<GetVersionResponse>();
GetVersionRequest.Builder builder = GetVersionRequest.newBuilder();
instance.getVersion(controller, builder.build(), rpcCallback);
if(controller.getFailedOn() != null) {
throw controller.getFailedOn();
}
return rpcCallback.get().getVersion();
}
});
for (Map.Entry<byte[],Long> result : results.entrySet()) {
// This is the "phoenix.jar" is in-place, but server is out-of-sync with client case.
if (!isCompatible(result.getValue())) {
isIncompatible = true;
HRegionLocation name = regionMap.get(result.getKey());
buf.append(name);
buf.append(';');
}
hasInvalidIndexConfiguration |= isInvalidMutableIndexConfig(result.getValue());
if (minHBaseVersion > MetaDataUtil.decodeHBaseVersion(result.getValue())) {
minHBaseVersion = MetaDataUtil.decodeHBaseVersion(result.getValue());
}
}
lowestClusterHBaseVersion = minHBaseVersion;
} catch (SQLException e) {
throw e;
} catch (Throwable t) {
// This is the case if the "phoenix.jar" is not on the classpath of HBase on the region server
throw new SQLExceptionInfo.Builder(SQLExceptionCode.INCOMPATIBLE_CLIENT_SERVER_JAR).setRootCause(t)
.setMessage("Ensure that " + QueryConstants.DEFAULT_COPROCESS_PATH + " is put on the classpath of HBase in every region server: " + t.getMessage())
.build().buildException();
}
if (isIncompatible) {
buf.setLength(buf.length()-1);
throw new SQLExceptionInfo.Builder(SQLExceptionCode.OUTDATED_JARS).setMessage(buf.toString()).build().buildException();
}
}
@Override
public MetaDataMutationResult dropTable(final List<Mutation> tableMetaData, final PTableType tableType, final boolean cascade) throws SQLException {
byte[][] rowKeyMetadata = new byte[3][];
SchemaUtil.getVarChars(tableMetaData.get(0).getRow(), rowKeyMetadata);
byte[] tenantIdBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
byte[] schemaBytes = rowKeyMetadata[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
byte[] tableBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes == null ? ByteUtil.EMPTY_BYTE_ARRAY : tenantIdBytes, schemaBytes, tableBytes);
final MetaDataMutationResult result = metaDataCoprocessorExec(tableKey,
new Batch.Call<MetaDataService, MetaDataResponse>() {
@Override
public MetaDataResponse call(MetaDataService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<MetaDataResponse> rpcCallback =
new BlockingRpcCallback<MetaDataResponse>();
DropTableRequest.Builder builder = DropTableRequest.newBuilder();
for (Mutation m : tableMetaData) {
MutationProto mp = ProtobufUtil.toProto(m);
builder.addTableMetadataMutations(mp.toByteString());
}
builder.setTableType(tableType.getSerializedValue());
builder.setCascade(cascade);
instance.dropTable(controller, builder.build(), rpcCallback);
if(controller.getFailedOn() != null) {
throw controller.getFailedOn();
}
return rpcCallback.get();
}
});
final MutationCode code = result.getMutationCode();
switch(code) {
case TABLE_ALREADY_EXISTS:
ReadOnlyProps props = this.getProps();
boolean dropMetadata = props.getBoolean(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA);
if (dropMetadata) {
dropTables(result.getTableNamesToDelete());
}
invalidateTables(result.getTableNamesToDelete());
if (tableType == PTableType.TABLE) {
byte[] physicalName = SchemaUtil.getTableNameAsBytes(schemaBytes, tableBytes);
long timestamp = MetaDataUtil.getClientTimeStamp(tableMetaData);
ensureViewIndexTableDropped(physicalName, timestamp);
ensureLocalIndexTableDropped(physicalName, timestamp);
tableStatsCache.invalidate(new ImmutableBytesPtr(physicalName));
}
break;
default:
break;
}
return result;
}
@Override
public MetaDataMutationResult dropColumn(final List<Mutation> tableMetaData, PTableType tableType) throws SQLException {
byte[][] rowKeyMetadata = new byte[3][];
SchemaUtil.getVarChars(tableMetaData.get(0).getRow(), rowKeyMetadata);
byte[] tenantIdBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
byte[] schemaBytes = rowKeyMetadata[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
byte[] tableBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaBytes, tableBytes);
MetaDataMutationResult result = metaDataCoprocessorExec(tableKey,
new Batch.Call<MetaDataService, MetaDataResponse>() {
@Override
public MetaDataResponse call(MetaDataService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<MetaDataResponse> rpcCallback =
new BlockingRpcCallback<MetaDataResponse>();
DropColumnRequest.Builder builder = DropColumnRequest.newBuilder();
for (Mutation m : tableMetaData) {
MutationProto mp = ProtobufUtil.toProto(m);
builder.addTableMetadataMutations(mp.toByteString());
}
instance.dropColumn(controller, builder.build(), rpcCallback);
if(controller.getFailedOn() != null) {
throw controller.getFailedOn();
}
return rpcCallback.get();
}
});
final MutationCode code = result.getMutationCode();
switch(code) {
case TABLE_ALREADY_EXISTS:
final ReadOnlyProps props = this.getProps();
final boolean dropMetadata = props.getBoolean(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA);
if (dropMetadata) {
dropTables(result.getTableNamesToDelete());
}
invalidateTables(result.getTableNamesToDelete());
break;
default:
break;
}
return result;
}
/**
* Remove the cached table from all region servers
* @param cacheId unique identifier for the hash join (returned from {@link #addHashCache(HTable, Scan, Set)})
* @param servers list of servers upon which table was cached (filled in by {@link #addHashCache(HTable, Scan, Set)})
* @throws SQLException
* @throws IllegalStateException if hashed table cannot be removed on any region server on which it was added
*/
private void removeServerCache(final byte[] cacheId, Set<HRegionLocation> servers) throws SQLException {
ConnectionQueryServices services = connection.getQueryServices();
Throwable lastThrowable = null;
TableRef cacheUsingTableRef = cacheUsingTableRefMap.get(Bytes.mapKey(cacheId));
byte[] tableName = cacheUsingTableRef.getTable().getPhysicalName().getBytes();
HTableInterface iterateOverTable = services.getTable(tableName);
try {
List<HRegionLocation> locations = services.getAllTableRegions(tableName);
Set<HRegionLocation> remainingOnServers = new HashSet<HRegionLocation>(servers);
/**
* Allow for the possibility that the region we based where to send our cache has split and been
* relocated to another region server *after* we sent it, but before we removed it. To accommodate
* this, we iterate through the current metadata boundaries and remove the cache once for each
* server that we originally sent to.
*/
if (LOG.isDebugEnabled()) {LOG.debug(addCustomAnnotations("Removing Cache " + cacheId + " from servers.", connection));}
for (HRegionLocation entry : locations) {
if (remainingOnServers.contains(entry)) { // Call once per server
try {
byte[] key = entry.getRegionInfo().getStartKey();
iterateOverTable.coprocessorService(ServerCachingService.class, key, key,
new Batch.Call<ServerCachingService, RemoveServerCacheResponse>() {
@Override
public RemoveServerCacheResponse call(ServerCachingService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<RemoveServerCacheResponse> rpcCallback =
new BlockingRpcCallback<RemoveServerCacheResponse>();
RemoveServerCacheRequest.Builder builder = RemoveServerCacheRequest.newBuilder();
if(connection.getTenantId() != null){
builder.setTenantId(HBaseZeroCopyByteString.wrap(connection.getTenantId().getBytes()));
}
builder.setCacheId(HBaseZeroCopyByteString.wrap(cacheId));
instance.removeServerCache(controller, builder.build(), rpcCallback);
if(controller.getFailedOn() != null) {
throw controller.getFailedOn();
}
return rpcCallback.get();
}
});
remainingOnServers.remove(entry);
} catch (Throwable t) {
lastThrowable = t;
LOG.error(addCustomAnnotations("Error trying to remove hash cache for " + entry, connection), t);
}
}
}
if (!remainingOnServers.isEmpty()) {
LOG.warn(addCustomAnnotations("Unable to remove hash cache for " + remainingOnServers, connection), lastThrowable);
}
} finally {
Closeables.closeQuietly(iterateOverTable);
}
}
private ScanResponse scan(ScanRequest req) throws IOException {
BlockingRpcCallback<ScanResponse> callback = new BlockingRpcCallback<>();
STUB.scan(new HBaseRpcControllerImpl(), req, callback);
return callback.get();
}
private ScanResponse scan(HBaseRpcController hrc, ScanRequest req) throws IOException {
BlockingRpcCallback<ScanResponse> callback = new BlockingRpcCallback<>();
STUB.scan(hrc, req, callback);
return callback.get();
}
@Override
public String getVersion() {
String version = null;
if ((options == null) || !options.isServerSideLibraryEnabled()) {
LOGGER.warn("Serverside library not enabled, serverside version is irrelevant");
return null;
}
try {
// use Index as the type to check for version (for hbase type
// doesn't matter anyways)
final MetadataType type = MetadataType.INDEX;
final String tableName = getMetadataTableName(type);
if (!indexExists(tableName)) {
createTable(
new byte[0][],
HBaseOperations.METADATA_CFS_VERSIONING,
StringColumnFamilyFactory.getSingletonInstance(),
getTableName(getQualifiedTableName(tableName)));
}
// Use the row count coprocessor
if (options.isVerifyCoprocessors()) {
verifyCoprocessor(
tableName,
"org.locationtech.geowave.datastore.hbase.coprocessors.VersionEndpoint",
options.getCoprocessorJar());
}
final Table table = getTable(tableName);
final Map<byte[], List<String>> versionInfoResponse =
table.coprocessorService(
VersionProtosClient.VersionService.class,
null,
null,
new Batch.Call<VersionProtosClient.VersionService, List<String>>() {
@Override
public List<String> call(final VersionProtosClient.VersionService versionService)
throws IOException {
final BlockingRpcCallback<VersionProtosClient.VersionResponse> rpcCallback =
new BlockingRpcCallback<>();
versionService.version(null, VersionRequest.getDefaultInstance(), rpcCallback);
final VersionProtosClient.VersionResponse response = rpcCallback.get();
return response.getVersionInfoList();
}
});
table.close();
if ((versionInfoResponse == null) || versionInfoResponse.isEmpty()) {
LOGGER.error("No response from version coprocessor");
} else {
final Iterator<List<String>> values = versionInfoResponse.values().iterator();
final List<String> value = values.next();
while (values.hasNext()) {
final List<String> newValue = values.next();
if (!value.equals(newValue)) {
LOGGER.error(
"Version Info '"
+ Arrays.toString(value.toArray())
+ "' and '"
+ Arrays.toString(newValue.toArray())
+ "' differ. This may mean that different regions are using different versions of GeoWave.");
}
}
version = VersionUtils.asLineDelimitedString(value);
}
} catch (final Throwable e) {
LOGGER.warn("Unable to check metadata table for version", e);
}
return version;
}