下面列出了怎么用org.apache.hadoop.hbase.ipc.FifoRpcScheduler的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Sets up a RPC Server and a Client. Does a RPC checks the result. If an exception is thrown from
* the stub, this function will throw root cause of that exception.
*/
private void callRpcService(User clientUser) throws Exception {
SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class);
Mockito.when(securityInfoMock.getServerPrincipal())
.thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL);
SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock);
InetSocketAddress isa = new InetSocketAddress(HOST, 0);
RpcServerInterface rpcServer = RpcServerFactory.createRpcServer(null, "AbstractTestSecureIPC",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface((BlockingService) SERVICE, null)), isa,
serverConf, new FifoRpcScheduler(serverConf, 1));
rpcServer.start();
try (RpcClient rpcClient = RpcClientFactory.createClient(clientConf,
HConstants.DEFAULT_CLUSTER_ID.toString())) {
BlockingInterface stub = newBlockingStub(rpcClient, rpcServer.getListenerAddress(),
clientUser);
TestThread th1 = new TestThread(stub);
final Throwable exception[] = new Throwable[1];
Collections.synchronizedList(new ArrayList<Throwable>());
Thread.UncaughtExceptionHandler exceptionHandler = new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread th, Throwable ex) {
exception[0] = ex;
}
};
th1.setUncaughtExceptionHandler(exceptionHandler);
th1.start();
th1.join();
if (exception[0] != null) {
// throw root cause.
while (exception[0].getCause() != null) {
exception[0] = exception[0].getCause();
}
throw (Exception) exception[0];
}
} finally {
rpcServer.stop();
}
}
@Override
public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
return new FifoRpcScheduler(conf, handlerCount);
}
@Test
public void testFifo() {
RpcSchedulerFactory factory = new FifoRpcSchedulerFactory();
RpcScheduler rpcScheduler = factory.create(this.conf, null, null);
assertTrue(rpcScheduler.getClass().equals(FifoRpcScheduler.class));
}
/**
* @param subscriptionTimestamp timestamp of when the index subscription became active (or more accurately, not
* inactive)
* @param listener listeners that will process the events
* @param threadCnt number of worker threads that will handle incoming SEP events
* @param hostName hostname to bind to
* @param payloadExtractor extracts payloads to include in SepEvents
*/
public SepConsumer(String subscriptionId, long subscriptionTimestamp, EventListener listener, int threadCnt,
String hostName, ZooKeeperItf zk, Configuration hbaseConf, PayloadExtractor payloadExtractor) throws IOException, InterruptedException {
Preconditions.checkArgument(threadCnt > 0, "Thread count must be > 0");
this.subscriptionId = SepModelImpl.toInternalSubscriptionName(subscriptionId);
this.subscriptionTimestamp = subscriptionTimestamp;
this.listener = listener;
this.zk = zk;
this.hbaseConf = hbaseConf;
this.sepMetrics = new SepMetrics(subscriptionId);
this.payloadExtractor = payloadExtractor;
this.executors = Lists.newArrayListWithCapacity(threadCnt);
InetSocketAddress initialIsa = new InetSocketAddress(hostName, 0);
if (initialIsa.getAddress() == null) {
throw new IllegalArgumentException("Failed resolve of " + initialIsa);
}
String name = "regionserver/" + initialIsa.toString();
this.rpcServer = new RpcServer(this, name, getServices(),
/*HBaseRPCErrorHandler.class, OnlineRegions.class},*/
initialIsa, // BindAddress is IP we got for this server.
//hbaseConf.getInt("hbase.regionserver.handler.count", 10),
//hbaseConf.getInt("hbase.regionserver.metahandler.count", 10),
hbaseConf,
new FifoRpcScheduler(hbaseConf, hbaseConf.getInt("hbase.regionserver.handler.count", 10)));
/*
new SimpleRpcScheduler(
hbaseConf,
hbaseConf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT),
hbaseConf.getInt("hbase.regionserver.metahandler.count", 10),
hbaseConf.getInt("hbase.regionserver.handler.count", 10),
this,
HConstants.QOS_THRESHOLD)
);
*/
this.serverName = ServerName.valueOf(hostName, rpcServer.getListenerAddress().getPort(), System.currentTimeMillis());
this.zkWatcher = new ZooKeeperWatcher(hbaseConf, this.serverName.toString(), null);
// login the zookeeper client principal (if using security)
ZKUtil.loginClient(hbaseConf, "hbase.zookeeper.client.keytab.file",
"hbase.zookeeper.client.kerberos.principal", hostName);
// login the server principal (if using secure Hadoop)
User.login(hbaseConf, "hbase.regionserver.keytab.file",
"hbase.regionserver.kerberos.principal", hostName);
for (int i = 0; i < threadCnt; i++) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(100));
executor.setRejectedExecutionHandler(new WaitPolicy());
executors.add(executor);
}
}