下面列出了org.apache.hadoop.mapred.FileAlreadyExistsException#org.apache.tez.client.TezClient 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private static SessionInfo createSession(Configuration conf,
Map<String, LocalResource> requestedAMResources, Credentials creds,
TezJobConfig tezJobConf) throws TezException, IOException,
InterruptedException {
TezConfiguration amConf = MRToTezHelper.getDAGAMConfFromMRConf(conf);
TezScriptState ss = TezScriptState.get();
ss.addDAGSettingsToConf(amConf);
adjustAMConfig(amConf, tezJobConf);
String jobName = conf.get(PigContext.JOB_NAME, "pig");
TezClient tezClient = TezClient.create(jobName, amConf, true, requestedAMResources, creds);
tezClient.start();
TezAppMasterStatus appMasterStatus = tezClient.getAppMasterStatus();
if (appMasterStatus.equals(TezAppMasterStatus.SHUTDOWN)) {
throw new RuntimeException("TezSession has already shutdown");
}
tezClient.waitTillReady();
return new SessionInfo(tezClient, requestedAMResources);
}
void runDAGAndVerify(DAG dag, DAGStatus.State finalState,
TezClient session) throws Exception {
session.waitTillReady();
DAGClient dagClient = session.submitDAG(dag);
DAGStatus dagStatus = dagClient.getDAGStatus(null);
while (!dagStatus.isCompleted()) {
LOG.info("Waiting for dag to complete. Sleeping for 500ms."
+ " DAG name: " + dag.getName()
+ " DAG appId: " + dagClient.getApplicationId()
+ " Current state: " + dagStatus.getState());
Thread.sleep(100);
dagStatus = dagClient.getDAGStatus(null);
}
Assert.assertEquals(finalState, dagStatus.getState());
}
@Test(timeout=120000)
public void testSessionDisableMultiAttempts() throws Exception {
tezSession.stop();
Path remoteStagingDir = remoteFs.makeQualified(new Path(TEST_ROOT_DIR, String
.valueOf(new Random().nextInt(100000))));
TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
TezConfiguration tezConf = createSessionConfig(remoteStagingDir);
tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
tezConf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, false);
TezClient session = new TezClient("TestDAGRecovery2SingleAttemptOnly", tezConf);
session.start();
// DAG should fail as it never completes on the first attempt
DAG dag = MultiAttemptDAG.createDAG("TestSingleAttemptDAG", null);
runDAGAndVerify(dag, State.FAILED, session);
session.stop();
}
@Test(timeout = 20000)
public void testFatalErrorReported() throws IOException, TezException, InterruptedException {
TezClient tezClient = getTezClient("testFatalErrorReported");
DAGClient dagClient = null;
try {
FailingProcessor.configureForFatalFail();
DAG dag = DAG.create("testFatalErrorReportedDag").addVertex(
Vertex
.create(VERTEX_NAME, ProcessorDescriptor.create(FailingProcessor.class.getName()), 1));
dagClient = tezClient.submitDAG(dag);
dagClient.waitForCompletion();
assertEquals(DAGStatus.State.FAILED, dagClient.getDAGStatus(null).getState());
assertEquals(1, dagClient.getVertexStatus(VERTEX_NAME, null).getProgress().getFailedTaskAttemptCount());
} finally {
if (dagClient != null) {
dagClient.close();
}
tezClient.stop();
}
}
@Override
public JobStatus getJobStatus(JobID jobID) throws IOException,
InterruptedException {
String user = UserGroupInformation.getCurrentUser().getShortUserName();
String jobFile = MRApps.getJobFile(conf, user, jobID);
DAGStatus dagStatus;
try {
if(dagClient == null) {
dagClient = TezClient.getDAGClient(TypeConverter.toYarn(jobID).getAppId(), tezConf);
}
dagStatus = dagClient.getDAGStatus(null);
return new DAGJobStatus(dagClient.getApplicationReport(), dagStatus, jobFile);
} catch (TezException e) {
throw new IOException(e);
}
}
@Test (timeout = 150000)
public void testNonFatalErrors() throws IOException, TezException, InterruptedException {
String methodName = "testNonFatalErrors";
TezConfiguration tezClientConf = new TezConfiguration(extServiceTestHelper.getConfForJobs());
TezClient tezClient = TezClient
.newBuilder(TestExternalTezServicesErrors.class.getSimpleName() + methodName + "_session",
tezClientConf)
.setIsSession(true).setServicePluginDescriptor(servicePluginsDescriptor).build();
try {
tezClient.start();
LOG.info("TezSessionStarted for " + methodName);
tezClient.waitTillReady();
LOG.info("TezSession ready for submission for " + methodName);
runAndVerifyForNonFatalErrors(tezClient, SUFFIX_LAUNCHER, EXECUTION_CONTEXT_LAUNCHER_REPORT_NON_FATAL);
runAndVerifyForNonFatalErrors(tezClient, SUFFIX_TASKCOMM, EXECUTION_CONTEXT_TASKCOMM_REPORT_NON_FATAL);
runAndVerifyForNonFatalErrors(tezClient, SUFFIX_SCHEDULER, EXECUTION_CONTEXT_SCHEDULER_REPORT_NON_FATAL);
} finally {
tezClient.stop();
}
}
private void startSessionClient() throws Exception {
LOG.info("Starting session");
tezConf = new TezConfiguration();
tezConf.setInt(TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, 0);
tezConf
.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
tezConf.setInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, 4);
tezConf.setBoolean(TezConfiguration.TEZ_AM_ONE_TO_ONE_ROUTING_USE_ON_DEMAND_ROUTING, true);
tezConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, 500);
tezConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, " -Xmx256m");
tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
// for local mode
tezConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
tezConf.set("fs.defaultFS", "file:///");
tezConf.setBoolean(
TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
tezSession = TezClient.create("TestExceptionPropagation", tezConf);
tezSession.start();
}
/**
* Utility method to use the example from within code or a test.
*
* @param conf the tez configuration instance which will be used to crate the DAG and
* possible the Tez Client.
* @param args arguments to the example
* @param tezClient an existing running {@link org.apache.tez.client.TezClient} instance if one
* exists. If no TezClient is specified (null), one will be created based on the
* provided configuration. If TezClient is specified, local mode option can not been
* specified in arguments, it takes no effect.
* @return Zero indicates success, non-zero indicates failure
* @throws Exception
*/
public int run(TezConfiguration conf, String[] args, @Nullable TezClient tezClient) throws
Exception {
setConf(conf);
hadoopShim = new HadoopShimsLoader(conf).getHadoopShim();
GenericOptionsParser optionParser = new GenericOptionsParser(conf, getExtraOptions(), args);
if (optionParser.getCommandLine().hasOption(LOCAL_MODE)) {
isLocalMode = true;
if (tezClient != null) {
throw new RuntimeException("can't specify local mode when TezClient is created, it takes no effect");
}
}
if (optionParser.getCommandLine().hasOption(DISABLE_SPLIT_GROUPING)) {
disableSplitGrouping = true;
}
if (optionParser.getCommandLine().hasOption(COUNTER_LOG)) {
isCountersLog = true;
}
if (optionParser.getCommandLine().hasOption(GENERATE_SPLIT_IN_CLIENT)) {
generateSplitInClient = true;
}
String[] otherArgs = optionParser.getRemainingArgs();
return _execute(otherArgs, conf, tezClient);
}
@Test(timeout=120000)
public void testSessionDisableMultiAttempts() throws Exception {
tezSession.stop();
Path remoteStagingDir = remoteFs.makeQualified(new Path(TEST_ROOT_DIR, String
.valueOf(new Random().nextInt(100000))));
TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
TezConfiguration tezConf = createSessionConfig(remoteStagingDir);
tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
tezConf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, false);
TezClient session = TezClient.create("TestDAGRecovery2SingleAttemptOnly", tezConf);
session.start();
// DAG should fail as it never completes on the first attempt
DAG dag = MultiAttemptDAG.createDAG("TestSingleAttemptDAG", null);
runDAGAndVerify(dag, State.FAILED, session);
session.stop();
}
@Test(timeout = 20000)
public void testNoSysExitOnFailinglDAG() throws TezException, InterruptedException,
IOException {
TezConfiguration tezConf1 = createConf();
// Run in non-session mode so that the AM terminates
TezClient tezClient1 = TezClient.create("commonName", tezConf1, false);
tezClient1.start();
DAG dag1 = createSimpleDAG("dag1", FailingProcessor.class.getName());
DAGClient dagClient1 = tezClient1.submitDAG(dag1);
dagClient1.waitForCompletion();
assertEquals(DAGStatus.State.FAILED, dagClient1.getDAGStatus(null).getState());
// Sleep for more time than is required for the DAG to complete.
Thread.sleep((long) (TezConstants.TEZ_DAG_SLEEP_TIME_BEFORE_EXIT * 1.5));
dagClient1.close();
tezClient1.stop();
}
@Test(timeout = 20000)
public void testNoSysExitOnSuccessfulDAG() throws TezException, InterruptedException,
IOException {
TezConfiguration tezConf1 = createConf();
// Run in non-session mode so that the AM terminates
TezClient tezClient1 = TezClient.create("commonName", tezConf1, false);
tezClient1.start();
DAG dag1 = createSimpleDAG("dag1", SleepProcessor.class.getName());
DAGClient dagClient1 = tezClient1.submitDAG(dag1);
dagClient1.waitForCompletion();
assertEquals(DAGStatus.State.SUCCEEDED, dagClient1.getDAGStatus(null).getState());
// Sleep for more time than is required for the DAG to complete.
Thread.sleep((long) (TezConstants.TEZ_DAG_SLEEP_TIME_BEFORE_EXIT * 1.5));
dagClient1.close();
tezClient1.stop();
}
TezClient getTezClient(boolean withTimeline) throws Exception {
TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig());
if (withTimeline) {
tezConf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, withTimeline);
tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
ATSHistoryLoggingService.class.getName());
} else {
tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
SimpleHistoryLoggingService.class.getName());
}
tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
TezClient tezClient = TezClient.create("WordCount", tezConf, false);
tezClient.start();
tezClient.waitTillReady();
return tezClient;
}
@Before
public void setup() throws Exception {
LOG.info("Starting session");
Path remoteStagingDir = remoteFs.makeQualified(new Path(TEST_ROOT_DIR, String
.valueOf(new Random().nextInt(100000))));
TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
tezConf = new TezConfiguration(miniTezCluster.getConfig());
tezConf.setInt(TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, 0);
tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "INFO");
tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
remoteStagingDir.toString());
tezConf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
tezConf.setInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, 4);
tezConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, 500);
tezConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, " -Xmx256m");
tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
tezConf.set(TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE, "false");
tezConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,0);
tezConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, 0);
tezConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY,1000);
tezSession = TezClient.create("TestDAGRecovery", tezConf);
tezSession.start();
}
public int run(Configuration conf, String[] args, TezClient tezClient) throws Exception {
setConf(conf);
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
int result = validateArgs(otherArgs);
if (result != 0) {
return result;
}
return execute(otherArgs, tezClient);
}
private int execute(String[] args) throws TezException, IOException, InterruptedException {
TezConfiguration tezConf = new TezConfiguration(getConf());
TezClient tezClient = null;
try {
tezClient = createTezClient(tezConf);
return execute(args, tezConf, tezClient);
} finally {
if (tezClient != null) {
tezClient.stop();
}
}
}
@Before
public void setup() throws Exception {
LOG.info("Starting session");
Path remoteStagingDir =
remoteFs.makeQualified(new Path(TEST_ROOT_DIR, String
.valueOf(new Random().nextInt(100000))));
TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
tezConf = new TezConfiguration(miniTezCluster.getConfig());
tezConf.setInt(TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, 0);
tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "INFO");
tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
remoteStagingDir.toString());
tezConf
.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
tezConf.setInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, MAX_AM_ATTEMPT);
tezConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, 500);
tezConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, " -Xmx256m");
tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
tezConf.setBoolean(
TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE, false);
tezConf.setBoolean(
RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED,
true);
tezConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,0);
tezConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, 0);
tezConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY,1000);
tezSession = TezClient.create("TestDAGRecovery", tezConf);
tezSession.start();
}
static void freeSession(TezClient session) {
sessionPoolLock.readLock().lock();
try {
for (SessionInfo sessionInfo : sessionPool) {
synchronized (sessionInfo) {
if (sessionInfo.session == session) {
sessionInfo.inUse = false;
break;
}
}
}
} finally {
sessionPoolLock.readLock().unlock();
}
}
static void stopSession(TezClient session) throws TezException, IOException {
Iterator<SessionInfo> iter = sessionPool.iterator();
SessionInfo sessionToRemove = null;
sessionPoolLock.readLock().lock();
try {
while (iter.hasNext()) {
SessionInfo sessionInfo = iter.next();
synchronized (sessionInfo) {
if (sessionInfo.session == session) {
log.info("Stopping Tez session " + session);
session.stop();
sessionToRemove = sessionInfo;
break;
}
}
}
} finally {
sessionPoolLock.readLock().unlock();
}
if (sessionToRemove != null) {
sessionPoolLock.writeLock().lock();
try {
sessionPool.remove(sessionToRemove);
} finally {
sessionPoolLock.writeLock().unlock();
}
}
}
public static String getTezVersion() {
if (tezVersion == null) {
try {
Properties prop = new Properties();
prop.load(TezClient.class.getResourceAsStream(POM_PROPERTIES));
tezVersion = prop.getProperty("version", "");
} catch (Exception e) {
LOG.info("Failed to load Tez pom.properties");
}
}
return tezVersion == null ? "" : tezVersion;
}
@Test(timeout = 120000)
public void testAMRelocalizationConflict() throws Exception {
Path relocPath = new Path("/tmp/relocalizationfilefound");
if (remoteFs.exists(relocPath)) {
remoteFs.delete(relocPath, true);
}
// Run a DAG w/o a file.
TezClient tezSession = createTezSession();
State finalState = testMRRSleepJobDagSubmitCore(true, false, false,
tezSession, true, MRInputAMSplitGeneratorRelocalizationTest.class, null);
Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
Assert.assertFalse(remoteFs.exists(relocPath));
// Create a bogus TezAppJar directly to HDFS
LOG.info("Creating jar for relocalization test");
Path tezAppJar = new Path(MiniTezCluster.APPJAR);
Path tezAppJarRemote = remoteFs.makeQualified(new Path("/tmp/" + tezAppJar.getName()));
OutputStream os = remoteFs.create(tezAppJarRemote, true);
createTestJar(os, RELOCALIZATION_TEST_CLASS_NAME);
Map<String, LocalResource> additionalResources = new HashMap<String, LocalResource>();
additionalResources.put("TezAppJar.jar", createLrObjFromPath(tezAppJarRemote));
try {
testMRRSleepJobDagSubmitCore(true, false, false,
tezSession, true, MRInputAMSplitGeneratorRelocalizationTest.class, additionalResources);
Assert.fail("should have failed");
} catch (Exception ex) {
// expected
}
stopAndVerifyYarnApp(tezSession);
}
private TezClient createTezSession() throws IOException, TezException {
Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String
.valueOf(new Random().nextInt(100000))));
remoteFs.mkdirs(remoteStagingDir);
TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
TezClient tezSession = new TezClient("testrelocalizationsession", tezConf, true);
tezSession.start();
Assert.assertEquals(TezAppMasterStatus.INITIALIZING, tezSession.getAppMasterStatus());
return tezSession;
}
@Test(timeout = 120000)
public void testMultipleMRRSleepJobViaSession() throws IOException,
InterruptedException, TezException, ClassNotFoundException, YarnException {
Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String
.valueOf(new Random().nextInt(100000))));
remoteFs.mkdirs(remoteStagingDir);
TezConfiguration tezConf = new TezConfiguration(
mrrTezCluster.getConfig());
tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
remoteStagingDir.toString());
TezClient tezSession = new TezClient("testsession", tezConf, true);
tezSession.start();
Assert.assertEquals(TezAppMasterStatus.INITIALIZING,
tezSession.getAppMasterStatus());
State finalState = testMRRSleepJobDagSubmitCore(true, false, false,
tezSession, false, null, null);
Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
Assert.assertEquals(TezAppMasterStatus.READY,
tezSession.getAppMasterStatus());
finalState = testMRRSleepJobDagSubmitCore(true, false, false,
tezSession, false, null, null);
Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
Assert.assertEquals(TezAppMasterStatus.READY,
tezSession.getAppMasterStatus());
stopAndVerifyYarnApp(tezSession);
}
@BeforeClass
public static void setup() throws Exception {
LOG.info("Starting mini clusters");
FileSystem remoteFs = null;
try {
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
.format(true).racks(null).build();
remoteFs = dfsCluster.getFileSystem();
} catch (IOException io) {
throw new RuntimeException("problem starting mini dfs cluster", io);
}
if (miniTezCluster == null) {
miniTezCluster = new MiniTezCluster(TestFaultTolerance.class.getName(),
4, 1, 1);
Configuration miniTezconf = new Configuration(conf);
miniTezconf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
miniTezCluster.init(miniTezconf);
miniTezCluster.start();
Path remoteStagingDir = remoteFs.makeQualified(new Path(TEST_ROOT_DIR, String
.valueOf(new Random().nextInt(100000))));
TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig());
tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
remoteStagingDir.toString());
tezConf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
tezSession = new TezClient("TestFaultTolerance", tezConf, true);
tezSession.start();
}
}
@Test(timeout = 60000)
public void testSleepJob() throws TezException, IOException, InterruptedException {
SleepProcessorConfig spConf = new SleepProcessorConfig(1);
DAG dag = new DAG("TezSleepProcessor");
Vertex vertex = new Vertex("SleepVertex", new ProcessorDescriptor(
SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
Resource.newInstance(1024, 1));
dag.addVertex(vertex);
TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random
.nextInt(100000))));
remoteFs.mkdirs(remoteStagingDir);
tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
TezClient tezSession = new TezClient("TezSleepProcessor", tezConf, false);
tezSession.start();
DAGClient dagClient = tezSession.submitDAG(dag);
DAGStatus dagStatus = dagClient.getDAGStatus(null);
while (!dagStatus.isCompleted()) {
LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
+ dagStatus.getState());
Thread.sleep(500l);
dagStatus = dagClient.getDAGStatus(null);
}
dagStatus = dagClient.getDAGStatus(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
assertNotNull(dagStatus.getDAGCounters());
assertNotNull(dagStatus.getDAGCounters().getGroup(FileSystemCounter.class.getName()));
assertNotNull(dagStatus.getDAGCounters().findCounter(TaskCounter.GC_TIME_MILLIS));
ExampleDriver.printDAGStatus(dagClient, new String[] { "SleepVertex" }, true, true);
tezSession.stop();
}
@Test
public void testNonDefaultFSStagingDir() throws Exception {
SleepProcessorConfig spConf = new SleepProcessorConfig(1);
DAG dag = new DAG("TezSleepProcessor");
Vertex vertex = new Vertex("SleepVertex", new ProcessorDescriptor(
SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
Resource.newInstance(1024, 1));
dag.addVertex(vertex);
TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
Path stagingDir = new Path(TEST_ROOT_DIR, "testNonDefaultFSStagingDir"
+ String.valueOf(random.nextInt(100000)));
FileSystem localFs = FileSystem.getLocal(tezConf);
stagingDir = localFs.makeQualified(stagingDir);
localFs.mkdirs(stagingDir);
tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir.toString());
TezClient tezSession = new TezClient("TezSleepProcessor", tezConf, false);
tezSession.start();
DAGClient dagClient = tezSession.submitDAG(dag);
DAGStatus dagStatus = dagClient.getDAGStatus(null);
while (!dagStatus.isCompleted()) {
LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
+ dagStatus.getState());
Thread.sleep(500l);
dagStatus = dagClient.getDAGStatus(null);
}
dagStatus = dagClient.getDAGStatus(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
assertNotNull(dagStatus.getDAGCounters());
assertNotNull(dagStatus.getDAGCounters().getGroup(FileSystemCounter.class.getName()));
assertNotNull(dagStatus.getDAGCounters().findCounter(TaskCounter.GC_TIME_MILLIS));
ExampleDriver.printDAGStatus(dagClient, new String[] { "SleepVertex" }, true, true);
tezSession.stop();
}
@Before
public void setup() throws Exception {
Path remoteStagingDir = remoteFs.makeQualified(new Path(TEST_ROOT_DIR, String
.valueOf(new Random().nextInt(100000))));
TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
TezConfiguration tezConf = createSessionConfig(remoteStagingDir);
tezSession = new TezClient("TestDAGRecovery2", tezConf);
tezSession.start();
}
public int run(Configuration conf, String[] args, TezClient tezSession) throws Exception {
setConf(conf);
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
int result = validateArgs(otherArgs);
if (result != 0) {
return result;
}
return execute(otherArgs, tezSession);
}
private int execute(String[] args) throws TezException, IOException, InterruptedException {
TezConfiguration tezConf = new TezConfiguration(getConf());
TezClient tezSession = null;
try {
tezSession = createTezSession(tezConf);
return execute(args, tezConf, tezSession);
} finally {
if (tezSession != null) {
tezSession.stop();
}
}
}
@Test (timeout=50000)
public void testDisabledACls() throws Exception {
TezClient tezSession = null;
try {
SleepProcessorConfig spConf = new SleepProcessorConfig(1);
DAG dag = DAG.create("TezSleepProcessor");
Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
Resource.newInstance(256, 1));
dag.addVertex(vertex);
TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
ATSHistoryLoggingService.class.getName());
Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random
.nextInt(100000))));
remoteFs.mkdirs(remoteStagingDir);
tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
tezSession = TezClient.create("TezSleepProcessor", tezConf, true);
tezSession.start();
DAGClient dagClient = tezSession.submitDAG(dag);
DAGStatus dagStatus = dagClient.getDAGStatus(null);
while (!dagStatus.isCompleted()) {
LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
+ dagStatus.getState());
Thread.sleep(500l);
dagStatus = dagClient.getDAGStatus(null);
}
Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
} finally {
if (tezSession != null) {
tezSession.stop();
}
}
}
public int run(Configuration conf, String[] args, TezClient tezSession) throws Exception {
setConf(conf);
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
int result = validateArgs(otherArgs);
if (result != 0) {
return result;
}
return execute(otherArgs, tezSession);
}