下面列出了org.apache.hadoop.hbase.HConstants#QOS_THRESHOLD 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
return new SimpleRpcScheduler(
conf,
handlerCount,
conf.getInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT),
conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT),
conf.getInt(HConstants.MASTER_META_TRANSITION_HANDLER_COUNT,
HConstants.MASTER__META_TRANSITION_HANDLER_COUNT_DEFAULT),
priority,
server,
HConstants.QOS_THRESHOLD);
}
@Override
public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
return new CQTBERpcScheduler(conf, handlerCount,
conf.getInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT),
conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT),
conf.getInt(HConstants.MASTER_META_TRANSITION_HANDLER_COUNT,
HConstants.MASTER__META_TRANSITION_HANDLER_COUNT_DEFAULT),
priority, server, HConstants.QOS_THRESHOLD);
}
@Test
public void testScanQueueWithZeroScanRatio() throws Exception {
Configuration schedConf = HBaseConfiguration.create();
schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5f);
schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0f);
PriorityFunction priority = mock(PriorityFunction.class);
when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 2, 1, 1, priority,
HConstants.QOS_THRESHOLD);
assertNotEquals(null, scheduler);
}
@Test
public void testSoftAndHardQueueLimits() throws Exception {
Configuration schedConf = HBaseConfiguration.create();
schedConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 0);
schedConf.setInt("hbase.ipc.server.max.callqueue.length", 5);
schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
PriorityFunction priority = mock(PriorityFunction.class);
when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
SimpleRpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 0, 0, 0, priority,
HConstants.QOS_THRESHOLD);
try {
scheduler.start();
CallRunner putCallTask = mock(CallRunner.class);
ServerCall putCall = mock(ServerCall.class);
putCall.param = RequestConverter.buildMutateRequest(
Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
when(putCallTask.getRpcCall()).thenReturn(putCall);
when(putCall.getHeader()).thenReturn(putHead);
assertTrue(scheduler.dispatch(putCallTask));
schedConf.setInt("hbase.ipc.server.max.callqueue.length", 0);
scheduler.onConfigurationChange(schedConf);
assertFalse(scheduler.dispatch(putCallTask));
waitUntilQueueEmpty(scheduler);
schedConf.setInt("hbase.ipc.server.max.callqueue.length", 1);
scheduler.onConfigurationChange(schedConf);
assertTrue(scheduler.dispatch(putCallTask));
} finally {
scheduler.stop();
}
}
@Override
public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
return new SimpleRpcScheduler(
conf,
conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT),
conf.getInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT),
conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT),
new SplicePriorityFunction(priority),
server,
HConstants.QOS_THRESHOLD);
}
@Override
public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
return new SimpleRpcScheduler(
conf,
conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT),
conf.getInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT),
conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT),
new SplicePriorityFunction(priority),
server,
HConstants.QOS_THRESHOLD);
}
@Override
public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
return new SimpleRpcScheduler(
conf,
conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT),
conf.getInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT),
conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT),
new SplicePriorityFunction(priority),
server,
HConstants.QOS_THRESHOLD);
}
@Override
public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
return new SimpleRpcScheduler(
conf,
conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT),
conf.getInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT),
conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT),
new SplicePriorityFunction(priority),
server,
HConstants.QOS_THRESHOLD);
}
@Override
public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
return new SimpleRpcScheduler(
conf,
conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT),
conf.getInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT),
conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT),
new SplicePriorityFunction(priority),
server,
HConstants.QOS_THRESHOLD);
}
@Override
public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
return new SimpleRpcScheduler(
conf,
conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT),
conf.getInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT),
conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT),
new SplicePriorityFunction(priority),
server,
HConstants.QOS_THRESHOLD);
}
private void testRpcScheduler(final String queueType) throws Exception {
Configuration schedConf = HBaseConfiguration.create();
schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType);
PriorityFunction priority = mock(PriorityFunction.class);
when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 1, 1, 1, priority,
HConstants.QOS_THRESHOLD);
try {
scheduler.start();
CallRunner smallCallTask = mock(CallRunner.class);
ServerCall smallCall = mock(ServerCall.class);
RequestHeader smallHead = RequestHeader.newBuilder().setCallId(1).build();
when(smallCallTask.getRpcCall()).thenReturn(smallCall);
when(smallCall.getHeader()).thenReturn(smallHead);
CallRunner largeCallTask = mock(CallRunner.class);
ServerCall largeCall = mock(ServerCall.class);
RequestHeader largeHead = RequestHeader.newBuilder().setCallId(50).build();
when(largeCallTask.getRpcCall()).thenReturn(largeCall);
when(largeCall.getHeader()).thenReturn(largeHead);
CallRunner hugeCallTask = mock(CallRunner.class);
ServerCall hugeCall = mock(ServerCall.class);
RequestHeader hugeHead = RequestHeader.newBuilder().setCallId(100).build();
when(hugeCallTask.getRpcCall()).thenReturn(hugeCall);
when(hugeCall.getHeader()).thenReturn(hugeHead);
when(priority.getDeadline(eq(smallHead), any())).thenReturn(0L);
when(priority.getDeadline(eq(largeHead), any())).thenReturn(50L);
when(priority.getDeadline(eq(hugeHead), any())).thenReturn(100L);
final ArrayList<Integer> work = new ArrayList<>();
doAnswerTaskExecution(smallCallTask, work, 10, 250);
doAnswerTaskExecution(largeCallTask, work, 50, 250);
doAnswerTaskExecution(hugeCallTask, work, 100, 250);
scheduler.dispatch(smallCallTask);
scheduler.dispatch(smallCallTask);
scheduler.dispatch(smallCallTask);
scheduler.dispatch(hugeCallTask);
scheduler.dispatch(smallCallTask);
scheduler.dispatch(largeCallTask);
scheduler.dispatch(smallCallTask);
scheduler.dispatch(smallCallTask);
while (work.size() < 8) {
Thread.sleep(100);
}
int seqSum = 0;
int totalTime = 0;
for (int i = 0; i < work.size(); ++i) {
LOG.debug("Request i=" + i + " value=" + work.get(i));
seqSum += work.get(i);
totalTime += seqSum;
}
LOG.debug("Total Time: " + totalTime);
// -> [small small small huge small large small small]
// -> NO REORDER [10 10 10 100 10 50 10 10] -> 930 (FIFO Queue)
// -> WITH REORDER [10 10 10 10 10 10 50 100] -> 530 (Deadline Queue)
if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
assertEquals(530, totalTime);
} else if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE)) {
assertEquals(930, totalTime);
}
} finally {
scheduler.stop();
}
}
@Test
public void testScanQueues() throws Exception {
Configuration schedConf = HBaseConfiguration.create();
schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f);
schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f);
PriorityFunction priority = mock(PriorityFunction.class);
when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 3, 1, 1, priority,
HConstants.QOS_THRESHOLD);
try {
scheduler.start();
CallRunner putCallTask = mock(CallRunner.class);
ServerCall putCall = mock(ServerCall.class);
putCall.param = RequestConverter.buildMutateRequest(
Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
when(putCallTask.getRpcCall()).thenReturn(putCall);
when(putCall.getHeader()).thenReturn(putHead);
when(putCall.getParam()).thenReturn(putCall.param);
CallRunner getCallTask = mock(CallRunner.class);
ServerCall getCall = mock(ServerCall.class);
RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build();
when(getCallTask.getRpcCall()).thenReturn(getCall);
when(getCall.getHeader()).thenReturn(getHead);
CallRunner scanCallTask = mock(CallRunner.class);
ServerCall scanCall = mock(ServerCall.class);
scanCall.param = ScanRequest.newBuilder().build();
RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build();
when(scanCallTask.getRpcCall()).thenReturn(scanCall);
when(scanCall.getHeader()).thenReturn(scanHead);
when(scanCall.getParam()).thenReturn(scanCall.param);
ArrayList<Integer> work = new ArrayList<>();
doAnswerTaskExecution(putCallTask, work, 1, 1000);
doAnswerTaskExecution(getCallTask, work, 2, 1000);
doAnswerTaskExecution(scanCallTask, work, 3, 1000);
// There are 3 queues: [puts], [gets], [scans]
// so the calls will be interleaved
scheduler.dispatch(putCallTask);
scheduler.dispatch(putCallTask);
scheduler.dispatch(putCallTask);
scheduler.dispatch(getCallTask);
scheduler.dispatch(getCallTask);
scheduler.dispatch(getCallTask);
scheduler.dispatch(scanCallTask);
scheduler.dispatch(scanCallTask);
scheduler.dispatch(scanCallTask);
while (work.size() < 6) {
Thread.sleep(100);
}
for (int i = 0; i < work.size() - 2; i += 3) {
assertNotEquals(work.get(i + 0), work.get(i + 1));
assertNotEquals(work.get(i + 0), work.get(i + 2));
assertNotEquals(work.get(i + 1), work.get(i + 2));
}
} finally {
scheduler.stop();
}
}
@Test
public void testMetaRWScanQueues() throws Exception {
Configuration schedConf = HBaseConfiguration.create();
schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f);
schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f);
PriorityFunction priority = mock(PriorityFunction.class);
when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.HIGH_QOS);
RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 3, 3, 1, priority,
HConstants.QOS_THRESHOLD);
try {
scheduler.start();
CallRunner putCallTask = mock(CallRunner.class);
ServerCall putCall = mock(ServerCall.class);
putCall.param = RequestConverter.buildMutateRequest(
Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
when(putCallTask.getRpcCall()).thenReturn(putCall);
when(putCall.getHeader()).thenReturn(putHead);
when(putCall.getParam()).thenReturn(putCall.param);
CallRunner getCallTask = mock(CallRunner.class);
ServerCall getCall = mock(ServerCall.class);
RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build();
when(getCallTask.getRpcCall()).thenReturn(getCall);
when(getCall.getHeader()).thenReturn(getHead);
CallRunner scanCallTask = mock(CallRunner.class);
ServerCall scanCall = mock(ServerCall.class);
scanCall.param = ScanRequest.newBuilder().build();
RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build();
when(scanCallTask.getRpcCall()).thenReturn(scanCall);
when(scanCall.getHeader()).thenReturn(scanHead);
when(scanCall.getParam()).thenReturn(scanCall.param);
ArrayList<Integer> work = new ArrayList<>();
doAnswerTaskExecution(putCallTask, work, 1, 1000);
doAnswerTaskExecution(getCallTask, work, 2, 1000);
doAnswerTaskExecution(scanCallTask, work, 3, 1000);
// There are 3 queues: [puts], [gets], [scans]
// so the calls will be interleaved
scheduler.dispatch(putCallTask);
scheduler.dispatch(putCallTask);
scheduler.dispatch(putCallTask);
scheduler.dispatch(getCallTask);
scheduler.dispatch(getCallTask);
scheduler.dispatch(getCallTask);
scheduler.dispatch(scanCallTask);
scheduler.dispatch(scanCallTask);
scheduler.dispatch(scanCallTask);
while (work.size() < 6) {
Thread.sleep(100);
}
for (int i = 0; i < work.size() - 2; i += 3) {
assertNotEquals(work.get(i + 0), work.get(i + 1));
assertNotEquals(work.get(i + 0), work.get(i + 2));
assertNotEquals(work.get(i + 1), work.get(i + 2));
}
} finally {
scheduler.stop();
}
}