下面列出了怎么用org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel的API类实例代码及写法,或者点击链接到github查看源代码。
public void commitStats(List<Mutation> mutations) throws IOException {
if (mutations.size() > 0) {
byte[] row = mutations.get(0).getRow();
MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder();
for (Mutation m : mutations) {
mrmBuilder.addMutationRequest(ProtobufUtil.toMutation(getMutationType(m), m));
}
MutateRowsRequest mrm = mrmBuilder.build();
CoprocessorRpcChannel channel = statsWriterTable.coprocessorService(row);
MultiRowMutationService.BlockingInterface service =
MultiRowMutationService.newBlockingStub(channel);
try {
service.mutateRows(null, mrm);
} catch (ServiceException ex) {
ProtobufUtil.toIOException(ex);
}
}
}
@Test
public void testDoubleScan() throws Throwable {
prepareTestData();
CoprocessorRpcChannel channel = table.coprocessorService(ROW);
RowProcessorEndpoint.FriendsOfFriendsProcessor processor =
new RowProcessorEndpoint.FriendsOfFriendsProcessor(ROW, A);
RowProcessorService.BlockingInterface service =
RowProcessorService.newBlockingStub(channel);
ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor);
ProcessResponse protoResult = service.process(null, request);
FriendsOfFriendsProcessorResponse response =
FriendsOfFriendsProcessorResponse.parseFrom(protoResult.getRowProcessorResult());
Set<String> result = new HashSet<>();
result.addAll(response.getResultList());
Set<String> expected = new HashSet<>(Arrays.asList(new String[]{"d", "e", "f", "g"}));
Get get = new Get(ROW);
LOG.debug("row keyvalues:" + stringifyKvs(table.get(get).listCells()));
assertEquals(expected, result);
}
@Test
public void testTimeout() throws Throwable {
prepareTestData();
CoprocessorRpcChannel channel = table.coprocessorService(ROW);
RowProcessorEndpoint.TimeoutProcessor processor =
new RowProcessorEndpoint.TimeoutProcessor(ROW);
RowProcessorService.BlockingInterface service =
RowProcessorService.newBlockingStub(channel);
ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor);
boolean exceptionCaught = false;
try {
service.process(null, request);
} catch (Exception e) {
exceptionCaught = true;
}
assertTrue(exceptionCaught);
}
@Test
public void testCoprocessorError() throws Exception {
Configuration configuration = new Configuration(util.getConfiguration());
// Make it not retry forever
configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
Table table = util.getConnection().getTable(TEST_TABLE);
try {
CoprocessorRpcChannel protocol = table.coprocessorService(ROWS[0]);
TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service =
TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(protocol);
service.error(null, TestProtos.EmptyRequestProto.getDefaultInstance());
fail("Should have thrown an exception");
} catch (ServiceException e) {
} finally {
table.close();
}
}
@Test
public void test() throws Exception {
try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
Table table = conn.getTable(TableName.META_TABLE_NAME)) {
CoprocessorRpcChannel rpcChannel = table.coprocessorService(HConstants.EMPTY_START_ROW);
AuthenticationProtos.AuthenticationService.BlockingInterface service =
AuthenticationProtos.AuthenticationService.newBlockingStub(rpcChannel);
WhoAmIResponse response = service.whoAmI(null, WhoAmIRequest.getDefaultInstance());
assertEquals(USERNAME, response.getUsername());
assertEquals(AuthenticationMethod.TOKEN.name(), response.getAuthMethod());
try {
service.getAuthenticationToken(null, GetAuthenticationTokenRequest.getDefaultInstance());
} catch (ServiceException e) {
IOException ioe = ProtobufUtil.getRemoteException(e);
assertThat(ioe, instanceOf(AccessDeniedException.class));
assertThat(ioe.getMessage(),
containsString("Token generation only allowed for Kerberos authenticated clients"));
}
}
}
/**
* Obtain and return an authentication token for the current user.
* @param conn The HBase cluster connection
* @throws IOException if a remote error or serialization problem occurs.
* @return the authentication token instance
*/
@InterfaceAudience.Private
static Token<AuthenticationTokenIdentifier> obtainToken(
Connection conn) throws IOException {
Table meta = null;
try {
injectFault();
meta = conn.getTable(TableName.META_TABLE_NAME);
CoprocessorRpcChannel rpcChannel = meta.coprocessorService(
HConstants.EMPTY_START_ROW);
AuthenticationProtos.AuthenticationService.BlockingInterface service =
AuthenticationProtos.AuthenticationService.newBlockingStub(rpcChannel);
AuthenticationProtos.GetAuthenticationTokenResponse response =
service.getAuthenticationToken(null,
AuthenticationProtos.GetAuthenticationTokenRequest.getDefaultInstance());
return toToken(response.getToken());
} catch (ServiceException se) {
throw ProtobufUtil.handleRemoteException(se);
} finally {
if (meta != null) {
meta.close();
}
}
}
public void commitStats(final List<Mutation> mutations, final StatisticsCollector statsCollector)
throws IOException {
User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
commitLastStatsUpdatedTime(statsCollector);
if (mutations.size() > 0) {
byte[] row = mutations.get(0).getRow();
MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder();
for (Mutation m : mutations) {
mrmBuilder.addMutationRequest(ProtobufUtil.toMutation(getMutationType(m), m));
}
MutateRowsRequest mrm = mrmBuilder.build();
CoprocessorRpcChannel channel = statsWriterTable.coprocessorService(row);
MultiRowMutationService.BlockingInterface service = MultiRowMutationService
.newBlockingStub(channel);
try {
service.mutateRows(null, mrm);
} catch (ServiceException ex) {
ProtobufUtil.toIOException(ex);
}
}
return null;
}
});
}
@Override
public Void call() throws Exception{
SConfiguration configuration = HConfiguration.getConfiguration();
Connection conn = HBaseConnectionFactory.getInstance(configuration).getConnection();
Admin admin = conn.getAdmin();
CoprocessorRpcChannel channel = admin.coprocessorService(serverName);
SpliceRSRpcServices.BlockingInterface service = SpliceRSRpcServices.newBlockingStub(channel);
SpliceMessage.GetWALPositionsRequest.Builder builder = SpliceMessage.GetWALPositionsRequest.newBuilder();
SpliceMessage.GetWALPositionsResponse response = service.getWALPositions(null, builder.build());
List<SpliceMessage.GetWALPositionsResponse.Result> resultList = response.getResultList();
SortedMap<String, Long> serverSnapshot = new TreeMap<>();
for (SpliceMessage.GetWALPositionsResponse.Result result : resultList) {
serverSnapshot.put(result.getWALName(), result.getPosition());
}
map.put(serverName.getServerName(), serverSnapshot);
return null;
}
private int incrementCounter(Table table) throws Throwable {
CoprocessorRpcChannel channel = table.coprocessorService(ROW);
RowProcessorEndpoint.IncrementCounterProcessor processor =
new RowProcessorEndpoint.IncrementCounterProcessor(ROW);
RowProcessorService.BlockingInterface service =
RowProcessorService.newBlockingStub(channel);
ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor);
ProcessResponse protoResult = service.process(null, request);
IncCounterProcessorResponse response = IncCounterProcessorResponse
.parseFrom(protoResult.getRowProcessorResult());
Integer result = response.getResponse();
return result;
}
private void swapRows(Table table) throws Throwable {
CoprocessorRpcChannel channel = table.coprocessorService(ROW);
RowProcessorEndpoint.RowSwapProcessor processor =
new RowProcessorEndpoint.RowSwapProcessor(ROW, ROW2);
RowProcessorService.BlockingInterface service =
RowProcessorService.newBlockingStub(channel);
ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor);
service.process(null, request);
}
@Test
public void testMultiRowMutation() throws Exception {
LOG.info("Starting testMultiRowMutation");
final TableName tableName = name.getTableName();
final byte [] ROW1 = Bytes.toBytes("testRow1");
try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
Put p = new Put(ROW);
p.addColumn(FAMILY, QUALIFIER, VALUE);
MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, p);
p = new Put(ROW1);
p.addColumn(FAMILY, QUALIFIER, VALUE);
MutationProto m2 = ProtobufUtil.toMutation(MutationType.PUT, p);
MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder();
mrmBuilder.addMutationRequest(m1);
mrmBuilder.addMutationRequest(m2);
MutateRowsRequest mrm = mrmBuilder.build();
CoprocessorRpcChannel channel = t.coprocessorService(ROW);
MultiRowMutationService.BlockingInterface service =
MultiRowMutationService.newBlockingStub(channel);
service.mutateRows(null, mrm);
Get g = new Get(ROW);
Result r = t.get(g);
assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIER)));
g = new Get(ROW1);
r = t.get(g);
assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIER)));
}
}
private static BlockingInterface getAccessControlServiceStub(Table ht)
throws IOException {
CoprocessorRpcChannel service = ht.coprocessorService(HConstants.EMPTY_START_ROW);
BlockingInterface protocol =
AccessControlProtos.AccessControlService.newBlockingStub(service);
return protocol;
}
@Override
protected TxnMessage.TxnLifecycleService getLifecycleService(byte[] rowKey) throws IOException{
TxnMessage.TxnLifecycleService service;
CoprocessorRpcChannel coprocessorRpcChannel = channelFactory.newRetryableChannel(table.getName(), rowKey);
try{
service=ProtobufUtil.newServiceStub(TxnMessage.TxnLifecycleService.class,coprocessorRpcChannel);
}catch(Exception e){
throw new IOException(e);
}
return service;
}
@Override
public Long call() throws Exception{
SConfiguration configuration = HConfiguration.getConfiguration();
Connection conn = HBaseConnectionFactory.getInstance(configuration).getConnection();
Admin admin = conn.getAdmin();
CoprocessorRpcChannel channel = admin.coprocessorService(serverName);
SpliceRSRpcServices.BlockingInterface service = SpliceRSRpcServices.newBlockingStub(channel);
SpliceMessage.SpliceOldestActiveTransactionRequest request = SpliceMessage.SpliceOldestActiveTransactionRequest.getDefaultInstance();
SpliceMessage.SpliceOldestActiveTransactionResponse response = service.getOldestActiveTransaction(null, request);
return response.getOldestActiveTransaction();
}
public BulkWritesResult invoke(BulkWrites write) throws IOException {
TableName tableName=tableInfoFactory.getTableInfo(this.tableName);
CoprocessorRpcChannel channel = channelFactory.newChannel(tableName,write.getRegionKey());
boolean cacheCheck = false;
try {
SpliceMessage.SpliceIndexService service = ProtobufUtil.newServiceStub(SpliceMessage.SpliceIndexService.class, channel);
SpliceMessage.BulkWriteRequest.Builder builder = SpliceMessage.BulkWriteRequest.newBuilder();
byte[] requestBytes = compressor.compress(write);
builder.setBytes(ZeroCopyLiteralByteString.wrap(requestBytes));
SpliceMessage.BulkWriteRequest bwr = builder.build();
BlockingRpcCallback<SpliceMessage.BulkWriteResponse> doneCallback =new BlockingRpcCallback<>();
ServerRpcController controller = new ServerRpcController();
service.bulkWrite(controller, bwr, doneCallback);
if (controller.failed()){
IOException error=controller.getFailedOn();
clearCacheIfNeeded(error);
cacheCheck=true;
if(error!=null)
throw pef.processRemoteException(error);
else
throw pef.fromErrorString(controller.errorText());
}
SpliceMessage.BulkWriteResponse bulkWriteResponse = doneCallback.get();
byte[] bytes = bulkWriteResponse.getBytes().toByteArray();
if(bytes==null || bytes.length<=0){
Logger logger=Logger.getLogger(BulkWriteChannelInvoker.class);
logger.error("zero-length bytes returned with a null error for encodedString: "+write.getBulkWrites().iterator().next().getEncodedStringName());
}
return compressor.decompress(bytes,BulkWritesResult.class);
} catch (Exception e) {
if (!cacheCheck) clearCacheIfNeeded(e);
throw pef.processRemoteException(e);
}
}
private static boolean setBlock(final boolean onOff, CallType type) throws Throwable {
org.apache.hadoop.hbase.client.Connection hbaseConnection = HBaseConnectionFactory.getInstance(HConfiguration.getConfiguration()).getConnection();
Admin admin = hbaseConnection.getAdmin();
ServerRpcController controller = new ServerRpcController();
SpliceMessage.BlockingProbeRequest message = SpliceMessage.BlockingProbeRequest.newBuilder().setDoBlock(onOff).build();
final AtomicBoolean success = new AtomicBoolean(true);
Collection<ServerName> servers = admin.getClusterStatus().getServers();
final CountDownLatch latch = new CountDownLatch(servers.size());
for (ServerName server : servers) {
CoprocessorRpcChannel channel = admin.coprocessorService(server);
SpliceMessage.BlockingProbeEndpoint.Stub service = SpliceMessage.BlockingProbeEndpoint.newStub(channel);
RpcCallback<SpliceMessage.BlockingProbeResponse> callback = new RpcCallback<SpliceMessage.BlockingProbeResponse>() {
@Override
public void run(SpliceMessage.BlockingProbeResponse response) {
if (response.getDidBlock() != onOff) {
success.set(false);
}
latch.countDown();
}
};
switch (type) {
case POST_COMPACT: service.blockPostCompact(controller, message, callback); break;
case PRE_COMPACT: service.blockPreCompact(controller, message, callback); break;
case POST_FLUSH: service.blockPostFlush(controller, message, callback); break;
case PRE_FLUSH: service.blockPreFlush(controller, message, callback); break;
case POST_SPLIT: service.blockPostSplit(controller, message, callback); break;
case PRE_SPLIT: service.blockPreSplit(controller, message, callback); break;
}
}
if (!latch.await(10000, TimeUnit.SECONDS)){
return false;
}
return success.get();
}
@Override
public CoprocessorRpcChannel coprocessorService(byte[] row) {
throw new NotImplementedException();
}
@Override
public CoprocessorRpcChannel coprocessorService(byte[] row) {
return hTable.coprocessorService(row);
}
@Override
public CoprocessorRpcChannel coprocessorService(byte[] row) {
return hTable.coprocessorService(row);
}
@Override
public CoprocessorRpcChannel coprocessorService(byte[] row) {
return hTable.coprocessorService(row);
}
@Override
public CoprocessorRpcChannel coprocessorService(byte[] row) {
return hTable.coprocessorService(row);
}
@Override
public CoprocessorRpcChannel coprocessorService(byte[] row) {
return hTable.coprocessorService(row);
}
@Override
public CoprocessorRpcChannel coprocessorService(byte[] row) {
return hTable.coprocessorService(row);
}
@Override
public CoprocessorRpcChannel coprocessorService(byte[] row) {
return hTable.coprocessorService(row);
}
@Override
public CoprocessorRpcChannel coprocessorService(byte[] row) {
return hTable.coprocessorService(row);
}
/**
* {@inheritDoc}
*/
@Override
public CoprocessorRpcChannel coprocessorService(byte[] row) {
throw new RuntimeException(this.getClass() + " does NOT implement this method.");
}
/**
* {@inheritDoc}
*/
@Override
public CoprocessorRpcChannel coprocessorService(byte[] var1) {
throw new RuntimeException(this.getClass() + " does NOT implement this method.");
}
@Override
public CoprocessorRpcChannel coprocessorService(byte[] bytes) {
throw new UnsupportedOperationException();
}
@Override
public CoprocessorRpcChannel coprocessorService(byte[] row) {
throw new NotImplementedException();
}
@Override
public CoprocessorRpcChannel coprocessorService(byte[] row) {
throw new NotImplementedException("coprocessorService not supported in ThriftTable");
}