下面列出了怎么用org.apache.hadoop.hbase.Abortable的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testCorrectlyCleansUpResources() throws Exception{
ExecutorService exec = Executors.newFixedThreadPool(1);
FakeTableFactory factory = new FakeTableFactory(
Collections.<ImmutableBytesPtr, HTableInterface> emptyMap());
ParallelWriterIndexCommitter writer = new ParallelWriterIndexCommitter(VersionInfo.getVersion());
Abortable mockAbort = Mockito.mock(Abortable.class);
Stoppable mockStop = Mockito.mock(Stoppable.class);
// create a simple writer
writer.setup(factory, exec, mockAbort, mockStop, 1);
// stop the writer
writer.stop(this.test.getTableNameString() + " finished");
assertTrue("Factory didn't get shutdown after writer#stop!", factory.shutdown);
assertTrue("ExectorService isn't terminated after writer#stop!", exec.isShutdown());
Mockito.verifyZeroInteractions(mockAbort, mockStop);
}
@Test
public void testCorrectlyCleansUpResources() throws Exception{
ExecutorService exec = Executors.newFixedThreadPool(1);
FakeTableFactory factory = new FakeTableFactory(
Collections.<ImmutableBytesPtr, HTableInterface> emptyMap());
ParallelWriterIndexCommitter writer = new ParallelWriterIndexCommitter(VersionInfo.getVersion());
Abortable mockAbort = Mockito.mock(Abortable.class);
Stoppable mockStop = Mockito.mock(Stoppable.class);
// create a simple writer
writer.setup(factory, exec, mockAbort, mockStop, 1);
// stop the writer
writer.stop(this.test.getTableNameString() + " finished");
assertTrue("Factory didn't get shutdown after writer#stop!", factory.shutdown);
assertTrue("ExectorService isn't terminated after writer#stop!", exec.isShutdown());
Mockito.verifyZeroInteractions(mockAbort, mockStop);
}
@Test
public void testIndexPriorityWritesToIndexHandler() throws Exception {
RpcScheduler mock = Mockito.mock(RpcScheduler.class);
PriorityFunction qosFunction = Mockito.mock(PriorityFunction.class);
Abortable abortable = new AbortServer();
PhoenixRpcScheduler scheduler = new PhoenixRpcScheduler(conf, mock, 200, 250,qosFunction,abortable);
BalancedQueueRpcExecutor executor = new BalancedQueueRpcExecutor("test-queue", 1, 1,qosFunction,conf,abortable);
scheduler.setIndexExecutorForTesting(executor);
dispatchCallWithPriority(scheduler, 200);
List<BlockingQueue<CallRunner>> queues = executor.getQueues();
assertEquals(1, queues.size());
BlockingQueue<CallRunner> queue = queues.get(0);
queue.poll(20, TimeUnit.SECONDS);
// try again, this time we tweak the ranges we support
scheduler = new PhoenixRpcScheduler(conf, mock, 101, 110,qosFunction,abortable);
scheduler.setIndexExecutorForTesting(executor);
dispatchCallWithPriority(scheduler, 101);
queue.poll(20, TimeUnit.SECONDS);
Mockito.verify(mock, Mockito.times(2)).init(Mockito.any(Context.class));
scheduler.stop();
executor.stop();
}
@InterfaceAudience.Private
public Context(final Server server, final Configuration localConf, final Configuration conf,
final FileSystem fs, final String peerId, final UUID clusterId,
final ReplicationPeer replicationPeer, final MetricsSource metrics,
final TableDescriptors tableDescriptors, final Abortable abortable) {
this.server = server;
this.localConf = localConf;
this.conf = conf;
this.fs = fs;
this.clusterId = clusterId;
this.peerId = peerId;
this.replicationPeer = replicationPeer;
this.metrics = metrics;
this.tableDescriptors = tableDescriptors;
this.abortable = abortable;
}
MasterRegionFlusherAndCompactor(Configuration conf, Abortable abortable, HRegion region,
long flushSize, long flushPerChanges, long flushIntervalMs, int compactMin,
Path globalArchivePath, String archivedHFileSuffix) {
this.conf = conf;
this.abortable = abortable;
this.region = region;
this.flushSize = flushSize;
this.flushPerChanges = flushPerChanges;
this.flushIntervalMs = flushIntervalMs;
this.compactMin = compactMin;
this.globalArchivePath = globalArchivePath;
this.archivedHFileSuffix = archivedHFileSuffix;
flushThread = new Thread(this::flushLoop, region.getRegionInfo().getTable() + "-Flusher");
flushThread.setDaemon(true);
flushThread.start();
compactExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setNameFormat(region.getRegionInfo().getTable() + "-Store-Compactor").setDaemon(true)
.build());
LOG.info("Constructor flushSize={}, flushPerChanges={}, flushIntervalMs={}, compactMin={}",
flushSize, flushPerChanges, flushIntervalMs, compactMin);
}
public SplitOrMergeTracker(ZKWatcher watcher, Configuration conf,
Abortable abortable) {
try {
if (ZKUtil.checkExists(watcher, watcher.getZNodePaths().switchZNode) < 0) {
ZKUtil.createAndFailSilent(watcher, watcher.getZNodePaths().switchZNode);
}
} catch (KeeperException e) {
throw new RuntimeException(e);
}
splitZnode = ZNodePaths.joinZNode(watcher.getZNodePaths().switchZNode,
conf.get("zookeeper.znode.switch.split", "split"));
mergeZnode = ZNodePaths.joinZNode(watcher.getZNodePaths().switchZNode,
conf.get("zookeeper.znode.switch.merge", "merge"));
splitStateTracker = new SwitchStateTracker(watcher, splitZnode, abortable);
mergeStateTracker = new SwitchStateTracker(watcher, mergeZnode, abortable);
}
@Override
public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
return new SimpleRpcScheduler(
conf,
handlerCount,
conf.getInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT),
conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT),
conf.getInt(HConstants.MASTER_META_TRANSITION_HANDLER_COUNT,
HConstants.MASTER__META_TRANSITION_HANDLER_COUNT_DEFAULT),
priority,
server,
HConstants.QOS_THRESHOLD);
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniZKCluster();
Configuration conf = TEST_UTIL.getConfiguration();
abortable = new Abortable() {
@Override
public void abort(String why, Throwable e) {
LOG.info(why, e);
}
@Override
public boolean isAborted() {
return false;
}
};
zkw = new ZKWatcher(conf, "TableCFs", abortable, true);
zkStorageUtil = new ZKStorageUtil(zkw, conf);
}
/**
* Test that abort is called when lock times out.
*/
@Test
public void testEntityLockTimeout() throws Exception {
final long workerSleepTime = 200; // in ms
Abortable abortable = Mockito.mock(Abortable.class);
EntityLock lock = admin.namespaceLock("namespace", "description", abortable);
lock.setTestingSleepTime(workerSleepTime);
when(master.requestLock(any(), any()))
.thenReturn(LockResponse.newBuilder().setProcId(procId).build());
// Acquires the lock, but then it times out (since we don't call unlock() on it).
when(master.lockHeartbeat(any(), any()))
.thenReturn(LOCKED_RESPONSE, UNLOCKED_RESPONSE);
lock.requestLock();
lock.await();
assertTrue(lock.isLocked());
// Should get unlocked in next heartbeat i.e. after workerSleepTime. Wait 10x time to be sure.
assertTrue(waitLockTimeOut(lock, 10 * workerSleepTime));
// Works' run() returns, there is a small gap that the thread is still alive(os
// has not declare it is dead yet), so remove the following assertion.
// assertFalse(lock.getWorker().isAlive());
verify(abortable, times(1)).abort(any(), eq(null));
}
/**
* Test that abort is called when lockHeartbeat fails with IOException.
*/
@Test
public void testHeartbeatException() throws Exception {
final long workerSleepTime = 100; // in ms
Abortable abortable = Mockito.mock(Abortable.class);
EntityLock lock = admin.namespaceLock("namespace", "description", abortable);
lock.setTestingSleepTime(workerSleepTime);
when(master.requestLock(any(), any()))
.thenReturn(LockResponse.newBuilder().setProcId(procId).build());
when(master.lockHeartbeat(any(), any()))
.thenReturn(LOCKED_RESPONSE)
.thenThrow(new ServiceException("Failed heartbeat!"));
lock.requestLock();
lock.await();
assertTrue(waitLockTimeOut(lock, 100 * workerSleepTime));
while (lock.getWorker().isAlive()) {
TimeUnit.MILLISECONDS.sleep(100);
}
verify(abortable, times(1)).abort(any(), isA(HBaseIOException.class));
assertFalse(lock.getWorker().isAlive());
}
@After
public void tearDown()
throws KeeperException, ZooKeeperConnectionException, IOException {
// Make sure zk is clean before we run the next test.
ZKWatcher zkw = new ZKWatcher(TESTUTIL.getConfiguration(),
"@Before", new Abortable() {
@Override
public void abort(String why, Throwable e) {
throw new RuntimeException(why, e);
}
@Override
public boolean isAborted() {
return false;
}
});
ZKUtil.deleteNodeRecursively(zkw, zkw.getZNodePaths().baseZNode);
zkw.close();
}
@Test
public void testRpcScheduler() throws IOException, InterruptedException {
PriorityFunction qosFunction = mock(PriorityFunction.class);
Abortable abortable = new AbortServer();
CONF.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl);
RpcScheduler scheduler = new SimpleRpcScheduler(CONF, 2, 0, 0, 0, qosFunction, abortable, 0);
RpcServer rpcServer = RpcServerFactory.createRpcServer(null, "testRpcServer",
Lists.newArrayList(new BlockingServiceAndInterface((BlockingService) SERVICE, null)),
new InetSocketAddress("localhost", 0), CONF, scheduler);
try (BlockingRpcClient client = new BlockingRpcClient(CONF)) {
rpcServer.start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello").build());
} catch (Throwable e) {
assert (abortable.isAborted() == true);
} finally {
rpcServer.stop();
}
}
/**
* Test that we can interrupt a node that is blocked on a wait.
*/
@Test
public void testInterruptible() throws IOException, InterruptedException {
Abortable abortable = new StubAbortable();
ZKWatcher zk = new ZKWatcher(TEST_UTIL.getConfiguration(), "testInterruptible", abortable);
final TestTracker tracker = new TestTracker(zk, "/xyz", abortable);
tracker.start();
Thread t = new Thread(() -> {
try {
tracker.blockUntilAvailable();
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted", e);
}
});
t.start();
while (!t.isAlive()) {
Threads.sleep(1);
}
tracker.stop();
t.join();
// If it wasn't interruptible, we'd never get to here.
}
/**
* Setup <tt>this</tt>.
* <p>
* Exposed for TESTING
*/
void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop, int cacheSize) {
this.pool = new WaitForCompletionTaskRunner(pool);
this.factory = new CachingHTableFactory(factory, cacheSize);
this.abortable = new CapturingAbortable(abortable);
this.stopped = stop;
}
@Override
public RpcScheduler create(Configuration conf, PriorityFunction priorityFunction, Abortable abortable) {
// create the delegate scheduler
RpcScheduler delegate;
try {
// happens in <=0.98.4 where the scheduler factory is not visible
delegate = new SimpleRpcSchedulerFactory().create(conf, priorityFunction, abortable);
} catch (IllegalAccessError e) {
LOG.fatal(VERSION_TOO_OLD_FOR_INDEX_RPC);
throw e;
}
int indexHandlerCount = conf.getInt(QueryServices.INDEX_HANDLER_COUNT_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_HANDLER_COUNT);
int minPriority = getMinPriority(conf);
int maxPriority = conf.getInt(QueryServices.MAX_INDEX_PRIOIRTY_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_MAX_PRIORITY);
// make sure the ranges are outside the warning ranges
Preconditions.checkArgument(maxPriority > minPriority, "Max index priority (" + maxPriority
+ ") must be larger than min priority (" + minPriority + ")");
boolean allSmaller =
minPriority < HConstants.REPLICATION_QOS
&& maxPriority < HConstants.REPLICATION_QOS;
boolean allLarger = minPriority > HConstants.HIGH_QOS;
Preconditions.checkArgument(allSmaller || allLarger, "Index priority range (" + minPriority
+ ", " + maxPriority + ") must be outside HBase priority range ("
+ HConstants.REPLICATION_QOS + ", " + HConstants.HIGH_QOS + ")");
LOG.info("Using custom Phoenix Index RPC Handling with " + indexHandlerCount
+ " handlers and priority range [" + minPriority + ", " + maxPriority + ")");
PhoenixIndexRpcScheduler scheduler =
new PhoenixIndexRpcScheduler(indexHandlerCount, conf, delegate, minPriority,
maxPriority);
return scheduler;
}
/**
* Create a new EntityLock object to acquire an exclusive or shared lock on a table.
* Internally, the table namespace will also be locked in shared mode.
*/
public EntityLock tableLock(final TableName tableName, final boolean exclusive,
final String description, final Abortable abort) {
LockRequest lockRequest = buildLockRequest(exclusive ? LockType.EXCLUSIVE : LockType.SHARED,
tableName.getNameAsString(), null, null, description, ng.getNonceGroup(), ng.newNonce());
return new EntityLock(conf, stub, lockRequest, abort);
}
/**
* Abortable.abort() is called when the lease of the lock will expire.
* It's up to the user decide if simply abort the process or handle the loss of the lock
* by aborting the operation that was supposed to be under lock.
*/
EntityLock(Configuration conf, LockService.BlockingInterface stub,
LockRequest request, Abortable abort) {
this.stub = stub;
this.lockRequest = request;
this.abort = abort;
this.heartbeatTimeBuffer = conf.getInt(HEARTBEAT_TIME_BUFFER, 10000);
this.worker = new LockHeartbeatWorker(lockRequest.getDescription());
}
private MasterRegionWALRoller(String name, Configuration conf, Abortable abortable, FileSystem fs,
Path walRootDir, Path globalWALRootDir, String archivedWALSuffix) {
super(name, conf, abortable);
this.fs = fs;
this.walArchiveDir = new Path(walRootDir, HREGION_OLDLOGDIR_NAME);
this.globalWALArchiveDir = new Path(globalWALRootDir, HREGION_OLDLOGDIR_NAME);
this.archivedWALSuffix = archivedWALSuffix;
}
static MasterRegionWALRoller create(String name, Configuration conf, Abortable abortable,
FileSystem fs, Path walRootDir, Path globalWALRootDir, String archivedWALSuffix,
long rollPeriodMs, long flushSize) {
// we can not run with wal disabled, so force set it to true.
conf.setBoolean(WALFactory.WAL_ENABLED, true);
// we do not need this feature, so force disable it.
conf.setBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, false);
conf.setLong(WAL_ROLL_PERIOD_KEY, rollPeriodMs);
// make the roll size the same with the flush size, as we only have one region here
conf.setLong(WALUtil.WAL_BLOCK_SIZE, flushSize * 2);
conf.setFloat(AbstractFSWAL.WAL_ROLL_MULTIPLIER, 0.5f);
return new MasterRegionWALRoller(name, conf, abortable, fs, walRootDir, globalWALRootDir,
archivedWALSuffix);
}
FlushTableSubprocedurePool(String name, Configuration conf, Abortable abortable) {
this.abortable = abortable;
// configure the executor service
long keepAlive = conf.getLong(
RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_KEY,
RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_DEFAULT);
int threads = conf.getInt(CONCURENT_FLUSH_TASKS_KEY, DEFAULT_CONCURRENT_FLUSH_TASKS);
this.name = name;
executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS,
"rs(" + name + ")-flush-proc");
taskPool = new ExecutorCompletionService<>(executor);
}
private ZKWatcher createZooKeeperWatcher() throws IOException {
return new ZKWatcher(getConf(), "hbase Fsck", new Abortable() {
@Override
public void abort(String why, Throwable e) {
LOG.error(why, e);
System.exit(1);
}
@Override
public boolean isAborted() {
return false;
}
});
}
@Override
public EntityLock regionLock(final List<RegionInfo> regionInfo, final String description,
final Abortable abort) {
final LockServiceClient client =
new LockServiceClient(conf, lockStub, asyncClusterConnection.getNonceGenerator());
return client.regionLock(regionInfo, description, abort);
}
SnapshotSubprocedurePool(String name, Configuration conf, Abortable abortable) {
this.abortable = abortable;
// configure the executor service
long keepAlive = conf.getLong(
RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_KEY,
RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
int threads = conf.getInt(CONCURENT_SNAPSHOT_TASKS_KEY, DEFAULT_CONCURRENT_SNAPSHOT_TASKS);
this.name = name;
executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS,
"rs(" + name + ")-snapshot");
taskPool = new ExecutorCompletionService<>(executor);
}
@Override
public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
int totalHandlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
int rsReportHandlerCount = Math.max(1, conf
.getInt(MasterFifoRpcScheduler.MASTER_SERVER_REPORT_HANDLER_COUNT, totalHandlerCount / 2));
int callHandlerCount = Math.max(1, totalHandlerCount - rsReportHandlerCount);
return new MasterFifoRpcScheduler(conf, callHandlerCount, rsReportHandlerCount);
}
public BalancedQueueRpcExecutor(final String name, final int handlerCount,
final String callQueueType, final int maxQueueLength, final PriorityFunction priority,
final Configuration conf, final Abortable abortable) {
super(name, handlerCount, callQueueType, maxQueueLength, priority, conf, abortable);
this.balancer = getBalancer(this.numCallQueues);
initializeQueues(this.numCallQueues);
}
@Override
public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
return new CQTBERpcScheduler(conf, handlerCount,
conf.getInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT),
conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT),
conf.getInt(HConstants.MASTER_META_TRANSITION_HANDLER_COUNT,
HConstants.MASTER__META_TRANSITION_HANDLER_COUNT_DEFAULT),
priority, server, HConstants.QOS_THRESHOLD);
}
private void initFlusher(long flushSize, long flushPerChanges, long flushIntervalMs) {
flusher = new MasterRegionFlusherAndCompactor(conf, new Abortable() {
@Override
public boolean isAborted() {
return false;
}
@Override
public void abort(String why, Throwable e) {
}
}, region, flushSize, flushPerChanges, flushIntervalMs, 4, new Path("/tmp"), "");
}
private static ZKWatcher newZooKeeperWatcher() throws IOException {
return new ZKWatcher(UTIL.getConfiguration(), "testing utility", new Abortable() {
@Override
public void abort(String why, Throwable e) {
throw new RuntimeException(
"Unexpected abort in distributed three phase commit test:" + why, e);
}
@Override
public boolean isAborted() {
return false;
}
});
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniZKCluster();
Configuration conf = TEST_UTIL.getConfiguration();
Abortable abortable = new ZKMultiAbortable();
zkw = new ZKWatcher(conf,
"TestZKMulti", abortable, true);
}
public PhoenixRpcScheduler(Configuration conf, RpcScheduler delegate, int indexPriority, int metadataPriority, PriorityFunction priorityFunction, Abortable abortable) {
// copied from org.apache.hadoop.hbase.ipc.SimpleRpcScheduler in HBase 0.98.4
int indexHandlerCount = conf.getInt(QueryServices.INDEX_HANDLER_COUNT_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_HANDLER_COUNT);
int metadataHandlerCount = conf.getInt(QueryServices.METADATA_HANDLER_COUNT_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_HANDLER_COUNT);
int maxIndexQueueLength = conf.getInt(CALLQUEUE_LENGTH_CONF_KEY, indexHandlerCount*DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
int maxMetadataQueueLength = conf.getInt(CALLQUEUE_LENGTH_CONF_KEY, metadataHandlerCount*DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
this.indexPriority = indexPriority;
this.metadataPriority = metadataPriority;
this.delegate = delegate;
this.indexCallExecutor = new BalancedQueueRpcExecutor("Index", indexHandlerCount, maxIndexQueueLength, priorityFunction,conf,abortable);
this.metadataCallExecutor = new BalancedQueueRpcExecutor("Metadata", metadataHandlerCount, maxMetadataQueueLength, priorityFunction,conf,abortable);
}