下面列出了org.apache.hadoop.fs.FSDataOutputStream#writeBytes ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Test over replicated block should get invalidated when decreasing the
* replication for a partial block.
*/
@Test
public void testInvalidateOverReplicatedBlock() throws Exception {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
.build();
try {
final FSNamesystem namesystem = cluster.getNamesystem();
final BlockManager bm = namesystem.getBlockManager();
FileSystem fs = cluster.getFileSystem();
Path p = new Path(MiniDFSCluster.getBaseDirectory(), "/foo1");
FSDataOutputStream out = fs.create(p, (short) 2);
out.writeBytes("HDFS-3119: " + p);
out.hsync();
fs.setReplication(p, (short) 1);
out.close();
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, p);
assertEquals("Expected only one live replica for the block", 1, bm
.countNodes(block.getLocalBlock()).liveReplicas());
} finally {
cluster.shutdown();
}
}
@BeforeClass
public static void createCSVFiles() throws IOException {
localfs = LocalFileSystem.getInstance();
csvFile = new Path("target/temp.csv");
reorderedFile = new Path("target/reordered.csv");
tsvFile = new Path("target/temp.tsv");
validatorFile = new Path("target/validator.csv");
FSDataOutputStream out = localfs.create(csvFile, true);
out.writeBytes(CSV_CONTENT);
out.close();
out = localfs.create(reorderedFile, true);
out.writeBytes(REORDERED_CSV_CONTENT);
out.close();
out = localfs.create(validatorFile, true);
out.writeBytes(VALIDATOR_CSV_CONTENT);
out.close();
out = localfs.create(tsvFile, true);
out.writeBytes(TSV_CONTENT);
out.close();
}
/**
* Tests isUnderConstruction() functionality.
*/
public void testIsUnderConstruction() throws Exception {
// Open output file stream.
FSDataOutputStream out = hdfs.create(TEST_FILE, true);
out.writeBytes("test");
// Test file under construction.
FSDataInputStream in1 = hftpFs.open(TEST_FILE);
assertTrue(in1.isUnderConstruction());
in1.close();
// Close output file stream.
out.close();
// Test file not under construction.
FSDataInputStream in2 = hftpFs.open(TEST_FILE);
assertFalse(in2.isUnderConstruction());
in2.close();
}
/**
* The idea for making sure that there is no more than one instance
* running in an HDFS is to create a file in the HDFS, writes the hostname
* of the machine on which the instance is running to the file, but did not
* close the file until it exits.
*
* This prevents the second instance from running because it can not
* creates the file while the first one is running.
*
* This method checks if there is any running instance. If no, mark yes.
* Note that this is an atomic operation.
*
* @return null if there is a running instance;
* otherwise, the output stream to the newly created file.
*/
private OutputStream checkAndMarkRunning() throws IOException {
try {
if (fs.exists(idPath)) {
// try appending to it so that it will fail fast if another balancer is
// running.
IOUtils.closeStream(fs.append(idPath));
fs.delete(idPath, true);
}
final FSDataOutputStream fsout = fs.create(idPath, false);
// mark balancer idPath to be deleted during filesystem closure
fs.deleteOnExit(idPath);
if (write2IdFile) {
fsout.writeBytes(InetAddress.getLocalHost().getHostName());
fsout.hflush();
}
return fsout;
} catch(RemoteException e) {
if(AlreadyBeingCreatedException.class.getName().equals(e.getClassName())){
return null;
} else {
throw e;
}
}
}
private void writeConfigFile(Path name, List<String> nodes)
throws IOException {
// delete if it already exists
if (localFileSys.exists(name)) {
localFileSys.delete(name, true);
}
FSDataOutputStream stm = localFileSys.create(name);
if (nodes != null) {
for (Iterator<String> it = nodes.iterator(); it.hasNext();) {
String node = it.next();
stm.writeBytes(node);
stm.writeBytes("\n");
}
}
stm.close();
}
/**
* Scenario: Read an under construction file using hftp.
*
* Expected: Hftp should be able to read the latest byte after the file
* has been hdfsSynced (but not yet closed).
*
* @throws IOException
*/
public void testConcurrentRead() throws IOException {
// Write a test file.
FSDataOutputStream out = hdfs.create(TEST_FILE, true);
out.writeBytes("123");
out.sync(); // sync but not close
// Try read using hftp.
FSDataInputStream in = hftpFs.open(TEST_FILE);
assertEquals('1', in.read());
assertEquals('2', in.read());
assertEquals('3', in.read());
in.close();
// Try seek and read.
in = hftpFs.open(TEST_FILE);
in.seek(2);
assertEquals('3', in.read());
in.close();
out.close();
}
@Test(timeout=60000)
public void testFileCloseStatus() throws IOException {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
DistributedFileSystem fs = cluster.getFileSystem();
try {
// create a new file.
Path file = new Path("/simpleFlush.dat");
FSDataOutputStream output = fs.create(file);
// write to file
output.writeBytes("Some test data");
output.flush();
assertFalse("File status should be open", fs.isFileClosed(file));
output.close();
assertTrue("File status should be closed", fs.isFileClosed(file));
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
private static void printErrorMessages(FSDataOutputStream out, Map<String, List<String>> errors) throws IOException {
for(Map.Entry<String, List<String>> entry : errors.entrySet()) {
String index = entry.getKey();
List<String> messages = entry.getValue();
out.writeBytes(index + ":\n");
for (String message : messages) {
out.writeBytes("\t" + message + "\n");
}
}
}
/**
* This method creates an file that contains a line with a MD5 sum
*
* @param fs
* FileSystem where to create the file.
* @param md5sum
* The string containing the MD5 sum.
* @param remoteMd5Path
* The path where to save the file.
* @throws IOException
*/
private void createMd5SumFile(FileSystem fs, String md5sum, Path remoteMd5Path) throws IOException {
FSDataOutputStream os = null;
try {
os = fs.create(remoteMd5Path, true);
os.writeBytes(md5sum);
os.flush();
} catch (Exception e) {
LOG.error("{}", e);
} finally {
if (os != null) {
os.close();
}
}
}
public void testCorruptHfileBucketFail() throws Exception {
int port = AvailablePortHelper.getRandomAvailableTCPPort();
MiniDFSCluster cluster = initMiniCluster(port ,1);
hsf.setHomeDir("Store-1");
hsf.setNameNodeURL("hdfs://127.0.0.1:" + port);
HDFSStoreImpl store1 = (HDFSStoreImpl) hsf.create("Store-1");
// create a corrupt file
FileSystem fs = store1.getFileSystem();
for (int i = 0; i < 113; i++) {
FSDataOutputStream opStream = fs.create(new Path("Store-1/region-1/" + i + "/1-1-1.hop"));
opStream.writeBytes("Some random corrupt file");
opStream.close();
}
// create region with store
regionfactory.setHDFSStoreName(store1.getName());
Region<Object, Object> region1 = regionfactory.create("region-1");
ExpectedException ex = TestUtils.addExpectedException("CorruptHFileException");
try {
region1.get("key");
fail("get should have failed with corrupt file error");
} catch (HDFSIOException e) {
// expected
} finally {
ex.remove();
}
region1.destroyRegion();
store1.destroy();
cluster.shutdown();
FileUtils.deleteDirectory(new File("hdfs-test-cluster"));
}
private Path createTempFile(String filename, String contents)
throws IOException {
Path path = new Path(TEST_ROOT_DIR, filename);
FSDataOutputStream os = localFs.create(path);
os.writeBytes(contents);
os.close();
localFs.setPermission(path, new FsPermission("700"));
return path;
}
/**
* Creates and closes a file of certain length.
* Calls append to allow next write() operation to add to the end of it
* After write() invocation, calls hflush() to make sure that data sunk through
* the pipeline and check the state of the last block's replica.
* It supposes to be in RBW state
*
* @throws IOException in case of an error
*/
@Test
public void pipeline_01() throws IOException {
final String METHOD_NAME = GenericTestUtils.getMethodName();
if(LOG.isDebugEnabled()) {
LOG.debug("Running " + METHOD_NAME);
}
Path filePath = new Path("/" + METHOD_NAME + ".dat");
DFSTestUtil.createFile(fs, filePath, FILE_SIZE, REPL_FACTOR, rand.nextLong());
if(LOG.isDebugEnabled()) {
LOG.debug("Invoking append but doing nothing otherwise...");
}
FSDataOutputStream ofs = fs.append(filePath);
ofs.writeBytes("Some more stuff to write");
((DFSOutputStream) ofs.getWrappedStream()).hflush();
List<LocatedBlock> lb = cluster.getNameNodeRpc().getBlockLocations(
filePath.toString(), FILE_SIZE - 1, FILE_SIZE).getLocatedBlocks();
String bpid = cluster.getNamesystem().getBlockPoolId();
for (DataNode dn : cluster.getDataNodes()) {
Replica r = DataNodeTestUtils.fetchReplicaInfo(dn, bpid, lb.get(0)
.getBlock().getBlockId());
assertTrue("Replica on DN " + dn + " shouldn't be null", r != null);
assertEquals("Should be RBW replica on " + dn
+ " after sequence of calls append()/write()/hflush()",
HdfsServerConstants.ReplicaState.RBW, r.getState());
}
ofs.close();
}
/**
* This method constructs the JobConf to be used to run the map reduce job to
* download the files from S3. This is a potentially expensive method since it
* makes multiple calls to S3 to get a listing of all the input data. Clients
* are encouraged to cache the returned JobConf reference and not call this
* method multiple times unless necessary.
*
* @return the JobConf to be used to run the map reduce job to download the
* files from S3.
*/
public JobConf getJobConf() throws IOException, ParseException {
JobConf conf = new JobConf(CopyFromS3.class);
conf.setJobName("CopyFromS3");
conf.setOutputKeyClass(NullWritable.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(S3CopyMapper.class);
// We configure a reducer, even though we don't use it right now.
// The idea is that, in the future we may.
conf.setReducerClass(HDFSWriterReducer.class);
conf.setNumReduceTasks(0);
FileInputFormat.setInputPaths(conf, new Path(tempFile));
FileOutputFormat.setOutputPath(conf, new Path(outputPath));
conf.setOutputFormat(TextOutputFormat.class);
conf.setCompressMapOutput(true);
JobClient jobClient = new JobClient(conf);
FileSystem inputFS = FileSystem.get(URI.create(inputPathPrefix), conf);
DatePathFilter datePathFilter = new DatePathFilter(startDate, endDate);
List<Path> filePaths = getFilePaths(inputFS, new Path(inputPathPrefix), datePathFilter, jobClient.getDefaultMaps());
// Write the file names to a temporary index file to be used
// as input to the map tasks.
FileSystem outputFS = FileSystem.get(URI.create(tempFile), conf);
FSDataOutputStream outputStream = outputFS.create(new Path(tempFile), true);
try {
for (Path path : filePaths) {
outputStream.writeBytes(path.toString() + "\n");
}
}
finally {
outputStream.close();
}
conf.setNumMapTasks(Math.min(filePaths.size(), jobClient.getDefaultMaps()));
return conf;
}
/**
* Creates users file with the content as the String usersFileContent.
* @param usersFilePath the path to the file that is to be created
* @param usersFileContent Content of users file
* @throws IOException
*/
private static void writeUserList(Path usersFilePath, String usersFileContent)
throws IOException {
FSDataOutputStream out = null;
try {
out = fs.create(usersFilePath, true);
out.writeBytes(usersFileContent);
} finally {
if (out != null) {
out.close();
}
}
}
public static void writeLine(String path, String data) {
FSDataOutputStream dos = null;
try {
Path dst = new Path(path);
hdfs.createNewFile(dst);
dos = hdfs.append(new Path(path));
dos.writeBytes(data + "\r\n");
dos.close();
log.info("write hdfs " + path + " successed. ");
} catch (Exception e) {
e.printStackTrace();
log.error("write hdfs " + path + " failed. ", e);
}
}
/** @throws Exception If failed. */
@Test
public void testRenameIfSrcPathIsAlreadyBeingOpenedToWrite() throws Exception {
Path fsHome = new Path(primaryFsUri);
Path srcFile = new Path(fsHome, "srcFile");
Path dstFile = new Path(fsHome, "dstFile");
FSDataOutputStream os = fs.create(srcFile, EnumSet.noneOf(CreateFlag.class),
Options.CreateOpts.perms(FsPermission.getDefault()));
os.close();
os = fs.create(srcFile, EnumSet.of(CreateFlag.APPEND),
Options.CreateOpts.perms(FsPermission.getDefault()));
fs.rename(srcFile, dstFile);
assertPathExists(fs, dstFile);
String testStr = "Test";
try {
os.writeBytes(testStr);
}
finally {
os.close();
}
try (FSDataInputStream is = fs.open(dstFile)) {
byte[] buf = new byte[testStr.getBytes().length];
is.readFully(buf);
assertEquals(testStr, new String(buf));
}
}
private void testPersistHelper(Configuration conf) throws IOException {
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).build();
cluster.waitActive();
FSNamesystem fsn = cluster.getNamesystem();
DistributedFileSystem fs = cluster.getFileSystem();
final Path dir = new Path("/abc/def");
final Path file1 = new Path(dir, "f1");
final Path file2 = new Path(dir, "f2");
// create an empty file f1
fs.create(file1).close();
// create an under-construction file f2
FSDataOutputStream out = fs.create(file2);
out.writeBytes("hello");
((DFSOutputStream) out.getWrappedStream()).hsync(EnumSet
.of(SyncFlag.UPDATE_LENGTH));
// checkpoint
fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
fs.saveNamespace();
fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
cluster.restartNameNode();
cluster.waitActive();
fs = cluster.getFileSystem();
assertTrue(fs.isDirectory(dir));
assertTrue(fs.exists(file1));
assertTrue(fs.exists(file2));
// check internals of file2
INodeFile file2Node = fsn.dir.getINode4Write(file2.toString()).asFile();
assertEquals("hello".length(), file2Node.computeFileSize());
assertTrue(file2Node.isUnderConstruction());
BlockInfoContiguous[] blks = file2Node.getBlocks();
assertEquals(1, blks.length);
assertEquals(BlockUCState.UNDER_CONSTRUCTION, blks[0].getBlockUCState());
// check lease manager
Lease lease = fsn.leaseManager.getLeaseByPath(file2.toString());
Assert.assertNotNull(lease);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
/**
* Move an open file into archival storage
*/
@Test
public void testMigrateOpenFileToArchival() throws Exception {
LOG.info("testMigrateOpenFileToArchival");
final Path fooDir = new Path("/foo");
Map<Path, BlockStoragePolicy> policyMap = Maps.newHashMap();
policyMap.put(fooDir, COLD);
NamespaceScheme nsScheme = new NamespaceScheme(Arrays.asList(fooDir), null,
BLOCK_SIZE, null, policyMap);
ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF,
NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES), null);
MigrationTest test = new MigrationTest(clusterScheme, nsScheme);
test.setupCluster();
// create an open file
banner("writing to file /foo/bar");
final Path barFile = new Path(fooDir, "bar");
DFSTestUtil.createFile(test.dfs, barFile, BLOCK_SIZE, (short) 1, 0L);
FSDataOutputStream out = test.dfs.append(barFile);
out.writeBytes("hello, ");
((DFSOutputStream) out.getWrappedStream()).hsync();
try {
banner("start data migration");
test.setStoragePolicy(); // set /foo to COLD
test.migrate();
// make sure the under construction block has not been migrated
LocatedBlocks lbs = test.dfs.getClient().getLocatedBlocks(
barFile.toString(), BLOCK_SIZE);
LOG.info("Locations: " + lbs);
List<LocatedBlock> blks = lbs.getLocatedBlocks();
Assert.assertEquals(1, blks.size());
Assert.assertEquals(1, blks.get(0).getLocations().length);
banner("finish the migration, continue writing");
// make sure the writing can continue
out.writeBytes("world!");
((DFSOutputStream) out.getWrappedStream()).hsync();
IOUtils.cleanup(LOG, out);
lbs = test.dfs.getClient().getLocatedBlocks(
barFile.toString(), BLOCK_SIZE);
LOG.info("Locations: " + lbs);
blks = lbs.getLocatedBlocks();
Assert.assertEquals(1, blks.size());
Assert.assertEquals(1, blks.get(0).getLocations().length);
banner("finish writing, starting reading");
// check the content of /foo/bar
FSDataInputStream in = test.dfs.open(barFile);
byte[] buf = new byte[13];
// read from offset 1024
in.readFully(BLOCK_SIZE, buf, 0, buf.length);
IOUtils.cleanup(LOG, in);
Assert.assertEquals("hello, world!", new String(buf));
} finally {
test.shutdownCluster();
}
}
/**
* Creates a job history file of a given specific version. This method should
* change if format/content of future versions of job history file changes.
*/
private void writeHistoryFile(FSDataOutputStream out, long version)
throws IOException {
String delim = "\n"; // '\n' for version 0
String counters = COUNTERS;
String jobConf = "job.xml";
if (version > 0) { // line delimeter should be '.' for later versions
// Change the delimiter
delim = DELIM + delim;
// Write the version line
out.writeBytes(RecordTypes.Meta.name() + " VERSION=\""
+ JobHistory.VERSION + "\" " + delim);
jobConf = JobHistory.escapeString(jobConf);
counters = JobHistory.escapeString(counters);
}
// Write the job-start line
out.writeBytes("Job JOBID=\"" + JOB + "\" JOBNAME=\"" + JOBNAME + "\""
+ " USER=\"" + USER + "\" SUBMIT_TIME=\"" + TIME + "\""
+ " JOBCONF=\"" + jobConf + "\" " + delim);
// Write the job-launch line
out.writeBytes("Job JOBID=\"" + JOB + "\" LAUNCH_TIME=\"" + TIME + "\""
+ " TOTAL_MAPS=\"1\" TOTAL_REDUCES=\"0\" " + delim);
// Write the task start line
out.writeBytes("Task TASKID=\"" + TASK_ID + "\" TASK_TYPE=\"MAP\""
+ " START_TIME=\"" + TIME + "\" SPLITS=\"\""
+ " TOTAL_MAPS=\"1\" TOTAL_REDUCES=\"0\" " + delim);
// Write the attempt start line
out.writeBytes("MapAttempt TASK_TYPE=\"MAP\" TASKID=\"" + TASK_ID + "\""
+ " TASK_ATTEMPT_ID=\"" + TASK_ATTEMPT_ID + "\""
+ " START_TIME=\"" + TIME + "\""
+ " HOSTNAME=\"" + HOSTNAME + "\" " + delim);
// Write the attempt finish line
out.writeBytes("MapAttempt TASK_TYPE=\"MAP\" TASKID=\"" + TASK_ID + "\""
+ " TASK_ATTEMPT_ID=\"" + TASK_ATTEMPT_ID + "\""
+ " FINISH_TIME=\"" + TIME + "\""
+ " TASK_STATUS=\"SUCCESS\""
+ " HOSTNAME=\"" + HOSTNAME + "\" " + delim);
// Write the task finish line
out.writeBytes("Task TASKID=\"" + TASK_ID + "\" TASK_TYPE=\"MAP\""
+ " TASK_STATUS=\"SUCCESS\""
+ " FINISH_TIME=\"" + TIME + "\""
+ " COUNTERS=\"" + counters + "\" " + delim);
// Write the job-finish line
out.writeBytes("Job JOBID=\"" + JOB + "\" FINISH_TIME=\"" + TIME + "\""
+ " TOTAL_MAPS=\"1\" TOTAL_REDUCES=\"0\""
+ " JOB_STATUS=\"SUCCESS\" FINISHED_MAPS=\"1\""
+ " FINISHED_REDUCES=\"0\" FAILED_MAPS=\"0\""
+ " FAILED_REDUCES=\"0\""
+ " COUNTERS=\"" + counters + "\" " + delim);
}
/**
* Test running many balancer simultaneously.
*
* Case-1: First balancer is running. Now, running second one should get
* "Another balancer is running. Exiting.." IOException and fail immediately
*
* Case-2: When running second balancer 'balancer.id' file exists but the
* lease doesn't exists. Now, the second balancer should run successfully.
*/
@Test(timeout = 100000)
public void testManyBalancerSimultaneously() throws Exception {
final Configuration conf = new HdfsConfiguration();
initConf(conf);
// add an empty node with half of the capacities(4 * CAPACITY) & the same
// rack
long[] capacities = new long[] { 4 * CAPACITY };
String[] racks = new String[] { RACK0 };
long newCapacity = 2 * CAPACITY;
String newRack = RACK0;
LOG.info("capacities = " + long2String(capacities));
LOG.info("racks = " + Arrays.asList(racks));
LOG.info("newCapacity= " + newCapacity);
LOG.info("newRack = " + newRack);
LOG.info("useTool = " + false);
assertEquals(capacities.length, racks.length);
int numOfDatanodes = capacities.length;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length)
.racks(racks).simulatedCapacities(capacities).build();
try {
cluster.waitActive();
client = NameNodeProxies.createProxy(conf,
cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
long totalCapacity = sum(capacities);
// fill up the cluster to be 30% full
final long totalUsedSpace = totalCapacity * 3 / 10;
createFile(cluster, filePath, totalUsedSpace / numOfDatanodes,
(short) numOfDatanodes, 0);
// start up an empty node with the same capacity and on the same rack
cluster.startDataNodes(conf, 1, true, null, new String[] { newRack },
new long[] { newCapacity });
// Case1: Simulate first balancer by creating 'balancer.id' file. It
// will keep this file until the balancing operation is completed.
FileSystem fs = cluster.getFileSystem(0);
final FSDataOutputStream out = fs
.create(Balancer.BALANCER_ID_PATH, false);
out.writeBytes(InetAddress.getLocalHost().getHostName());
out.hflush();
assertTrue("'balancer.id' file doesn't exist!",
fs.exists(Balancer.BALANCER_ID_PATH));
// start second balancer
final String[] args = { "-policy", "datanode" };
final Tool tool = new Cli();
tool.setConf(conf);
int exitCode = tool.run(args); // start balancing
assertEquals("Exit status code mismatches",
ExitStatus.IO_EXCEPTION.getExitCode(), exitCode);
// Case2: Release lease so that another balancer would be able to
// perform balancing.
out.close();
assertTrue("'balancer.id' file doesn't exist!",
fs.exists(Balancer.BALANCER_ID_PATH));
exitCode = tool.run(args); // start balancing
assertEquals("Exit status code mismatches",
ExitStatus.SUCCESS.getExitCode(), exitCode);
} finally {
cluster.shutdown();
}
}