下面列出了org.apache.hadoop.fs.ParentNotDirectoryException#org.apache.hadoop.hdfs.DFSConfigKeys 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@BeforeClass
public static void setUp() throws Exception {
conf.set(DFSConfigKeys.DFS_HTTP_POLICY_KEY,
HttpConfig.Policy.HTTP_AND_HTTPS.name());
conf.set(NfsConfigKeys.NFS_HTTP_ADDRESS_KEY, "localhost:0");
conf.set(NfsConfigKeys.NFS_HTTPS_ADDRESS_KEY, "localhost:0");
// Use emphral port in case tests are running in parallel
conf.setInt(NfsConfigKeys.DFS_NFS_SERVER_PORT_KEY, 0);
conf.setInt(NfsConfigKeys.DFS_NFS_MOUNTD_PORT_KEY, 0);
File base = new File(BASEDIR);
FileUtil.fullyDelete(base);
base.mkdirs();
keystoresDir = new File(BASEDIR).getAbsolutePath();
sslConfDir = KeyStoreTestUtil.getClasspathDir(TestNfs3HttpServer.class);
KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfDir, conf, false);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
}
@Before
public void setUp() throws Exception {
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 5);
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, BKJMUtil
.createJournalURI("/bootstrapStandby").toString());
BKJMUtil.addJournalManagerDefinition(conf);
conf.setBoolean(DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, true);
conf.set(DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_KEY,
SlowCodec.class.getCanonicalName());
CompressionCodecFactory.setCodecClasses(conf,
ImmutableList.<Class> of(SlowCodec.class));
MiniDFSNNTopology topology = new MiniDFSNNTopology()
.addNameservice(new MiniDFSNNTopology.NSConf("ns1").addNN(
new MiniDFSNNTopology.NNConf("nn1").setHttpPort(10001)).addNN(
new MiniDFSNNTopology.NNConf("nn2").setHttpPort(10002)));
cluster = new MiniDFSCluster.Builder(conf).nnTopology(topology)
.numDataNodes(1).manageNameDfsSharedDirs(false).build();
cluster.waitActive();
}
/**
* Test that a implementation of JournalManager without a
* (Configuration,URI) constructor throws an exception
*/
@Test
public void testBadConstructor() throws Exception {
MiniDFSCluster cluster = null;
Configuration conf = new Configuration();
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_PLUGIN_PREFIX + ".dummy",
BadConstructorJournalManager.class.getName());
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
"dummy://test");
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
cluster.waitActive();
fail("Should have failed before this point");
} catch (IllegalArgumentException iae) {
if (!iae.getMessage().contains("Unable to construct journal")) {
fail("Should have failed with unable to construct exception");
}
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
private void runBalancer(Configuration conf,
long totalUsedSpace, long totalCapacity, Balancer.Parameters p,
int excludedNodes) throws Exception {
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
// start rebalancing
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
final int r = runBalancer(namenodes, p, conf);
if (conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT) ==0) {
assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
return;
} else {
assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
}
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
LOG.info(" .");
waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, excludedNodes);
}
/**
* Reconfigure a DataNode by setting a new list of volumes.
*
* @param dn DataNode to reconfigure
* @param newVols new volumes to configure
* @throws Exception if there is any failure
*/
private static void reconfigureDataNode(DataNode dn, File... newVols)
throws Exception {
StringBuilder dnNewDataDirs = new StringBuilder();
for (File newVol: newVols) {
if (dnNewDataDirs.length() > 0) {
dnNewDataDirs.append(',');
}
dnNewDataDirs.append(newVol.getAbsolutePath());
}
try {
dn.reconfigurePropertyImpl(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
dnNewDataDirs.toString());
} catch (ReconfigurationException e) {
// This can be thrown if reconfiguration tries to use a failed volume.
// We need to swallow the exception, because some of our tests want to
// cover this case.
LOG.warn("Could not reconfigure DataNode.", e);
}
}
private DFSClient genClientWithDummyHandler() throws IOException {
URI nnUri = dfs.getUri();
FailoverProxyProvider<ClientProtocol> failoverProxyProvider =
NameNodeProxies.createFailoverProxyProvider(conf,
nnUri, ClientProtocol.class, true, null);
InvocationHandler dummyHandler = new DummyRetryInvocationHandler(
failoverProxyProvider, RetryPolicies
.failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,
Integer.MAX_VALUE,
DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT,
DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT));
ClientProtocol proxy = (ClientProtocol) Proxy.newProxyInstance(
failoverProxyProvider.getInterface().getClassLoader(),
new Class[] { ClientProtocol.class }, dummyHandler);
DFSClient client = new DFSClient(null, proxy, conf, null);
return client;
}
/**
* secnn-6
* checkpoint for edits and image is the same directory
* @throws IOException
*/
@Test
public void testChkpointStartup2() throws IOException{
LOG.info("--starting checkpointStartup2 - same directory for checkpoint");
// different name dirs
config.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
fileAsURI(new File(hdfsDir, "name")).toString());
config.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
fileAsURI(new File(hdfsDir, "edits")).toString());
// same checkpoint dirs
config.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY,
fileAsURI(new File(hdfsDir, "chkpt")).toString());
config.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY,
fileAsURI(new File(hdfsDir, "chkpt")).toString());
createCheckPoint(1);
corruptNameNodeFiles();
checkNameNodeFiles();
}
private Configuration getConf() {
Configuration conf = new HdfsConfiguration();
// Lower the heart beat interval so the NN quickly learns of dead
// or decommissioned DNs and the NN issues replication and invalidation
// commands quickly (as replies to heartbeats)
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
// Have the NN ReplicationMonitor compute the replication and
// invalidation commands to send DNs every second.
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
// Have the NN check for pending replications every second so it
// quickly schedules additional replicas as they are identified.
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 1);
// The DNs report blocks every second.
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
// Indicates we have multiple racks
conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, "xyz");
return conf;
}
/**
* Test that an exception is thrown if a journal class doesn't exist
* in the configuration
*/
@Test(expected=IllegalArgumentException.class)
public void testNotConfigured() throws Exception {
MiniDFSCluster cluster = null;
Configuration conf = new Configuration();
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
"dummy://test");
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
cluster.waitActive();
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
@Before
public void setupCluster() throws IOException {
conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
HAUtil.setAllowStandbyReads(conf, true);
MiniDFSNNTopology topology = MiniDFSNNTopology.simpleHATopology();
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(topology)
.numDataNodes(0)
.build();
cluster.waitActive();
shutdownClusterAndRemoveSharedEditsDir();
}
private static void setupCluster(final int nNameNodes, final int nDataNodes)
throws Exception {
LOG.info("nNameNodes=" + nNameNodes + ", nDataNodes=" + nDataNodes);
conf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(nNameNodes))
.numDataNodes(nDataNodes)
.build();
cluster.waitActive();
webhdfs = new WebHdfsFileSystem[nNameNodes];
for(int i = 0; i < webhdfs.length; i++) {
final InetSocketAddress addr = cluster.getNameNode(i).getHttpAddress();
final String uri = WebHdfsFileSystem.SCHEME + "://"
+ addr.getHostName() + ":" + addr.getPort() + "/";
webhdfs[i] = (WebHdfsFileSystem)FileSystem.get(new URI(uri), conf);
}
}
public static WebHdfsFileSystem getWebHdfsFileSystem(
final Configuration conf, String scheme) throws IOException,
URISyntaxException {
final String uri;
if (WebHdfsFileSystem.SCHEME.equals(scheme)) {
uri = WebHdfsFileSystem.SCHEME + "://"
+ conf.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
} else if (SWebHdfsFileSystem.SCHEME.equals(scheme)) {
uri = SWebHdfsFileSystem.SCHEME + "://"
+ conf.get(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY);
} else {
throw new IllegalArgumentException("unknown scheme:" + scheme);
}
return (WebHdfsFileSystem)FileSystem.get(new URI(uri), conf);
}
/**
* Sets the required configurations for performing failover
*/
public static void setFailoverConfigurations(Configuration conf,
String logicalName, InetSocketAddress nnAddr1,
InetSocketAddress nnAddr2) {
String nameNodeId1 = "nn1";
String nameNodeId2 = "nn2";
String address1 = "hdfs://" + nnAddr1.getHostName() + ":" + nnAddr1.getPort();
String address2 = "hdfs://" + nnAddr2.getHostName() + ":" + nnAddr2.getPort();
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
logicalName, nameNodeId1), address1);
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
logicalName, nameNodeId2), address2);
conf.set(DFSConfigKeys.DFS_NAMESERVICES, logicalName);
conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, logicalName),
nameNodeId1 + "," + nameNodeId2);
conf.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + logicalName,
ConfiguredFailoverProxyProvider.class.getName());
conf.set("fs.defaultFS", "hdfs://" + logicalName);
}
/**
* Test for the case when the SBN is configured to checkpoint based
* on a time period, but no transactions are happening on the
* active. Thus, it would want to save a second checkpoint at the
* same txid, which is a no-op. This test makes sure this doesn't
* cause any problem.
*/
@Test(timeout = 300000)
public void testCheckpointWhenNoNewTransactionsHappened()
throws Exception {
// Checkpoint as fast as we can, in a tight loop.
cluster.getConfiguration(1).setInt(
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 0);
cluster.restartNameNode(1);
nn1 = cluster.getNameNode(1);
FSImage spyImage1 = NameNodeAdapter.spyOnFsImage(nn1);
// We shouldn't save any checkpoints at txid=0
Thread.sleep(1000);
Mockito.verify(spyImage1, Mockito.never())
.saveNamespace((FSNamesystem) Mockito.anyObject());
// Roll the primary and wait for the standby to catch up
HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
Thread.sleep(2000);
// We should make exactly one checkpoint at this new txid.
Mockito.verify(spyImage1, Mockito.times(1)).saveNamespace(
(FSNamesystem) Mockito.anyObject(), Mockito.eq(NameNodeFile.IMAGE),
(Canceler) Mockito.anyObject());
}
@Test
public void testFsckNonExistent() throws Exception {
DFSTestUtil util = new DFSTestUtil.Builder().setName("TestFsck").
setNumFiles(20).build();
MiniDFSCluster cluster = null;
FileSystem fs = null;
try {
Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
fs = cluster.getFileSystem();
util.createFiles(fs, "/srcdat");
util.waitReplication(fs, "/srcdat", (short)3);
String outStr = runFsck(conf, 0, true, "/non-existent");
assertEquals(-1, outStr.indexOf(NamenodeFsck.HEALTHY_STATUS));
System.out.println(outStr);
util.cleanup(fs, "/srcdat");
} finally {
if (fs != null) {try{fs.close();} catch(Exception e){}}
if (cluster != null) { cluster.shutdown(); }
}
}
/**
* Construct the reader
* @param in The stream to read from.
* @param logVersion The version of the data coming from the stream.
*/
public Reader(DataInputStream in, StreamLimiter limiter, int logVersion) {
this.logVersion = logVersion;
if (NameNodeLayoutVersion.supports(
LayoutVersion.Feature.EDITS_CHESKUM, logVersion)) {
this.checksum = DataChecksum.newCrc32();
} else {
this.checksum = null;
}
// It is possible that the logVersion is actually a future layoutversion
// during the rolling upgrade (e.g., the NN gets upgraded first). We
// assume future layout will also support length of editlog op.
this.supportEditLogLength = NameNodeLayoutVersion.supports(
NameNodeLayoutVersion.Feature.EDITLOG_LENGTH, logVersion)
|| logVersion < NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION;
if (this.checksum != null) {
this.in = new DataInputStream(
new CheckedInputStream(in, this.checksum));
} else {
this.in = in;
}
this.limiter = limiter;
this.cache = new OpInstanceCache();
this.maxOpSize = DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_DEFAULT;
}
/**
* Auxiliary - create token
* @param renewer
* @return
* @throws IOException
*/
static MyToken createTokens(Text renewer)
throws IOException {
Text user1= new Text("user1");
MyDelegationTokenSecretManager sm = new MyDelegationTokenSecretManager(
DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT,
DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT,
DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT,
3600000, null);
sm.startThreads();
DelegationTokenIdentifier dtId1 =
new DelegationTokenIdentifier(user1, renewer, user1);
MyToken token1 = new MyToken(dtId1, sm);
token1.setService(new Text("localhost:0"));
return token1;
}
/**
* Check that the permissions of the local DN directories are as expected.
*/
@Test
public void testLocalDirs() throws Exception {
Configuration conf = new Configuration();
final String permStr = conf.get(
DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY);
FsPermission expected = new FsPermission(permStr);
// Check permissions on directories in 'dfs.datanode.data.dir'
FileSystem localFS = FileSystem.getLocal(conf);
for (DataNode dn : cluster.getDataNodes()) {
for (FsVolumeSpi v : dn.getFSDataset().getVolumes()) {
String dir = v.getBasePath();
Path dataDir = new Path(dir);
FsPermission actual = localFS.getFileStatus(dataDir).getPermission();
assertEquals("Permission for dir: " + dataDir + ", is " + actual +
", while expected is " + expected, expected, actual);
}
}
}
/**
* The corrupt block has to be removed when the number of valid replicas
* matches replication factor for the file. In this test, the above
* condition is achieved by increasing the number of good replicas by
* replicating on a new Datanode.
* The test strategy :
* Bring up Cluster with 3 DataNodes
* Create a file of replication factor 3
* Corrupt one replica of a block of the file
* Verify that there are still 2 good replicas and 1 corrupt replica
* (corrupt replica should not be removed since number of good replicas
* (2) is less than replication factor (3))
* Start a new data node
* Verify that the a new replica is created and corrupt replica is
* removed.
*
*/
@Test
public void testByAddingAnExtraDataNode() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
conf.set(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, Integer.toString(2));
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
FileSystem fs = cluster.getFileSystem();
final FSNamesystem namesystem = cluster.getNamesystem();
DataNodeProperties dnPropsFourth = cluster.stopDataNode(3);
try {
final Path fileName = new Path("/foo1");
DFSTestUtil.createFile(fs, fileName, 2, (short) 3, 0L);
DFSTestUtil.waitReplication(fs, fileName, (short) 3);
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, fileName);
corruptBlock(cluster, fs, fileName, 0, block);
DFSTestUtil.waitReplication(fs, fileName, (short) 2);
assertEquals(2, countReplicas(namesystem, block).liveReplicas());
assertEquals(1, countReplicas(namesystem, block).corruptReplicas());
cluster.restartDataNode(dnPropsFourth);
DFSTestUtil.waitReplication(fs, fileName, (short) 3);
assertEquals(3, countReplicas(namesystem, block).liveReplicas());
assertEquals(0, countReplicas(namesystem, block).corruptReplicas());
} finally {
cluster.shutdown();
}
}
/**
* Test simple HA failover usecase with BK
*/
@Test
public void testFailoverWithBK() throws Exception {
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
BKJMUtil.createJournalURI("/hotfailover").toString());
BKJMUtil.addJournalManagerDefinition(conf);
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(0)
.manageNameDfsSharedDirs(false)
.build();
NameNode nn1 = cluster.getNameNode(0);
NameNode nn2 = cluster.getNameNode(1);
cluster.waitActive();
cluster.transitionToActive(0);
Path p = new Path("/testBKJMfailover");
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
fs.mkdirs(p);
cluster.shutdownNameNode(0);
cluster.transitionToActive(1);
assertTrue(fs.exists(p));
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
private static long getStaleIntervalFromConf(Configuration conf,
long heartbeatExpireInterval) {
long staleInterval = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);
Preconditions.checkArgument(staleInterval > 0,
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY +
" = '" + staleInterval + "' is invalid. " +
"It should be a positive non-zero value.");
final long heartbeatIntervalSeconds = conf.getLong(
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
// The stale interval value cannot be smaller than
// 3 times of heartbeat interval
final long minStaleInterval = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_DEFAULT)
* heartbeatIntervalSeconds * 1000;
if (staleInterval < minStaleInterval) {
LOG.warn("The given interval for marking stale datanode = "
+ staleInterval + ", which is less than "
+ DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_DEFAULT
+ " heartbeat intervals. This may cause too frequent changes of "
+ "stale states of DataNodes since a heartbeat msg may be missing "
+ "due to temporary short-term failures. Reset stale interval to "
+ minStaleInterval + ".");
staleInterval = minStaleInterval;
}
if (staleInterval > heartbeatExpireInterval) {
LOG.warn("The given interval for marking stale datanode = "
+ staleInterval + ", which is larger than heartbeat expire interval "
+ heartbeatExpireInterval + ".");
}
return staleInterval;
}
@Test
public void testReceivePacketMetrics() throws Exception {
Configuration conf = new HdfsConfiguration();
final int interval = 1;
conf.set(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY, "" + interval);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try {
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
Path testFile = new Path("/testFlushNanosMetric.txt");
FSDataOutputStream fout = fs.create(testFile);
fout.write(new byte[1]);
fout.hsync();
fout.close();
List<DataNode> datanodes = cluster.getDataNodes();
DataNode datanode = datanodes.get(0);
MetricsRecordBuilder dnMetrics = getMetrics(datanode.getMetrics().name());
// Expect two flushes, 1 for the flush that occurs after writing,
// 1 that occurs on closing the data and metadata files.
assertCounter("FlushNanosNumOps", 2L, dnMetrics);
// Expect two syncs, one from the hsync, one on close.
assertCounter("FsyncNanosNumOps", 2L, dnMetrics);
// Wait for at least 1 rollover
Thread.sleep((interval + 1) * 1000);
// Check the receivePacket percentiles that should be non-zero
String sec = interval + "s";
assertQuantileGauges("FlushNanos" + sec, dnMetrics);
assertQuantileGauges("FsyncNanos" + sec, dnMetrics);
} finally {
if (cluster != null) {cluster.shutdown();}
}
}
/**
* Refresh the user-to-groups mappings on the {@link NameNode}.
* @return exitcode 0 on success, non-zero on failure
* @throws IOException
*/
public int refreshUserToGroupsMappings() throws IOException {
// Get the current configuration
Configuration conf = getConf();
// for security authorization
// server principal for this call
// should be NN's one.
conf.set(CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY,
conf.get(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, ""));
DistributedFileSystem dfs = getDFS();
URI dfsUri = dfs.getUri();
boolean isHaEnabled = HAUtil.isLogicalUri(conf, dfsUri);
if (isHaEnabled) {
// Run refreshUserToGroupsMapings for all NNs if HA is enabled
String nsId = dfsUri.getHost();
List<ProxyAndInfo<RefreshUserMappingsProtocol>> proxies =
HAUtil.getProxiesForAllNameNodesInNameservice(conf, nsId,
RefreshUserMappingsProtocol.class);
for (ProxyAndInfo<RefreshUserMappingsProtocol> proxy : proxies) {
proxy.getProxy().refreshUserToGroupsMappings();
System.out.println("Refresh user to groups mapping successful for "
+ proxy.getAddress());
}
} else {
// Create the client
RefreshUserMappingsProtocol refreshProtocol =
NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf),
RefreshUserMappingsProtocol.class).getProxy();
// Refresh the user-to-groups mappings
refreshProtocol.refreshUserToGroupsMappings();
System.out.println("Refresh user to groups mapping successful");
}
return 0;
}
@Test
public void testSecureHAToken() throws IOException, InterruptedException {
Configuration conf = DFSTestUtil.newHAConfiguration(LOGICAL_NAME);
conf.setBoolean(DFSConfigKeys
.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
MiniDFSCluster cluster = null;
WebHdfsFileSystem fs = null;
try {
cluster = new MiniDFSCluster.Builder(conf).nnTopology(topo)
.numDataNodes(0).build();
HATestUtil.setFailoverConfigurations(cluster, conf, LOGICAL_NAME);
cluster.waitActive();
fs = spy((WebHdfsFileSystem) FileSystem.get(WEBHDFS_URI, conf));
FileSystemTestHelper.addFileSystemForTesting(WEBHDFS_URI, conf, fs);
cluster.transitionToActive(0);
Token<?> token = fs.getDelegationToken(null);
cluster.shutdownNameNode(0);
cluster.transitionToActive(1);
token.renew(conf);
token.cancel(conf);
verify(fs).renewDelegationToken(token);
verify(fs).cancelDelegationToken(token);
} finally {
IOUtils.cleanup(null, fs);
if (cluster != null) {
cluster.shutdown();
}
}
}
@Before
public void setUpCluster() throws Exception {
conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_NUM_CHECKPOINTS_RETAINED_KEY, 10);
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
HAUtil.setAllowStandbyReads(conf, true);
if (clusterType == TestType.SHARED_DIR_HA) {
MiniDFSNNTopology topology = MiniQJMHACluster.createDefaultTopology(10000);
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(topology)
.numDataNodes(0)
.checkExitOnShutdown(false)
.build();
} else {
Builder builder = new MiniQJMHACluster.Builder(conf);
builder.getDfsBuilder().numDataNodes(0).checkExitOnShutdown(false);
miniQjmHaCluster = builder.build();
cluster = miniQjmHaCluster.getDfsCluster();
}
cluster.waitActive();
nn0 = cluster.getNameNode(0);
nn1 = cluster.getNameNode(1);
cluster.transitionToActive(0);
fs = HATestUtil.configureFailoverFs(cluster, conf);
}
@Test
public void testFailoverWithFencerConfigured() throws Exception {
Mockito.doReturn(STANDBY_READY_RESULT).when(mockProtocol).getServiceStatus();
HdfsConfiguration conf = getHAConf();
conf.set(DFSConfigKeys.DFS_HA_FENCE_METHODS_KEY, getFencerTrueCommand());
tool.setConf(conf);
assertEquals(0, runTool("-failover", "nn1", "nn2"));
}
/**
* Test the DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY configuration
* option, ie the DN shuts itself down when the number of failures
* experienced drops below the tolerated amount.
*/
@Test
public void testConfigureMinValidVolumes() throws Exception {
assumeTrue(!System.getProperty("os.name").startsWith("Windows"));
// Bring up two additional datanodes that need both of their volumes
// functioning in order to stay up.
conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 0);
cluster.startDataNodes(conf, 2, true, null, null);
cluster.waitActive();
final DatanodeManager dm = cluster.getNamesystem().getBlockManager(
).getDatanodeManager();
long origCapacity = DFSTestUtil.getLiveDatanodeCapacity(dm);
long dnCapacity = DFSTestUtil.getDatanodeCapacity(dm, 0);
// Fail a volume on the 2nd DN
File dn2Vol1 = new File(dataDir, "data"+(2*1+1));
DataNodeTestUtils.injectDataDirFailure(dn2Vol1);
// Should only get two replicas (the first DN and the 3rd)
Path file1 = new Path("/test1");
DFSTestUtil.createFile(fs, file1, 1024, (short)3, 1L);
DFSTestUtil.waitReplication(fs, file1, (short)2);
// Check that this single failure caused a DN to die.
DFSTestUtil.waitForDatanodeStatus(dm, 2, 1, 0,
origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
// If we restore the volume we should still only be able to get
// two replicas since the DN is still considered dead.
DataNodeTestUtils.restoreDataDirFromFailure(dn2Vol1);
Path file2 = new Path("/test2");
DFSTestUtil.createFile(fs, file2, 1024, (short)3, 1L);
DFSTestUtil.waitReplication(fs, file2, (short)2);
}
QuorumJournalManager(Configuration conf,
URI uri, NamespaceInfo nsInfo,
AsyncLogger.Factory loggerFactory) throws IOException {
Preconditions.checkArgument(conf != null, "must be configured");
this.conf = conf;
this.uri = uri;
this.nsInfo = nsInfo;
this.loggers = new AsyncLoggerSet(createLoggers(loggerFactory));
this.connectionFactory = URLConnectionFactory
.newDefaultURLConnectionFactory(conf);
// Configure timeouts.
this.startSegmentTimeoutMs = conf.getInt(
DFSConfigKeys.DFS_QJOURNAL_START_SEGMENT_TIMEOUT_KEY,
DFSConfigKeys.DFS_QJOURNAL_START_SEGMENT_TIMEOUT_DEFAULT);
this.prepareRecoveryTimeoutMs = conf.getInt(
DFSConfigKeys.DFS_QJOURNAL_PREPARE_RECOVERY_TIMEOUT_KEY,
DFSConfigKeys.DFS_QJOURNAL_PREPARE_RECOVERY_TIMEOUT_DEFAULT);
this.acceptRecoveryTimeoutMs = conf.getInt(
DFSConfigKeys.DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_KEY,
DFSConfigKeys.DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_DEFAULT);
this.finalizeSegmentTimeoutMs = conf.getInt(
DFSConfigKeys.DFS_QJOURNAL_FINALIZE_SEGMENT_TIMEOUT_KEY,
DFSConfigKeys.DFS_QJOURNAL_FINALIZE_SEGMENT_TIMEOUT_DEFAULT);
this.selectInputStreamsTimeoutMs = conf.getInt(
DFSConfigKeys.DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_KEY,
DFSConfigKeys.DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_DEFAULT);
this.getJournalStateTimeoutMs = conf.getInt(
DFSConfigKeys.DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_KEY,
DFSConfigKeys.DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_DEFAULT);
this.newEpochTimeoutMs = conf.getInt(
DFSConfigKeys.DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_KEY,
DFSConfigKeys.DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_DEFAULT);
this.writeTxnsTimeoutMs = conf.getInt(
DFSConfigKeys.DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_KEY,
DFSConfigKeys.DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_DEFAULT);
}
@SuppressWarnings("rawtypes")
@Before
public void setupCluster() throws Exception {
Configuration conf = setupCommonConfig();
// Dial down the retention of extra edits and checkpoints. This is to
// help catch regressions of HDFS-4238 (SBN should not purge shared edits)
conf.setInt(DFSConfigKeys.DFS_NAMENODE_NUM_CHECKPOINTS_RETAINED_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY, 0);
int retryCount = 0;
while (true) {
try {
int basePort = 10060 + random.nextInt(100) * 2;
MiniDFSNNTopology topology = new MiniDFSNNTopology()
.addNameservice(new MiniDFSNNTopology.NSConf("ns1")
.addNN(new MiniDFSNNTopology.NNConf("nn1").setHttpPort(basePort))
.addNN(new MiniDFSNNTopology.NNConf("nn2").setHttpPort(basePort + 1)));
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(topology)
.numDataNodes(1)
.build();
cluster.waitActive();
nn0 = cluster.getNameNode(0);
nn1 = cluster.getNameNode(1);
fs = HATestUtil.configureFailoverFs(cluster, conf);
cluster.transitionToActive(0);
++retryCount;
break;
} catch (BindException e) {
LOG.info("Set up MiniDFSCluster failed due to port conflicts, retry "
+ retryCount + " times");
}
}
}
@BeforeClass
public static void init() throws Exception {
conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
initCluster(true);
}