下面列出了org.apache.hadoop.mapred.FileAlreadyExistsException#org.apache.tez.dag.api.DAG 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* v1 --> v2 <br>
* v1 has a customized VM to control whether to schedule only one second task when it is partiallyFinished test case.
* v2 has a customized VM which could control when to kill AM
*
* @param vertexManagerClass
* @param dmType
* @param failOnParitialCompleted
* @return
* @throws IOException
*/
private DAG createDAG(String dagName, Class vertexManagerClass, DataMovementType dmType,
boolean failOnParitialCompleted) throws IOException {
if (failOnParitialCompleted) {
tezConf.set(FAIL_ON_PARTIAL_FINISHED, "true");
} else {
tezConf.set(FAIL_ON_PARTIAL_FINISHED, "false");
}
DAG dag = DAG.create(dagName);
UserPayload payload = UserPayload.create(null);
Vertex v1 = Vertex.create("v1", MyProcessor.getProcDesc(), 2);
v1.setVertexManagerPlugin(VertexManagerPluginDescriptor.create(
ScheduleControlledVertexManager.class.getName()).setUserPayload(
TezUtils.createUserPayloadFromConf(tezConf)));
Vertex v2 = Vertex.create("v2", DoNothingProcessor.getProcDesc(), 2);
v2.setVertexManagerPlugin(VertexManagerPluginDescriptor.create(
vertexManagerClass.getName()).setUserPayload(
TezUtils.createUserPayloadFromConf(tezConf)));
dag.addVertex(v1).addVertex(v2);
dag.addEdge(Edge.create(v1, v2, EdgeProperty.create(dmType,
DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
TestOutput.getOutputDesc(payload), TestInput.getInputDesc(payload))));
return dag;
}
@Test (timeout=60000)
public void testBasicInputFailureWithoutExitDeadline() throws Exception {
Configuration testConf = new Configuration(false);
testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 3); // 1 error < 0.4 fail fraction
testConf.setBoolean(TestInput.getVertexConfName(
TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true);
testConf.set(TestInput.getVertexConfName(
TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "2");
testConf.set(TestInput.getVertexConfName(
TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0");
testConf.set(TestInput.getVertexConfName(
TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0");
DAG dag = SimpleTestDAG.createDAG("testBasicInputFailureWithoutExitDeadline", testConf);
runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
}
private DAG createDAG(String dagName, boolean uv12CommitFail, boolean v3CommitFail) {
DAG dag = DAG.create(dagName);
Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("Proc"), 1);
Vertex v2 = Vertex.create("v2", ProcessorDescriptor.create("Proc"), 1);
Vertex v3 = Vertex.create("v3", ProcessorDescriptor.create("Proc"), 1);
VertexGroup uv12 = dag.createVertexGroup("uv12", v1, v2);
DataSinkDescriptor uv12DataSink = DataSinkDescriptor.create(
OutputDescriptor.create("dummy output"), createOutputCommitterDesc(uv12CommitFail), null);
uv12.addDataSink("uv12Out", uv12DataSink);
DataSinkDescriptor v3DataSink = DataSinkDescriptor.create(
OutputDescriptor.create("dummy output"), createOutputCommitterDesc(v3CommitFail), null);
v3.addDataSink("v3Out", v3DataSink);
GroupInputEdge e1 = GroupInputEdge.create(uv12, v3, EdgeProperty.create(
DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
SchedulingType.SEQUENTIAL,
OutputDescriptor.create("dummy output class"),
InputDescriptor.create("dummy input class")), InputDescriptor
.create("merge.class"));
dag.addVertex(v1)
.addVertex(v2)
.addVertex(v3)
.addEdge(e1);
return dag;
}
@Override
public void checkJobControlCompilerErrResult(PhysicalPlan pp, PigContext pc) throws Exception {
TezOperPlan tezPlan = buildTezPlan(pp, pc);
LoaderProcessor loaderStorer = new LoaderProcessor(tezPlan, pc);
loaderStorer.visit();
ParallelismSetter parallelismSetter = new ParallelismSetter(tezPlan, pc);
parallelismSetter.visit();
DAG tezDag = getTezDAG(tezPlan, pc);
TezDagBuilder dagBuilder = new TezDagBuilder(pc, tezPlan, tezDag, null);
try {
dagBuilder.visit();
} catch (VisitorException jce) {
assertTrue(((JobCreationException)jce.getCause()).getErrorCode() == 1068);
}
}
@Override
public void checkDefaultParallelResult(PhysicalPlan pp, PigContext pc) throws Exception {
TezOperPlan tezPlan = buildTezPlan(pp, pc);
LoaderProcessor loaderStorer = new LoaderProcessor(tezPlan, pc);
loaderStorer.visit();
ParallelismSetter parallelismSetter = new ParallelismSetter(tezPlan, pc);
parallelismSetter.visit();
DAG tezDag = getTezDAG(tezPlan, pc);
TezDagBuilder dagBuilder = new TezDagBuilder(pc, tezPlan, tezDag, null);
dagBuilder.visit();
for (Vertex v : tezDag.getVertices()) {
if (!v.getInputVertices().isEmpty()) {
Configuration conf = TezUtils.createConfFromUserPayload(v.getProcessorDescriptor().getUserPayload());
int parallel = v.getParallelism();
assertEquals(parallel, 100);
Util.assertConfLong(conf, "pig.info.reducers.default.parallel", 100);
Util.assertConfLong(conf, "pig.info.reducers.requested.parallel", -1);
Util.assertConfLong(conf, "pig.info.reducers.estimated.parallel", -1);
}
}
}
@Test (timeout=60000)
public void testBasicInputFailureWithoutExit() throws Exception {
Configuration testConf = new Configuration(false);
testConf.setBoolean(TestInput.getVertexConfName(
TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true);
testConf.set(TestInput.getVertexConfName(
TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "1");
testConf.set(TestInput.getVertexConfName(
TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0");
testConf.set(TestInput.getVertexConfName(
TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0");
testConf.set(TestProcessor.getVertexConfName(
TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX, "v2"), "0,1");
testConf.setInt(TestProcessor.getVertexConfName(
TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_VALUE, "v2", 1), 4);
testConf.setInt(TestProcessor.getVertexConfName(
TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_VALUE, "v2", 0), 3);
DAG dag = SimpleTestDAG.createDAG("testBasicInputFailureWithoutExit", testConf);
runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
}
@Test (timeout=60000)
public void testBasicTaskFailure() throws Exception {
Configuration testConf = new Configuration(false);
testConf.setBoolean(TestProcessor.getVertexConfName(
TestProcessor.TEZ_FAILING_PROCESSOR_DO_FAIL, "v1"), true);
testConf.set(TestProcessor.getVertexConfName(
TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_TASK_INDEX, "v1"), "0");
testConf.setInt(TestProcessor.getVertexConfName(
TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "v1"), 0);
//verify value at v2 task1
testConf.set(TestProcessor.getVertexConfName(
TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX, "v2"), "1");
//value of v2 task1 is 4.
//v1 attempt0 has value of 1 (attempt index + 1).
//v1 attempt1 has value of 2 (attempt index + 1).
//v3 attempt0 verifies value of 1 + 2 (values from input vertices)
// + 1 (attempt index + 1) = 4
testConf.setInt(TestProcessor.getVertexConfName(
TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_VALUE, "v2", 1), 4);
DAG dag = SimpleTestDAG.createDAG("testBasicTaskFailure", testConf);
runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
}
@Test (timeout=60000)
public void testTaskMultipleFailures() throws Exception {
Configuration testConf = new Configuration(false);
testConf.setBoolean(TestProcessor.getVertexConfName(
TestProcessor.TEZ_FAILING_PROCESSOR_DO_FAIL, "v1"), true);
testConf.set(TestProcessor.getVertexConfName(
TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_TASK_INDEX, "v1"), "0,1");
testConf.setInt(TestProcessor.getVertexConfName(
TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "v1"), 1);
//v1 task0,1 attempt 2 succeed. Input sum = 6. Plus one (v2 attempt0).
//ending sum is 7.
testConf.set(TestProcessor.getVertexConfName(
TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX, "v2"), "0");
testConf.setInt(TestProcessor.getVertexConfName(
TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_VALUE, "v2", 0), 7);
DAG dag = SimpleTestDAG.createDAG("testTaskMultipleFailures", testConf);
runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
}
@Test (timeout=60000)
public void testMultipleInputFailureWithoutExit() throws Exception {
Configuration testConf = new Configuration(false);
testConf.setBoolean(TestInput.getVertexConfName(
TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true);
testConf.set(TestInput.getVertexConfName(
TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "0,1");
testConf.set(TestInput.getVertexConfName(
TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0");
testConf.set(TestInput.getVertexConfName(
TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "-1");
//v2 task0 attempt0 input0,1 fails. wait.
//v1 task0 attempt1 reruns. v1 task1 attempt1 reruns.
//2 + 2 + 1 = 5
//same number for v2 task1
testConf.set(TestProcessor.getVertexConfName(
TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX, "v2"), "0,1");
testConf.setInt(TestProcessor.getVertexConfName(
TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_VALUE, "v2", 0), 5);
testConf.setInt(TestProcessor.getVertexConfName(
TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_VALUE, "v2", 1), 5);
DAG dag = SimpleTestDAG.createDAG("testMultipleInputFailureWithoutExit", testConf);
runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
}
private List<StepCheck[]> testBasicTaskFailure() throws Exception {
Configuration testConf = new Configuration(false);
testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1);
testConf.setBoolean(TestProcessor.getVertexConfName(
TestProcessor.TEZ_FAILING_PROCESSOR_DO_FAIL, "v1"), true);
testConf.set(TestProcessor.getVertexConfName(
TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_TASK_INDEX, "v1"), "0");
testConf.setInt(TestProcessor.getVertexConfName(
TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "v1"), 0);
StepCheck[] check = {
createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
createStep("v1 : 000000_1", CriticalPathDependency.RETRY_DEPENDENCY),
createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
};
DAG dag = SimpleTestDAG.createDAG("testBasicTaskFailure", testConf);
runDAG(dag, DAGStatus.State.SUCCEEDED);
return Collections.singletonList(check);
}
private void testMemory(DAG dag, boolean sendDMEvents) throws Exception {
StopWatch stopwatch = new StopWatch();
stopwatch.start();
TezConfiguration tezconf = new TezConfiguration(defaultConf);
MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null,
null, false, false, numThreads, 1000);
tezClient.start();
MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
mockLauncher.startScheduling(false);
mockApp.eventsDelegate = new TestMockDAGAppMaster.TestEventsDelegate();
mockApp.doSleep = false;
DAGClient dagClient = tezClient.submitDAG(dag);
mockLauncher.waitTillContainersLaunched();
mockLauncher.startScheduling(true);
DAGStatus status = dagClient.waitForCompletion();
Assert.assertEquals(DAGStatus.State.SUCCEEDED, status.getState());
checkMemory(dag.getName(), mockApp);
stopwatch.stop();
System.out.println("Time taken(ms): " + stopwatch.now(TimeUnit.MILLISECONDS));
tezClient.stop();
}
/**
* Test cascading input failure without exit. Expecting success.
* v1 -- v2 -- v3
* v3 all-tasks attempt0 input0 fails. Wait. Triggering v2 rerun.
* v2 task0 attempt1 input0 fails. Wait. Triggering v1 rerun.
* v1 attempt1 rerun and succeeds. v2 accepts v1 attempt1 output. v2 attempt1 succeeds.
* v3 attempt0 accepts v2 attempt1 output.
*
* AM vertex succeeded order is v1, v2, v1, v2, v3.
* @throws Exception
*/
@Test (timeout=60000)
public void testCascadingInputFailureWithoutExitSuccess() throws Exception {
Configuration testConf = new Configuration(false);
setCascadingInputFailureConfig(testConf, false);
//v2 task0 attempt1 value = v1 task0 attempt1 (2) + v1 task1 attempt0 (1) + 2 = 5
//v3 all-tasks attempt0 takes v2 task0 attempt1 value (5) + v2 task1 attempt0 (3) + 1 = 9
testConf.set(TestProcessor.getVertexConfName(
TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX, "v3"), "0,1");
testConf.setInt(TestProcessor.getVertexConfName(
TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_VALUE, "v3", 0), 9);
testConf.setInt(TestProcessor.getVertexConfName(
TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_VALUE, "v3", 1), 9);
DAG dag = SimpleTestDAG3Vertices.createDAG(
"testCascadingInputFailureWithoutExitSuccess", testConf);
runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
}
/**
* Test cascading input failure with exit. Expecting success.
* v1 -- v2 -- v3
* v3 all-tasks attempt0 input0 fails. v3 attempt0 exits. Triggering v2 rerun.
* v2 task0 attempt1 input0 fails. v2 attempt1 exits. Triggering v1 rerun.
* v1 attempt1 rerun and succeeds. v2 accepts v1 attempt1 output. v2 attempt2 succeeds.
* v3 attempt1 accepts v2 attempt2 output.
*
* AM vertex succeeded order is v1, v2, v3, v1, v2, v3.
* @throws Exception
*/
@Test (timeout=60000)
public void testCascadingInputFailureWithExitSuccess() throws Exception {
Configuration testConf = new Configuration(false);
setCascadingInputFailureConfig(testConf, true);
//v2 task0 attempt2 value = v1 task0 attempt1 (2) + v1 task1 attempt0 (1) + 3 = 6
//v3 all-tasks attempt1 takes v2 task0 attempt2 value (6) + v2 task1 attempt0 (3) + 2 = 11
testConf.set(TestProcessor.getVertexConfName(
TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX, "v3"), "0,1");
testConf.setInt(TestProcessor.getVertexConfName(
TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_VALUE, "v3", 0), 11);
testConf.setInt(TestProcessor.getVertexConfName(
TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_VALUE, "v3", 1), 11);
DAG dag = SimpleTestDAG3Vertices.createDAG(
"testCascadingInputFailureWithExitSuccess", testConf);
runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
}
private DAG createDAG(int numGenTasks, int totalSourceDataSize, int numFetcherTasks) {
int bytesPerSource = totalSourceDataSize / numGenTasks;
LOG.info("DataPerSourceTask(bytes)=" + bytesPerSource);
ByteBuffer payload = ByteBuffer.allocate(4);
payload.putInt(0, bytesPerSource);
Vertex broadcastVertex = Vertex.create("DataGen",
ProcessorDescriptor.create(InputGenProcessor.class.getName())
.setUserPayload(UserPayload.create(payload)), numGenTasks);
Vertex fetchVertex = Vertex.create("FetchVertex",
ProcessorDescriptor.create(InputFetchProcessor.class.getName()), numFetcherTasks);
UnorderedKVEdgeConfig edgeConf = UnorderedKVEdgeConfig.newBuilder(NullWritable.class
.getName(), IntWritable.class.getName()).setCompression(false, null, null).build();
DAG dag = DAG.create("BroadcastLoadGen");
dag.addVertex(broadcastVertex).addVertex(fetchVertex).addEdge(
Edge.create(broadcastVertex, fetchVertex, edgeConf.createDefaultBroadcastEdgeProperty()));
return dag;
}
@Test(timeout=120000)
public void testSessionDisableMultiAttempts() throws Exception {
tezSession.stop();
Path remoteStagingDir = remoteFs.makeQualified(new Path(TEST_ROOT_DIR, String
.valueOf(new Random().nextInt(100000))));
TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
TezConfiguration tezConf = createSessionConfig(remoteStagingDir);
tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
tezConf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, false);
TezClient session = new TezClient("TestDAGRecovery2SingleAttemptOnly", tezConf);
session.start();
// DAG should fail as it never completes on the first attempt
DAG dag = MultiAttemptDAG.createDAG("TestSingleAttemptDAG", null);
runDAGAndVerify(dag, State.FAILED, session);
session.stop();
}
public static DAG createDAG(String name,
Configuration conf) throws Exception {
byte[] payload = null;
int taskCount = TEZ_SIMPLE_DAG_NUM_TASKS_DEFAULT;
if (conf != null) {
taskCount = conf.getInt(TEZ_SIMPLE_DAG_NUM_TASKS, TEZ_SIMPLE_DAG_NUM_TASKS_DEFAULT);
payload = TezUtils.createUserPayloadFromConf(conf);
}
DAG dag = new DAG(name);
Vertex v1 = new Vertex("v1", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
Vertex v2 = new Vertex("v2", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
Vertex v3 = new Vertex("v3", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
dag.addVertex(v1).addVertex(v2).addEdge(new Edge(v1, v2,
new EdgeProperty(DataMovementType.SCATTER_GATHER,
DataSourceType.PERSISTED,
SchedulingType.SEQUENTIAL,
TestOutput.getOutputDesc(payload),
TestInput.getInputDesc(payload))));
dag.addVertex(v3).addEdge(new Edge(v2, v3,
new EdgeProperty(DataMovementType.SCATTER_GATHER,
DataSourceType.PERSISTED,
SchedulingType.SEQUENTIAL,
TestOutput.getOutputDesc(payload),
TestInput.getInputDesc(payload))));
return dag;
}
/**
* Fine-grained recovery task-level, In a vertex (v1), task 0 is done task 1
* is not started. History flush happens. AM dies. Once AM is recovered, task 0 is
* not re-run. Task 1 is re-run. (SCATTER_GATHER)
*
* @throws Exception
*/
@Test(timeout = 120000)
public void testVertexPartiallyFinished_ScatterGather() throws Exception {
DAG dag =
createDAG("VertexPartiallyFinished_ScatterGather", ControlledShuffleVertexManager.class,
DataMovementType.SCATTER_GATHER, true);
TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
printHistoryEvents(historyEvents1, 1);
printHistoryEvents(historyEvents1, 2);
// task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
// attempt 1
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
assertEquals(0, findTaskAttemptFinishedEvent(historyEvents1, 0, 1).size());
// task_0 of v1 is finished in attempt 1 and not rerun, task_1 of v1 is
// finished in attempt 2
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents2, 0, 0).size());
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents2, 0, 1).size());
}
@Test(timeout = 5000)
public void testClientResubmit() throws Exception {
TezClientForTest client = configureAndCreateTezClient(null, true, null);
client.start();
Map<String, LocalResource> lrDAG = Collections.singletonMap("LR1",
LocalResource.newInstance(
URL.newInstance("file", "localhost", 0, "/test1"),
LocalResourceType.FILE,
LocalResourceVisibility.PUBLIC, 1, 1));
Vertex vertex1 = Vertex.create("Vertex1", ProcessorDescriptor.create("P1"), 1,
Resource.newInstance(1, 1));
vertex1.setTaskLaunchCmdOpts("-XX:+UseParallelGC -XX:+UseG1GC");
Vertex vertex2 = Vertex.create("Vertex2", ProcessorDescriptor.create("P2"), 1,
Resource.newInstance(1, 1));
vertex2.setTaskLaunchCmdOpts("-XX:+UseParallelGC -XX:+UseG1GC");
DAG dag = DAG.create("DAG").addVertex(vertex1).addVertex(vertex2).addTaskLocalFiles(lrDAG);
for (int i = 0; i < 3; ++i) {
try {
client.submitDAG(dag);
Assert.fail("Expected TezUncheckedException here.");
} catch(TezUncheckedException ex) {
Assert.assertTrue(ex.getMessage().contains("Invalid/conflicting GC options found"));
}
}
client.stop();
}
@Test(timeout=120000)
public void testSessionDisableMultiAttempts() throws Exception {
tezSession.stop();
Path remoteStagingDir = remoteFs.makeQualified(new Path(TEST_ROOT_DIR, String
.valueOf(new Random().nextInt(100000))));
TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
TezConfiguration tezConf = createSessionConfig(remoteStagingDir);
tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
tezConf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, false);
TezClient session = TezClient.create("TestDAGRecovery2SingleAttemptOnly", tezConf);
session.start();
// DAG should fail as it never completes on the first attempt
DAG dag = MultiAttemptDAG.createDAG("TestSingleAttemptDAG", null);
runDAGAndVerify(dag, State.FAILED, session);
session.stop();
}
/**
* Fine-grained recovery task-level, In a vertex (v1), task 0 is done task 1
* is also done. History flush happens. AM dies. Once AM is recovered, task 0
* and Task 1 is not re-run. (Broadcast)
*
* @throws Exception
*/
@Test(timeout = 120000)
public void testVertexCompletelyFinished_Broadcast() throws Exception {
DAG dag =
createDAG("VertexCompletelyFinished_Broadcast", ControlledImmediateStartVertexManager.class,
DataMovementType.BROADCAST, false);
TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
printHistoryEvents(historyEvents1, 1);
printHistoryEvents(historyEvents1, 2);
// task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
// attempt 1
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 1).size());
// task_0 of v1 is finished in attempt 1 and not rerun, task_1 of v1 is
// finished in attempt 2
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents2, 0, 0).size());
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents2, 0, 1).size());
}
private DAG createDAG(FileSystem fs, TezConfiguration tezConf,
Map<String, LocalResource> localResources, Path stagingDir,
String inputPath, String outputPath) throws IOException {
Configuration inputConf = new Configuration(tezConf);
inputConf.set(FileInputFormat.INPUT_DIR, inputPath);
InputDescriptor id = new InputDescriptor(MRInput.class.getName())
.setUserPayload(MRInput.createUserPayload(inputConf,
TextInputFormat.class.getName(), true, true));
Configuration outputConf = new Configuration(tezConf);
outputConf.set(FileOutputFormat.OUTDIR, outputPath);
OutputDescriptor od = new OutputDescriptor(MROutput.class.getName())
.setUserPayload(MROutput.createUserPayload(
outputConf, TextOutputFormat.class.getName(), true));
Vertex tokenizerVertex = new Vertex("tokenizer", new ProcessorDescriptor(
TokenProcessor.class.getName()), -1, MRHelpers.getMapResource(tezConf));
tokenizerVertex.addInput("MRInput", id, MRInputAMSplitGenerator.class);
Vertex summerVertex = new Vertex("summer",
new ProcessorDescriptor(
SumProcessor.class.getName()), 1, MRHelpers.getReduceResource(tezConf));
summerVertex.addOutput("MROutput", od, MROutputCommitter.class);
OrderedPartitionedKVEdgeConfigurer edgeConf = OrderedPartitionedKVEdgeConfigurer
.newBuilder(Text.class.getName(), IntWritable.class.getName(),
HashPartitioner.class.getName(), null).build();
DAG dag = new DAG("WordCount");
dag.addVertex(tokenizerVertex)
.addVertex(summerVertex)
.addEdge(
new Edge(tokenizerVertex, summerVertex, edgeConf.createDefaultEdgeProperty()));
return dag;
}
public TezJob(TezConfiguration conf, DAG dag,
Map<String, LocalResource> requestAMResources,
int estimatedTotalParallelism) throws IOException {
this.conf = conf;
this.dag = dag;
this.requestAMResources = requestAMResources;
this.reuseSession = conf.getBoolean(PigConfiguration.PIG_TEZ_SESSION_REUSE, true);
this.statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
tezJobConf = new TezJobConfig(estimatedTotalParallelism);
}
/**
* API to help pre-allocate containers in session mode. In non-session mode
* this is ignored. The pre-allocated containers may be re-used by subsequent
* job DAGs to improve performance.
* The preWarm vertex should be configured and setup exactly
* like the other vertices in the job DAGs so that the pre-allocated
* containers may be re-used by the subsequent DAGs to improve performance.
* The processor for the preWarmVertex may be used to pre-warm the containers
* by pre-loading classes etc. It should be short-running so that pre-warming
* does not block real execution. Users can specify their custom processors or
* use the PreWarmProcessor from the runtime library.
* The parallelism of the preWarmVertex will determine the number of preWarmed
* containers.
* Pre-warming is best efforts and among other factors is limited by the free
* resources on the cluster. Based on the specified timeout value it returns
* false if the status is not READY after the wait period.
* @param preWarmVertex
* @param timeout
* @param unit
* @throws TezException
* @throws IOException
*/
@Unstable
public synchronized void preWarm(PreWarmVertex preWarmVertex,
long timeout, TimeUnit unit)
throws TezException, IOException {
if (!isSession) {
// do nothing for non session mode. This is there to let the code
// work correctly in both modes
LOG.warn("preWarm is not supported in non-session mode," +
"please use session-mode of TezClient");
return;
}
verifySessionStateForSubmission();
DAG dag = org.apache.tez.dag.api.DAG.create(TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX + "_"
+ preWarmDAGCounter++);
dag.addVertex(preWarmVertex);
boolean isReady;
try {
isReady = waitTillReady(timeout, unit);
} catch (InterruptedException e) {
throw new IOException("Interrupted while waiting for AM to become " +
"available", e);
}
if(isReady) {
prewarmDagClient = submitDAG(dag);
} else {
throw new SessionNotReady("Tez AM not ready, could not submit DAG");
}
}
@Test (timeout=50000)
public void testDisabledACls() throws Exception {
TezClient tezSession = null;
try {
SleepProcessorConfig spConf = new SleepProcessorConfig(1);
DAG dag = DAG.create("TezSleepProcessor");
Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
Resource.newInstance(256, 1));
dag.addVertex(vertex);
TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
ATSHistoryLoggingService.class.getName());
Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random
.nextInt(100000))));
remoteFs.mkdirs(remoteStagingDir);
tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
tezSession = TezClient.create("TezSleepProcessor", tezConf, true);
tezSession.start();
DAGClient dagClient = tezSession.submitDAG(dag);
DAGStatus dagStatus = dagClient.getDAGStatus(null);
while (!dagStatus.isCompleted()) {
LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
+ dagStatus.getState());
Thread.sleep(500l);
dagStatus = dagClient.getDAGStatus(null);
}
Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
} finally {
if (tezSession != null) {
tezSession.stop();
}
}
}
@Test (timeout = 10000)
public void testMultipleSubmissions() throws Exception {
Map<String, LocalResource> lrDAG = Maps.newHashMap();
String lrName1 = "LR1";
lrDAG.put(lrName1, LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test"),
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1));
Map<String, LocalResource> lrVertex = Maps.newHashMap();
String lrName2 = "LR2";
lrVertex.put(lrName2, LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test1"),
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1));
DAG dag = DAG.create("test").addTaskLocalFiles(lrDAG);
Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5).addTaskLocalFiles(lrVertex);
dag.addVertex(vA);
TezConfiguration tezconf = new TezConfiguration(defaultConf);
MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null);
tezClient.start();
DAGClient dagClient = tezClient.submitDAG(dag);
dagClient.waitForCompletion();
Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
tezClient.stop();
// submit the same DAG again to verify it can be done.
tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null);
tezClient.start();
dagClient = tezClient.submitDAG(dag);
dagClient.waitForCompletion();
Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
tezClient.stop();
}
static DAGPlan prepareAndCreateDAGPlan(DAG dag, AMConfiguration amConfig,
Map<String, LocalResource> tezJarResources, boolean tezLrsAsArchive,
Credentials credentials, ServicePluginsDescriptor servicePluginsDescriptor,
JavaOptsChecker javaOptsChecker) throws IOException {
Credentials dagCredentials = setupDAGCredentials(dag, credentials,
amConfig.getTezConfiguration());
TezCommonUtils.logCredentials(LOG, dagCredentials, "dagPlan");
return dag.createDag(amConfig.getTezConfiguration(), dagCredentials, tezJarResources,
amConfig.getBinaryConfLR(), tezLrsAsArchive, servicePluginsDescriptor, javaOptsChecker);
}
@Test
public void testTezParallelismEstimatorOrderBy() throws Exception{
pc.getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
String query = "a = load '2' using " + ArbitarySplitsLoader.class.getName()
+ "() as (name:chararray, age:int, gpa:double);"
+ "b = group a by name parallel 3;"
+ "c = foreach b generate group as name, AVG(a.age) as age;"
+ "d = order c by age;"
+ "store d into 'output';";
Pair<TezOperPlan, DAG> compiledPlan = compile(query);
TezOperator sortOper = compiledPlan.first.getLeaves().get(0);
Vertex sortVertex = compiledPlan.second.getVertex(sortOper.getOperatorKey().toString());
assertEquals(sortVertex.getParallelism(), -1);
}
@Test
public void testTezParallelismEstimatorSplitBranch() throws Exception{
pc.getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
String query = "a = load '10' using " + ArbitarySplitsLoader.class.getName()
+ "() as (name:chararray, age:int, gpa:double);"
+ "b = filter a by age>20;"
+ "c = filter a by age>50;"
+ "store b into 'o1';"
+ "d = group c by name;"
+ "store d into 'o2';";
Pair<TezOperPlan, DAG> compiledPlan = compile(query);
TezOperator leafOper = compiledPlan.first.getLeaves().get(0);
Vertex leafVertex = compiledPlan.second.getVertex(leafOper.getOperatorKey().toString());
assertEquals(leafVertex.getParallelism(), 7);
}
/**
* Fine-grained recovery task-level, In a vertex (v1), task 0 is done task 1
* is also done. History flush happens. AM dies. Once AM is recovered, task 0
* and Task 1 is not re-run. (SCATTER_GATHER)
*
* @throws Exception
*/
@Test(timeout = 120000)
public void testVertexCompletelyFinished_ScatterGather() throws Exception {
DAG dag =
createDAG("VertexCompletelyFinished_ScatterGather", ControlledShuffleVertexManager.class,
DataMovementType.SCATTER_GATHER, false);
TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
TezCounter outputCounter = counters.findCounter(TestOutput.COUNTER_NAME, TestOutput.COUNTER_NAME);
TezCounter inputCounter = counters.findCounter(TestInput.COUNTER_NAME, TestInput.COUNTER_NAME);
// verify that processor, input and output counters, are all being collected
Assert.assertTrue(outputCounter.getValue() > 0);
Assert.assertTrue(inputCounter.getValue() > 0);
List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
printHistoryEvents(historyEvents1, 1);
printHistoryEvents(historyEvents1, 2);
// task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
// attempt 1
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 1).size());
// task_0 of v1 is finished in attempt 1 and not rerun, task_1 of v1 is
// finished in attempt 2
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents2, 0, 0).size());
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents2, 0, 1).size());
}
@Test(timeout = 10000)
public void testMultipleClientsWithoutSession() throws TezException, InterruptedException,
IOException {
TezConfiguration tezConf1 = createConf();
TezClient tezClient1 = TezClient.create("commonName", tezConf1, false);
tezClient1.start();
DAG dag1 = createSimpleDAG("dag1", SleepProcessor.class.getName());
DAGClient dagClient1 = tezClient1.submitDAG(dag1);
dagClient1.waitForCompletion();
assertEquals(DAGStatus.State.SUCCEEDED, dagClient1.getDAGStatus(null).getState());
dagClient1.close();
tezClient1.stop();
TezConfiguration tezConf2 = createConf();
DAG dag2 = createSimpleDAG("dag2", SleepProcessor.class.getName());
TezClient tezClient2 = TezClient.create("commonName", tezConf2, false);
tezClient2.start();
DAGClient dagClient2 = tezClient2.submitDAG(dag2);
dagClient2.waitForCompletion();
assertEquals(DAGStatus.State.SUCCEEDED, dagClient2.getDAGStatus(null).getState());
assertFalse(dagClient1.getExecutionContext().equals(dagClient2.getExecutionContext()));
dagClient2.close();
tezClient2.stop();
}