下面列出了org.apache.hadoop.fs.ReadOption#org.apache.hadoop.hdfs.MiniDFSCluster 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testClose() throws Exception {
MiniDFSCluster cluster = new MiniDFSCluster.Builder(new HdfsConfiguration()).build();
try {
cluster.waitActive();
DataNode dn = cluster.getDataNodes().get(0);
FsDatasetImpl dataSet = (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn);
// set up replicasMap
String bpid = cluster.getNamesystem().getBlockPoolId();
ExtendedBlock[] blocks = setup(bpid, dataSet);
// test close
testClose(dataSet, blocks);
} finally {
cluster.shutdown();
}
}
/**
* Start a MiniDFS cluster backed SabotNode cluster
* @param testClass
* @param isImpersonationEnabled Enable impersonation in the cluster?
* @throws Exception
*/
protected static void startMiniDfsCluster(final String testClass, Configuration configuration) throws Exception {
Preconditions.checkArgument(!Strings.isNullOrEmpty(testClass), "Expected a non-null and non-empty test class name");
dfsConf = Preconditions.checkNotNull(configuration);
// Set the MiniDfs base dir to be the temp directory of the test, so that all files created within the MiniDfs
// are properly cleanup when test exits.
miniDfsStoragePath = Files.createTempDirectory(testClass).toString();
dfsConf.set("hdfs.minidfs.basedir", miniDfsStoragePath);
// HDFS-8880 and HDFS-8953 introduce metrics logging that requires log4j, but log4j is explicitly
// excluded in build. So disable logging to avoid NoClassDefFoundError for Log4JLogger.
dfsConf.set("dfs.namenode.metrics.logger.period.seconds", "0");
dfsConf.set("dfs.datanode.metrics.logger.period.seconds", "0");
// Start the MiniDfs cluster
dfsCluster = new MiniDFSCluster.Builder(dfsConf)
.numDataNodes(3)
.format(true)
.build();
fs = dfsCluster.getFileSystem();
}
@Before
public void setUp() throws Exception {
config = new HdfsConfiguration();
hdfsDir = new File(MiniDFSCluster.getBaseDirectory());
if ( hdfsDir.exists() && !FileUtil.fullyDelete(hdfsDir) ) {
throw new IOException("Could not delete hdfs directory '" + hdfsDir + "'");
}
LOG.info("--hdfsdir is " + hdfsDir.getAbsolutePath());
config.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
fileAsURI(new File(hdfsDir, "name")).toString());
config.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
new File(hdfsDir, "data").getPath());
config.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, "0.0.0.0:0");
config.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
config.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "0.0.0.0:0");
config.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY,
fileAsURI(new File(hdfsDir, "secondary")).toString());
config.set(DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY,
WILDCARD_HTTP_HOST + "0");
FileSystem.setDefaultUri(config, "hdfs://"+NAME_NODE_HOST + "0");
}
/**
* Test, on HDFS, that the FileLink is still readable
* even when the current file gets renamed.
*/
@Test
public void testHDFSLinkReadDuringRename() throws Exception {
HBaseTestingUtility testUtil = new HBaseTestingUtility();
Configuration conf = testUtil.getConfiguration();
conf.setInt("dfs.blocksize", 1024 * 1024);
conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024);
testUtil.startMiniDFSCluster(1);
MiniDFSCluster cluster = testUtil.getDFSCluster();
FileSystem fs = cluster.getFileSystem();
assertEquals("hdfs", fs.getUri().getScheme());
try {
testLinkReadDuringRename(fs, testUtil.getDefaultRootDirPath());
} finally {
testUtil.shutdownMiniCluster();
}
}
@BeforeClass
public static void createHDFS() throws IOException {
Assume.assumeTrue("HDFS cluster cannot be started on Windows without extensions.", !OperatingSystem.isWindows());
Configuration conf = new Configuration();
File dataDir = tempFolder.newFolder();
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath());
MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
hdfsCluster = builder.build();
dfs = hdfsCluster.getFileSystem();
hdfsURI = "hdfs://"
+ NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort())
+ "/";
}
void waitCheckpointDone(MiniDFSCluster cluster, long txid) {
long thisCheckpointTxId;
do {
try {
LOG.info("Waiting checkpoint to complete... " +
"checkpoint txid should increase above " + txid);
Thread.sleep(1000);
} catch (Exception e) {}
// The checkpoint is not done until the nn has received it from the bn
thisCheckpointTxId = cluster.getNameNode().getFSImage().getStorage()
.getMostRecentCheckpointTxId();
} while (thisCheckpointTxId < txid);
// Check that the checkpoint got uploaded to NN successfully
FSImageTestUtil.assertNNHasCheckpoints(cluster,
Collections.singletonList((int)thisCheckpointTxId));
}
@Test
public void testFsckNonExistent() throws Exception {
DFSTestUtil util = new DFSTestUtil.Builder().setName("TestFsck").
setNumFiles(20).build();
MiniDFSCluster cluster = null;
FileSystem fs = null;
try {
Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
fs = cluster.getFileSystem();
util.createFiles(fs, "/srcdat");
util.waitReplication(fs, "/srcdat", (short)3);
String outStr = runFsck(conf, 0, true, "/non-existent");
assertEquals(-1, outStr.indexOf(NamenodeFsck.HEALTHY_STATUS));
System.out.println(outStr);
util.cleanup(fs, "/srcdat");
} finally {
if (fs != null) {try{fs.close();} catch(Exception e){}}
if (cluster != null) { cluster.shutdown(); }
}
}
@Before
@Override
public void setUp() throws Exception {
super.setUp();
conf.setClass(PolicyProvider.POLICY_PROVIDER_CONFIG,
HDFSPolicyProvider.class, PolicyProvider.class);
// Many of the tests expect a replication value of 1 in the output
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
dfsCluster.waitClusterUp();
namenode = conf.get(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "file:///");
username = System.getProperty("user.name");
fs = dfsCluster.getFileSystem();
assertTrue("Not a HDFS: "+fs.getUri(),
fs instanceof DistributedFileSystem);
}
@BeforeClass
public static void beforeClass() throws Exception {
LOG.info("Starting mini clusters");
try {
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
.format(true).racks(null).build();
remoteFs = dfsCluster.getFileSystem();
} catch (IOException io) {
throw new RuntimeException("problem starting mini dfs cluster", io);
}
if (miniTezCluster == null) {
miniTezCluster = new MiniTezCluster(TestRecovery.class.getName(), 1, 1, 1);
Configuration miniTezconf = new Configuration(conf);
miniTezconf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 4);
miniTezconf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
miniTezconf.setLong(TezConfiguration.TEZ_AM_SLEEP_TIME_BEFORE_EXIT_MILLIS, 500);
miniTezCluster.init(miniTezconf);
miniTezCluster.start();
}
}
@Before
public void setUp() throws Exception {
conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 512L);
/*
* Lower the DN heartbeat, DF rate, and recheck interval to one second
* so state about failures and datanode death propagates faster.
*/
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_DF_INTERVAL_KEY, 1000);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1000);
// Allow a single volume failure (there are two volumes)
conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
fs = cluster.getFileSystem();
dataDir = cluster.getDataDirectory();
}
@BeforeClass
public static void beforeClass() throws Exception {
GSetGenerator gSetGenerator = new GSetGenerator();
gSetGenerator.clear();
GSet<INode, INodeWithAdditionalFields> gset = gSetGenerator.getGSet((short) 3, 10, 500);
nna = new HadoopWebServerMain();
ApplicationConfiguration conf = new ApplicationConfiguration();
conf.set("ldap.enable", "false");
conf.set("authorization.enable", "false");
conf.set("nna.historical", "false");
conf.set("nna.base.dir", MiniDFSCluster.getBaseDirectory());
conf.set("nna.web.base.dir", "src/main/resources/webapps/nna");
conf.set("nna.query.engine.impl", JavaStreamQueryEngine.class.getCanonicalName());
nna.init(conf, gset);
hostPort = new HttpHost("localhost", 4567);
}
@Test
public void testNamenodeRpcBindAny() throws IOException {
Configuration conf = new HdfsConfiguration();
// The name node in MiniDFSCluster only binds to 127.0.0.1.
// We can set the bind address to 0.0.0.0 to make it listen
// to all interfaces.
conf.set(DFS_NAMENODE_RPC_BIND_HOST_KEY, "0.0.0.0");
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).build();
cluster.waitActive();
assertEquals("0.0.0.0", ((NameNodeRpcServer)cluster.getNameNodeRpc())
.getClientRpcServer().getListenerAddress().getHostName());
} finally {
if (cluster != null) {
cluster.shutdown();
}
// Reset the config
conf.unset(DFS_NAMENODE_RPC_BIND_HOST_KEY);
}
}
@Override
protected void setupMiniDfsAndMrClusters() {
try {
CONF_DIR.mkdirs();
if (CONF_FILE.exists()) {
CONF_FILE.delete();
}
m_conf = new Configuration();
m_conf.set("io.sort.mb", "1");
m_conf.writeXml(new FileOutputStream(CONF_FILE));
int dataNodes = 4;
m_dfs = new MiniDFSCluster(m_conf, dataNodes, true, null);
m_fileSys = m_dfs.getFileSystem();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Test
public void testAppend() throws Exception {
MiniDFSCluster cluster = new MiniDFSCluster.Builder(new HdfsConfiguration()).build();
try {
cluster.waitActive();
DataNode dn = cluster.getDataNodes().get(0);
FsDatasetImpl dataSet = (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn);
// set up replicasMap
String bpid = cluster.getNamesystem().getBlockPoolId();
ExtendedBlock[] blocks = setup(bpid, dataSet);
// test append
testAppend(bpid, dataSet, blocks);
} finally {
cluster.shutdown();
}
}
private MiniDFSCluster initMiniHACluster(int nn1port, int nn2port)
throws IOException {
Configuration confForMiniDFS = new Configuration();
Builder builder = new MiniDFSCluster.Builder(confForMiniDFS)
.nnTopology(new MiniDFSNNTopology()
.addNameservice(new MiniDFSNNTopology.NSConf("ns1")
.addNN(new MiniDFSNNTopology.NNConf("nn1").setIpcPort(nn1port))
.addNN(new MiniDFSNNTopology.NNConf("nn2").setIpcPort(nn2port))
))
.numDataNodes(1);
MiniDFSCluster cluster = builder.build();
cluster.waitActive();
NameNode nnode1 = cluster.getNameNode(0);
assertTrue(nnode1.isStandbyState());
NameNode nnode2 = cluster.getNameNode(1);
assertTrue(nnode2.isStandbyState());
cluster.transitionToActive(0);
assertFalse(nnode1.isStandbyState());
return cluster;
}
/**
* Wait for the datanodes in the cluster to process any block
* deletions that have already been asynchronously queued.
*/
public static void waitForDNDeletions(final MiniDFSCluster cluster)
throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
for (DataNode dn : cluster.getDataNodes()) {
if (DataNodeTestUtils.getPendingAsyncDeletions(dn) > 0) {
return false;
}
}
return true;
}
}, 1000, 10000);
}
public void testWithDFS() throws IOException {
MiniDFSCluster dfs = null;
MiniMRCluster mr = null;
FileSystem fileSys = null;
try {
final int taskTrackers = 4;
JobConf conf = new JobConf();
conf.set(JTConfig.JT_SYSTEM_DIR, "/tmp/custom/mapred/system");
dfs = new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
fileSys = dfs.getFileSystem();
mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1, null, null, conf);
runWordCount(mr, mr.createJobConf(), conf.get("mapred.system.dir"));
} finally {
if (dfs != null) { dfs.shutdown(); }
if (mr != null) { mr.shutdown();
}
}
}
@Test
public void testStart() throws IOException {
// Start minicluster
NfsConfiguration config = new NfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(config).numDataNodes(1)
.build();
cluster.waitActive();
// Use emphral port in case tests are running in parallel
config.setInt("nfs3.mountd.port", 0);
config.setInt("nfs3.server.port", 0);
// Start nfs
Nfs3 nfs3 = new Nfs3(config);
nfs3.startServiceInternal(false);
RpcProgramMountd mountd = (RpcProgramMountd) nfs3.getMountd()
.getRpcProgram();
mountd.nullOp(new XDR(), 1234, InetAddress.getByName("localhost"));
RpcProgramNfs3 nfsd = (RpcProgramNfs3) nfs3.getRpcProgram();
nfsd.nullProcedure();
cluster.shutdown();
}
@BeforeClass
public static void setup() throws Exception {
cluster = new MiniDFSCluster.Builder(new Configuration()).numDataNodes(1)
.format(true).build();
totalFileSize = 0;
for (int i=0; i<N_FILES; ++i)
totalFileSize += createFile("/tmp/source/" + String.valueOf(i), SIZEOF_EACH_FILE);
}
public void setUp(boolean simulateEditLogCrash) throws IOException {
conf = new Configuration();
conf.set("dfs.secondary.http.address", "0.0.0.0:0");
conf.setBoolean("dfs.simulate.editlog.crash", simulateEditLogCrash);
cluster = new MiniDFSCluster(conf, 3, true, null);
fs = cluster.getFileSystem();
random = new Random();
}
private static MiniDFSCluster startMini(String testName) throws IOException {
File baseDir = new File("./target/hdfs/" + testName).getAbsoluteFile();
FileUtil.fullyDelete(baseDir);
Configuration conf = new Configuration();
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
MiniDFSCluster hdfsCluster = builder.clusterId(testName).build();
hdfsCluster.waitActive();
return hdfsCluster;
}
@Test(timeout=120000)
public void testTopUsersNoPeriods() throws Exception {
final Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.NNTOP_ENABLED_KEY, true);
conf.set(DFSConfigKeys.NNTOP_WINDOWS_MINUTES_KEY, "");
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
cluster.waitActive();
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName mxbeanNameFsns = new ObjectName(
"Hadoop:service=NameNode,name=FSNamesystemState");
FileSystem fs = cluster.getFileSystem();
final Path path = new Path("/");
final int NUM_OPS = 10;
for (int i=0; i< NUM_OPS; i++) {
fs.listStatus(path);
fs.setTimes(path, 0, 1);
}
String topUsers =
(String) (mbs.getAttribute(mxbeanNameFsns, "TopUserOpCounts"));
assertNotNull("Expected TopUserOpCounts bean!", topUsers);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
protected void setUp() throws Exception {
super.setUp();
dfscluster = new MiniDFSCluster(new JobConf(), 2, true, null);
fs = dfscluster.getFileSystem();
mapred = new MiniMRCluster(2, fs.getUri().toString(), 1);
inputPath = new Path(fs.getHomeDirectory(), "test");
filea = new Path(inputPath,"a");
fileb = new Path(inputPath,"b");
filec = new Path(inputPath,"c");
archivePath = new Path(fs.getHomeDirectory(), "tmp");
}
private static boolean wipeAndRestartDn(MiniDFSCluster cluster, int dnIndex)
throws IOException {
// stop the DN, reformat it, then start it again with the same xfer port.
DataNodeProperties dnProps = cluster.stopDataNode(dnIndex);
cluster.formatDataNodeDirs();
return cluster.restartDataNode(dnProps, true);
}
@BeforeClass
public static void createHDFS() throws IOException {
org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
File dataDir = TEMPORARY_FOLDER.newFolder();
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath());
MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
hdfsCluster = builder.build();
dfs = hdfsCluster.getFileSystem();
outputDir = "hdfs://"
+ NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort());
}
public void testNewReaderWithNameNodeHA() throws Exception {
deleteMiniClusterDir();
int nn1port = AvailablePortHelper.getRandomAvailableTCPPort();
int nn2port = AvailablePortHelper.getRandomAvailableTCPPort();
MiniDFSCluster cluster = initMiniHACluster(nn1port, nn2port);
initClientHAConf(nn1port, nn2port);
HDFSStoreImpl store1 = (HDFSStoreImpl) hsf.create("Store-1");
regionfactory.setHDFSStoreName(store1.getName());
Region<Object, Object> region1 = regionfactory.create("region-1");
HdfsRegionManager regionManager1 = ((LocalRegion)region1).getHdfsRegionManager();
HoplogOrganizer<SortedHoplogPersistedEvent> organizer = doRead(regionManager1);
organizer.close();
TestUtils.addExpectedException("java.io.EOFException");
NameNode nnode2 = cluster.getNameNode(1);
assertTrue(nnode2.isStandbyState());
cluster.shutdownNameNode(0);
cluster.transitionToActive(1);
assertFalse(nnode2.isStandbyState());
organizer = new HdfsSortedOplogOrganizer(regionManager1, 0);
byte[] keyBytes1 = BlobHelper.serializeToBlob("1");
byte[] keyBytes3 = BlobHelper.serializeToBlob("3");
byte[] keyBytes4 = BlobHelper.serializeToBlob("4");
assertEquals("2-1", organizer.read(keyBytes1).getValue());
assertEquals("3-3", organizer.read(keyBytes3).getValue());
assertEquals("1-4", organizer.read(keyBytes4).getValue());
TestUtils.removeExpectedException("java.io.EOFException");
region1.destroyRegion();
store1.destroy();
cluster.shutdown();
FileUtils.deleteDirectory(new File("hdfs-test-cluster"));
}
@Before
public void setUp() throws Exception {
conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCKSIZE);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPLICATION)
.build();
cluster.waitActive();
fsdir = cluster.getNamesystem().getFSDirectory();
dfs = cluster.getFileSystem();
}
/**
* NameNode should load the edits correctly if the applicable edits are
* present in the BKJM.
*/
@Test
public void testNameNodeMultipleSwitchesUsingBKJM() throws Exception {
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, BKJMUtil
.createJournalURI("/correctEditLogSelection").toString());
BKJMUtil.addJournalManagerDefinition(conf);
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(0)
.manageNameDfsSharedDirs(false).build();
NameNode nn1 = cluster.getNameNode(0);
NameNode nn2 = cluster.getNameNode(1);
cluster.waitActive();
cluster.transitionToActive(0);
nn1.getRpcServer().rollEditLog(); // Roll Edits from current Active.
// Transition to standby current active gracefully.
cluster.transitionToStandby(0);
// Make the other Active and Roll edits multiple times
cluster.transitionToActive(1);
nn2.getRpcServer().rollEditLog();
nn2.getRpcServer().rollEditLog();
// Now One more failover. So NN1 should be able to failover successfully.
cluster.transitionToStandby(1);
cluster.transitionToActive(0);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
@SuppressWarnings("unchecked") // for any(Token.class)
@Test
public void testLazyTokenFetchForWebhdfs() throws Exception {
MiniDFSCluster cluster = null;
WebHdfsFileSystem fs = null;
try {
final Configuration clusterConf = new HdfsConfiguration(conf);
SecurityUtil.setAuthenticationMethod(SIMPLE, clusterConf);
clusterConf.setBoolean(DFSConfigKeys
.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
// trick the NN into thinking security is enabled w/o it trying
// to login from a keytab
UserGroupInformation.setConfiguration(clusterConf);
cluster = new MiniDFSCluster.Builder(clusterConf).numDataNodes(1).build();
cluster.waitActive();
SecurityUtil.setAuthenticationMethod(KERBEROS, clusterConf);
UserGroupInformation.setConfiguration(clusterConf);
uri = DFSUtil.createUri(
"webhdfs", cluster.getNameNode().getHttpAddress());
validateLazyTokenFetch(clusterConf);
} finally {
IOUtils.cleanup(null, fs);
if (cluster != null) {
cluster.shutdown();
}
}
}
@Before
public void startUpCluster() throws IOException {
conf = new HdfsConfiguration();
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(REPL_FACTOR)
.storageTypes(new StorageType[] { storageType, storageType } )
.build();
fs = cluster.getFileSystem();
bpid = cluster.getNamesystem().getBlockPoolId();
}