类org.apache.hadoop.util.Daemon源码实例Demo

下面列出了怎么用org.apache.hadoop.util.Daemon的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: hadoop   文件: LeaseRenewer.java
void interruptAndJoin() throws InterruptedException {
  Daemon daemonCopy = null;
  synchronized (this) {
    if (isRunning()) {
      daemon.interrupt();
      daemonCopy = daemon;
    }
  }
 
  if (daemonCopy != null) {
    if(LOG.isDebugEnabled()) {
      LOG.debug("Wait for lease checker to terminate");
    }
    daemonCopy.join();
  }
}
 
源代码2 项目: hadoop   文件: PeerCache.java
private synchronized void startExpiryDaemon() {
  // start daemon only if not already started
  if (isDaemonStarted() == true) {
    return;
  }
  
  daemon = new Daemon(new Runnable() {
    @Override
    public void run() {
      try {
        PeerCache.this.run();
      } catch(InterruptedException e) {
        //noop
      } finally {
        PeerCache.this.clear();
      }
    }

    @Override
    public String toString() {
      return String.valueOf(PeerCache.this);
    }
  });
  daemon.start();
}
 
源代码3 项目: hadoop   文件: DataNode.java
public Daemon recoverBlocks(
    final String who,
    final Collection<RecoveringBlock> blocks) {
  
  Daemon d = new Daemon(threadGroup, new Runnable() {
    /** Recover a list of blocks. It is run by the primary datanode. */
    @Override
    public void run() {
      for(RecoveringBlock b : blocks) {
        try {
          logRecoverBlock(who, b);
          recoverBlock(b);
        } catch (IOException e) {
          LOG.warn("recoverBlocks FAILED: " + b, e);
        }
      }
    }
  });
  d.start();
  return d;
}
 
源代码4 项目: hadoop   文件: TestBlockRecovery.java
/**
 * BlockRecoveryFI_07. max replica length from all DNs is zero.
 *
 * @throws IOException in case of an error
 */
@Test
public void testZeroLenReplicas() throws IOException, InterruptedException {
  if(LOG.isDebugEnabled()) {
    LOG.debug("Running " + GenericTestUtils.getMethodName());
  }
  DataNode spyDN = spy(dn);
  doReturn(new ReplicaRecoveryInfo(block.getBlockId(), 0,
      block.getGenerationStamp(), ReplicaState.FINALIZED)).when(spyDN).
      initReplicaRecovery(any(RecoveringBlock.class));
  Daemon d = spyDN.recoverBlocks("fake NN", initRecoveringBlocks());
  d.join();
  DatanodeProtocol dnP = dn.getActiveNamenodeForBP(POOL_ID);
  verify(dnP).commitBlockSynchronization(
      block, RECOVERY_ID, 0, true, true, DatanodeID.EMPTY_ARRAY, null);
}
 
源代码5 项目: big-c   文件: LeaseRenewer.java
void interruptAndJoin() throws InterruptedException {
  Daemon daemonCopy = null;
  synchronized (this) {
    if (isRunning()) {
      daemon.interrupt();
      daemonCopy = daemon;
    }
  }
 
  if (daemonCopy != null) {
    if(LOG.isDebugEnabled()) {
      LOG.debug("Wait for lease checker to terminate");
    }
    daemonCopy.join();
  }
}
 
源代码6 项目: big-c   文件: PeerCache.java
private synchronized void startExpiryDaemon() {
  // start daemon only if not already started
  if (isDaemonStarted() == true) {
    return;
  }
  
  daemon = new Daemon(new Runnable() {
    @Override
    public void run() {
      try {
        PeerCache.this.run();
      } catch(InterruptedException e) {
        //noop
      } finally {
        PeerCache.this.clear();
      }
    }

    @Override
    public String toString() {
      return String.valueOf(PeerCache.this);
    }
  });
  daemon.start();
}
 
源代码7 项目: big-c   文件: DataNode.java
public Daemon recoverBlocks(
    final String who,
    final Collection<RecoveringBlock> blocks) {
  
  Daemon d = new Daemon(threadGroup, new Runnable() {
    /** Recover a list of blocks. It is run by the primary datanode. */
    @Override
    public void run() {
      for(RecoveringBlock b : blocks) {
        try {
          logRecoverBlock(who, b);
          recoverBlock(b);
        } catch (IOException e) {
          LOG.warn("recoverBlocks FAILED: " + b, e);
        }
      }
    }
  });
  d.start();
  return d;
}
 
源代码8 项目: big-c   文件: TestBlockRecovery.java
/**
 * BlockRecoveryFI_07. max replica length from all DNs is zero.
 *
 * @throws IOException in case of an error
 */
@Test
public void testZeroLenReplicas() throws IOException, InterruptedException {
  if(LOG.isDebugEnabled()) {
    LOG.debug("Running " + GenericTestUtils.getMethodName());
  }
  DataNode spyDN = spy(dn);
  doReturn(new ReplicaRecoveryInfo(block.getBlockId(), 0,
      block.getGenerationStamp(), ReplicaState.FINALIZED)).when(spyDN).
      initReplicaRecovery(any(RecoveringBlock.class));
  Daemon d = spyDN.recoverBlocks("fake NN", initRecoveringBlocks());
  d.join();
  DatanodeProtocol dnP = dn.getActiveNamenodeForBP(POOL_ID);
  verify(dnP).commitBlockSynchronization(
      block, RECOVERY_ID, 0, true, true, DatanodeID.EMPTY_ARRAY, null);
}
 
源代码9 项目: RDFS   文件: StandbySafeMode.java
/**
 * Triggers failover processing for safe mode and blocks until we have left
 * safe mode.
 * 
 * @throws IOException
 */
protected void triggerFailover() throws IOException {
  clearDataStructures();
  for (DatanodeInfo node : namesystem.datanodeReport(DatanodeReportType.LIVE)) {
    liveDatanodes.add(node);
    outStandingHeartbeats.add(node);
  }
  safeModeState = SafeModeState.FAILOVER_IN_PROGRESS;
  safeModeMonitor = new Daemon(new SafeModeMonitor(namesystem, this));
  safeModeMonitor.start();
  try {
    safeModeMonitor.join();
  } catch (InterruptedException ie) {
    throw new IOException("triggerSafeMode() interruped()");
  }
  if (safeModeState != SafeModeState.AFTER_FAILOVER) {
    throw new RuntimeException("safeModeState is : " + safeModeState +
        " which does not indicate a successfull exit of safemode");
  }
}
 
源代码10 项目: RDFS   文件: SecondaryNameNode.java
/**
 * main() has some simple utility methods.
 * @param argv Command line parameters.
 * @exception Exception if the filesystem does not exist.
 */
public static void main(String[] argv) throws Exception {
  StringUtils.startupShutdownMessage(SecondaryNameNode.class, argv, LOG);
  Configuration tconf = new Configuration();
  try {
    argv = DFSUtil.setGenericConf(argv, tconf);
  } catch (IllegalArgumentException e) {
    System.err.println(e.getMessage());
    printUsage("");
    return;
  }
  if (argv.length >= 1) {
    SecondaryNameNode secondary = new SecondaryNameNode(tconf);
    int ret = secondary.processArgs(argv);
    System.exit(ret);
  }

  // Create a never ending deamon
  Daemon checkpointThread = new Daemon(new SecondaryNameNode(tconf)); 
  checkpointThread.start();
}
 
源代码11 项目: RDFS   文件: DataNode.java
private void initDataXceiver(Configuration conf) throws IOException {
  String address = 
    NetUtils.getServerAddress(conf,
                      "dfs.datanode.bindAddress",
                      "dfs.datanode.port",
                      "dfs.datanode.address");
  InetSocketAddress socAddr = NetUtils.createSocketAddr(address);
  // find free port
  ServerSocket ss = (socketWriteTimeout > 0) ? 
        ServerSocketChannel.open().socket() : new ServerSocket();
  Server.bind(ss, socAddr, 
      conf.getInt("dfs.datanode.xceiver.listen.queue.size", 128));
  ss.setReceiveBufferSize(DEFAULT_DATA_SOCKET_SIZE); 
  // adjust machine name with the actual port
  int tmpPort = ss.getLocalPort();
  selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
                                   tmpPort);
  LOG.info("Opened info server at " + tmpPort);
    
  this.threadGroup = new ThreadGroup("dataXceiverServer");
  this.dataXceiverServer = new Daemon(threadGroup, 
      new DataXceiverServer(ss, conf, this));
  this.threadGroup.setDaemon(true); // auto destroy when empty
}
 
源代码12 项目: RDFS   文件: DataNode.java
public Daemon recoverBlocks(final int namespaceId, final Block[] blocks, final DatanodeInfo[][] targets) {
  Daemon d = new Daemon(threadGroup, new Runnable() {
    /** Recover a list of blocks. It is run by the primary datanode. */
    public void run() {
      for(int i = 0; i < blocks.length; i++) {
        try {
          logRecoverBlock("NameNode", namespaceId, blocks[i], targets[i]);
          recoverBlock(namespaceId, blocks[i], false, targets[i], true, 0);
        } catch (IOException e) {
          LOG.warn("recoverBlocks FAILED, blocks[" + i + "]=" + blocks[i], e);
        }
      }
    }
  });
  d.start();
  return d;
}
 
源代码13 项目: hadoop-gpu   文件: DataNode.java
public Daemon recoverBlocks(final Block[] blocks, final DatanodeInfo[][] targets) {
  Daemon d = new Daemon(threadGroup, new Runnable() {
    /** Recover a list of blocks. It is run by the primary datanode. */
    public void run() {
      for(int i = 0; i < blocks.length; i++) {
        try {
          logRecoverBlock("NameNode", blocks[i], targets[i]);
          recoverBlock(blocks[i], false, targets[i], true);
        } catch (IOException e) {
          LOG.warn("recoverBlocks FAILED, blocks[" + i + "]=" + blocks[i], e);
        }
      }
    }
  });
  d.start();
  return d;
}
 
源代码14 项目: hadoop-ozone   文件: TestLockManager.java
@Test
public void testConcurrentWriteLockWithDifferentResource() throws Exception {
  OzoneConfiguration conf = new OzoneConfiguration();
  final int count = 100;
  final LockManager<Integer> manager = new LockManager<>(conf);
  final int sleep = 10;
  final AtomicInteger done = new AtomicInteger();
  for (int i = 0; i < count; i++) {
    final Integer id = i;
    Daemon d1 = new Daemon(() -> {
      try {
        manager.writeLock(id);
        Thread.sleep(sleep);
      } catch (InterruptedException e) {
        e.printStackTrace();
      } finally {
        manager.writeUnlock(id);
      }
      done.getAndIncrement();
    });
    d1.setName("Locker-" + i);
    d1.start();
  }
  GenericTestUtils.waitFor(() -> done.get() == count, 100,
      10 * count * sleep);
  Assert.assertEquals(count, done.get());
}
 
/**
 * Should be called before this object is used.
 */
@Override
public synchronized void start(CertificateClient certClient)
    throws IOException {
  super.start(certClient);
  tokenRemoverThread = new Daemon(new ExpiredTokenRemover());
  tokenRemoverThread.start();
}
 
源代码16 项目: hadoop-ozone   文件: OzoneManagerDoubleBuffer.java
private OzoneManagerDoubleBuffer(OMMetadataManager omMetadataManager,
    OzoneManagerRatisSnapshot ozoneManagerRatisSnapShot,
    boolean isRatisEnabled, boolean isTracingEnabled,
    Function<Long, Long> indexToTerm) {
  this.currentBuffer = new ConcurrentLinkedQueue<>();
  this.readyBuffer = new ConcurrentLinkedQueue<>();

  this.isRatisEnabled = isRatisEnabled;
  this.isTracingEnabled = isTracingEnabled;
  if (!isRatisEnabled) {
    this.currentFutureQueue = new ConcurrentLinkedQueue<>();
    this.readyFutureQueue = new ConcurrentLinkedQueue<>();
  } else {
    this.currentFutureQueue = null;
    this.readyFutureQueue = null;
  }

  this.omMetadataManager = omMetadataManager;
  this.ozoneManagerRatisSnapShot = ozoneManagerRatisSnapShot;
  this.ozoneManagerDoubleBufferMetrics =
      OzoneManagerDoubleBufferMetrics.create();
  this.indexToTerm = indexToTerm;

  isRunning.set(true);
  // Daemon thread which runs in back ground and flushes transactions to DB.
  daemon = new Daemon(this::flushTransactions);
  daemon.setName("OMDoubleBufferFlushThread");
  daemon.start();

}
 
/**
 * Create bucketCount number of createBucket responses for each iteration.
 * All these iterations are run in parallel. Then verify OM DB has correct
 * number of entries or not.
 */
private void testDoubleBuffer(int volumeCount, int bucketsPerVolume)
    throws Exception {
  // Reset transaction id.
  trxId.set(0);
  for (int i = 0; i < volumeCount; i++) {
    Daemon d1 = new Daemon(() -> doTransactions(bucketsPerVolume));
    d1.start();
  }

  // We are doing +1 for volume transaction.
  // Here not checking lastAppliedIndex because transactionIndex is
  // shared across threads, and lastAppliedIndex cannot be always
  // expectedTransactions. So, skipping that check here.
  int expectedBuckets = bucketsPerVolume * volumeCount;
  long expectedTransactions = volumeCount + expectedBuckets;

  GenericTestUtils.waitFor(() ->
      expectedTransactions == doubleBuffer.getFlushedTransactionCount(),
      100, volumeCount * 500);

  GenericTestUtils.waitFor(() ->
      assertRowCount(volumeCount, omMetadataManager.getVolumeTable()),
      300, volumeCount * 300);


  GenericTestUtils.waitFor(() ->
      assertRowCount(expectedBuckets, omMetadataManager.getBucketTable()),
      300, volumeCount * 300);

  Assert.assertTrue(doubleBuffer.getFlushIterations() > 0);
}
 
源代码18 项目: hadoop   文件: DirectoryScanner.java
DirectoryScanner(DataNode datanode, FsDatasetSpi<?> dataset, Configuration conf) {
  this.datanode = datanode;
  this.dataset = dataset;
  int interval = conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
      DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT);
  scanPeriodMsecs = interval * 1000L; //msec
  int threads = 
      conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
                  DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT);

  reportCompileThreadPool = Executors.newFixedThreadPool(threads, 
      new Daemon.DaemonFactory());
  masterThread = new ScheduledThreadPoolExecutor(1,
      new Daemon.DaemonFactory());
}
 
源代码19 项目: hadoop   文件: DataNode.java
private void initDataXceiver(Configuration conf) throws IOException {
  // find free port or use privileged port provided
  TcpPeerServer tcpPeerServer;
  if (secureResources != null) {
    tcpPeerServer = new TcpPeerServer(secureResources);
  } else {
    tcpPeerServer = new TcpPeerServer(dnConf.socketWriteTimeout,
        DataNode.getStreamingAddr(conf));
  }
  tcpPeerServer.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
  streamingAddr = tcpPeerServer.getStreamingAddr();
  LOG.info("Opened streaming server at " + streamingAddr);
  this.threadGroup = new ThreadGroup("dataXceiverServer");
  xserver = new DataXceiverServer(tcpPeerServer, conf, this);
  this.dataXceiverServer = new Daemon(threadGroup, xserver);
  this.threadGroup.setDaemon(true); // auto destroy when empty

  if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
            DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT) ||
      conf.getBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
            DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT)) {
    DomainPeerServer domainPeerServer =
              getDomainPeerServer(conf, streamingAddr.getPort());
    if (domainPeerServer != null) {
      this.localDataXceiverServer = new Daemon(threadGroup,
          new DataXceiverServer(domainPeerServer, conf, this));
      LOG.info("Listening on UNIX domain socket: " +
          domainPeerServer.getBindPath());
    }
  }
  this.shortCircuitRegistry = new ShortCircuitRegistry(conf);
}
 
源代码20 项目: hadoop   文件: TestBlockRecovery.java
/**
 * BlockRecoveryFI_05. One DN throws RecoveryInProgressException.
 *
 * @throws IOException
 *           in case of an error
 */
@Test
public void testRecoveryInProgressException()
  throws IOException, InterruptedException {
  if(LOG.isDebugEnabled()) {
    LOG.debug("Running " + GenericTestUtils.getMethodName());
  }
  DataNode spyDN = spy(dn);
  doThrow(new RecoveryInProgressException("Replica recovery is in progress")).
     when(spyDN).initReplicaRecovery(any(RecoveringBlock.class));
  Daemon d = spyDN.recoverBlocks("fake NN", initRecoveringBlocks());
  d.join();
  verify(spyDN, never()).syncBlock(
      any(RecoveringBlock.class), anyListOf(BlockRecord.class));
}
 
源代码21 项目: hadoop   文件: TestBlockRecovery.java
/**
 * BlockRecoveryFI_06. all datanodes throws an exception.
 *
 * @throws IOException
 *           in case of an error
 */
@Test
public void testErrorReplicas() throws IOException, InterruptedException {
  if(LOG.isDebugEnabled()) {
    LOG.debug("Running " + GenericTestUtils.getMethodName());
  }
  DataNode spyDN = spy(dn);
  doThrow(new IOException()).
     when(spyDN).initReplicaRecovery(any(RecoveringBlock.class));
  Daemon d = spyDN.recoverBlocks("fake NN", initRecoveringBlocks());
  d.join();
  verify(spyDN, never()).syncBlock(
      any(RecoveringBlock.class), anyListOf(BlockRecord.class));
}
 
/** should be called before this object is used */
public void startThreads() throws IOException {
  Preconditions.checkState(!running);
  updateCurrentKey();
  synchronized (this) {
    running = true;
    tokenRemoverThread = new Daemon(new ExpiredTokenRemover());
    tokenRemoverThread.start();
  }
}
 
源代码23 项目: big-c   文件: DirectoryScanner.java
DirectoryScanner(DataNode datanode, FsDatasetSpi<?> dataset, Configuration conf) {
  this.datanode = datanode;
  this.dataset = dataset;
  int interval = conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
      DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT);
  scanPeriodMsecs = interval * 1000L; //msec
  int threads = 
      conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
                  DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT);

  reportCompileThreadPool = Executors.newFixedThreadPool(threads, 
      new Daemon.DaemonFactory());
  masterThread = new ScheduledThreadPoolExecutor(1,
      new Daemon.DaemonFactory());
}
 
源代码24 项目: big-c   文件: DataNode.java
private void initDataXceiver(Configuration conf) throws IOException {
  // find free port or use privileged port provided
  TcpPeerServer tcpPeerServer;
  if (secureResources != null) {
    tcpPeerServer = new TcpPeerServer(secureResources);
  } else {
    tcpPeerServer = new TcpPeerServer(dnConf.socketWriteTimeout,
        DataNode.getStreamingAddr(conf));
  }
  tcpPeerServer.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
  streamingAddr = tcpPeerServer.getStreamingAddr();
  LOG.info("Opened streaming server at " + streamingAddr);
  this.threadGroup = new ThreadGroup("dataXceiverServer");
  xserver = new DataXceiverServer(tcpPeerServer, conf, this);
  this.dataXceiverServer = new Daemon(threadGroup, xserver);
  this.threadGroup.setDaemon(true); // auto destroy when empty

  if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
            DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT) ||
      conf.getBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
            DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT)) {
    DomainPeerServer domainPeerServer =
              getDomainPeerServer(conf, streamingAddr.getPort());
    if (domainPeerServer != null) {
      this.localDataXceiverServer = new Daemon(threadGroup,
          new DataXceiverServer(domainPeerServer, conf, this));
      LOG.info("Listening on UNIX domain socket: " +
          domainPeerServer.getBindPath());
    }
  }
  this.shortCircuitRegistry = new ShortCircuitRegistry(conf);
}
 
源代码25 项目: big-c   文件: TestBlockRecovery.java
/**
 * BlockRecoveryFI_05. One DN throws RecoveryInProgressException.
 *
 * @throws IOException
 *           in case of an error
 */
@Test
public void testRecoveryInProgressException()
  throws IOException, InterruptedException {
  if(LOG.isDebugEnabled()) {
    LOG.debug("Running " + GenericTestUtils.getMethodName());
  }
  DataNode spyDN = spy(dn);
  doThrow(new RecoveryInProgressException("Replica recovery is in progress")).
     when(spyDN).initReplicaRecovery(any(RecoveringBlock.class));
  Daemon d = spyDN.recoverBlocks("fake NN", initRecoveringBlocks());
  d.join();
  verify(spyDN, never()).syncBlock(
      any(RecoveringBlock.class), anyListOf(BlockRecord.class));
}
 
源代码26 项目: big-c   文件: TestBlockRecovery.java
/**
 * BlockRecoveryFI_06. all datanodes throws an exception.
 *
 * @throws IOException
 *           in case of an error
 */
@Test
public void testErrorReplicas() throws IOException, InterruptedException {
  if(LOG.isDebugEnabled()) {
    LOG.debug("Running " + GenericTestUtils.getMethodName());
  }
  DataNode spyDN = spy(dn);
  doThrow(new IOException()).
     when(spyDN).initReplicaRecovery(any(RecoveringBlock.class));
  Daemon d = spyDN.recoverBlocks("fake NN", initRecoveringBlocks());
  d.join();
  verify(spyDN, never()).syncBlock(
      any(RecoveringBlock.class), anyListOf(BlockRecord.class));
}
 
/** should be called before this object is used */
public void startThreads() throws IOException {
  Preconditions.checkState(!running);
  updateCurrentKey();
  synchronized (this) {
    running = true;
    tokenRemoverThread = new Daemon(new ExpiredTokenRemover());
    tokenRemoverThread.start();
  }
}
 
源代码28 项目: RDFS   文件: DistRaidNode.java
public DistRaidNode(Configuration conf) throws IOException {
	super(conf);
	pendingFiles =  new HashMap<PolicyInfo, Map<String, FileStatus>>();
	this.jobMonitor = new JobMonitor(conf);
	this.jobMonitorThread = new Daemon(this.jobMonitor);
	this.jobScheduler = new JobScheduler();
	this.jobSchedulerThread = new Daemon(this.jobScheduler);
	this.jobMonitorThread.start();
	this.jobSchedulerThread.start();
	LOG.info("created");
}
 
源代码29 项目: RDFS   文件: SnapshotNode.java
/**
 * Initialize SnapshotNode
 * @throws IOException
 */
private void init() throws IOException {
  ssDir = conf.get("fs.snapshot.dir", "/.SNAPSHOT");
  tempDir = conf.get("fs.snapshot.tempdir", "/tmp/snapshot");

  fileServer = getImageServer();
  dfs = FileSystem.get(conf);

  Path ssPath = new Path(ssDir);
  if (!dfs.exists(ssPath)) {
    dfs.mkdirs(ssPath);
  }

  maxLeaseUpdateThreads = conf.getInt("fs.snapshot.leaseupdatethreads", 100);

  // Waiting room purge thread
  purgeThread = new Daemon((new WaitingRoom(conf)).getPurger());
  purgeThread.start();

  // Get namenode rpc connection
  nameNodeAddr = NameNode.getAddress(conf);
  namenode = (NamenodeProtocol) RPC.waitForProxy(NamenodeProtocol.class,
                             NamenodeProtocol.versionID, nameNodeAddr, conf);

  // Snapshot RPC Server
  InetSocketAddress socAddr = SnapshotNode.getAddress(conf);
  int handlerCount = conf.getInt("fs.snapshot.handler.count", 10);
  server = RPC.getServer(this, socAddr.getHostName(), socAddr.getPort(),
                         handlerCount, false, conf);
  // The rpc-server port can be ephemeral... ensure we have the correct info
  serverAddress = server.getListenerAddress();
  LOG.info("SnapshotNode up at: " + serverAddress);

  server.start(); // start rpc server
}
 
源代码30 项目: RDFS   文件: UtilizationCollector.java
protected void initialize(Configuration conf)
  throws IOException {
  InetSocketAddress socAddr = UtilizationCollector.getAddress(conf);
  int handlerCount = conf.getInt(
          "mapred.resourceutilization.handler.count", 10);

  // create rpc server
  this.server = RPC.getServer(this, socAddr.getHostName(), socAddr.getPort(),
                              handlerCount, false, conf);

  // The rpc-server port can be ephemeral... ensure we have the correct info
  this.serverAddress = this.server.getListenerAddress();
  LOG.info("Collector up at: " + this.serverAddress);

  // start RPC server
  this.server.start();

  // How long does the TaskTracker reports expire
  timeLimit = conf.getLong("mapred.resourceutilization.timelimit",
                           DEFAULT_TIME_LIMIT);

  // How long do we consider a job is finished after it stops
  stopTimeLimit = conf.getLong("mapred.resourceutilization.stoptimelimit",
                               DEFAULT_STOP_TIME_LIMIT);

  // How often do we aggregate the reports
  aggregatePeriod = conf.getLong(
          "mapred.resourceutilization.aggregateperiod",
          DEFAULT_AGGREGATE_SLEEP_TIME);

  // Start the daemon thread to aggregate the TaskTracker reports
  this.aggregateDaemon = new Daemon(new AggregateRun());
  this.aggregateDaemon.start();
}
 
 类所在包
 类方法
 同包方法