下面列出了org.apache.hadoop.fs.FSDataOutputStream#writeUTF ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testConstraintsGetReadyTimestamp() throws IOException {
SignalManager manager = new SignalManager(fileSystem, testDirectory);
Constraints constraints = new Constraints(DatasetTestUtilities.USER_SCHEMA).
with("email", "[email protected]");
Path signalFilePath = new Path(this.testDirectory,
"email=testConstraintsReady%40domain.com");
// drop a file at the signal path
FSDataOutputStream stream = this.fileSystem.create(signalFilePath, true);
stream.writeUTF(String.valueOf(System.currentTimeMillis()));
stream.close();
Assert.assertTrue(manager.getReadyTimestamp(constraints) != -1);
}
@Test
public void testRewritingClusterIdToPB() throws Exception {
TEST_UTIL.startMiniZKCluster();
TEST_UTIL.startMiniDFSCluster(1);
TEST_UTIL.createRootDir();
Path rootDir = CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration());
FileSystem fs = rootDir.getFileSystem(TEST_UTIL.getConfiguration());
Path filePath = new Path(rootDir, HConstants.CLUSTER_ID_FILE_NAME);
FSDataOutputStream s = null;
try {
s = fs.create(filePath);
s.writeUTF(HBaseCommonTestingUtility.getRandomUUID().toString());
} finally {
if (s != null) {
s.close();
}
}
TEST_UTIL.startMiniHBaseCluster();
HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
int expected = LoadBalancer.isTablesOnMaster(TEST_UTIL.getConfiguration())? 2: 1;
assertEquals(expected, master.getServerManager().getOnlineServersList().size());
}
private void addToLocalResources(FileSystem fs, String fileSrcPath,
String fileDstPath, String appId, Map<String, LocalResource> localResources,
String resources) throws IOException {
String suffix = jstormClientContext.appName + JOYConstants.BACKLASH + appId + JOYConstants.BACKLASH + fileDstPath;
Path dst =
new Path(fs.getHomeDirectory(), suffix);
if (fileSrcPath == null) {
FSDataOutputStream ostream = null;
try {
ostream = FileSystem
.create(fs, dst, new FsPermission(JOYConstants.FS_PERMISSION));
ostream.writeUTF(resources);
} finally {
IOUtils.closeQuietly(ostream);
}
} else {
fs.copyFromLocalFile(new Path(fileSrcPath), dst);
}
FileStatus scFileStatus = fs.getFileStatus(dst);
LocalResource scRsrc =
LocalResource.newInstance(
ConverterUtils.getYarnUrlFromURI(dst.toUri()),
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
scFileStatus.getLen(), scFileStatus.getModificationTime());
localResources.put(fileDstPath, scRsrc);
}
@org.junit.Test
public void testTextWriteAndRead() throws Exception{
String testString="Is there anyone out there?";
String readChars=null;
FSDataOutputStream dfsOut=null;
dfsOut=fs.create(new Path("test1.txt"));
dfsOut.writeUTF(testString);
dfsOut.close();
FSDataInputStream dfsin=null;
dfsin=fs.open(new Path("test1.txt"));
readChars=dfsin.readUTF();
dfsin.close();
assertEquals(testString, readChars);
fs.delete(new Path("test1.txt"), true);
assertFalse(fs.exists(new Path("test1")));
}
private void addToLocalResources(FileSystem fs, String fileSrcPath,
String fileDstPath, int appId, Map<String, LocalResource> localResources,
String resources) throws IOException {
String suffix = appName + "/" + appId + "/" + fileDstPath;
Path dst =
new Path(fs.getHomeDirectory(), suffix);
if (fileSrcPath == null) {
FSDataOutputStream ostream = null;
try {
ostream = FileSystem
.create(fs, dst, new FsPermission((short) 0710));
ostream.writeUTF(resources);
} finally {
IOUtils.closeQuietly(ostream);
}
} else {
fs.copyFromLocalFile(new Path(fileSrcPath), dst);
}
FileStatus scFileStatus = fs.getFileStatus(dst);
LocalResource scRsrc =
LocalResource.newInstance(
ConverterUtils.getYarnUrlFromURI(dst.toUri()),
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
scFileStatus.getLen(), scFileStatus.getModificationTime());
localResources.put(fileDstPath, scRsrc);
}
public void append(HdfsConfig config, HdfsRule rule, Event event) {
try {
Configuration hadoopConfig = new Configuration();
FileSystem fileSystem = FileSystem.get(URI.create(this.address), hadoopConfig);
Path hdfsPath = new Path(fileName);
FSDataOutputStream fileOutputStream = null;
try {
if (fileSystem.exists(hdfsPath)) {
fileOutputStream = fileSystem.append(hdfsPath);
} else {
fileOutputStream = fileSystem.create(hdfsPath);
}
fileOutputStream.writeUTF(JSON.toJSONString(event));
} finally {
if (fileSystem != null) {
fileSystem.close();
}
if (fileOutputStream != null) {
fileOutputStream.close();
}
}
} catch (IOException e) {
}
}
public void append(HdfsConfig config, HdfsRule rule, Event event) {
try {
Configuration hadoopConfig = new Configuration();
FileSystem fileSystem = FileSystem.get(URI.create(this.address), hadoopConfig);
Path hdfsPath = new Path(fileName);
FSDataOutputStream fileOutputStream = null;
try {
if (fileSystem.exists(hdfsPath)) {
fileOutputStream = fileSystem.append(hdfsPath);
} else {
fileOutputStream = fileSystem.create(hdfsPath);
}
fileOutputStream.writeUTF(JSON.toJSONString(event));
} finally {
if (fileSystem != null) {
fileSystem.close();
}
if (fileOutputStream != null) {
fileOutputStream.close();
}
}
} catch (IOException e) {
}
}
/**
* To ensure there are not multiple instances of the SCM running on a given
* cluster, a global pid file is used. This file contains the hostname of the
* machine that owns the pid file.
*
* @return true if the pid file was written, false otherwise
* @throws YarnException
*/
private boolean writeGlobalCleanerPidFile() throws YarnException {
String root =
conf.get(YarnConfiguration.SHARED_CACHE_ROOT,
YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT);
Path pidPath = new Path(root, GLOBAL_CLEANER_PID);
try {
FileSystem fs = FileSystem.get(this.conf);
if (fs.exists(pidPath)) {
return false;
}
FSDataOutputStream os = fs.create(pidPath, false);
// write the hostname and the process id in the global cleaner pid file
final String ID = ManagementFactory.getRuntimeMXBean().getName();
os.writeUTF(ID);
os.close();
// add it to the delete-on-exit to ensure it gets deleted when the JVM
// exits
fs.deleteOnExit(pidPath);
} catch (IOException e) {
throw new YarnException(e);
}
LOG.info("Created the global cleaner pid file at " + pidPath.toString());
return true;
}
private void addToLocalResources(FileSystem fs, String fileSrcPath,
String fileDstPath, String appId, Map<String, LocalResource> localResources,
String resources) throws IOException {
String suffix =
appName + "/" + appId + "/" + fileDstPath;
Path dst =
new Path(fs.getHomeDirectory(), suffix);
if (fileSrcPath == null) {
FSDataOutputStream ostream = null;
try {
ostream = FileSystem
.create(fs, dst, new FsPermission((short) 0710));
ostream.writeUTF(resources);
} finally {
IOUtils.closeQuietly(ostream);
}
} else {
fs.copyFromLocalFile(new Path(fileSrcPath), dst);
}
FileStatus scFileStatus = fs.getFileStatus(dst);
LocalResource scRsrc =
LocalResource.newInstance(
ConverterUtils.getYarnUrlFromURI(dst.toUri()),
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
scFileStatus.getLen(), scFileStatus.getModificationTime());
localResources.put(fileDstPath, scRsrc);
}
/**
* To ensure there are not multiple instances of the SCM running on a given
* cluster, a global pid file is used. This file contains the hostname of the
* machine that owns the pid file.
*
* @return true if the pid file was written, false otherwise
* @throws YarnException
*/
private boolean writeGlobalCleanerPidFile() throws YarnException {
String root =
conf.get(YarnConfiguration.SHARED_CACHE_ROOT,
YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT);
Path pidPath = new Path(root, GLOBAL_CLEANER_PID);
try {
FileSystem fs = FileSystem.get(this.conf);
if (fs.exists(pidPath)) {
return false;
}
FSDataOutputStream os = fs.create(pidPath, false);
// write the hostname and the process id in the global cleaner pid file
final String ID = ManagementFactory.getRuntimeMXBean().getName();
os.writeUTF(ID);
os.close();
// add it to the delete-on-exit to ensure it gets deleted when the JVM
// exits
fs.deleteOnExit(pidPath);
} catch (IOException e) {
throw new YarnException(e);
}
LOG.info("Created the global cleaner pid file at " + pidPath.toString());
return true;
}
@Test
public void testSchemaEvolution() throws Exception {
final Configuration conf = new Configuration();
final Path inputPath = new Path("target/test/thrift/schema_evolution/in");
final Path parquetPath = new Path("target/test/thrift/schema_evolution/parquet");
final Path outputPath = new Path("target/test/thrift/schema_evolution/out");
final FileSystem fileSystem = parquetPath.getFileSystem(conf);
fileSystem.delete(inputPath, true);
final FSDataOutputStream in = fileSystem.create(inputPath);
in.writeUTF("Alice\nBob\nCharles\n");
in.close();
fileSystem.delete(parquetPath, true);
fileSystem.delete(outputPath, true);
{
write(conf, inputPath, new Path(parquetPath, "V1"), TestInputOutputFormat.SchemaEvolutionMapper1.class, StructV1.class);
write(conf, inputPath, new Path(parquetPath, "V2"), TestInputOutputFormat.SchemaEvolutionMapper2.class, StructV2.class);
write(conf, inputPath, new Path(parquetPath, "V3"), TestInputOutputFormat.SchemaEvolutionMapper3.class, StructV3.class);
}
{
final Job job = new Job(conf, "read");
job.setInputFormatClass(ParquetThriftInputFormat.class);
ParquetThriftInputFormat.setInputPaths(job, new Path(parquetPath, "*"));
ParquetThriftInputFormat.setThriftClass(job.getConfiguration(), StructV3.class);
job.setMapperClass(TestInputOutputFormat.SchemaEvolutionReadMapper.class);
job.setNumReduceTasks(0);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, outputPath);
waitForJob(job);
}
read(outputPath + "/part-m-00000", 3);
read(outputPath + "/part-m-00001", 3);
read(outputPath + "/part-m-00002", 3);
}
@Test
public void hdfsFileLoggerSinkAndTest() throws FileNotFoundException,
IOException {
List<Event> searchEvents = generateSearchAnalyticsDataService
.getSearchEvents(11);
DistributedFileSystem fs = hadoopClusterService.getFileSystem();
// /Write to file
Path outFile = new Path("/searchevents/event" + UUID.randomUUID());
FSDataOutputStream out = fs.create(outFile, false);
for (Event event : searchEvents) {
String eventString = new String(event.getBody(), "UTF-8");
System.out.println("Writing event string: " + eventString);
out.writeUTF(eventString + System.lineSeparator());
}
out.flush();
out.close();
// check the data is there...with standard file
FSDataInputStream input = fs.open(outFile);
try (BufferedReader br = new BufferedReader(new InputStreamReader(
input, "UTF-8"))) {
String line = null;
while ((line = br.readLine()) != null) {
System.out.println("HDFS file line is:" + line);
}
}
input.close();
fs.delete(outFile, true);
}
private static Path localizeDagPlanAsText(DAGPlan dagPB, FileSystem fs, AMConfiguration amConfig,
String strAppId, Path tezSysStagingPath) throws IOException {
Path textPath =
TezCommonUtils.getTezTextPlanStagingPath(tezSysStagingPath, strAppId, dagPB.getName());
FSDataOutputStream dagPBOutTextStream = null;
try {
dagPBOutTextStream = TezCommonUtils.createFileForAM(fs, textPath);
String dagPBStr = dagPB.toString();
int dagPBStrLen = dagPBStr.length();
if (dagPBStrLen <= UTF8_CHUNK_SIZE) {
dagPBOutTextStream.writeUTF(dagPBStr);
} else {
int startIndex = 0;
while (startIndex < dagPBStrLen) {
int endIndex = startIndex + UTF8_CHUNK_SIZE;
if (endIndex > dagPBStrLen) {
endIndex = dagPBStrLen;
}
dagPBOutTextStream.writeUTF(dagPBStr.substring(startIndex, endIndex));
startIndex += UTF8_CHUNK_SIZE;
}
}
} finally {
if (dagPBOutTextStream != null) {
dagPBOutTextStream.close();
}
}
return textPath;
}
private static Path localizeDagPlanAsText(DAGPlan dagPB, FileSystem fs, AMConfiguration amConfig,
String strAppId, Path tezSysStagingPath) throws IOException {
Path textPath = TezCommonUtils.getTezTextPlanStagingPath(tezSysStagingPath);
FSDataOutputStream dagPBOutTextStream = null;
try {
dagPBOutTextStream = TezCommonUtils.createFileForAM(fs, textPath);
String dagPBStr = dagPB.toString();
int dagPBStrLen = dagPBStr.length();
if (dagPBStrLen <= UTF8_CHUNK_SIZE) {
dagPBOutTextStream.writeUTF(dagPBStr);
} else {
int startIndex = 0;
while (startIndex < dagPBStrLen) {
int endIndex = startIndex + UTF8_CHUNK_SIZE;
if (endIndex > dagPBStrLen) {
endIndex = dagPBStrLen;
}
dagPBOutTextStream.writeUTF(dagPBStr.substring(startIndex, endIndex));
startIndex += UTF8_CHUNK_SIZE;
}
}
} finally {
if (dagPBOutTextStream != null) {
dagPBOutTextStream.close();
}
}
return textPath;
}
public void addHDFSContent(String content, Path path) {
try {
FSDataOutputStream fsDataOutputStream = fs.create(path);
fsDataOutputStream.writeUTF(content);
fsDataOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
protected static void addContentToFS(Path input, String... content) throws IOException {
FSDataOutputStream fsDataOutputStream = fs.create(input);
for (int i = 0; i < content.length; i++) {
fsDataOutputStream.writeUTF(content[i]);
}
IOUtils.closeQuietly(fsDataOutputStream);
}
@Test
public void testDfsClusterStart() throws Exception {
// Write a file to HDFS containing the test string
FileSystem hdfsFsHandle = dfsCluster.getHdfsFileSystemHandle();
FSDataOutputStream writer = hdfsFsHandle.create(
new Path(propertyParser.getProperty(ConfigVars.HDFS_TEST_FILE_KEY)));
writer.writeUTF(propertyParser.getProperty(ConfigVars.HDFS_TEST_STRING_KEY));
writer.close();
// Read the file and compare to test string
FSDataInputStream reader = hdfsFsHandle.open(
new Path(propertyParser.getProperty(ConfigVars.HDFS_TEST_FILE_KEY)));
assertEquals(reader.readUTF(), propertyParser.getProperty(ConfigVars.HDFS_TEST_STRING_KEY));
reader.close();
hdfsFsHandle.close();
URL url = new URL(
String.format( "http://localhost:%s/webhdfs/v1?op=GETHOMEDIRECTORY&user.name=guest",
propertyParser.getProperty( ConfigVars.HDFS_NAMENODE_HTTP_PORT_KEY ) ) );
URLConnection connection = url.openConnection();
connection.setRequestProperty( "Accept-Charset", "UTF-8" );
BufferedReader response = new BufferedReader( new InputStreamReader( connection.getInputStream() ) );
String line = response.readLine();
response.close();
assertEquals( "{\"Path\":\"/user/guest\"}", line );
}
@Test
public void testHdfs() throws Exception {
FileSystem hdfsFsHandle = hdfsLocalCluster.getHdfsFileSystemHandle();
UserGroupInformation.loginUserFromKeytab(kdcLocalCluster.getKrbPrincipalWithRealm("hdfs"), kdcLocalCluster.getKeytabForPrincipal("hdfs"));
assertTrue(UserGroupInformation.isSecurityEnabled());
assertTrue(UserGroupInformation.isLoginKeytabBased());
// Write a file to HDFS containing the test string
FSDataOutputStream writer = hdfsFsHandle.create(
new Path(propertyParser.getProperty(ConfigVars.HDFS_TEST_FILE_KEY)));
writer.writeUTF(propertyParser.getProperty(ConfigVars.HDFS_TEST_STRING_KEY));
writer.close();
// Read the file and compare to test string
FSDataInputStream reader = hdfsFsHandle.open(
new Path(propertyParser.getProperty(ConfigVars.HDFS_TEST_FILE_KEY)));
assertEquals(reader.readUTF(), propertyParser.getProperty(ConfigVars.HDFS_TEST_STRING_KEY));
reader.close();
// Log out
UserGroupInformation.getLoginUser().logoutUserFromKeytab();
UserGroupInformation.reset();
try {
Configuration conf = new Configuration();
UserGroupInformation.setConfiguration(conf);
FileSystem.get(hdfsFsHandle.getUri(), conf).open(
new Path(propertyParser.getProperty(ConfigVars.HDFS_TEST_FILE_KEY)));
fail();
} catch (AccessControlException e) {
LOG.info("Not authenticated!");
}
}
private void writeFileToHdfs(String fileName, String contents) throws Exception {
// Write a file to HDFS containing the test string
FileSystem hdfsFsHandle = dfsCluster.getHdfsFileSystemHandle();
FSDataOutputStream writer = hdfsFsHandle.create(new Path(fileName));
writer.writeUTF(contents);
writer.close();
hdfsFsHandle.close();
}
private void writeRandomBytes(FSDataOutputStream writer, int count) throws IOException {
for (int i = 0; i < count; i++) {
writer.writeUTF(UUID.randomUUID().toString());
}
}