下面列出了怎么用org.apache.hadoop.util.Daemon的API类实例代码及写法,或者点击链接到github查看源代码。
void interruptAndJoin() throws InterruptedException {
Daemon daemonCopy = null;
synchronized (this) {
if (isRunning()) {
daemon.interrupt();
daemonCopy = daemon;
}
}
if (daemonCopy != null) {
if(LOG.isDebugEnabled()) {
LOG.debug("Wait for lease checker to terminate");
}
daemonCopy.join();
}
}
private synchronized void startExpiryDaemon() {
// start daemon only if not already started
if (isDaemonStarted() == true) {
return;
}
daemon = new Daemon(new Runnable() {
@Override
public void run() {
try {
PeerCache.this.run();
} catch(InterruptedException e) {
//noop
} finally {
PeerCache.this.clear();
}
}
@Override
public String toString() {
return String.valueOf(PeerCache.this);
}
});
daemon.start();
}
public Daemon recoverBlocks(
final String who,
final Collection<RecoveringBlock> blocks) {
Daemon d = new Daemon(threadGroup, new Runnable() {
/** Recover a list of blocks. It is run by the primary datanode. */
@Override
public void run() {
for(RecoveringBlock b : blocks) {
try {
logRecoverBlock(who, b);
recoverBlock(b);
} catch (IOException e) {
LOG.warn("recoverBlocks FAILED: " + b, e);
}
}
}
});
d.start();
return d;
}
/**
* BlockRecoveryFI_07. max replica length from all DNs is zero.
*
* @throws IOException in case of an error
*/
@Test
public void testZeroLenReplicas() throws IOException, InterruptedException {
if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName());
}
DataNode spyDN = spy(dn);
doReturn(new ReplicaRecoveryInfo(block.getBlockId(), 0,
block.getGenerationStamp(), ReplicaState.FINALIZED)).when(spyDN).
initReplicaRecovery(any(RecoveringBlock.class));
Daemon d = spyDN.recoverBlocks("fake NN", initRecoveringBlocks());
d.join();
DatanodeProtocol dnP = dn.getActiveNamenodeForBP(POOL_ID);
verify(dnP).commitBlockSynchronization(
block, RECOVERY_ID, 0, true, true, DatanodeID.EMPTY_ARRAY, null);
}
void interruptAndJoin() throws InterruptedException {
Daemon daemonCopy = null;
synchronized (this) {
if (isRunning()) {
daemon.interrupt();
daemonCopy = daemon;
}
}
if (daemonCopy != null) {
if(LOG.isDebugEnabled()) {
LOG.debug("Wait for lease checker to terminate");
}
daemonCopy.join();
}
}
private synchronized void startExpiryDaemon() {
// start daemon only if not already started
if (isDaemonStarted() == true) {
return;
}
daemon = new Daemon(new Runnable() {
@Override
public void run() {
try {
PeerCache.this.run();
} catch(InterruptedException e) {
//noop
} finally {
PeerCache.this.clear();
}
}
@Override
public String toString() {
return String.valueOf(PeerCache.this);
}
});
daemon.start();
}
public Daemon recoverBlocks(
final String who,
final Collection<RecoveringBlock> blocks) {
Daemon d = new Daemon(threadGroup, new Runnable() {
/** Recover a list of blocks. It is run by the primary datanode. */
@Override
public void run() {
for(RecoveringBlock b : blocks) {
try {
logRecoverBlock(who, b);
recoverBlock(b);
} catch (IOException e) {
LOG.warn("recoverBlocks FAILED: " + b, e);
}
}
}
});
d.start();
return d;
}
/**
* BlockRecoveryFI_07. max replica length from all DNs is zero.
*
* @throws IOException in case of an error
*/
@Test
public void testZeroLenReplicas() throws IOException, InterruptedException {
if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName());
}
DataNode spyDN = spy(dn);
doReturn(new ReplicaRecoveryInfo(block.getBlockId(), 0,
block.getGenerationStamp(), ReplicaState.FINALIZED)).when(spyDN).
initReplicaRecovery(any(RecoveringBlock.class));
Daemon d = spyDN.recoverBlocks("fake NN", initRecoveringBlocks());
d.join();
DatanodeProtocol dnP = dn.getActiveNamenodeForBP(POOL_ID);
verify(dnP).commitBlockSynchronization(
block, RECOVERY_ID, 0, true, true, DatanodeID.EMPTY_ARRAY, null);
}
/**
* Triggers failover processing for safe mode and blocks until we have left
* safe mode.
*
* @throws IOException
*/
protected void triggerFailover() throws IOException {
clearDataStructures();
for (DatanodeInfo node : namesystem.datanodeReport(DatanodeReportType.LIVE)) {
liveDatanodes.add(node);
outStandingHeartbeats.add(node);
}
safeModeState = SafeModeState.FAILOVER_IN_PROGRESS;
safeModeMonitor = new Daemon(new SafeModeMonitor(namesystem, this));
safeModeMonitor.start();
try {
safeModeMonitor.join();
} catch (InterruptedException ie) {
throw new IOException("triggerSafeMode() interruped()");
}
if (safeModeState != SafeModeState.AFTER_FAILOVER) {
throw new RuntimeException("safeModeState is : " + safeModeState +
" which does not indicate a successfull exit of safemode");
}
}
/**
* main() has some simple utility methods.
* @param argv Command line parameters.
* @exception Exception if the filesystem does not exist.
*/
public static void main(String[] argv) throws Exception {
StringUtils.startupShutdownMessage(SecondaryNameNode.class, argv, LOG);
Configuration tconf = new Configuration();
try {
argv = DFSUtil.setGenericConf(argv, tconf);
} catch (IllegalArgumentException e) {
System.err.println(e.getMessage());
printUsage("");
return;
}
if (argv.length >= 1) {
SecondaryNameNode secondary = new SecondaryNameNode(tconf);
int ret = secondary.processArgs(argv);
System.exit(ret);
}
// Create a never ending deamon
Daemon checkpointThread = new Daemon(new SecondaryNameNode(tconf));
checkpointThread.start();
}
private void initDataXceiver(Configuration conf) throws IOException {
String address =
NetUtils.getServerAddress(conf,
"dfs.datanode.bindAddress",
"dfs.datanode.port",
"dfs.datanode.address");
InetSocketAddress socAddr = NetUtils.createSocketAddr(address);
// find free port
ServerSocket ss = (socketWriteTimeout > 0) ?
ServerSocketChannel.open().socket() : new ServerSocket();
Server.bind(ss, socAddr,
conf.getInt("dfs.datanode.xceiver.listen.queue.size", 128));
ss.setReceiveBufferSize(DEFAULT_DATA_SOCKET_SIZE);
// adjust machine name with the actual port
int tmpPort = ss.getLocalPort();
selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
tmpPort);
LOG.info("Opened info server at " + tmpPort);
this.threadGroup = new ThreadGroup("dataXceiverServer");
this.dataXceiverServer = new Daemon(threadGroup,
new DataXceiverServer(ss, conf, this));
this.threadGroup.setDaemon(true); // auto destroy when empty
}
public Daemon recoverBlocks(final int namespaceId, final Block[] blocks, final DatanodeInfo[][] targets) {
Daemon d = new Daemon(threadGroup, new Runnable() {
/** Recover a list of blocks. It is run by the primary datanode. */
public void run() {
for(int i = 0; i < blocks.length; i++) {
try {
logRecoverBlock("NameNode", namespaceId, blocks[i], targets[i]);
recoverBlock(namespaceId, blocks[i], false, targets[i], true, 0);
} catch (IOException e) {
LOG.warn("recoverBlocks FAILED, blocks[" + i + "]=" + blocks[i], e);
}
}
}
});
d.start();
return d;
}
public Daemon recoverBlocks(final Block[] blocks, final DatanodeInfo[][] targets) {
Daemon d = new Daemon(threadGroup, new Runnable() {
/** Recover a list of blocks. It is run by the primary datanode. */
public void run() {
for(int i = 0; i < blocks.length; i++) {
try {
logRecoverBlock("NameNode", blocks[i], targets[i]);
recoverBlock(blocks[i], false, targets[i], true);
} catch (IOException e) {
LOG.warn("recoverBlocks FAILED, blocks[" + i + "]=" + blocks[i], e);
}
}
}
});
d.start();
return d;
}
@Test
public void testConcurrentWriteLockWithDifferentResource() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
final int count = 100;
final LockManager<Integer> manager = new LockManager<>(conf);
final int sleep = 10;
final AtomicInteger done = new AtomicInteger();
for (int i = 0; i < count; i++) {
final Integer id = i;
Daemon d1 = new Daemon(() -> {
try {
manager.writeLock(id);
Thread.sleep(sleep);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
manager.writeUnlock(id);
}
done.getAndIncrement();
});
d1.setName("Locker-" + i);
d1.start();
}
GenericTestUtils.waitFor(() -> done.get() == count, 100,
10 * count * sleep);
Assert.assertEquals(count, done.get());
}
/**
* Should be called before this object is used.
*/
@Override
public synchronized void start(CertificateClient certClient)
throws IOException {
super.start(certClient);
tokenRemoverThread = new Daemon(new ExpiredTokenRemover());
tokenRemoverThread.start();
}
private OzoneManagerDoubleBuffer(OMMetadataManager omMetadataManager,
OzoneManagerRatisSnapshot ozoneManagerRatisSnapShot,
boolean isRatisEnabled, boolean isTracingEnabled,
Function<Long, Long> indexToTerm) {
this.currentBuffer = new ConcurrentLinkedQueue<>();
this.readyBuffer = new ConcurrentLinkedQueue<>();
this.isRatisEnabled = isRatisEnabled;
this.isTracingEnabled = isTracingEnabled;
if (!isRatisEnabled) {
this.currentFutureQueue = new ConcurrentLinkedQueue<>();
this.readyFutureQueue = new ConcurrentLinkedQueue<>();
} else {
this.currentFutureQueue = null;
this.readyFutureQueue = null;
}
this.omMetadataManager = omMetadataManager;
this.ozoneManagerRatisSnapShot = ozoneManagerRatisSnapShot;
this.ozoneManagerDoubleBufferMetrics =
OzoneManagerDoubleBufferMetrics.create();
this.indexToTerm = indexToTerm;
isRunning.set(true);
// Daemon thread which runs in back ground and flushes transactions to DB.
daemon = new Daemon(this::flushTransactions);
daemon.setName("OMDoubleBufferFlushThread");
daemon.start();
}
/**
* Create bucketCount number of createBucket responses for each iteration.
* All these iterations are run in parallel. Then verify OM DB has correct
* number of entries or not.
*/
private void testDoubleBuffer(int volumeCount, int bucketsPerVolume)
throws Exception {
// Reset transaction id.
trxId.set(0);
for (int i = 0; i < volumeCount; i++) {
Daemon d1 = new Daemon(() -> doTransactions(bucketsPerVolume));
d1.start();
}
// We are doing +1 for volume transaction.
// Here not checking lastAppliedIndex because transactionIndex is
// shared across threads, and lastAppliedIndex cannot be always
// expectedTransactions. So, skipping that check here.
int expectedBuckets = bucketsPerVolume * volumeCount;
long expectedTransactions = volumeCount + expectedBuckets;
GenericTestUtils.waitFor(() ->
expectedTransactions == doubleBuffer.getFlushedTransactionCount(),
100, volumeCount * 500);
GenericTestUtils.waitFor(() ->
assertRowCount(volumeCount, omMetadataManager.getVolumeTable()),
300, volumeCount * 300);
GenericTestUtils.waitFor(() ->
assertRowCount(expectedBuckets, omMetadataManager.getBucketTable()),
300, volumeCount * 300);
Assert.assertTrue(doubleBuffer.getFlushIterations() > 0);
}
DirectoryScanner(DataNode datanode, FsDatasetSpi<?> dataset, Configuration conf) {
this.datanode = datanode;
this.dataset = dataset;
int interval = conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT);
scanPeriodMsecs = interval * 1000L; //msec
int threads =
conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT);
reportCompileThreadPool = Executors.newFixedThreadPool(threads,
new Daemon.DaemonFactory());
masterThread = new ScheduledThreadPoolExecutor(1,
new Daemon.DaemonFactory());
}
private void initDataXceiver(Configuration conf) throws IOException {
// find free port or use privileged port provided
TcpPeerServer tcpPeerServer;
if (secureResources != null) {
tcpPeerServer = new TcpPeerServer(secureResources);
} else {
tcpPeerServer = new TcpPeerServer(dnConf.socketWriteTimeout,
DataNode.getStreamingAddr(conf));
}
tcpPeerServer.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
streamingAddr = tcpPeerServer.getStreamingAddr();
LOG.info("Opened streaming server at " + streamingAddr);
this.threadGroup = new ThreadGroup("dataXceiverServer");
xserver = new DataXceiverServer(tcpPeerServer, conf, this);
this.dataXceiverServer = new Daemon(threadGroup, xserver);
this.threadGroup.setDaemon(true); // auto destroy when empty
if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT) ||
conf.getBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT)) {
DomainPeerServer domainPeerServer =
getDomainPeerServer(conf, streamingAddr.getPort());
if (domainPeerServer != null) {
this.localDataXceiverServer = new Daemon(threadGroup,
new DataXceiverServer(domainPeerServer, conf, this));
LOG.info("Listening on UNIX domain socket: " +
domainPeerServer.getBindPath());
}
}
this.shortCircuitRegistry = new ShortCircuitRegistry(conf);
}
/**
* BlockRecoveryFI_05. One DN throws RecoveryInProgressException.
*
* @throws IOException
* in case of an error
*/
@Test
public void testRecoveryInProgressException()
throws IOException, InterruptedException {
if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName());
}
DataNode spyDN = spy(dn);
doThrow(new RecoveryInProgressException("Replica recovery is in progress")).
when(spyDN).initReplicaRecovery(any(RecoveringBlock.class));
Daemon d = spyDN.recoverBlocks("fake NN", initRecoveringBlocks());
d.join();
verify(spyDN, never()).syncBlock(
any(RecoveringBlock.class), anyListOf(BlockRecord.class));
}
/**
* BlockRecoveryFI_06. all datanodes throws an exception.
*
* @throws IOException
* in case of an error
*/
@Test
public void testErrorReplicas() throws IOException, InterruptedException {
if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName());
}
DataNode spyDN = spy(dn);
doThrow(new IOException()).
when(spyDN).initReplicaRecovery(any(RecoveringBlock.class));
Daemon d = spyDN.recoverBlocks("fake NN", initRecoveringBlocks());
d.join();
verify(spyDN, never()).syncBlock(
any(RecoveringBlock.class), anyListOf(BlockRecord.class));
}
/** should be called before this object is used */
public void startThreads() throws IOException {
Preconditions.checkState(!running);
updateCurrentKey();
synchronized (this) {
running = true;
tokenRemoverThread = new Daemon(new ExpiredTokenRemover());
tokenRemoverThread.start();
}
}
DirectoryScanner(DataNode datanode, FsDatasetSpi<?> dataset, Configuration conf) {
this.datanode = datanode;
this.dataset = dataset;
int interval = conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT);
scanPeriodMsecs = interval * 1000L; //msec
int threads =
conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT);
reportCompileThreadPool = Executors.newFixedThreadPool(threads,
new Daemon.DaemonFactory());
masterThread = new ScheduledThreadPoolExecutor(1,
new Daemon.DaemonFactory());
}
private void initDataXceiver(Configuration conf) throws IOException {
// find free port or use privileged port provided
TcpPeerServer tcpPeerServer;
if (secureResources != null) {
tcpPeerServer = new TcpPeerServer(secureResources);
} else {
tcpPeerServer = new TcpPeerServer(dnConf.socketWriteTimeout,
DataNode.getStreamingAddr(conf));
}
tcpPeerServer.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
streamingAddr = tcpPeerServer.getStreamingAddr();
LOG.info("Opened streaming server at " + streamingAddr);
this.threadGroup = new ThreadGroup("dataXceiverServer");
xserver = new DataXceiverServer(tcpPeerServer, conf, this);
this.dataXceiverServer = new Daemon(threadGroup, xserver);
this.threadGroup.setDaemon(true); // auto destroy when empty
if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT) ||
conf.getBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT)) {
DomainPeerServer domainPeerServer =
getDomainPeerServer(conf, streamingAddr.getPort());
if (domainPeerServer != null) {
this.localDataXceiverServer = new Daemon(threadGroup,
new DataXceiverServer(domainPeerServer, conf, this));
LOG.info("Listening on UNIX domain socket: " +
domainPeerServer.getBindPath());
}
}
this.shortCircuitRegistry = new ShortCircuitRegistry(conf);
}
/**
* BlockRecoveryFI_05. One DN throws RecoveryInProgressException.
*
* @throws IOException
* in case of an error
*/
@Test
public void testRecoveryInProgressException()
throws IOException, InterruptedException {
if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName());
}
DataNode spyDN = spy(dn);
doThrow(new RecoveryInProgressException("Replica recovery is in progress")).
when(spyDN).initReplicaRecovery(any(RecoveringBlock.class));
Daemon d = spyDN.recoverBlocks("fake NN", initRecoveringBlocks());
d.join();
verify(spyDN, never()).syncBlock(
any(RecoveringBlock.class), anyListOf(BlockRecord.class));
}
/**
* BlockRecoveryFI_06. all datanodes throws an exception.
*
* @throws IOException
* in case of an error
*/
@Test
public void testErrorReplicas() throws IOException, InterruptedException {
if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName());
}
DataNode spyDN = spy(dn);
doThrow(new IOException()).
when(spyDN).initReplicaRecovery(any(RecoveringBlock.class));
Daemon d = spyDN.recoverBlocks("fake NN", initRecoveringBlocks());
d.join();
verify(spyDN, never()).syncBlock(
any(RecoveringBlock.class), anyListOf(BlockRecord.class));
}
/** should be called before this object is used */
public void startThreads() throws IOException {
Preconditions.checkState(!running);
updateCurrentKey();
synchronized (this) {
running = true;
tokenRemoverThread = new Daemon(new ExpiredTokenRemover());
tokenRemoverThread.start();
}
}
public DistRaidNode(Configuration conf) throws IOException {
super(conf);
pendingFiles = new HashMap<PolicyInfo, Map<String, FileStatus>>();
this.jobMonitor = new JobMonitor(conf);
this.jobMonitorThread = new Daemon(this.jobMonitor);
this.jobScheduler = new JobScheduler();
this.jobSchedulerThread = new Daemon(this.jobScheduler);
this.jobMonitorThread.start();
this.jobSchedulerThread.start();
LOG.info("created");
}
/**
* Initialize SnapshotNode
* @throws IOException
*/
private void init() throws IOException {
ssDir = conf.get("fs.snapshot.dir", "/.SNAPSHOT");
tempDir = conf.get("fs.snapshot.tempdir", "/tmp/snapshot");
fileServer = getImageServer();
dfs = FileSystem.get(conf);
Path ssPath = new Path(ssDir);
if (!dfs.exists(ssPath)) {
dfs.mkdirs(ssPath);
}
maxLeaseUpdateThreads = conf.getInt("fs.snapshot.leaseupdatethreads", 100);
// Waiting room purge thread
purgeThread = new Daemon((new WaitingRoom(conf)).getPurger());
purgeThread.start();
// Get namenode rpc connection
nameNodeAddr = NameNode.getAddress(conf);
namenode = (NamenodeProtocol) RPC.waitForProxy(NamenodeProtocol.class,
NamenodeProtocol.versionID, nameNodeAddr, conf);
// Snapshot RPC Server
InetSocketAddress socAddr = SnapshotNode.getAddress(conf);
int handlerCount = conf.getInt("fs.snapshot.handler.count", 10);
server = RPC.getServer(this, socAddr.getHostName(), socAddr.getPort(),
handlerCount, false, conf);
// The rpc-server port can be ephemeral... ensure we have the correct info
serverAddress = server.getListenerAddress();
LOG.info("SnapshotNode up at: " + serverAddress);
server.start(); // start rpc server
}
protected void initialize(Configuration conf)
throws IOException {
InetSocketAddress socAddr = UtilizationCollector.getAddress(conf);
int handlerCount = conf.getInt(
"mapred.resourceutilization.handler.count", 10);
// create rpc server
this.server = RPC.getServer(this, socAddr.getHostName(), socAddr.getPort(),
handlerCount, false, conf);
// The rpc-server port can be ephemeral... ensure we have the correct info
this.serverAddress = this.server.getListenerAddress();
LOG.info("Collector up at: " + this.serverAddress);
// start RPC server
this.server.start();
// How long does the TaskTracker reports expire
timeLimit = conf.getLong("mapred.resourceutilization.timelimit",
DEFAULT_TIME_LIMIT);
// How long do we consider a job is finished after it stops
stopTimeLimit = conf.getLong("mapred.resourceutilization.stoptimelimit",
DEFAULT_STOP_TIME_LIMIT);
// How often do we aggregate the reports
aggregatePeriod = conf.getLong(
"mapred.resourceutilization.aggregateperiod",
DEFAULT_AGGREGATE_SLEEP_TIME);
// Start the daemon thread to aggregate the TaskTracker reports
this.aggregateDaemon = new Daemon(new AggregateRun());
this.aggregateDaemon.start();
}