类org.apache.hadoop.hbase.Abortable源码实例Demo

下面列出了怎么用org.apache.hadoop.hbase.Abortable的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: phoenix   文件: TestParalleWriterIndexCommitter.java
@Test
public void testCorrectlyCleansUpResources() throws Exception{
  ExecutorService exec = Executors.newFixedThreadPool(1);
  FakeTableFactory factory = new FakeTableFactory(
      Collections.<ImmutableBytesPtr, HTableInterface> emptyMap());
  ParallelWriterIndexCommitter writer = new ParallelWriterIndexCommitter(VersionInfo.getVersion());
  Abortable mockAbort = Mockito.mock(Abortable.class);
  Stoppable mockStop = Mockito.mock(Stoppable.class);
  // create a simple writer
  writer.setup(factory, exec, mockAbort, mockStop, 1);
  // stop the writer
  writer.stop(this.test.getTableNameString() + " finished");
  assertTrue("Factory didn't get shutdown after writer#stop!", factory.shutdown);
  assertTrue("ExectorService isn't terminated after writer#stop!", exec.isShutdown());
  Mockito.verifyZeroInteractions(mockAbort, mockStop);
}
 
源代码2 项目: phoenix   文件: TestParalleIndexWriter.java
@Test
public void testCorrectlyCleansUpResources() throws Exception{
  ExecutorService exec = Executors.newFixedThreadPool(1);
  FakeTableFactory factory = new FakeTableFactory(
      Collections.<ImmutableBytesPtr, HTableInterface> emptyMap());
  ParallelWriterIndexCommitter writer = new ParallelWriterIndexCommitter(VersionInfo.getVersion());
  Abortable mockAbort = Mockito.mock(Abortable.class);
  Stoppable mockStop = Mockito.mock(Stoppable.class);
  // create a simple writer
  writer.setup(factory, exec, mockAbort, mockStop, 1);
  // stop the writer
  writer.stop(this.test.getTableNameString() + " finished");
  assertTrue("Factory didn't get shutdown after writer#stop!", factory.shutdown);
  assertTrue("ExectorService isn't terminated after writer#stop!", exec.isShutdown());
  Mockito.verifyZeroInteractions(mockAbort, mockStop);
}
 
源代码3 项目: phoenix   文件: PhoenixIndexRpcSchedulerTest.java
@Test
public void testIndexPriorityWritesToIndexHandler() throws Exception {
    RpcScheduler mock = Mockito.mock(RpcScheduler.class);
    PriorityFunction qosFunction = Mockito.mock(PriorityFunction.class);
    Abortable abortable = new AbortServer();
    PhoenixRpcScheduler scheduler = new PhoenixRpcScheduler(conf, mock, 200, 250,qosFunction,abortable);
    BalancedQueueRpcExecutor executor = new BalancedQueueRpcExecutor("test-queue", 1, 1,qosFunction,conf,abortable);
    scheduler.setIndexExecutorForTesting(executor);
    dispatchCallWithPriority(scheduler, 200);
    List<BlockingQueue<CallRunner>> queues = executor.getQueues();
    assertEquals(1, queues.size());
    BlockingQueue<CallRunner> queue = queues.get(0);
    queue.poll(20, TimeUnit.SECONDS);

    // try again, this time we tweak the ranges we support
    scheduler = new PhoenixRpcScheduler(conf, mock, 101, 110,qosFunction,abortable);
    scheduler.setIndexExecutorForTesting(executor);
    dispatchCallWithPriority(scheduler, 101);
    queue.poll(20, TimeUnit.SECONDS);

    Mockito.verify(mock, Mockito.times(2)).init(Mockito.any(Context.class));
    scheduler.stop();
    executor.stop();
}
 
源代码4 项目: hbase   文件: ReplicationEndpoint.java
@InterfaceAudience.Private
public Context(final Server server, final Configuration localConf, final Configuration conf,
    final FileSystem fs, final String peerId, final UUID clusterId,
    final ReplicationPeer replicationPeer, final MetricsSource metrics,
    final TableDescriptors tableDescriptors, final Abortable abortable) {
  this.server = server;
  this.localConf = localConf;
  this.conf = conf;
  this.fs = fs;
  this.clusterId = clusterId;
  this.peerId = peerId;
  this.replicationPeer = replicationPeer;
  this.metrics = metrics;
  this.tableDescriptors = tableDescriptors;
  this.abortable = abortable;
}
 
源代码5 项目: hbase   文件: MasterRegionFlusherAndCompactor.java
MasterRegionFlusherAndCompactor(Configuration conf, Abortable abortable, HRegion region,
  long flushSize, long flushPerChanges, long flushIntervalMs, int compactMin,
  Path globalArchivePath, String archivedHFileSuffix) {
  this.conf = conf;
  this.abortable = abortable;
  this.region = region;
  this.flushSize = flushSize;
  this.flushPerChanges = flushPerChanges;
  this.flushIntervalMs = flushIntervalMs;
  this.compactMin = compactMin;
  this.globalArchivePath = globalArchivePath;
  this.archivedHFileSuffix = archivedHFileSuffix;
  flushThread = new Thread(this::flushLoop, region.getRegionInfo().getTable() + "-Flusher");
  flushThread.setDaemon(true);
  flushThread.start();
  compactExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
    .setNameFormat(region.getRegionInfo().getTable() + "-Store-Compactor").setDaemon(true)
    .build());
  LOG.info("Constructor flushSize={}, flushPerChanges={}, flushIntervalMs={}, compactMin={}",
    flushSize, flushPerChanges, flushIntervalMs, compactMin);
}
 
源代码6 项目: hbase   文件: SplitOrMergeTracker.java
public SplitOrMergeTracker(ZKWatcher watcher, Configuration conf,
                           Abortable abortable) {
  try {
    if (ZKUtil.checkExists(watcher, watcher.getZNodePaths().switchZNode) < 0) {
      ZKUtil.createAndFailSilent(watcher, watcher.getZNodePaths().switchZNode);
    }
  } catch (KeeperException e) {
    throw new RuntimeException(e);
  }
  splitZnode = ZNodePaths.joinZNode(watcher.getZNodePaths().switchZNode,
    conf.get("zookeeper.znode.switch.split", "split"));
  mergeZnode = ZNodePaths.joinZNode(watcher.getZNodePaths().switchZNode,
    conf.get("zookeeper.znode.switch.merge", "merge"));
  splitStateTracker = new SwitchStateTracker(watcher, splitZnode, abortable);
  mergeStateTracker = new SwitchStateTracker(watcher, mergeZnode, abortable);
}
 
源代码7 项目: hbase   文件: SimpleRpcSchedulerFactory.java
@Override
public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
  int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
      HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
  return new SimpleRpcScheduler(
    conf,
    handlerCount,
    conf.getInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT,
      HConstants.DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT),
    conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
        HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT),
      conf.getInt(HConstants.MASTER_META_TRANSITION_HANDLER_COUNT,
          HConstants.MASTER__META_TRANSITION_HANDLER_COUNT_DEFAULT),
    priority,
    server,
    HConstants.QOS_THRESHOLD);
}
 
源代码8 项目: hbase   文件: TestTableCFsUpdater.java
@BeforeClass
public static void setUpBeforeClass() throws Exception {
  TEST_UTIL.startMiniZKCluster();
  Configuration conf = TEST_UTIL.getConfiguration();
  abortable = new Abortable() {
    @Override
    public void abort(String why, Throwable e) {
      LOG.info(why, e);
    }

    @Override
    public boolean isAborted() {
      return false;
    }
  };
  zkw = new ZKWatcher(conf, "TableCFs", abortable, true);
  zkStorageUtil = new ZKStorageUtil(zkw, conf);
}
 
源代码9 项目: hbase   文件: TestEntityLocks.java
/**
 * Test that abort is called when lock times out.
 */
@Test
public void testEntityLockTimeout() throws Exception {
  final long workerSleepTime = 200;  // in ms
  Abortable abortable = Mockito.mock(Abortable.class);
  EntityLock lock = admin.namespaceLock("namespace", "description", abortable);
  lock.setTestingSleepTime(workerSleepTime);

  when(master.requestLock(any(), any()))
      .thenReturn(LockResponse.newBuilder().setProcId(procId).build());
  // Acquires the lock, but then it times out (since we don't call unlock() on it).
  when(master.lockHeartbeat(any(), any()))
    .thenReturn(LOCKED_RESPONSE, UNLOCKED_RESPONSE);

  lock.requestLock();
  lock.await();
  assertTrue(lock.isLocked());
  // Should get unlocked in next heartbeat i.e. after workerSleepTime. Wait 10x time to be sure.
  assertTrue(waitLockTimeOut(lock, 10 * workerSleepTime));

  // Works' run() returns, there is a small gap that the thread is still alive(os
  // has not declare it is dead yet), so remove the following assertion.
  // assertFalse(lock.getWorker().isAlive());
  verify(abortable, times(1)).abort(any(), eq(null));
}
 
源代码10 项目: hbase   文件: TestEntityLocks.java
/**
 * Test that abort is called when lockHeartbeat fails with IOException.
 */
@Test
public void testHeartbeatException() throws Exception {
  final long workerSleepTime = 100;  // in ms
  Abortable abortable = Mockito.mock(Abortable.class);
  EntityLock lock = admin.namespaceLock("namespace", "description", abortable);
  lock.setTestingSleepTime(workerSleepTime);

  when(master.requestLock(any(), any()))
      .thenReturn(LockResponse.newBuilder().setProcId(procId).build());
  when(master.lockHeartbeat(any(), any()))
      .thenReturn(LOCKED_RESPONSE)
      .thenThrow(new ServiceException("Failed heartbeat!"));

  lock.requestLock();
  lock.await();
  assertTrue(waitLockTimeOut(lock, 100 * workerSleepTime));
  while (lock.getWorker().isAlive()) {
    TimeUnit.MILLISECONDS.sleep(100);
  }
  verify(abortable, times(1)).abort(any(), isA(HBaseIOException.class));
  assertFalse(lock.getWorker().isAlive());
}
 
源代码11 项目: hbase   文件: TestMasterNoCluster.java
@After
public void tearDown()
throws KeeperException, ZooKeeperConnectionException, IOException {
  // Make sure zk is clean before we run the next test.
  ZKWatcher zkw = new ZKWatcher(TESTUTIL.getConfiguration(),
      "@Before", new Abortable() {
    @Override
    public void abort(String why, Throwable e) {
      throw new RuntimeException(why, e);
    }

    @Override
    public boolean isAborted() {
      return false;
    }
  });
  ZKUtil.deleteNodeRecursively(zkw, zkw.getZNodePaths().baseZNode);
  zkw.close();
}
 
源代码12 项目: hbase   文件: TestRpcHandlerException.java
@Test
public void testRpcScheduler() throws IOException, InterruptedException {
  PriorityFunction qosFunction = mock(PriorityFunction.class);
  Abortable abortable = new AbortServer();
  CONF.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl);
  RpcScheduler scheduler = new SimpleRpcScheduler(CONF, 2, 0, 0, 0, qosFunction, abortable, 0);
  RpcServer rpcServer = RpcServerFactory.createRpcServer(null, "testRpcServer",
      Lists.newArrayList(new BlockingServiceAndInterface((BlockingService) SERVICE, null)),
      new InetSocketAddress("localhost", 0), CONF, scheduler);
  try (BlockingRpcClient client = new BlockingRpcClient(CONF)) {
    rpcServer.start();
    BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
    stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello").build());
  } catch (Throwable e) {
    assert (abortable.isAborted() == true);
  } finally {
    rpcServer.stop();
  }
}
 
源代码13 项目: hbase   文件: TestZKNodeTracker.java
/**
 * Test that we can interrupt a node that is blocked on a wait.
 */
@Test
public void testInterruptible() throws IOException, InterruptedException {
  Abortable abortable = new StubAbortable();
  ZKWatcher zk = new ZKWatcher(TEST_UTIL.getConfiguration(), "testInterruptible", abortable);
  final TestTracker tracker = new TestTracker(zk, "/xyz", abortable);
  tracker.start();
  Thread t = new Thread(() -> {
    try {
      tracker.blockUntilAvailable();
    } catch (InterruptedException e) {
      throw new RuntimeException("Interrupted", e);
    }
  });
  t.start();
  while (!t.isAlive()) {
    Threads.sleep(1);
  }
  tracker.stop();
  t.join();
  // If it wasn't interruptible, we'd never get to here.
}
 
/**
 * Setup <tt>this</tt>.
 * <p>
 * Exposed for TESTING
 */
void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop, int cacheSize) {
    this.pool = new WaitForCompletionTaskRunner(pool);
    this.factory = new CachingHTableFactory(factory, cacheSize);
    this.abortable = new CapturingAbortable(abortable);
    this.stopped = stop;
}
 
源代码15 项目: phoenix   文件: PhoenixIndexRpcSchedulerFactory.java
@Override
public RpcScheduler create(Configuration conf, PriorityFunction priorityFunction, Abortable abortable) {
    // create the delegate scheduler
    RpcScheduler delegate;
    try {
        // happens in <=0.98.4 where the scheduler factory is not visible
        delegate = new SimpleRpcSchedulerFactory().create(conf, priorityFunction, abortable);
    } catch (IllegalAccessError e) {
        LOG.fatal(VERSION_TOO_OLD_FOR_INDEX_RPC);
        throw e;
    }

    int indexHandlerCount = conf.getInt(QueryServices.INDEX_HANDLER_COUNT_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_HANDLER_COUNT);
    int minPriority = getMinPriority(conf);
    int maxPriority = conf.getInt(QueryServices.MAX_INDEX_PRIOIRTY_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_MAX_PRIORITY);
    // make sure the ranges are outside the warning ranges
    Preconditions.checkArgument(maxPriority > minPriority, "Max index priority (" + maxPriority
            + ") must be larger than min priority (" + minPriority + ")");
    boolean allSmaller =
            minPriority < HConstants.REPLICATION_QOS
                    && maxPriority < HConstants.REPLICATION_QOS;
    boolean allLarger = minPriority > HConstants.HIGH_QOS;
    Preconditions.checkArgument(allSmaller || allLarger, "Index priority range (" + minPriority
            + ",  " + maxPriority + ") must be outside HBase priority range ("
            + HConstants.REPLICATION_QOS + ", " + HConstants.HIGH_QOS + ")");

    LOG.info("Using custom Phoenix Index RPC Handling with " + indexHandlerCount
            + " handlers and priority range [" + minPriority + ", " + maxPriority + ")");

    PhoenixIndexRpcScheduler scheduler =
            new PhoenixIndexRpcScheduler(indexHandlerCount, conf, delegate, minPriority,
                    maxPriority);
    return scheduler;
}
 
源代码16 项目: hbase   文件: LockServiceClient.java
/**
 * Create a new EntityLock object to acquire an exclusive or shared lock on a table.
 * Internally, the table namespace will also be locked in shared mode.
 */
public EntityLock tableLock(final TableName tableName, final boolean exclusive,
    final String description, final Abortable abort) {
  LockRequest lockRequest = buildLockRequest(exclusive ? LockType.EXCLUSIVE : LockType.SHARED,
      tableName.getNameAsString(), null, null, description, ng.getNonceGroup(), ng.newNonce());
  return new EntityLock(conf, stub, lockRequest, abort);
}
 
源代码17 项目: hbase   文件: EntityLock.java
/**
 * Abortable.abort() is called when the lease of the lock will expire.
 * It's up to the user decide if simply abort the process or handle the loss of the lock
 * by aborting the operation that was supposed to be under lock.
 */
EntityLock(Configuration conf, LockService.BlockingInterface stub,
    LockRequest request, Abortable abort) {
  this.stub = stub;
  this.lockRequest = request;
  this.abort = abort;

  this.heartbeatTimeBuffer = conf.getInt(HEARTBEAT_TIME_BUFFER, 10000);
  this.worker = new LockHeartbeatWorker(lockRequest.getDescription());
}
 
源代码18 项目: hbase   文件: MasterRegionWALRoller.java
private MasterRegionWALRoller(String name, Configuration conf, Abortable abortable, FileSystem fs,
  Path walRootDir, Path globalWALRootDir, String archivedWALSuffix) {
  super(name, conf, abortable);
  this.fs = fs;
  this.walArchiveDir = new Path(walRootDir, HREGION_OLDLOGDIR_NAME);
  this.globalWALArchiveDir = new Path(globalWALRootDir, HREGION_OLDLOGDIR_NAME);
  this.archivedWALSuffix = archivedWALSuffix;
}
 
源代码19 项目: hbase   文件: MasterRegionWALRoller.java
static MasterRegionWALRoller create(String name, Configuration conf, Abortable abortable,
  FileSystem fs, Path walRootDir, Path globalWALRootDir, String archivedWALSuffix,
  long rollPeriodMs, long flushSize) {
  // we can not run with wal disabled, so force set it to true.
  conf.setBoolean(WALFactory.WAL_ENABLED, true);
  // we do not need this feature, so force disable it.
  conf.setBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, false);
  conf.setLong(WAL_ROLL_PERIOD_KEY, rollPeriodMs);
  // make the roll size the same with the flush size, as we only have one region here
  conf.setLong(WALUtil.WAL_BLOCK_SIZE, flushSize * 2);
  conf.setFloat(AbstractFSWAL.WAL_ROLL_MULTIPLIER, 0.5f);
  return new MasterRegionWALRoller(name, conf, abortable, fs, walRootDir, globalWALRootDir,
    archivedWALSuffix);
}
 
FlushTableSubprocedurePool(String name, Configuration conf, Abortable abortable) {
  this.abortable = abortable;
  // configure the executor service
  long keepAlive = conf.getLong(
    RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_KEY,
    RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_DEFAULT);
  int threads = conf.getInt(CONCURENT_FLUSH_TASKS_KEY, DEFAULT_CONCURRENT_FLUSH_TASKS);
  this.name = name;
  executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS,
      "rs(" + name + ")-flush-proc");
  taskPool = new ExecutorCompletionService<>(executor);
}
 
源代码21 项目: hbase   文件: HBaseFsck.java
private ZKWatcher createZooKeeperWatcher() throws IOException {
  return new ZKWatcher(getConf(), "hbase Fsck", new Abortable() {
    @Override
    public void abort(String why, Throwable e) {
      LOG.error(why, e);
      System.exit(1);
    }

    @Override
    public boolean isAborted() {
      return false;
    }

  });
}
 
源代码22 项目: hbase   文件: HRegionServer.java
@Override
public EntityLock regionLock(final List<RegionInfo> regionInfo, final String description,
    final Abortable abort) {
  final LockServiceClient client =
      new LockServiceClient(conf, lockStub, asyncClusterConnection.getNonceGenerator());
  return client.regionLock(regionInfo, description, abort);
}
 
源代码23 项目: hbase   文件: RegionServerSnapshotManager.java
SnapshotSubprocedurePool(String name, Configuration conf, Abortable abortable) {
  this.abortable = abortable;
  // configure the executor service
  long keepAlive = conf.getLong(
    RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_KEY,
    RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
  int threads = conf.getInt(CONCURENT_SNAPSHOT_TASKS_KEY, DEFAULT_CONCURRENT_SNAPSHOT_TASKS);
  this.name = name;
  executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS,
      "rs(" + name + ")-snapshot");
  taskPool = new ExecutorCompletionService<>(executor);
}
 
源代码24 项目: hbase   文件: MasterFifoRpcSchedulerFactory.java
@Override
public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
  int totalHandlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
    HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
  int rsReportHandlerCount = Math.max(1, conf
      .getInt(MasterFifoRpcScheduler.MASTER_SERVER_REPORT_HANDLER_COUNT, totalHandlerCount / 2));
  int callHandlerCount = Math.max(1, totalHandlerCount - rsReportHandlerCount);
  return new MasterFifoRpcScheduler(conf, callHandlerCount, rsReportHandlerCount);
}
 
源代码25 项目: hbase   文件: BalancedQueueRpcExecutor.java
public BalancedQueueRpcExecutor(final String name, final int handlerCount,
    final String callQueueType, final int maxQueueLength, final PriorityFunction priority,
    final Configuration conf, final Abortable abortable) {
  super(name, handlerCount, callQueueType, maxQueueLength, priority, conf, abortable);
  this.balancer = getBalancer(this.numCallQueues);
  initializeQueues(this.numCallQueues);
}
 
@Override
public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
  int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
    HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
  return new CQTBERpcScheduler(conf, handlerCount,
    conf.getInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT,
      HConstants.DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT),
    conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
      HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT),
    conf.getInt(HConstants.MASTER_META_TRANSITION_HANDLER_COUNT,
      HConstants.MASTER__META_TRANSITION_HANDLER_COUNT_DEFAULT),
    priority, server, HConstants.QOS_THRESHOLD);
}
 
源代码27 项目: hbase   文件: TestMasterRegionFlush.java
private void initFlusher(long flushSize, long flushPerChanges, long flushIntervalMs) {
  flusher = new MasterRegionFlusherAndCompactor(conf, new Abortable() {

    @Override
    public boolean isAborted() {
      return false;
    }

    @Override
    public void abort(String why, Throwable e) {
    }
  }, region, flushSize, flushPerChanges, flushIntervalMs, 4, new Path("/tmp"), "");
}
 
源代码28 项目: hbase   文件: TestZKProcedure.java
private static ZKWatcher newZooKeeperWatcher() throws IOException {
  return new ZKWatcher(UTIL.getConfiguration(), "testing utility", new Abortable() {
    @Override
    public void abort(String why, Throwable e) {
      throw new RuntimeException(
          "Unexpected abort in distributed three phase commit test:" + why, e);
    }

    @Override
    public boolean isAborted() {
      return false;
    }
  });
}
 
源代码29 项目: hbase   文件: TestZKMulti.java
@BeforeClass
public static void setUpBeforeClass() throws Exception {
  TEST_UTIL.startMiniZKCluster();
  Configuration conf = TEST_UTIL.getConfiguration();
  Abortable abortable = new ZKMultiAbortable();
  zkw = new ZKWatcher(conf,
    "TestZKMulti", abortable, true);
}
 
源代码30 项目: phoenix   文件: PhoenixRpcScheduler.java
public PhoenixRpcScheduler(Configuration conf, RpcScheduler delegate, int indexPriority, int metadataPriority, PriorityFunction priorityFunction, Abortable abortable) {
    // copied from org.apache.hadoop.hbase.ipc.SimpleRpcScheduler in HBase 0.98.4
	int indexHandlerCount = conf.getInt(QueryServices.INDEX_HANDLER_COUNT_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_HANDLER_COUNT);
	int metadataHandlerCount = conf.getInt(QueryServices.METADATA_HANDLER_COUNT_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_HANDLER_COUNT);
    int maxIndexQueueLength =  conf.getInt(CALLQUEUE_LENGTH_CONF_KEY, indexHandlerCount*DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
    int maxMetadataQueueLength =  conf.getInt(CALLQUEUE_LENGTH_CONF_KEY, metadataHandlerCount*DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);

    this.indexPriority = indexPriority;
    this.metadataPriority = metadataPriority;
    this.delegate = delegate;
    this.indexCallExecutor = new BalancedQueueRpcExecutor("Index", indexHandlerCount, maxIndexQueueLength, priorityFunction,conf,abortable);
    this.metadataCallExecutor = new BalancedQueueRpcExecutor("Metadata", metadataHandlerCount, maxMetadataQueueLength, priorityFunction,conf,abortable);
}
 
 类所在包
 类方法
 同包方法