下面列出了org.apache.hadoop.fs.FileUtil#chmod ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private File getFileCommand(String clazz) throws Exception {
String classpath = System.getProperty("java.class.path");
File fCommand = new File(workSpace + File.separator + "cache.sh");
fCommand.deleteOnExit();
if (!fCommand.getParentFile().exists()) {
fCommand.getParentFile().mkdirs();
}
fCommand.createNewFile();
OutputStream os = new FileOutputStream(fCommand);
os.write("#!/bin/sh \n".getBytes());
if (clazz == null) {
os.write(("ls ").getBytes());
} else {
os.write(("java -cp " + classpath + " " + clazz).getBytes());
}
os.flush();
os.close();
FileUtil.chmod(fCommand.getAbsolutePath(), "700");
return fCommand;
}
public void map(WritableComparable key, Writable value,
OutputCollector<WritableComparable, Writable> out, Reporter reporter)
throws IOException {
File subDir = new File(taskWorkDir, MY_DIR);
LOG.info("Child folder : " + subDir);
subDir.mkdirs();
File newFile = new File(subDir, MY_FILE);
LOG.info("Child file : " + newFile);
newFile.createNewFile();
// Set the permissions of my-test-dir and my-test-dir/my-test-file to 555
try {
FileUtil.chmod(subDir.getAbsolutePath(), "a=rx", true);
} catch (Exception e) {
throw new IOException(e);
}
}
/**
* Test that a volume that is considered failed on startup is seen as
* a failed volume by the NN.
*/
@Test
public void testFailedVolumeOnStartupIsCounted() throws Exception {
assumeTrue(!System.getProperty("os.name").startsWith("Windows"));
final DatanodeManager dm = cluster.getNamesystem().getBlockManager(
).getDatanodeManager();
long origCapacity = DFSTestUtil.getLiveDatanodeCapacity(dm);
File dir = new File(cluster.getInstanceStorageDir(0, 0), "current");
try {
prepareDirToFail(dir);
restartDatanodes(1, false);
// The cluster is up..
assertEquals(true, cluster.getDataNodes().get(0)
.isBPServiceAlive(cluster.getNamesystem().getBlockPoolId()));
// but there has been a single volume failure
DFSTestUtil.waitForDatanodeStatus(dm, 1, 0, 1,
origCapacity / 2, WAIT_FOR_HEARTBEATS);
} finally {
FileUtil.chmod(dir.toString(), "755");
}
}
/**
* Tests for a given volumes to be tolerated and volumes failed.
*/
private void testVolumeConfig(int volumesTolerated, int volumesFailed,
boolean expectedBPServiceState, boolean manageDfsDirs)
throws IOException, InterruptedException {
assumeTrue(!System.getProperty("os.name").startsWith("Windows"));
final int dnIndex = 0;
// Fail the current directory since invalid storage directory perms
// get fixed up automatically on datanode startup.
File[] dirs = {
new File(cluster.getInstanceStorageDir(dnIndex, 0), "current"),
new File(cluster.getInstanceStorageDir(dnIndex, 1), "current") };
try {
for (int i = 0; i < volumesFailed; i++) {
prepareDirToFail(dirs[i]);
}
restartDatanodes(volumesTolerated, manageDfsDirs);
assertEquals(expectedBPServiceState, cluster.getDataNodes().get(0)
.isBPServiceAlive(cluster.getNamesystem().getBlockPoolId()));
} finally {
for (File dir : dirs) {
FileUtil.chmod(dir.toString(), "755");
}
}
}
@Test(expected=IllegalStateException.class)
public void testFinalizeErrorReportedToNNStorage() throws IOException, InterruptedException {
File f = new File(TestEditLog.TEST_DIR + "/filejournaltestError");
// abort after 10th roll
NNStorage storage = setupEdits(Collections.<URI>singletonList(f.toURI()),
10, new AbortSpec(10, 0));
StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next();
FileJournalManager jm = new FileJournalManager(conf, sd, storage);
String sdRootPath = sd.getRoot().getAbsolutePath();
FileUtil.chmod(sdRootPath, "-w", true);
try {
jm.finalizeLogSegment(0, 1);
} finally {
FileUtil.chmod(sdRootPath, "+w", true);
assertTrue(storage.getRemovedStorageDirs().contains(sd));
}
}
private File[] cleanTokenPasswordFile() throws Exception {
File[] result = new File[2];
result[0] = new File("./jobTokenPassword");
if (result[0].exists()) {
FileUtil.chmod(result[0].getAbsolutePath(), "700");
assertTrue(result[0].delete());
}
result[1] = new File("./.jobTokenPassword.crc");
if (result[1].exists()) {
FileUtil.chmod(result[1].getAbsolutePath(), "700");
result[1].delete();
}
return result;
}
@Test
public void testToleratesSomeUnwritableVolumes() throws Throwable {
FileSystem localFileSystem = FileSystem.getLocal(new Configuration());
String[] vols = new String[]{TEST_ROOT_DIR + "/0",
TEST_ROOT_DIR + "/1"};
assertTrue(new File(vols[0]).mkdirs());
assertEquals(0, FileUtil.chmod(vols[0], "400")); // read only
try {
new MRAsyncDiskService(localFileSystem, vols);
} finally {
FileUtil.chmod(vols[0], "755"); // make writable again
}
}
@Before
public void setup() throws Exception {
files = FileContext.getLocalFSFileContext();
Path workSpacePath = new Path(workSpace.getAbsolutePath());
files.mkdir(workSpacePath, null, true);
FileUtil.chmod(workSpace.getAbsolutePath(), "777");
File localDir = new File(workSpace.getAbsoluteFile(), "localDir");
files.mkdir(new Path(localDir.getAbsolutePath()),
new FsPermission("777"), false);
File logDir = new File(workSpace.getAbsoluteFile(), "logDir");
files.mkdir(new Path(logDir.getAbsolutePath()),
new FsPermission("777"), false);
String exec_path = System.getProperty("container-executor.path");
if(exec_path != null && !exec_path.isEmpty()) {
conf = new Configuration(false);
conf.setClass("fs.AbstractFileSystem.file.impl",
org.apache.hadoop.fs.local.LocalFs.class,
org.apache.hadoop.fs.AbstractFileSystem.class);
conf.set(YarnConfiguration.NM_NONSECURE_MODE_LOCAL_USER_KEY, "xuan");
LOG.info("Setting "+YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH
+"="+exec_path);
conf.set(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, exec_path);
exec = new LinuxContainerExecutor();
exec.setConf(conf);
conf.set(YarnConfiguration.NM_LOCAL_DIRS, localDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_LOG_DIRS, logDir.getAbsolutePath());
dirsHandler = new LocalDirsHandlerService();
dirsHandler.init(conf);
}
appSubmitter = System.getProperty("application.submitter");
if(appSubmitter == null || appSubmitter.isEmpty()) {
appSubmitter = "nobody";
}
}
private File[] cleanTokenPasswordFile() throws Exception {
File[] result = new File[2];
result[0] = new File("./jobTokenPassword");
if (result[0].exists()) {
FileUtil.chmod(result[0].getAbsolutePath(), "700");
assertTrue(result[0].delete());
}
result[1] = new File("./.jobTokenPassword.crc");
if (result[1].exists()) {
FileUtil.chmod(result[1].getAbsolutePath(), "700");
result[1].delete();
}
return result;
}
@Test
public void testToleratesSomeUnwritableVolumes() throws Throwable {
FileSystem localFileSystem = FileSystem.getLocal(new Configuration());
String[] vols = new String[]{TEST_ROOT_DIR + "/0",
TEST_ROOT_DIR + "/1"};
assertTrue(new File(vols[0]).mkdirs());
assertEquals(0, FileUtil.chmod(vols[0], "400")); // read only
try {
new MRAsyncDiskService(localFileSystem, vols);
} finally {
FileUtil.chmod(vols[0], "755"); // make writable again
}
}
/**
* Enables the task for cleanup by changing permissions of the specified path
* in the local filesystem
*/
@Override
void enableTaskForCleanup(PathDeletionContext context)
throws IOException {
try {
FileUtil.chmod(context.fullPath, "a+rwx", true);
} catch(InterruptedException e) {
LOG.warn("Interrupted while setting permissions for " + context.fullPath +
" for deletion.");
} catch(IOException ioe) {
LOG.warn("Unable to change permissions of " + context.fullPath);
}
}
private void changeDirectoryPermissions(String dir, String mode,
boolean isRecursive) {
int ret = 0;
try {
ret = FileUtil.chmod(dir, mode, isRecursive);
} catch (Exception e) {
LOG.warn("Exception in changing permissions for directory " + dir +
". Exception: " + e.getMessage());
}
if (ret != 0) {
LOG.warn("Could not change permissions for directory " + dir);
}
}
@Before
public void setup() throws Exception {
files = FileContext.getLocalFSFileContext();
Path workSpacePath = new Path(workSpace.getAbsolutePath());
files.mkdir(workSpacePath, null, true);
FileUtil.chmod(workSpace.getAbsolutePath(), "777");
File localDir = new File(workSpace.getAbsoluteFile(), "localDir");
files.mkdir(new Path(localDir.getAbsolutePath()), new FsPermission("777"),
false);
File logDir = new File(workSpace.getAbsoluteFile(), "logDir");
files.mkdir(new Path(logDir.getAbsolutePath()), new FsPermission("777"),
false);
String exec_path = System.getProperty("container-executor.path");
if (exec_path != null && !exec_path.isEmpty()) {
conf = new Configuration(false);
conf.setClass("fs.AbstractFileSystem.file.impl",
org.apache.hadoop.fs.local.LocalFs.class,
org.apache.hadoop.fs.AbstractFileSystem.class);
appSubmitter = System.getProperty("application.submitter");
if (appSubmitter == null || appSubmitter.isEmpty()) {
appSubmitter = "nobody";
}
conf.set(YarnConfiguration.NM_NONSECURE_MODE_LOCAL_USER_KEY, appSubmitter);
LOG.info("Setting " + YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH
+ "=" + exec_path);
conf.set(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, exec_path);
exec = new LinuxContainerExecutor();
exec.setConf(conf);
conf.set(YarnConfiguration.NM_LOCAL_DIRS, localDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_LOG_DIRS, logDir.getAbsolutePath());
dirsHandler = new LocalDirsHandlerService();
dirsHandler.init(conf);
List<String> localDirs = dirsHandler.getLocalDirs();
for (String dir : localDirs) {
Path userDir = new Path(dir, ContainerLocalizer.USERCACHE);
files.mkdir(userDir, new FsPermission("777"), false);
// $local/filecache
Path fileDir = new Path(dir, ContainerLocalizer.FILECACHE);
files.mkdir(fileDir, new FsPermission("777"), false);
}
}
}
/**
* Start the child process to handle the task for us.
* @param conf the task's configuration
* @param recordReader the fake record reader to update progress with
* @param output the collector to send output to
* @param reporter the reporter for the task
* @param outputKeyClass the class of the output keys
* @param outputValueClass the class of the output values
* @throws IOException
* @throws InterruptedException
*/
Application(JobConf conf,
RecordReader<FloatWritable, NullWritable> recordReader,
OutputCollector<K2,V2> output, Reporter reporter,
Class<? extends K2> outputKeyClass,
Class<? extends V2> outputValueClass
) throws IOException, InterruptedException {
serverSocket = new ServerSocket(0);
Map<String, String> env = new HashMap<String,String>();
// add TMPDIR environment variable with the value of java.io.tmpdir
env.put("TMPDIR", System.getProperty("java.io.tmpdir"));
env.put(Submitter.PORT,
Integer.toString(serverSocket.getLocalPort()));
//Add token to the environment if security is enabled
Token<JobTokenIdentifier> jobToken = TokenCache.getJobToken(conf
.getCredentials());
// This password is used as shared secret key between this application and
// child pipes process
byte[] password = jobToken.getPassword();
String localPasswordFile = new File(".") + Path.SEPARATOR
+ "jobTokenPassword";
writePasswordToLocalFile(localPasswordFile, password, conf);
env.put("hadoop.pipes.shared.secret.location", localPasswordFile);
List<String> cmd = new ArrayList<String>();
String interpretor = conf.get(Submitter.INTERPRETOR);
if (interpretor != null) {
cmd.add(interpretor);
}
String executable = DistributedCache.getLocalCacheFiles(conf)[0].toString();
if (!FileUtil.canExecute(new File(executable))) {
// LinuxTaskController sets +x permissions on all distcache files already.
// In case of DefaultTaskController, set permissions here.
FileUtil.chmod(executable, "u+x");
}
cmd.add(executable);
// wrap the command in a stdout/stderr capture
// we are starting map/reduce task of the pipes job. this is not a cleanup
// attempt.
TaskAttemptID taskid =
TaskAttemptID.forName(conf.get(MRJobConfig.TASK_ATTEMPT_ID));
File stdout = TaskLog.getTaskLogFile(taskid, false, TaskLog.LogName.STDOUT);
File stderr = TaskLog.getTaskLogFile(taskid, false, TaskLog.LogName.STDERR);
long logLength = TaskLog.getTaskLogLength(conf);
cmd = TaskLog.captureOutAndError(null, cmd, stdout, stderr, logLength,
false);
process = runClient(cmd, env);
clientSocket = serverSocket.accept();
String challenge = getSecurityChallenge();
String digestToSend = createDigest(password, challenge);
String digestExpected = createDigest(password, digestToSend);
handler = new OutputHandler<K2, V2>(output, reporter, recordReader,
digestExpected);
K2 outputKey = (K2)
ReflectionUtils.newInstance(outputKeyClass, conf);
V2 outputValue = (V2)
ReflectionUtils.newInstance(outputValueClass, conf);
downlink = new BinaryProtocol<K1, V1, K2, V2>(clientSocket, handler,
outputKey, outputValue, conf);
downlink.authenticate(digestToSend, challenge);
waitForAuthentication();
LOG.debug("Authentication succeeded");
downlink.start();
downlink.setJobConf(conf);
}
public void configure(JobConf job) {
try {
String argv = getPipeCommand(job);
joinDelay_ = job.getLong("stream.joindelay.milli", 0);
job_ = job;
fs_ = FileSystem.get(job_);
String mapOutputFieldSeparator = job_.get("stream.map.output.field.separator", "\t");
String reduceOutputFieldSeparator = job_.get("stream.reduce.output.field.separator", "\t");
this.mapOutputFieldSeparator = mapOutputFieldSeparator.charAt(0);
this.reduceOutFieldSeparator = reduceOutputFieldSeparator.charAt(0);
this.numOfMapOutputKeyFields = job_.getInt("stream.num.map.output.key.fields", 1);
this.numOfReduceOutputKeyFields = job_.getInt("stream.num.reduce.output.key.fields", 1);
maxErrorBytes = job.getLong("stream.error.maxbytes", 100000);
doPipe_ = getDoPipe();
if (!doPipe_) return;
setStreamJobDetails(job);
String[] argvSplit = splitArgs(argv);
String prog = argvSplit[0];
File currentDir = new File(".").getAbsoluteFile();
File jobCacheDir = new File(currentDir.getParentFile().getParent(), "work");
if (new File(prog).isAbsolute()) {
// we don't own it. Hope it is executable
} else {
FileUtil.chmod(new File(jobCacheDir, prog).toString(), "a+x");
}
//
// argvSplit[0]:
// An absolute path should be a preexisting valid path on all TaskTrackers
// A relative path is converted into an absolute pathname by looking
// up the PATH env variable. If it still fails, look it up in the
// tasktracker's local working directory
//
if (!new File(argvSplit[0]).isAbsolute()) {
PathFinder finder = new PathFinder("PATH");
finder.prependPathComponent(jobCacheDir.toString());
File f = finder.getAbsolutePath(argvSplit[0]);
if (f != null) {
argvSplit[0] = f.getAbsolutePath();
}
f = null;
}
// Wrap the stream program in a wrapper that allows admins to control
// streaming job environment
String wrapper = job.get("stream.wrapper");
if(wrapper != null) {
String [] wrapComponents = splitArgs(wrapper);
int totallength = wrapComponents.length + argvSplit.length;
String [] finalArgv = new String [totallength];
for(int i=0; i<wrapComponents.length; i++) {
finalArgv[i] = wrapComponents[i];
}
for(int i=0; i<argvSplit.length; i++) {
finalArgv[wrapComponents.length+i] = argvSplit[i];
}
argvSplit = finalArgv;
}
logprintln("PipeMapRed exec " + Arrays.asList(argvSplit));
Environment childEnv = (Environment) StreamUtil.env().clone();
addJobConfToEnvironment(job_, childEnv);
addEnvironment(childEnv, job_.get("stream.addenvironment"));
sim = Runtime.getRuntime().exec(argvSplit, childEnv.toArray());
/* // This way required jdk1.5
Builder processBuilder = new ProcessBuilder(argvSplit);
Map<String, String> env = processBuilder.environment();
addEnvironment(env, job_.get("stream.addenvironment"));
sim = processBuilder.start();
*/
clientOut_ = new DataOutputStream(new BufferedOutputStream(sim.getOutputStream()));
clientIn_ = new DataInputStream(new BufferedInputStream(sim.getInputStream()));
clientErr_ = new DataInputStream(new BufferedInputStream(sim.getErrorStream()));
startTime_ = System.currentTimeMillis();
} catch (Exception e) {
logStackTrace(e);
LOG.error("configuration exception", e);
throw new RuntimeException("configuration exception", e);
}
}
public void configure(JobConf job) {
try {
String argv = getPipeCommand(job);
joinDelay_ = job.getLong("stream.joindelay.milli", 0);
job_ = job;
fs_ = FileSystem.get(job_);
nonZeroExitIsFailure_ = job_.getBoolean("stream.non.zero.exit.is.failure", true);
doPipe_ = getDoPipe();
if (!doPipe_) return;
setStreamJobDetails(job);
String[] argvSplit = splitArgs(argv);
String prog = argvSplit[0];
File currentDir = new File(".").getAbsoluteFile();
if (new File(prog).isAbsolute()) {
// we don't own it. Hope it is executable
} else {
FileUtil.chmod(new File(currentDir, prog).toString(), "a+x");
}
//
// argvSplit[0]:
// An absolute path should be a preexisting valid path on all TaskTrackers
// A relative path is converted into an absolute pathname by looking
// up the PATH env variable. If it still fails, look it up in the
// tasktracker's local working directory
//
if (!new File(argvSplit[0]).isAbsolute()) {
PathFinder finder = new PathFinder("PATH");
finder.prependPathComponent(currentDir.toString());
File f = finder.getAbsolutePath(argvSplit[0]);
if (f != null) {
argvSplit[0] = f.getAbsolutePath();
}
f = null;
}
logprintln("PipeMapRed exec " + Arrays.asList(argvSplit));
Environment childEnv = (Environment) StreamUtil.env().clone();
addJobConfToEnvironment(job_, childEnv);
addEnvironment(childEnv, job_.get("stream.addenvironment"));
// add TMPDIR environment variable with the value of java.io.tmpdir
envPut(childEnv, "TMPDIR", System.getProperty("java.io.tmpdir"));
// Start the process
ProcessBuilder builder = new ProcessBuilder(argvSplit);
builder.environment().putAll(childEnv.toMap());
sim = builder.start();
clientOut_ = new DataOutputStream(new BufferedOutputStream(
sim.getOutputStream(),
BUFFER_SIZE));
clientIn_ = new DataInputStream(new BufferedInputStream(
sim.getInputStream(),
BUFFER_SIZE));
clientErr_ = new DataInputStream(new BufferedInputStream(sim.getErrorStream()));
startTime_ = System.currentTimeMillis();
errThread_ = new MRErrorThread();
errThread_.start();
} catch (Exception e) {
logStackTrace(e);
LOG.error("configuration exception", e);
throw new RuntimeException("configuration exception", e);
}
}
/**
* Test case where two directories are configured as NAME_AND_EDITS
* and one of them fails to save storage. Since the edits and image
* failure states are decoupled, the failure of image saving should
* not prevent the purging of logs from that dir.
*/
@Test
public void testPurgingWithNameEditsDirAfterFailure()
throws Exception {
MiniDFSCluster cluster = null;
Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY, 0);
File sd0 = new File(TEST_ROOT_DIR, "nn0");
File sd1 = new File(TEST_ROOT_DIR, "nn1");
File cd0 = new File(sd0, "current");
File cd1 = new File(sd1, "current");
conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
Joiner.on(",").join(sd0, sd1));
try {
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(0)
.manageNameDfsDirs(false)
.format(true).build();
NameNode nn = cluster.getNameNode();
doSaveNamespace(nn);
LOG.info("After first save, images 0 and 2 should exist in both dirs");
assertGlobEquals(cd0, "fsimage_\\d*",
getImageFileName(0), getImageFileName(2));
assertGlobEquals(cd1, "fsimage_\\d*",
getImageFileName(0), getImageFileName(2));
assertGlobEquals(cd0, "edits_.*",
getFinalizedEditsFileName(1, 2),
getInProgressEditsFileName(3));
assertGlobEquals(cd1, "edits_.*",
getFinalizedEditsFileName(1, 2),
getInProgressEditsFileName(3));
doSaveNamespace(nn);
LOG.info("After second save, image 0 should be purged, " +
"and image 4 should exist in both.");
assertGlobEquals(cd0, "fsimage_\\d*",
getImageFileName(2), getImageFileName(4));
assertGlobEquals(cd1, "fsimage_\\d*",
getImageFileName(2), getImageFileName(4));
assertGlobEquals(cd0, "edits_.*",
getFinalizedEditsFileName(3, 4),
getInProgressEditsFileName(5));
assertGlobEquals(cd1, "edits_.*",
getFinalizedEditsFileName(3, 4),
getInProgressEditsFileName(5));
LOG.info("Failing first storage dir by chmodding it");
assertEquals(0, FileUtil.chmod(cd0.getAbsolutePath(), "000"));
doSaveNamespace(nn);
LOG.info("Restoring accessibility of first storage dir");
assertEquals(0, FileUtil.chmod(cd0.getAbsolutePath(), "755"));
LOG.info("nothing should have been purged in first storage dir");
assertGlobEquals(cd0, "fsimage_\\d*",
getImageFileName(2), getImageFileName(4));
assertGlobEquals(cd0, "edits_.*",
getFinalizedEditsFileName(3, 4),
getInProgressEditsFileName(5));
LOG.info("fsimage_2 should be purged in second storage dir");
assertGlobEquals(cd1, "fsimage_\\d*",
getImageFileName(4), getImageFileName(6));
assertGlobEquals(cd1, "edits_.*",
getFinalizedEditsFileName(5, 6),
getInProgressEditsFileName(7));
LOG.info("On next save, we should purge logs from the failed dir," +
" but not images, since the image directory is in failed state.");
doSaveNamespace(nn);
assertGlobEquals(cd1, "fsimage_\\d*",
getImageFileName(6), getImageFileName(8));
assertGlobEquals(cd1, "edits_.*",
getFinalizedEditsFileName(7, 8),
getInProgressEditsFileName(9));
assertGlobEquals(cd0, "fsimage_\\d*",
getImageFileName(2), getImageFileName(4));
assertGlobEquals(cd0, "edits_.*",
getInProgressEditsFileName(9));
} finally {
FileUtil.chmod(cd0.getAbsolutePath(), "755");
LOG.info("Shutting down...");
if (cluster != null) {
cluster.shutdown();
}
}
}
/**
* Basic unit test using all default values (except for the path) on an in-memory DFS cluster.
*/
@Test
public void testUnauthorizedOverwrite() throws IOException, URISyntaxException {
Path parent = new Path(mini.newFolder().toString());
Path dst = new Path(parent, "output");
String fileSpec = mini.getLocalFs().getUri().resolve(dst.toUri()).toString();
// Write something to the file before trying to run.
try (OutputStream out = mini.getLocalFs().create(new Path(dst, "part-00000"))) {
out.write(0);
}
// Ensure that the destination is unwritable.
FileUtil.chmod(dst.toUri().toString(), "000", true);
// Trying to overwrite an unmodifiable destination throws an exception.
thrown.expect(TalendRuntimeException.class);
thrown.expect(hasProperty("code", is(SimpleFileIOErrorCode.OUTPUT_NOT_AUTHORIZED)));
thrown.expectMessage("Can not write to " + fileSpec
+ ". Please check user permissions or existence of base directory.");
// Now try using the component.
try {
// Configure the component.
SimpleFileIOOutputProperties props = SimpleFileIOOutputRuntimeTest.createOutputComponentProperties();
props.getDatasetProperties().path.setValue(fileSpec);
props.overwrite.setValue(true);
// Create the runtime.
SimpleFileIOOutputRuntime runtime = new SimpleFileIOOutputRuntime();
runtime.initialize(null, props);
// Use the runtime in a direct pipeline to test.
final Pipeline p = beam.createPipeline();
PCollection<IndexedRecord> input = p.apply( //
Create.of(ConvertToIndexedRecord.convertToAvro(new String[] { "1", "one" }), //
ConvertToIndexedRecord.convertToAvro(new String[] { "2", "two" }))); //
input.apply(runtime);
// And run the test.
runtime.runAtDriver(null);
p.run().waitUntilFinish();
} catch (Pipeline.PipelineExecutionException e) {
if (e.getCause() instanceof TalendRuntimeException)
throw (TalendRuntimeException) e.getCause();
throw e;
}
}
/**
* Start the child process to handle the task for us.
* @param conf the task's configuration
* @param recordReader the fake record reader to update progress with
* @param output the collector to send output to
* @param reporter the reporter for the task
* @param outputKeyClass the class of the output keys
* @param outputValueClass the class of the output values
* @throws IOException
* @throws InterruptedException
*/
Application(JobConf conf,
RecordReader<FloatWritable, NullWritable> recordReader,
OutputCollector<K2,V2> output, Reporter reporter,
Class<? extends K2> outputKeyClass,
Class<? extends V2> outputValueClass
) throws IOException, InterruptedException {
serverSocket = new ServerSocket(0);
Map<String, String> env = new HashMap<String,String>();
// add TMPDIR environment variable with the value of java.io.tmpdir
env.put("TMPDIR", System.getProperty("java.io.tmpdir"));
env.put("hadoop.pipes.command.port",
Integer.toString(serverSocket.getLocalPort()));
List<String> cmd = new ArrayList<String>();
String interpretor = conf.get("hadoop.pipes.executable.interpretor");
if (interpretor != null) {
cmd.add(interpretor);
}
String executable = DistributedCache.getLocalCacheFiles(conf)[0].toString();
FileUtil.chmod(executable, "a+x");
cmd.add(executable);
// wrap the command in a stdout/stderr capture
TaskAttemptID taskid = TaskAttemptID.forName(conf.get("mapred.task.id"));
File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
long logLength = TaskLog.getTaskLogLength(conf);
cmd = TaskLog.captureOutAndError(null, cmd, stdout, stderr, logLength,
false);
process = runClient(cmd, env);
clientSocket = serverSocket.accept();
handler = new OutputHandler<K2, V2>(output, reporter, recordReader);
K2 outputKey = (K2)
ReflectionUtils.newInstance(outputKeyClass, conf);
V2 outputValue = (V2)
ReflectionUtils.newInstance(outputValueClass, conf);
downlink = new BinaryProtocol<K1, V1, K2, V2>(clientSocket, handler,
outputKey, outputValue, conf);
downlink.start();
downlink.setJobConf(conf);
}
public void configure(JobConf job) {
try {
String argv = getPipeCommand(job);
joinDelay_ = job.getLong("stream.joindelay.milli", 0);
job_ = job;
fs_ = FileSystem.get(job_);
nonZeroExitIsFailure_ = job_.getBoolean("stream.non.zero.exit.is.failure", true);
doPipe_ = getDoPipe();
if (!doPipe_) return;
setStreamJobDetails(job);
String[] argvSplit = splitArgs(argv);
String prog = argvSplit[0];
File currentDir = new File(".").getAbsoluteFile();
if (new File(prog).isAbsolute()) {
// we don't own it. Hope it is executable
} else {
FileUtil.chmod(new File(currentDir, prog).toString(), "a+x");
}
//
// argvSplit[0]:
// An absolute path should be a preexisting valid path on all TaskTrackers
// A relative path is converted into an absolute pathname by looking
// up the PATH env variable. If it still fails, look it up in the
// tasktracker's local working directory
//
if (!new File(argvSplit[0]).isAbsolute()) {
PathFinder finder = new PathFinder("PATH");
finder.prependPathComponent(currentDir.toString());
File f = finder.getAbsolutePath(argvSplit[0]);
if (f != null) {
argvSplit[0] = f.getAbsolutePath();
}
f = null;
}
logprintln("PipeMapRed exec " + Arrays.asList(argvSplit));
Environment childEnv = (Environment) StreamUtil.env().clone();
addJobConfToEnvironment(job_, childEnv);
addEnvironment(childEnv, job_.get("stream.addenvironment"));
// add TMPDIR environment variable with the value of java.io.tmpdir
envPut(childEnv, "TMPDIR", System.getProperty("java.io.tmpdir"));
// Start the process
ProcessBuilder builder = new ProcessBuilder(argvSplit);
builder.environment().putAll(childEnv.toMap());
sim = builder.start();
clientOut_ = new DataOutputStream(new BufferedOutputStream(
sim.getOutputStream(),
BUFFER_SIZE));
clientIn_ = new DataInputStream(new BufferedInputStream(
sim.getInputStream(),
BUFFER_SIZE));
clientErr_ = new DataInputStream(new BufferedInputStream(sim.getErrorStream()));
startTime_ = System.currentTimeMillis();
errThread_ = new MRErrorThread();
errThread_.start();
} catch (Exception e) {
logStackTrace(e);
LOG.error("configuration exception", e);
throw new RuntimeException("configuration exception", e);
}
}