下面列出了怎么用org.apache.hadoop.hbase.Server的API类实例代码及写法,或者点击链接到github查看源代码。
@InterfaceAudience.Private
public Context(final Server server, final Configuration localConf, final Configuration conf,
final FileSystem fs, final String peerId, final UUID clusterId,
final ReplicationPeer replicationPeer, final MetricsSource metrics,
final TableDescriptors tableDescriptors, final Abortable abortable) {
this.server = server;
this.localConf = localConf;
this.conf = conf;
this.fs = fs;
this.clusterId = clusterId;
this.peerId = peerId;
this.replicationPeer = replicationPeer;
this.metrics = metrics;
this.tableDescriptors = tableDescriptors;
this.abortable = abortable;
}
public static MasterRegion create(Server server) throws IOException {
MasterRegionParams params = new MasterRegionParams().server(server)
.regionDirName(MASTER_STORE_DIR).tableDescriptor(TABLE_DESC);
Configuration conf = server.getConfiguration();
long flushSize = conf.getLong(FLUSH_SIZE_KEY, DEFAULT_FLUSH_SIZE);
long flushPerChanges = conf.getLong(FLUSH_PER_CHANGES_KEY, DEFAULT_FLUSH_PER_CHANGES);
long flushIntervalMs = conf.getLong(FLUSH_INTERVAL_MS_KEY, DEFAULT_FLUSH_INTERVAL_MS);
int compactMin = conf.getInt(COMPACT_MIN_KEY, DEFAULT_COMPACT_MIN);
params.flushSize(flushSize).flushPerChanges(flushPerChanges).flushIntervalMs(flushIntervalMs)
.compactMin(compactMin);
int maxWals = conf.getInt(MAX_WALS_KEY, DEFAULT_MAX_WALS);
params.maxWals(maxWals);
if (conf.get(USE_HSYNC_KEY) != null) {
params.useHsync(conf.getBoolean(USE_HSYNC_KEY, false));
}
params.ringBufferSlotCount(conf.getInt(RING_BUFFER_SLOT_COUNT, DEFAULT_RING_BUFFER_SLOT_COUNT));
long rollPeriodMs = conf.getLong(ROLL_PERIOD_MS_KEY, DEFAULT_ROLL_PERIOD_MS);
params.rollPeriodMs(rollPeriodMs).archivedWalSuffix(ARCHIVED_WAL_SUFFIX)
.archivedHFileSuffix(ARCHIVED_HFILE_SUFFIX);
return MasterRegion.create(params);
}
@Override
protected RpcServerInterface createRpcServer(final Server server,
final RpcSchedulerFactory rpcSchedulerFactory, final InetSocketAddress bindAddress,
final String name) throws IOException {
final Configuration conf = regionServer.getConfiguration();
// RpcServer at HM by default enable ByteBufferPool iff HM having user table region in it
boolean reservoirEnabled = conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY,
LoadBalancer.isMasterCanHostUserRegions(conf));
try {
return RpcServerFactory.createRpcServer(server, name, getServices(),
bindAddress, // use final bindAddress for this server.
conf, rpcSchedulerFactory.create(conf, this, server), reservoirEnabled);
} catch (BindException be) {
throw new IOException(be.getMessage() + ". To switch ports use the '"
+ HConstants.MASTER_PORT + "' configuration property.",
be.getCause() != null ? be.getCause() : be);
}
}
protected RpcServerInterface createRpcServer(
final Server server,
final RpcSchedulerFactory rpcSchedulerFactory,
final InetSocketAddress bindAddress,
final String name
) throws IOException {
final Configuration conf = server.getConfiguration();
boolean reservoirEnabled = conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true);
try {
return RpcServerFactory.createRpcServer(server, name, getServices(),
bindAddress, // use final bindAddress for this server.
conf, rpcSchedulerFactory.create(conf, this, server), reservoirEnabled);
} catch (BindException be) {
throw new IOException(be.getMessage() + ". To switch ports use the '"
+ HConstants.REGIONSERVER_PORT + "' configuration property.",
be.getCause() != null ? be.getCause() : be);
}
}
public static RpcServer createRpcServer(final Server server, final String name,
final List<BlockingServiceAndInterface> services,
final InetSocketAddress bindAddress, Configuration conf,
RpcScheduler scheduler, boolean reservoirEnabled) throws IOException {
String rpcServerClass = conf.get(CUSTOM_RPC_SERVER_IMPL_CONF_KEY,
NettyRpcServer.class.getName());
StringBuilder servicesList = new StringBuilder();
for (BlockingServiceAndInterface s: services) {
ServiceDescriptor sd = s.getBlockingService().getDescriptorForType();
if (sd == null) continue; // Can be null for certain tests like TestTokenAuthentication
if (servicesList.length() > 0) servicesList.append(", ");
servicesList.append(sd.getFullName());
}
LOG.info("Creating " + rpcServerClass + " hosting " + servicesList);
return ReflectionUtils.instantiateWithCustomCtor(rpcServerClass,
new Class[] { Server.class, String.class, List.class,
InetSocketAddress.class, Configuration.class, RpcScheduler.class, boolean.class },
new Object[] { server, name, services, bindAddress, conf, scheduler, reservoirEnabled });
}
/**
* Constructs a server listening on the named port and address.
* @param server hosting instance of {@link Server}. We will do authentications if an
* instance else pass null for no authentication check.
* @param name Used keying this rpc servers' metrics and for naming the Listener thread.
* @param services A list of services.
* @param bindAddress Where to listen
* @param conf
* @param scheduler
* @param reservoirEnabled Enable ByteBufferPool or not.
*/
public SimpleRpcServer(final Server server, final String name,
final List<BlockingServiceAndInterface> services,
final InetSocketAddress bindAddress, Configuration conf,
RpcScheduler scheduler, boolean reservoirEnabled) throws IOException {
super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled);
this.socketSendBufferSize = 0;
this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10);
this.purgeTimeout = conf.getLong("hbase.ipc.client.call.purge.timeout",
2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
// Start the listener here and let it bind to the port
listener = new Listener(name);
this.port = listener.getAddress().getPort();
// Create the responder here
responder = new SimpleRpcServerResponder(this);
connectionManager = new ConnectionManager();
initReconfigurable(conf);
this.scheduler.init(new RpcSchedulerContext(this));
}
@Before
public void setUp() throws IOException {
ReplicationSource source = mock(ReplicationSource.class);
when(source.getPeerId()).thenReturn(PEER_ID);
when(source.getQueueStorage()).thenReturn(QUEUE_STORAGE);
conn = mock(Connection.class);
when(conn.isClosed()).thenReturn(false);
doAnswer(new Answer<Table>() {
@Override
public Table answer(InvocationOnMock invocation) throws Throwable {
return UTIL.getConnection().getTable((TableName) invocation.getArgument(0));
}
}).when(conn).getTable(any(TableName.class));
Server server = mock(Server.class);
when(server.getConnection()).thenReturn(conn);
when(source.getServer()).thenReturn(server);
checker = new SerialReplicationChecker(UTIL.getConfiguration(), source);
tableName = TableName.valueOf(name.getMethodName());
}
@Test
public void testCleanupUnknownPeerZNode() throws Exception {
Server server = new DummyServer("hostname2.example.org");
ReplicationQueueStorage rq = ReplicationStorageFactory
.getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
// populate some znodes in the peer znode
// add log to an unknown peer
String group = "testgroup";
rq.addWAL(server.getServerName(), "2", group + ".log1");
rq.addWAL(server.getServerName(), "2", group + ".log2");
NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName());
w1.run();
// The log of the unknown peer should be removed from zk
for (String peer : manager.getAllQueues()) {
assertTrue(peer.startsWith("1"));
}
}
@Test
public void testRecoveredReplicationSourceShipperGetPosition() throws Exception {
String walGroupId = "fake-wal-group-id";
ServerName serverName = ServerName.valueOf("www.example.com", 12006, 1524679704418L);
ServerName deadServer = ServerName.valueOf("www.deadServer.com", 12006, 1524679704419L);
PriorityBlockingQueue<Path> queue = new PriorityBlockingQueue<>();
queue.put(new Path("/www/html/test"));
RecoveredReplicationSource source = Mockito.mock(RecoveredReplicationSource.class);
Server server = Mockito.mock(Server.class);
Mockito.when(server.getServerName()).thenReturn(serverName);
Mockito.when(source.getServer()).thenReturn(server);
Mockito.when(source.getServerWALsBelongTo()).thenReturn(deadServer);
ReplicationQueueStorage storage = Mockito.mock(ReplicationQueueStorage.class);
Mockito.when(storage.getWALPosition(Mockito.eq(serverName), Mockito.any(), Mockito.any()))
.thenReturn(1001L);
Mockito.when(storage.getWALPosition(Mockito.eq(deadServer), Mockito.any(), Mockito.any()))
.thenReturn(-1L);
conf.setInt("replication.source.maxretriesmultiplier", -1);
RecoveredReplicationSourceShipper shipper =
new RecoveredReplicationSourceShipper(conf, walGroupId, queue, source, storage);
Assert.assertEquals(1001L, shipper.getStartPosition());
conf.unset("replication.source.maxretriesmultiplier");
}
@Test
public void testThreadCleanup() throws Exception {
Configuration conf = UTIL.getConfiguration();
conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, "");
Server server = new DummyServer();
Path archivedHfileDir =
new Path(UTIL.getDataTestDirOnTestFS(), HConstants.HFILE_ARCHIVE_DIRECTORY);
// setup the cleaner
FileSystem fs = UTIL.getDFSCluster().getFileSystem();
HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir, POOL);
// clean up archive directory
fs.delete(archivedHfileDir, true);
fs.mkdirs(archivedHfileDir);
// create some file to delete
fs.createNewFile(new Path(archivedHfileDir, "dfd-dfd"));
// launch the chore
cleaner.chore();
// call cleanup
cleaner.cleanup();
// wait awhile for thread to die
Thread.sleep(100);
for (Thread thread : cleaner.getCleanerThreads()) {
Assert.assertFalse(thread.isAlive());
}
}
@Test
public void testLargeSmallIsolation() throws Exception {
Configuration conf = UTIL.getConfiguration();
// no cleaner policies = delete all files
conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, "");
conf.setInt(HFileCleaner.HFILE_DELETE_THROTTLE_THRESHOLD, 512 * 1024);
Server server = new DummyServer();
Path archivedHfileDir =
new Path(UTIL.getDataTestDirOnTestFS(), HConstants.HFILE_ARCHIVE_DIRECTORY);
// setup the cleaner
FileSystem fs = UTIL.getDFSCluster().getFileSystem();
HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir, POOL);
// clean up archive directory
fs.delete(archivedHfileDir, true);
fs.mkdirs(archivedHfileDir);
// necessary set up
final int LARGE_FILE_NUM = 5;
final int SMALL_FILE_NUM = 20;
createFilesForTesting(LARGE_FILE_NUM, SMALL_FILE_NUM, fs, archivedHfileDir);
// call cleanup
cleaner.chore();
Assert.assertEquals(LARGE_FILE_NUM, cleaner.getNumOfDeletedLargeFiles());
Assert.assertEquals(SMALL_FILE_NUM, cleaner.getNumOfDeletedSmallFiles());
}
public PairOfSameType<HRegion> stepsAfterPONR(final Server server,
final RegionServerServices services, PairOfSameType<HRegion> regions)
throws IOException {
openDaughters(server, services, regions.getFirst(), regions.getSecond());
transitionZKNode(server, services, regions.getFirst(), regions.getSecond());
return regions;
}
/**
* Open daughter regions, add them to online list and update meta.
* @param server
* @param daughter
* @throws IOException
* @throws KeeperException
*/
void openDaughterRegion(final Server server, final HRegion daughter)
throws IOException, KeeperException {
HRegionInfo hri = daughter.getRegionInfo();
LoggingProgressable reporter = server == null ? null
: new LoggingProgressable(hri, server.getConfiguration().getLong(
"hbase.regionserver.split.daughter.open.log.interval", 10000));
daughter.openHRegion(reporter);
}
@Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
MetricsSource metrics) throws IOException {
super.init(conf, fs, manager, queueStorage, replicationPeer, server, peerClusterZnode,
clusterId, walFileLengthProvider, metrics);
this.actualPeerId = this.replicationQueueInfo.getPeerId();
}
/**
* Instantiation method used by region servers
* @param conf configuration to use
* @param fs file system to use
* @param manager replication manager to ping to
* @param server the server for this region server
* @param queueId the id of our replication queue
* @param clusterId unique UUID for the cluster
* @param metrics metrics for replication source
*/
@Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
MetricsSource metrics) throws IOException {
this.server = server;
this.conf = HBaseConfiguration.create(conf);
this.waitOnEndpointSeconds =
this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS);
decorateConf();
this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second
this.maxRetriesMultiplier =
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
this.queueStorage = queueStorage;
this.replicationPeer = replicationPeer;
this.manager = manager;
this.fs = fs;
this.metrics = metrics;
this.clusterId = clusterId;
this.queueId = queueId;
this.replicationQueueInfo = new ReplicationQueueInfo(queueId);
this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
currentBandwidth = getCurrentBandwidth();
this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
this.totalBufferUsed = manager.getTotalBufferUsed();
this.walFileLengthProvider = walFileLengthProvider;
LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId,
replicationPeer.getId(), this.currentBandwidth);
}
/**
* @param watcher ZK watcher
* @param sn ServerName
* @param master In an instance of a Master.
*/
ActiveMasterManager(ZKWatcher watcher, ServerName sn, Server master) {
super(watcher);
watcher.registerListener(this);
this.sn = sn;
this.master = master;
}
@VisibleForTesting
HeapMemoryManager(BlockCache blockCache, FlushRequester memStoreFlusher,
Server server, RegionServerAccounting regionServerAccounting) {
Configuration conf = server.getConfiguration();
this.blockCache = toResizableBlockCache(blockCache);
this.memStoreFlusher = memStoreFlusher;
this.server = server;
this.regionServerAccounting = regionServerAccounting;
this.tunerOn = doInit(conf);
this.defaultChorePeriod = conf.getInt(HBASE_RS_HEAP_MEMORY_TUNER_PERIOD,
HBASE_RS_HEAP_MEMORY_TUNER_DEFAULT_PERIOD);
this.heapOccupancyLowWatermark = conf.getFloat(HConstants.HEAP_OCCUPANCY_LOW_WATERMARK_KEY,
HConstants.DEFAULT_HEAP_OCCUPANCY_LOW_WATERMARK);
metricsHeapMemoryManager = new MetricsHeapMemoryManager();
}
/**
* This method used internally by the RegionServer to close out regions.
* @param abort If the regionserver is aborting.
*/
public CloseRegionHandler(final Server server,
final RegionServerServices rsServices,
final RegionInfo regionInfo, final boolean abort,
ServerName destination) {
this(server, rsServices, regionInfo, abort,
EventType.M_RS_CLOSE_REGION, destination);
}
protected CloseRegionHandler(final Server server,
final RegionServerServices rsServices, RegionInfo regionInfo,
boolean abort, EventType eventType, ServerName destination) {
super(server, eventType);
this.server = server;
this.rsServices = rsServices;
this.regionInfo = regionInfo;
this.abort = abort;
this.destination = destination;
}
protected OpenRegionHandler(final Server server,
final RegionServerServices rsServices, final RegionInfo regionInfo,
final TableDescriptor htd, long masterSystemTime, EventType eventType) {
super(server, eventType);
this.rsServices = rsServices;
this.regionInfo = regionInfo;
this.htd = htd;
this.masterSystemTime = masterSystemTime;
}
PostOpenDeployTasksThread(final HRegion region, final Server server,
final RegionServerServices services, final AtomicBoolean signaller, long masterSystemTime) {
super("PostOpenDeployTasks:" + region.getRegionInfo().getEncodedName());
this.setDaemon(true);
this.server = server;
this.services = services;
this.region = region;
this.signaller = signaller;
this.masterSystemTime = masterSystemTime;
}
public WALSplitterHandler(final Server server, SplitLogWorkerCoordination coordination,
SplitLogWorkerCoordination.SplitTaskDetails splitDetails, CancelableProgressable reporter,
AtomicInteger inProgressTasks, TaskExecutor splitTaskExecutor) {
super(server, EventType.RS_LOG_REPLAY);
this.splitTaskDetails = splitDetails;
this.coordination = coordination;
this.reporter = reporter;
this.inProgressTasks = inProgressTasks;
this.inProgressTasks.incrementAndGet();
this.serverName = server.getServerName();
this.splitTaskExecutor = splitTaskExecutor;
}
@Override
public void chore() {
// Noop if rss is null. This will never happen in a normal condition except for cases
// when the test case is not spinning up a cluster
if (regionServerServices == null) return;
List<HRegion> onlineRegions = (List<HRegion>) regionServerServices.getRegions();
if (onlineRegions == null) return;
for (HRegion region : onlineRegions) {
if (LOG.isTraceEnabled()) {
LOG.trace("Started compacted hfiles cleaner on " + region.getRegionInfo());
}
for (HStore store : region.getStores()) {
try {
if (useExecutor && regionServerServices != null) {
CompactedHFilesDischargeHandler handler = new CompactedHFilesDischargeHandler(
(Server) regionServerServices, EventType.RS_COMPACTED_FILES_DISCHARGER, store);
regionServerServices.getExecutorService().submit(handler);
} else {
// call synchronously if the RegionServerServices are not
// available
store.closeAndArchiveCompactedFiles();
}
if (LOG.isTraceEnabled()) {
LOG.trace("Completed archiving the compacted files for the region "
+ region.getRegionInfo() + " under the store " + store.getColumnFamilyName());
}
} catch (Exception e) {
LOG.error("Exception while trying to close and archive the compacted store "
+ "files of the store " + store.getColumnFamilyName() + " in the" + " region "
+ region.getRegionInfo(), e);
}
}
if (LOG.isTraceEnabled()) {
LOG.trace(
"Completed the compacted hfiles cleaner for the region " + region.getRegionInfo());
}
}
}
/**
* Default base class constructor.
*/
public EventHandler(Server server, EventType eventType) {
this.parent = Tracer.getCurrentSpan();
this.server = server;
this.eventType = eventType;
seqid = seqids.incrementAndGet();
if (server != null) {
this.waitingTimeForEvents = server.getConfiguration().
getInt("hbase.master.event.waiting.time", 1000);
}
}
/**
* Constructs a server listening on the named port and address.
* @param server hosting instance of {@link Server}. We will do authentications if an
* instance else pass null for no authentication check.
* @param name Used keying this rpc servers' metrics and for naming the Listener thread.
* @param services A list of services.
* @param bindAddress Where to listen
* @param conf
* @param scheduler
* @param reservoirEnabled Enable ByteBufferPool or not.
*/
public RpcServer(final Server server, final String name,
final List<BlockingServiceAndInterface> services,
final InetSocketAddress bindAddress, Configuration conf,
RpcScheduler scheduler, boolean reservoirEnabled) throws IOException {
this.bbAllocator = ByteBuffAllocator.create(conf, reservoirEnabled);
this.server = server;
this.services = services;
this.bindAddress = bindAddress;
this.conf = conf;
// See declaration above for documentation on what this size is.
this.maxQueueSizeInBytes =
this.conf.getLong("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
this.minClientRequestTimeout = conf.getInt(MIN_CLIENT_REQUEST_TIMEOUT,
DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT);
this.maxRequestSize = conf.getInt(MAX_REQUEST_SIZE, DEFAULT_MAX_REQUEST_SIZE);
this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this));
this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true);
this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true);
this.cellBlockBuilder = new CellBlockBuilder(conf);
this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
this.userProvider = UserProvider.instantiate(conf);
this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
if (isSecurityEnabled) {
saslProps = SaslUtil.initSaslProperties(conf.get("hbase.rpc.protection",
QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)));
} else {
saslProps = Collections.emptyMap();
}
this.scheduler = scheduler;
}
@Test
public void testCleanupFailoverQueues() throws Exception {
Server server = new DummyServer("hostname1.example.org");
ReplicationQueueStorage rq = ReplicationStorageFactory
.getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
// populate some znodes in the peer znode
SortedSet<String> files = new TreeSet<>();
String group = "testgroup";
String file1 = group + "." + EnvironmentEdgeManager.currentTime() + ".log1";
String file2 = group + "." + EnvironmentEdgeManager.currentTime() + ".log2";
files.add(file1);
files.add(file2);
for (String file : files) {
rq.addWAL(server.getServerName(), "1", file);
}
Server s1 = new DummyServer("dummyserver1.example.org");
ReplicationPeers rp1 =
ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration());
rp1.init();
NodeFailoverWorker w1 =
manager.new NodeFailoverWorker(server.getServerName());
w1.run();
assertEquals(1, manager.getWalsByIdRecoveredQueues().size());
String id = "1-" + server.getServerName().getServerName();
assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group));
ReplicationSourceInterface source = mock(ReplicationSourceInterface.class);
when(source.getQueueId()).thenReturn(id);
when(source.isRecovered()).thenReturn(true);
when(source.isSyncReplication()).thenReturn(false);
manager.cleanOldLogs(file2, false, source);
// log1 should be deleted
assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group));
}
@Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId,
UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
throws IOException {
throw new IOException("Failing deliberately");
}
private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) {
ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
Server mockServer = Mockito.mock(Server.class);
ReplicationSource source = Mockito.mock(ReplicationSource.class);
when(source.getSourceManager()).thenReturn(mockSourceManager);
when(source.getSourceMetrics()).thenReturn(new MetricsSource("1"));
when(source.getWALFileLengthProvider()).thenReturn(log);
when(source.getServer()).thenReturn(mockServer);
when(source.isRecovered()).thenReturn(recovered);
return source;
}
@Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId,
UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
throws IOException {
this.manager = manager;
this.peerClusterId = peerClusterId;
this.metrics = metrics;
this.walFileLengthProvider = walFileLengthProvider;
this.replicationPeer = rp;
}
@Before
public void setUp() throws IOException {
htu = new HBaseCommonTestingUtility();
Configuration conf = htu.getConfiguration();
conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, false);
// Runs on local filesystem. Test does not need sync. Turn off checks.
conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
Path testDir = htu.getDataTestDir();
CommonFSUtils.setRootDir(htu.getConfiguration(), testDir);
Server server = RegionProcedureStoreTestHelper.mockServer(conf);
region = MasterRegionFactory.create(server);
store = RegionProcedureStoreTestHelper.createStore(server, region, new LoadCounter());
}