下面列出了怎么用org.apache.hadoop.hbase.ipc.HBaseRpcController的API类实例代码及写法,或者点击链接到github查看源代码。
private <RESP> CompletableFuture<RESP> call(RpcCall<RESP> rpcCall, CellScanner cellScanner) {
CompletableFuture<RESP> future = new CompletableFuture<>();
HBaseRpcController controller = conn.rpcControllerFactory.newController(cellScanner);
try {
rpcCall.call(conn.getAdminStub(server), controller, new RpcCallback<RESP>() {
@Override
public void run(RESP resp) {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
} else {
future.complete(resp);
}
}
});
} catch (IOException e) {
future.completeExceptionally(e);
}
return future;
}
/**
* Replicate WAL entries on the region server.
* @param controller the RPC controller
* @param request the request
*/
@Override
@QosPriority(priority=HConstants.REPLICATION_QOS)
public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
final ReplicateWALEntryRequest request) throws ServiceException {
try {
checkOpen();
if (regionServer.getReplicationSinkService() != null) {
requestCount.increment();
List<WALEntry> entries = request.getEntryList();
checkShouldRejectReplicationRequest(entries);
CellScanner cellScanner = ((HBaseRpcController) controller).cellScanner();
regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries();
regionServer.getReplicationSinkService().replicateLogEntries(entries, cellScanner,
request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(),
request.getSourceHFileArchiveDirPath());
regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries();
return ReplicateWALEntryResponse.newBuilder().build();
} else {
throw new ServiceException("Replication services are not initialized yet");
}
} catch (IOException ie) {
throw new ServiceException(ie);
}
}
private long getTimeLimit(HBaseRpcController controller, boolean allowHeartbeatMessages) {
// Set the time limit to be half of the more restrictive timeout value (one of the
// timeout values must be positive). In the event that both values are positive, the
// more restrictive of the two is used to calculate the limit.
if (allowHeartbeatMessages && (scannerLeaseTimeoutPeriod > 0 || rpcTimeout > 0)) {
long timeLimitDelta;
if (scannerLeaseTimeoutPeriod > 0 && rpcTimeout > 0) {
timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, rpcTimeout);
} else {
timeLimitDelta = scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout;
}
if (controller != null && controller.getCallTimeout() > 0) {
timeLimitDelta = Math.min(timeLimitDelta, controller.getCallTimeout());
}
// Use half of whichever timeout value was more restrictive... But don't allow
// the time limit to be less than the allowable minimum (could cause an
// immediatate timeout before scanning any data).
timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta);
// XXX: Can not use EnvironmentEdge here because TestIncrementTimeRange use a
// ManualEnvironmentEdge. Consider using System.nanoTime instead.
return System.currentTimeMillis() + timeLimitDelta;
}
// Default value of timeLimit is negative to indicate no timeLimit should be
// enforced.
return -1L;
}
/**
* Verifies the cluster ID from all running masters.
*/
@Test public void TestClusterID() throws Exception {
HBaseRpcController rpcController = getRpcController();
String clusterID = TEST_UTIL.getMiniHBaseCluster().getMaster().getClusterId();
int rpcCount = 0;
for (JVMClusterUtil.MasterThread masterThread:
TEST_UTIL.getMiniHBaseCluster().getMasterThreads()) {
ClientMetaService.BlockingInterface stub =
getMasterStub(masterThread.getMaster().getServerName());
GetClusterIdResponse resp =
stub.getClusterId(rpcController, GetClusterIdRequest.getDefaultInstance());
assertEquals(clusterID, resp.getClusterId());
rpcCount++;
}
assertEquals(MASTER_COUNT, rpcCount);
}
/**
* Verifies the active master ServerName as seen by all masters.
*/
@Test public void TestActiveMaster() throws Exception {
HBaseRpcController rpcController = getRpcController();
ServerName activeMaster = TEST_UTIL.getMiniHBaseCluster().getMaster().getServerName();
int rpcCount = 0;
for (JVMClusterUtil.MasterThread masterThread:
TEST_UTIL.getMiniHBaseCluster().getMasterThreads()) {
ClientMetaService.BlockingInterface stub =
getMasterStub(masterThread.getMaster().getServerName());
GetActiveMasterResponse resp =
stub.getActiveMaster(rpcController, GetActiveMasterRequest.getDefaultInstance());
assertEquals(activeMaster, ProtobufUtil.toServerName(resp.getServerName()));
rpcCount++;
}
assertEquals(MASTER_COUNT, rpcCount);
}
/**
* Verifies that the meta region locations RPC returns consistent results across all masters.
*/
@Test public void TestMetaLocations() throws Exception {
HBaseRpcController rpcController = getRpcController();
List<HRegionLocation> metaLocations = TEST_UTIL.getMiniHBaseCluster().getMaster()
.getMetaRegionLocationCache().getMetaRegionLocations().get();
Collections.sort(metaLocations);
int rpcCount = 0;
for (JVMClusterUtil.MasterThread masterThread:
TEST_UTIL.getMiniHBaseCluster().getMasterThreads()) {
ClientMetaService.BlockingInterface stub =
getMasterStub(masterThread.getMaster().getServerName());
GetMetaRegionLocationsResponse resp = stub.getMetaRegionLocations(
rpcController, GetMetaRegionLocationsRequest.getDefaultInstance());
List<HRegionLocation> result = new ArrayList<>();
resp.getMetaLocationsList().forEach(
location -> result.add(ProtobufUtil.toRegionLocation(location)));
Collections.sort(result);
assertEquals(metaLocations, result);
rpcCount++;
}
assertEquals(MASTER_COUNT, rpcCount);
}
private CompletableFuture<OpenScannerResponse> callOpenScanner(HBaseRpcController controller,
HRegionLocation loc, ClientService.Interface stub) {
boolean isRegionServerRemote = isRemote(loc.getHostname());
incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
if (openScannerTries.getAndIncrement() > 1) {
incRPCRetriesMetrics(scanMetrics, isRegionServerRemote);
}
CompletableFuture<OpenScannerResponse> future = new CompletableFuture<>();
try {
ScanRequest request = RequestConverter.buildScanRequest(loc.getRegion().getRegionName(), scan,
scan.getCaching(), false);
stub.scan(controller, request, resp -> {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
return;
}
future.complete(new OpenScannerResponse(loc, isRegionServerRemote, stub, controller, resp));
});
} catch (IOException e) {
future.completeExceptionally(e);
}
return future;
}
private CompletableFuture<Message> rpcCall(MethodDescriptor method, Message request,
Message responsePrototype, HBaseRpcController controller, ClientService.Interface stub) {
CompletableFuture<Message> future = new CompletableFuture<>();
CoprocessorServiceRequest csr =
CoprocessorRpcUtils.getCoprocessorServiceRequest(method, request);
stub.execRegionServerService(
controller,
csr,
new org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback<CoprocessorServiceResponse>() {
@Override
public void run(CoprocessorServiceResponse resp) {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
} else {
try {
future.complete(CoprocessorRpcUtils.getResponse(resp, responsePrototype));
} catch (IOException e) {
future.completeExceptionally(e);
}
}
}
});
return future;
}
private CompletableFuture<Message> rpcCall(MethodDescriptor method, Message request,
Message responsePrototype, HBaseRpcController controller, MasterService.Interface stub) {
CompletableFuture<Message> future = new CompletableFuture<>();
CoprocessorServiceRequest csr =
CoprocessorRpcUtils.getCoprocessorServiceRequest(method, request);
stub.execMasterService(
controller,
csr,
new org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback<CoprocessorServiceResponse>() {
@Override
public void run(CoprocessorServiceResponse resp) {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
} else {
try {
future.complete(CoprocessorRpcUtils.getResponse(resp, responsePrototype));
} catch (IOException e) {
future.completeExceptionally(e);
}
}
}
});
return future;
}
private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller,
MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall,
Converter<RESP, PRESP> respConverter) {
CompletableFuture<RESP> future = new CompletableFuture<>();
rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
@Override
public void run(PRESP resp) {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
} else {
try {
future.complete(respConverter.convert(resp));
} catch (IOException e) {
future.completeExceptionally(e);
}
}
}
});
return future;
}
private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller,
AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall,
Converter<RESP, PRESP> respConverter) {
CompletableFuture<RESP> future = new CompletableFuture<>();
rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
@Override
public void run(PRESP resp) {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
} else {
try {
future.complete(respConverter.convert(resp));
} catch (IOException e) {
future.completeExceptionally(e);
}
}
}
});
return future;
}
@Override
public void callMethod(MethodDescriptor method, RpcController controller, Message request,
Message responsePrototype, RpcCallback<Message> done) {
// simulate the asynchronous behavior otherwise all logic will perform in the same thread...
EXECUTOR.execute(() -> {
int index = CALLED.getAndIncrement();
if (index == BAD_RESP_INDEX) {
done.run(GetClusterIdResponse.getDefaultInstance());
} else if (GOOD_RESP_INDEXS.contains(index)) {
done.run(RESP);
} else {
((HBaseRpcController) controller).setFailed("inject error");
done.run(null);
}
});
}
private void addResult(final MutateResponse.Builder builder, final Result result,
final HBaseRpcController rpcc, boolean clientCellBlockSupported) {
if (result == null) return;
if (clientCellBlockSupported) {
builder.setResult(ProtobufUtil.toResultNoData(result));
rpcc.setCellScanner(result.cellScanner());
} else {
ClientProtos.Result pbr = ProtobufUtil.toResult(result);
builder.setResult(pbr);
}
}
@Override
public void callMethod(MethodDescriptor md, RpcController controller, Message param,
Message returnType, RpcCallback<Message> done) {
invokations.getAndIncrement();
if (ThreadLocalRandom.current().nextFloat() < CHANCE_OF_TIMEOUT) {
// throw a ServiceException, because that is the only exception type that
// {@link ProtobufRpcEngine} throws. If this RpcEngine is used with a different
// "actual" type, this may not properly mimic the underlying RpcEngine.
((HBaseRpcController) controller).setFailed(new SocketTimeoutException("fake timeout"));
done.run(null);
return;
}
super.callMethod(md, controller, param, returnType, done);
}
@Override
public ScanResponse scan(RpcController controller, ScanRequest request)
throws ServiceException {
ScanResponse.Builder builder = ScanResponse.newBuilder();
try {
if (request.hasScan()) {
byte[] regionName = request.getRegion().getValue().toByteArray();
builder.setScannerId(openScanner(regionName, null));
builder.setMoreResults(true);
}
else {
long scannerId = request.getScannerId();
Result result = next(scannerId);
if (result != null) {
builder.addCellsPerResult(result.size());
List<CellScannable> results = new ArrayList<>(1);
results.add(result);
((HBaseRpcController) controller).setCellScanner(CellUtil
.createCellScanner(results));
builder.setMoreResults(true);
}
else {
builder.setMoreResults(false);
close(scannerId);
}
}
} catch (IOException ie) {
throw new ServiceException(ie);
}
return builder.build();
}
/**
* Sends a multi request with a certain amount of rows, will populate Multi command with either
* "rows" number of RegionActions with one Action each or one RegionAction with "rows" number of
* Actions
*/
private void sendMultiRequest(int rows, ActionType actionType)
throws ServiceException, IOException {
RpcController rpcc = Mockito.mock(HBaseRpcController.class);
MultiRequest.Builder builder = MultiRequest.newBuilder();
int numRAs = 1;
int numAs = 1;
switch (actionType) {
case REGION_ACTIONS:
numRAs = rows;
break;
case ACTIONS:
numAs = rows;
break;
}
for (int i = 0; i < numRAs; i++) {
RegionAction.Builder rab = RegionAction.newBuilder();
rab.setRegion(RequestConverter.buildRegionSpecifier(
HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME,
Bytes.toBytes("someStuff" + i)));
for (int j = 0; j < numAs; j++) {
Action.Builder ab = Action.newBuilder();
rab.addAction(ab.build());
}
builder.addRegionAction(rab.build());
}
LD = Mockito.mock(RSRpcServices.LogDelegate.class);
SERVICES = new RSRpcServices(RS, LD);
SERVICES.multi(rpcc, builder.build());
}
private static <REQ, RESP> CompletableFuture<RESP> mutate(HBaseRpcController controller,
HRegionLocation loc, ClientService.Interface stub, REQ req,
Converter<MutateRequest, byte[], REQ> reqConvert,
Converter<RESP, HBaseRpcController, MutateResponse> respConverter) {
return ConnectionUtils.call(controller, loc, stub, req, reqConvert,
(s, c, r, done) -> s.mutate(c, r, done), respConverter);
}
private static <REQ> CompletableFuture<Void> voidMutate(HBaseRpcController controller,
HRegionLocation loc, ClientService.Interface stub, REQ req,
Converter<MutateRequest, byte[], REQ> reqConvert) {
return mutate(controller, loc, stub, req, reqConvert, (c, resp) -> {
return null;
});
}
private static Result toResult(HBaseRpcController controller, MutateResponse resp)
throws IOException {
if (!resp.hasResult()) {
return null;
}
return ProtobufUtil.toResult(resp.getResult(), controller.cellScanner());
}
private <REQ, RESP> CompletableFuture<RESP> noncedMutate(long nonceGroup, long nonce,
HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req,
NoncedConverter<MutateRequest, byte[], REQ> reqConvert,
Converter<RESP, HBaseRpcController, MutateResponse> respConverter) {
return mutate(controller, loc, stub, req,
(info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter);
}
public OpenScannerResponse(HRegionLocation loc, boolean isRegionServerRemote, Interface stub,
HBaseRpcController controller, ScanResponse resp) {
this.loc = loc;
this.isRegionServerRemote = isRegionServerRemote;
this.stub = stub;
this.controller = controller;
this.resp = resp;
}
private CompletableFuture<Message> rpcCall(MethodDescriptor method, Message request,
Message responsePrototype, HBaseRpcController controller, HRegionLocation loc,
ClientService.Interface stub) {
CompletableFuture<Message> future = new CompletableFuture<>();
if (region != null && !Bytes.equals(loc.getRegion().getRegionName(), region.getRegionName())) {
future.completeExceptionally(new DoNotRetryIOException("Region name is changed, expected " +
region.getRegionNameAsString() + ", actual " + loc.getRegion().getRegionNameAsString()));
return future;
}
CoprocessorServiceRequest csr = CoprocessorRpcUtils.getCoprocessorServiceRequest(method,
request, row, loc.getRegion().getRegionName());
stub.execService(controller, csr,
new org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback<CoprocessorServiceResponse>() {
@Override
public void run(CoprocessorServiceResponse resp) {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
} else {
lastRegion = resp.getRegion().getValue().toByteArray();
try {
future.complete(CoprocessorRpcUtils.getResponse(resp, responsePrototype));
} catch (IOException e) {
future.completeExceptionally(e);
}
}
}
});
return future;
}
private <T extends Message> CompletableFuture<T> call(ClientMetaService.Interface stub,
Callable<T> callable) {
HBaseRpcController controller = rpcControllerFactory.newController();
CompletableFuture<T> future = new CompletableFuture<>();
callable.call(controller, stub, resp -> {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
} else {
future.complete(resp);
}
});
return future;
}
static void resetController(HBaseRpcController controller, long timeoutNs, int priority) {
controller.reset();
if (timeoutNs >= 0) {
controller.setCallTimeout(
(int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(timeoutNs)));
}
controller.setPriority(priority);
}
private HBaseRpcController assertPriority(int priority) {
return argThat(new ArgumentMatcher<HBaseRpcController>() {
@Override
public boolean matches(HBaseRpcController controller) {
return controller.getPriority() == priority;
}
});
}
private HBaseRpcController assertPriority(int priority) {
return argThat(new ArgumentMatcher<HBaseRpcController>() {
@Override
public boolean matches(HBaseRpcController controller) {
return controller.getPriority() == priority;
}
});
}
/**
* This function checks if all regions of a table is online
* @param table
* @return true when all regions of a table are online
* @throws IOException
* @throws
*/
public static boolean tableRegionsOnline(Configuration conf, PTable table) {
try (ClusterConnection hcon =
(ClusterConnection) ConnectionFactory.createConnection(conf)) {
List<HRegionLocation> locations = hcon.locateRegions(
org.apache.hadoop.hbase.TableName.valueOf(table.getPhysicalName().getBytes()));
for (HRegionLocation loc : locations) {
try {
ServerName sn = loc.getServerName();
if (sn == null) continue;
AdminService.BlockingInterface admin = hcon.getAdmin(sn);
HBaseRpcController controller = hcon.getRpcControllerFactory().newController();
org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.getRegionInfo(controller,
admin, loc.getRegion().getRegionName());
} catch (RemoteException e) {
LOGGER.debug("Cannot get region " + loc.getRegion().getEncodedName() + " info due to error:" + e);
return false;
}
}
} catch (IOException ex) {
LOGGER.warn("tableRegionsOnline failed due to:", ex);
return false;
}
return true;
}
private void openRegionAndWait(ClusterConnection connection, ServerName server, RegionInfo region, long timeout) throws IOException, InterruptedException {
AdminProtos.AdminService.BlockingInterface rs = connection.getAdmin(server);
HBaseRpcController controller = connection.getRpcControllerFactory().newController();
try {
org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.openRegion(controller, rs, server, region);
} catch (IOException var10) {
LOG.warn("Exception when closing region: " + region.getRegionNameAsString(), var10);
}
for(long expiration = timeout + System.currentTimeMillis(); System.currentTimeMillis() < expiration; Thread.sleep(1000L)) {
controller.reset();
try {
RegionInfo regionInfo = org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.getRegionInfo(controller, rs, region.getRegionName());
if(regionInfo != null) {
return;
}
} catch (IOException e) {
if(e instanceof NotServingRegionException ||
e instanceof RegionOpeningException) {
if (LOG.isDebugEnabled()) {
SpliceLogUtils.debug(LOG, "waiting for region %s to be opened", region.getRegionNameAsString());
}
}
LOG.warn("Exception when retrieving regioninfo from: " + region.getRegionNameAsString(), e);
}
}
throw new IOException("Region " + region + " failed to close within" + " timeout " + timeout);
}
public InterRegionServerRpcController(HBaseRpcController delegate, Configuration conf) {
super(delegate);
// Set priority higher that normal, but lower than high
this.priority = (HConstants.HIGH_QOS + HConstants.NORMAL_QOS) / 2;
}
@Override
public HBaseRpcController newController() {
HBaseRpcController delegate = super.newController();
return getController(delegate);
}