下面列出了怎么用org.apache.hadoop.hbase.ipc.RpcScheduler的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
return new SimpleRpcScheduler(
conf,
handlerCount,
conf.getInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT),
conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT),
conf.getInt(HConstants.MASTER_META_TRANSITION_HANDLER_COUNT,
HConstants.MASTER__META_TRANSITION_HANDLER_COUNT_DEFAULT),
priority,
server,
HConstants.QOS_THRESHOLD);
}
@Override
public RpcScheduler create(Configuration conf, PriorityFunction priorityFunction, Abortable abortable) {
// create the delegate scheduler
RpcScheduler delegate;
try {
// happens in <=0.98.4 where the scheduler factory is not visible
delegate = new SimpleRpcSchedulerFactory().create(conf, priorityFunction, abortable);
} catch (IllegalAccessError e) {
LOG.fatal(VERSION_TOO_OLD_FOR_INDEX_RPC);
throw e;
}
int indexHandlerCount = conf.getInt(QueryServices.INDEX_HANDLER_COUNT_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_HANDLER_COUNT);
int minPriority = getMinPriority(conf);
int maxPriority = conf.getInt(QueryServices.MAX_INDEX_PRIOIRTY_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_MAX_PRIORITY);
// make sure the ranges are outside the warning ranges
Preconditions.checkArgument(maxPriority > minPriority, "Max index priority (" + maxPriority
+ ") must be larger than min priority (" + minPriority + ")");
boolean allSmaller =
minPriority < HConstants.REPLICATION_QOS
&& maxPriority < HConstants.REPLICATION_QOS;
boolean allLarger = minPriority > HConstants.HIGH_QOS;
Preconditions.checkArgument(allSmaller || allLarger, "Index priority range (" + minPriority
+ ", " + maxPriority + ") must be outside HBase priority range ("
+ HConstants.REPLICATION_QOS + ", " + HConstants.HIGH_QOS + ")");
LOG.info("Using custom Phoenix Index RPC Handling with " + indexHandlerCount
+ " handlers and priority range [" + minPriority + ", " + maxPriority + ")");
PhoenixIndexRpcScheduler scheduler =
new PhoenixIndexRpcScheduler(indexHandlerCount, conf, delegate, minPriority,
maxPriority);
return scheduler;
}
@BeforeClass
public static void setupBeforeClass() throws Exception {
Configuration conf = util.getConfiguration();
conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
RowProcessorEndpoint.class.getName());
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
conf.setLong("hbase.hregion.row.processor.timeout", 1000L);
conf.setLong(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 2048);
util.startMiniCluster();
}
public void start(final RpcScheduler rpcScheduler) throws IOException {
if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) {
LOG.info("Quota support disabled");
return;
}
LOG.info("Initializing RPC quota support");
// Initialize quota cache
quotaCache = new QuotaCache(rsServices);
quotaCache.start();
rpcThrottleEnabled = rpcThrottleStorage.isRpcThrottleEnabled();
LOG.info("Start rpc quota manager and rpc throttle enabled is {}", rpcThrottleEnabled);
}
@Override
public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
int totalHandlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
int rsReportHandlerCount = Math.max(1, conf
.getInt(MasterFifoRpcScheduler.MASTER_SERVER_REPORT_HANDLER_COUNT, totalHandlerCount / 2));
int callHandlerCount = Math.max(1, totalHandlerCount - rsReportHandlerCount);
return new MasterFifoRpcScheduler(conf, callHandlerCount, rsReportHandlerCount);
}
@Override
public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
return new CQTBERpcScheduler(conf, handlerCount,
conf.getInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT),
conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT),
conf.getInt(HConstants.MASTER_META_TRANSITION_HANDLER_COUNT,
HConstants.MASTER__META_TRANSITION_HANDLER_COUNT_DEFAULT),
priority, server, HConstants.QOS_THRESHOLD);
}
@Test
public void testRWQ() {
// Set some configs just to see how it changes the scheduler. Can't assert the settings had
// an effect. Just eyeball the log.
this.conf.setDouble(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5);
this.conf.setDouble(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.5);
this.conf.setDouble(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5);
RpcSchedulerFactory factory = new SimpleRpcSchedulerFactory();
RpcScheduler rpcScheduler = factory.create(this.conf, null, null);
assertTrue(rpcScheduler.getClass().equals(SimpleRpcScheduler.class));
}
@Override
public RpcScheduler create(Configuration conf, PriorityFunction priorityFunction, Abortable abortable) {
PhoenixRpcScheduler phoenixIndexRpcScheduler = (PhoenixRpcScheduler)super.create(conf, priorityFunction, abortable);
phoenixIndexRpcScheduler.setIndexExecutorForTesting(indexRpcExecutor);
phoenixIndexRpcScheduler.setMetadataExecutorForTesting(metadataRpcExecutor);
return phoenixIndexRpcScheduler;
}
@Override
public RpcScheduler create(Configuration configuration, PriorityFunction priorityFunction) {
return create(configuration, priorityFunction, null);
}
/**
* @deprecated since 1.0.0.
* @see <a href="https://issues.apache.org/jira/browse/HBASE-12028">HBASE-12028</a>
*/
@Override
@Deprecated
public RpcScheduler create(Configuration conf, PriorityFunction priority) {
return create(conf, priority, null);
}
@Override
public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
return new FifoRpcScheduler(conf, handlerCount);
}
@Deprecated
@Override
public RpcScheduler create(Configuration conf, PriorityFunction priority) {
return create(conf, priority, null);
}
@VisibleForTesting
public RpcScheduler getRpcScheduler() {
return rpcServer.getScheduler();
}
@Override
public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
final RpcScheduler delegate = super.create(conf, priority, server);
return new SpyingRpcScheduler(delegate);
}
public SpyingRpcScheduler(RpcScheduler delegate) {
super(delegate);
}
@Test
public void testFifo() {
RpcSchedulerFactory factory = new FifoRpcSchedulerFactory();
RpcScheduler rpcScheduler = factory.create(this.conf, null, null);
assertTrue(rpcScheduler.getClass().equals(FifoRpcScheduler.class));
}
@Override
public RpcScheduler create(Configuration configuration, PriorityFunction priorityFunction) {
return create(configuration, priorityFunction, null);
}
/**
* Constructs a {@link org.apache.hadoop.hbase.ipc.RpcScheduler}.
*/
RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server);
/**
* @deprecated since 1.0.0.
* @see <a href="https://issues.apache.org/jira/browse/HBASE-12028">HBASE-12028</a>
*/
@Deprecated
RpcScheduler create(Configuration conf, PriorityFunction priority);