下面列出了怎么用org.apache.hadoop.util.Timer的API类实例代码及写法,或者点击链接到github查看源代码。
public ThrottledAsyncChecker(final Timer timer,
final long minMsBetweenChecks,
final long diskCheckTimeout,
final ExecutorService executorService) {
this.timer = timer;
this.minMsBetweenChecks = minMsBetweenChecks;
this.diskCheckTimeout = diskCheckTimeout;
this.executorService = MoreExecutors.listeningDecorator(executorService);
this.checksInProgress = new HashMap<>();
this.completedChecks = new WeakHashMap<>();
if (this.diskCheckTimeout > 0) {
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new
ScheduledThreadPoolExecutor(1);
this.scheduledExecutorService = MoreExecutors
.getExitingScheduledExecutorService(scheduledThreadPoolExecutor);
} else {
this.scheduledExecutorService = null;
}
}
/**
* Verify that bad volumes are filtered at startup.
* @throws IOException
*/
@Test
public void testBadDirectoryDetection() throws IOException {
final int numVolumes = 5;
final int numBadVolumes = 2;
conf = getConfWithDataNodeDirs(numVolumes);
final MutableVolumeSet volumeSet = new MutableVolumeSet(
UUID.randomUUID().toString(), conf) {
@Override
HddsVolumeChecker getVolumeChecker(ConfigurationSource configuration)
throws DiskErrorException {
return new DummyChecker(configuration, new Timer(), numBadVolumes);
}
};
assertThat(volumeSet.getFailedVolumesList().size(), is(numBadVolumes));
assertThat(volumeSet.getVolumesList().size(),
is(numVolumes - numBadVolumes));
volumeSet.shutdown();
}
/**
* Verify that all volumes are added to fail list if all volumes are bad.
*/
@Test
public void testAllVolumesAreBad() throws IOException {
final int numVolumes = 5;
conf = getConfWithDataNodeDirs(numVolumes);
final MutableVolumeSet volumeSet = new MutableVolumeSet(
UUID.randomUUID().toString(), conf) {
@Override
HddsVolumeChecker getVolumeChecker(ConfigurationSource configuration)
throws DiskErrorException {
return new DummyChecker(configuration, new Timer(), numVolumes);
}
};
assertEquals(volumeSet.getFailedVolumesList().size(), numVolumes);
assertEquals(volumeSet.getVolumesList().size(), 0);
volumeSet.shutdown();
}
public Groups(Configuration conf, final Timer timer) {
impl =
ReflectionUtils.newInstance(
conf.getClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
ShellBasedUnixGroupsMapping.class,
GroupMappingServiceProvider.class),
conf);
cacheTimeout =
conf.getLong(CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_SECS,
CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_SECS_DEFAULT) * 1000;
negativeCacheTimeout =
conf.getLong(CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_NEGATIVE_CACHE_SECS,
CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_NEGATIVE_CACHE_SECS_DEFAULT) * 1000;
warningDeltaMs =
conf.getLong(CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_WARN_AFTER_MS,
CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_WARN_AFTER_MS_DEFAULT);
staticUserToGroupsMap = parseStaticMapping(conf);
this.timer = timer;
this.cache = CacheBuilder.newBuilder()
.refreshAfterWrite(cacheTimeout, TimeUnit.MILLISECONDS)
.ticker(new TimerToTickerAdapter(timer))
.expireAfterWrite(10 * cacheTimeout, TimeUnit.MILLISECONDS)
.build(new GroupCacheLoader());
if(negativeCacheTimeout > 0) {
Cache<String, Boolean> tempMap = CacheBuilder.newBuilder()
.expireAfterWrite(negativeCacheTimeout, TimeUnit.MILLISECONDS)
.ticker(new TimerToTickerAdapter(timer))
.build();
negativeCache = Collections.newSetFromMap(tempMap.asMap());
}
if(LOG.isDebugEnabled())
LOG.debug("Group mapping impl=" + impl.getClass().getName() +
"; cacheTimeout=" + cacheTimeout + "; warningDeltaMs=" +
warningDeltaMs);
}
public Groups(Configuration conf, final Timer timer) {
impl =
ReflectionUtils.newInstance(
conf.getClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
ShellBasedUnixGroupsMapping.class,
GroupMappingServiceProvider.class),
conf);
cacheTimeout =
conf.getLong(CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_SECS,
CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_SECS_DEFAULT) * 1000;
negativeCacheTimeout =
conf.getLong(CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_NEGATIVE_CACHE_SECS,
CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_NEGATIVE_CACHE_SECS_DEFAULT) * 1000;
warningDeltaMs =
conf.getLong(CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_WARN_AFTER_MS,
CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_WARN_AFTER_MS_DEFAULT);
parseStaticMapping(conf);
this.timer = timer;
this.cache = CacheBuilder.newBuilder()
.refreshAfterWrite(cacheTimeout, TimeUnit.MILLISECONDS)
.ticker(new TimerToTickerAdapter(timer))
.expireAfterWrite(10 * cacheTimeout, TimeUnit.MILLISECONDS)
.build(new GroupCacheLoader());
if(negativeCacheTimeout > 0) {
Cache<String, Boolean> tempMap = CacheBuilder.newBuilder()
.expireAfterWrite(negativeCacheTimeout, TimeUnit.MILLISECONDS)
.ticker(new TimerToTickerAdapter(timer))
.build();
negativeCache = Collections.newSetFromMap(tempMap.asMap());
}
if(LOG.isDebugEnabled())
LOG.debug("Group mapping impl=" + impl.getClass().getName() +
"; cacheTimeout=" + cacheTimeout + "; warningDeltaMs=" +
warningDeltaMs);
}
@VisibleForTesting
HddsVolumeChecker getVolumeChecker(ConfigurationSource configuration)
throws DiskChecker.DiskErrorException {
return new HddsVolumeChecker(configuration, new Timer());
}
/**
* @param conf Configuration object.
* @param timer {@link Timer} object used for throttling checks.
*/
public HddsVolumeChecker(ConfigurationSource conf, Timer timer)
throws DiskErrorException {
maxAllowedTimeForCheckMs = conf.getTimeDuration(
DFSConfigKeysLegacy.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY,
DFSConfigKeysLegacy.DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
if (maxAllowedTimeForCheckMs <= 0) {
throw new DiskErrorException("Invalid value configured for "
+ DFSConfigKeysLegacy.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY + " - "
+ maxAllowedTimeForCheckMs + " (should be > 0)");
}
this.timer = timer;
/**
* Maximum number of volume failures that can be tolerated without
* declaring a fatal error.
*/
int maxVolumeFailuresTolerated = conf.getInt(
DFSConfigKeysLegacy.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY,
DFSConfigKeysLegacy.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT);
minDiskCheckGapMs = conf.getTimeDuration(
DFSConfigKeysLegacy.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
DFSConfigKeysLegacy.DFS_DATANODE_DISK_CHECK_MIN_GAP_DEFAULT,
TimeUnit.MILLISECONDS);
if (minDiskCheckGapMs < 0) {
throw new DiskErrorException("Invalid value configured for "
+ DFSConfigKeysLegacy.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY + " - "
+ minDiskCheckGapMs + " (should be >= 0)");
}
long diskCheckTimeout = conf.getTimeDuration(
DFSConfigKeysLegacy.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY,
DFSConfigKeysLegacy.DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
if (diskCheckTimeout < 0) {
throw new DiskErrorException("Invalid value configured for "
+ DFSConfigKeysLegacy.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY + " - "
+ diskCheckTimeout + " (should be >= 0)");
}
lastAllVolumesCheck = timer.monotonicNow() - minDiskCheckGapMs;
if (maxVolumeFailuresTolerated < MAX_VOLUME_FAILURE_TOLERATED_LIMIT) {
throw new DiskErrorException("Invalid value configured for "
+ DFSConfigKeysLegacy.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY
+ " - "
+ maxVolumeFailuresTolerated + " "
+ DataNode.MAX_VOLUME_FAILURES_TOLERATED_MSG);
}
delegateChecker = new ThrottledAsyncChecker<>(
timer, minDiskCheckGapMs, diskCheckTimeout,
Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("DataNode DiskChecker thread %d")
.setDaemon(true)
.build()));
checkVolumeResultHandlerExecutorService = Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("VolumeCheck ResultHandler thread %d")
.setDaemon(true)
.build());
}
DummyChecker(ConfigurationSource conf, Timer timer, int numBadVolumes)
throws DiskErrorException {
super(conf, timer);
this.numBadVolumes = numBadVolumes;
}
public Groups(Configuration conf) {
this(conf, new Timer());
}
public TimerToTickerAdapter(Timer timer) {
this.timer = timer;
}
public Groups(Configuration conf) {
this(conf, new Timer());
}
public TimerToTickerAdapter(Timer timer) {
this.timer = timer;
}
/**
* Create a blook pool slice
* @param bpid Block pool Id
* @param volume {@link FsVolumeImpl} to which this BlockPool belongs to
* @param bpDir directory corresponding to the BlockPool
* @param conf configuration
* @param timer include methods for getting time
* @throws IOException Error making directories
*/
BlockPoolSlice(String bpid, FsVolumeImpl volume, File bpDir,
Configuration conf, Timer timer) throws IOException {
this.bpid = bpid;
this.volume = volume;
this.fileIoProvider = volume.getFileIoProvider();
this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
this.finalizedDir = new File(
currentDir, DataStorage.STORAGE_DIR_FINALIZED);
this.lazypersistDir = new File(currentDir, DataStorage.STORAGE_DIR_LAZY_PERSIST);
if (!this.finalizedDir.exists()) {
if (!this.finalizedDir.mkdirs()) {
throw new IOException("Failed to mkdirs " + this.finalizedDir);
}
}
this.ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf);
this.deleteDuplicateReplicas = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION,
DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION_DEFAULT);
this.cachedDfsUsedCheckTime =
conf.getLong(
DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS,
DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_DEFAULT_MS);
this.maxDataLength = conf.getInt(
CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH,
CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
this.timer = timer;
// Files that were being written when the datanode was last shutdown
// are now moved back to the data directory. It is possible that
// in the future, we might want to do some sort of datanode-local
// recovery for these blocks. For example, crc validation.
//
this.tmpDir = new File(bpDir, DataStorage.STORAGE_DIR_TMP);
if (tmpDir.exists()) {
fileIoProvider.fullyDelete(volume, tmpDir);
}
this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW);
// create the rbw and tmp directories if they don't exist.
fileIoProvider.mkdirs(volume, rbwDir);
fileIoProvider.mkdirs(volume, tmpDir);
if (addReplicaThreadPool == null) {
// initialize add replica fork join pool
initializeAddReplicaPool(conf);
}
// Make the dfs usage to be saved during shutdown.
shutdownHook = new Runnable() {
@Override
public void run() {
addReplicaThreadPool.shutdownNow();
}
};
ShutdownHookManager.get().addShutdownHook(shutdownHook,
SHUTDOWN_HOOK_PRIORITY);
}