下面列出了org.apache.hadoop.mapred.FileAlreadyExistsException#org.apache.tez.dag.api.TezConfiguration 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public LegacySpeculator
(Configuration conf, TaskRuntimeEstimator estimator, Clock clock, Vertex vertex) {
super(LegacySpeculator.class.getName());
this.vertex = vertex;
this.estimator = estimator;
this.clock = clock;
taskTimeout = conf.getLong(
TezConfiguration.TEZ_AM_LEGACY_SPECULATIVE_SINGLE_TASK_VERTEX_TIMEOUT,
TezConfiguration.TEZ_AM_LEGACY_SPECULATIVE_SINGLE_TASK_VERTEX_TIMEOUT_DEFAULT);
soonestRetryAfterNoSpeculate = conf.getLong(
TezConfiguration.TEZ_AM_SOONEST_RETRY_AFTER_NO_SPECULATE,
TezConfiguration.TEZ_AM_SOONEST_RETRY_AFTER_NO_SPECULATE_DEFAULT);
soonestRetryAfterSpeculate = conf.getLong(
TezConfiguration.TEZ_AM_SOONEST_RETRY_AFTER_SPECULATE,
TezConfiguration.TEZ_AM_SOONEST_RETRY_AFTER_SPECULATE_DEFAULT);
proportionRunningTasksSpeculatable = conf.getDouble(
TezConfiguration.TEZ_AM_PROPORTION_RUNNING_TASKS_SPECULATABLE,
TezConfiguration.TEZ_AM_PROPORTION_RUNNING_TASKS_SPECULATABLE_DEFAULT);
proportionTotalTasksSpeculatable = conf.getDouble(
TezConfiguration.TEZ_AM_PROPORTION_TOTAL_TASKS_SPECULATABLE,
TezConfiguration.TEZ_AM_PROPORTION_TOTAL_TASKS_SPECULATABLE_DEFAULT);
minimumAllowedSpeculativeTasks = conf.getInt(
TezConfiguration.TEZ_AM_MINIMUM_ALLOWED_SPECULATIVE_TASKS,
TezConfiguration.TEZ_AM_MINIMUM_ALLOWED_SPECULATIVE_TASKS_DEFAULT);
}
private static SessionInfo createSession(Configuration conf,
Map<String, LocalResource> requestedAMResources, Credentials creds,
TezJobConfig tezJobConf) throws TezException, IOException,
InterruptedException {
TezConfiguration amConf = MRToTezHelper.getDAGAMConfFromMRConf(conf);
TezScriptState ss = TezScriptState.get();
ss.addDAGSettingsToConf(amConf);
adjustAMConfig(amConf, tezJobConf);
String jobName = conf.get(PigContext.JOB_NAME, "pig");
TezClient tezClient = TezClient.create(jobName, amConf, true, requestedAMResources, creds);
tezClient.start();
TezAppMasterStatus appMasterStatus = tezClient.getAppMasterStatus();
if (appMasterStatus.equals(TezAppMasterStatus.SHUTDOWN)) {
throw new RuntimeException("TezSession has already shutdown");
}
tezClient.waitTillReady();
return new SessionInfo(tezClient, requestedAMResources);
}
static OutputContext createOutputContext() throws IOException {
OutputContext outputContext = mock(OutputContext.class);
Configuration conf = new TezConfiguration();
UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf);
String[] workingDirs = new String[]{"workDir1"};
OutputStatisticsReporter statsReporter = mock(OutputStatisticsReporter.class);
TezCounters counters = new TezCounters();
doReturn("destinationVertex").when(outputContext).getDestinationVertexName();
doReturn(payLoad).when(outputContext).getUserPayload();
doReturn(workingDirs).when(outputContext).getWorkDirs();
doReturn(200 * 1024 * 1024l).when(outputContext).getTotalMemoryAvailableToTask();
doReturn(counters).when(outputContext).getCounters();
doReturn(statsReporter).when(outputContext).getStatisticsReporter();
doReturn(new Configuration(false)).when(outputContext).getContainerConfiguration();
return outputContext;
}
@Test(timeout = 5000)
public void testGroupsOnly() {
Configuration conf = new Configuration(false);
String adminACLs = "admin1,admin4, admgrp3,admgrp4,admgrp5 ";
String viewACLs = " grp3,grp4,grp5";
conf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, viewACLs);
conf.set(YarnConfiguration.YARN_ADMIN_ACL, adminACLs);
ACLConfigurationParser parser = new ACLConfigurationParser(conf);
Assert.assertFalse(parser.getAllowedUsers().isEmpty());
Assert.assertTrue(parser.getAllowedUsers().get(ACLType.YARN_ADMIN_ACL).contains("admin1"));
Assert.assertTrue(parser.getAllowedUsers().get(ACLType.YARN_ADMIN_ACL).contains("admin4"));
Assert.assertFalse(parser.getAllowedGroups().isEmpty());
Assert.assertTrue(parser.getAllowedGroups().get(ACLType.AM_VIEW_ACL).contains("grp3"));
Assert.assertFalse(parser.getAllowedGroups().get(ACLType.AM_VIEW_ACL).contains("grp6"));
Assert.assertTrue(parser.getAllowedGroups().get(ACLType.AM_VIEW_ACL).contains("grp4"));
Assert.assertTrue(parser.getAllowedGroups().get(ACLType.AM_VIEW_ACL).contains("grp5"));
Assert.assertTrue(parser.getAllowedGroups().get(ACLType.YARN_ADMIN_ACL).contains("admgrp3"));
Assert.assertTrue(parser.getAllowedGroups().get(ACLType.YARN_ADMIN_ACL).contains("admgrp4"));
Assert.assertTrue(parser.getAllowedGroups().get(ACLType.YARN_ADMIN_ACL).contains("admgrp5"));
}
@Test
public void testRunNewCombiner() throws IOException, InterruptedException {
TezConfiguration conf = new TezConfiguration();
setKeyAndValueClassTypes(conf);
conf.setBoolean("mapred.mapper.new-api", true);
conf.setClass(MRJobConfig.COMBINE_CLASS_ATTR, NewReducer.class,
Object.class);
TaskContext taskContext = getTaskContext(conf);
MRCombiner combiner = new MRCombiner(taskContext);
Writer writer = Mockito.mock(Writer.class);
combiner.combine(new TezRawKeyValueIteratorTest(), writer);
long inputRecords = taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS).getValue();
long outputRecords = taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS).getValue();
assertEquals(6, inputRecords);
assertEquals(3, outputRecords);
// verify combiner output keys and values
verifyKeyAndValues(writer);
}
@Test(timeout = 5000)
public void testInvalidSourceTaskIndex() throws Exception {
EventHandler mockEventHandler = mock(EventHandler.class);
Edge edge = new Edge(EdgeProperty.create(
EdgeManagerPluginDescriptor.create(CustomEdgeManagerWithInvalidReturnValue.class.getName())
.setUserPayload(new CustomEdgeManagerWithInvalidReturnValue.EdgeManagerConfig(1,1,1,-1).toUserPayload()),
DataSourceType.PERSISTED,
SchedulingType.SEQUENTIAL,
OutputDescriptor.create(""),
InputDescriptor.create("")), mockEventHandler, new TezConfiguration());
TezVertexID v1Id = createVertexID(1);
TezVertexID v2Id = createVertexID(2);
edge.setSourceVertex(mockVertex("v1", v1Id, new LinkedHashMap<TezTaskID, Task>()));
edge.setDestinationVertex(mockVertex("v2", v2Id, new LinkedHashMap<TezTaskID, Task>()));
edge.initialize();
try {
TezEvent ireEvent = new TezEvent(InputReadErrorEvent.create("diag", 0, 1),
new EventMetaData(EventProducerConsumerType.INPUT, "v2", "v1",
TezTaskAttemptID.getInstance(TezTaskID.getInstance(v2Id, 1), 1)));
edge.sendTezEventToSourceTasks(ireEvent);
Assert.fail();
} catch (AMUserCodeException e) {
e.printStackTrace();
assertTrue(e.getCause().getMessage().contains("SourceTaskIndex should not be negative"));
}
}
/**
* <p>
* This function returns the staging directory defined in the config with
* property name <code>TezConfiguration.TEZ_AM_STAGING_DIR</code>. If the
* property is not defined in the conf, Tez uses the value defined as
* <code>TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT</code>. In addition, the
* function makes sure if the staging directory exists. If not, it creates the
* directory with permission <code>TEZ_AM_DIR_PERMISSION</code>.
* </p>
*
* @param conf
* TEZ configuration
* @return Fully qualified staging directory
*/
public static Path getTezBaseStagingPath(Configuration conf) {
String stagingDirStr = conf.get(TezConfiguration.TEZ_AM_STAGING_DIR,
TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT);
Path baseStagingDir;
try {
Path p = new Path(stagingDirStr);
FileSystem fs = p.getFileSystem(conf);
if (!fs.exists(p)) {
mkDirForAM(fs, p);
LOG.info("Stage directory " + p + " doesn't exist and is created");
}
baseStagingDir = fs.resolvePath(p);
} catch (IOException e) {
throw new TezUncheckedException(e);
}
return baseStagingDir;
}
@Test (timeout = 150000)
public void testNonFatalErrors() throws IOException, TezException, InterruptedException {
String methodName = "testNonFatalErrors";
TezConfiguration tezClientConf = new TezConfiguration(extServiceTestHelper.getConfForJobs());
TezClient tezClient = TezClient
.newBuilder(TestExternalTezServicesErrors.class.getSimpleName() + methodName + "_session",
tezClientConf)
.setIsSession(true).setServicePluginDescriptor(servicePluginsDescriptor).build();
try {
tezClient.start();
LOG.info("TezSessionStarted for " + methodName);
tezClient.waitTillReady();
LOG.info("TezSession ready for submission for " + methodName);
runAndVerifyForNonFatalErrors(tezClient, SUFFIX_LAUNCHER, EXECUTION_CONTEXT_LAUNCHER_REPORT_NON_FATAL);
runAndVerifyForNonFatalErrors(tezClient, SUFFIX_TASKCOMM, EXECUTION_CONTEXT_TASKCOMM_REPORT_NON_FATAL);
runAndVerifyForNonFatalErrors(tezClient, SUFFIX_SCHEDULER, EXECUTION_CONTEXT_SCHEDULER_REPORT_NON_FATAL);
} finally {
tezClient.stop();
}
}
private HistoryEventHandler createHandler(HistoryLogLevel logLevel) {
Configuration conf = new Configuration(baseConfig);
conf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, false);
conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
InMemoryHistoryLoggingService.class.getName());
if (logLevel != null) {
conf.setEnum(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL, logLevel);
}
DAG dag = mock(DAG.class);
when(dag.getConf()).thenReturn(conf);
AppContext appContext = mock(AppContext.class);
when(appContext.getApplicationID()).thenReturn(appId);
when(appContext.getHadoopShim()).thenReturn(new HadoopShim() {});
when(appContext.getAMConf()).thenReturn(conf);
when(appContext.getCurrentDAG()).thenReturn(dag);
HistoryEventHandler handler = new HistoryEventHandler(appContext);
handler.init(conf);
return handler;
}
@Test(timeout = 60000)
/**
* Scenario
* - reducer has not progressed enough
* - reducer becomes unhealthy after some failures
* - no of attempts failing exceeds maxFailedUniqueFetches (5)
* Expected result
* - fail the reducer
*/
public void testReducerHealth_1() throws IOException {
Configuration conf = new TezConfiguration();
_testReducerHealth_1(conf);
conf.setInt(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST, 4000);
_testReducerHealth_1(conf);
}
@Test(timeout = 20000)
public void testNoSysExitOnSuccessfulDAG() throws TezException, InterruptedException,
IOException {
TezConfiguration tezConf1 = createConf();
// Run in non-session mode so that the AM terminates
TezClient tezClient1 = TezClient.create("commonName", tezConf1, false);
tezClient1.start();
DAG dag1 = createSimpleDAG("dag1", SleepProcessor.class.getName());
DAGClient dagClient1 = tezClient1.submitDAG(dag1);
dagClient1.waitForCompletion();
assertEquals(DAGStatus.State.SUCCEEDED, dagClient1.getDAGStatus(null).getState());
// Sleep for more time than is required for the DAG to complete.
Thread.sleep((long) (TezConstants.TEZ_DAG_SLEEP_TIME_BEFORE_EXIT * 1.5));
dagClient1.close();
tezClient1.stop();
}
/**
*
* @throws Exception
*/
@Test (timeout=5000)
public void validateSetTezJarLocalResourcesDefinedExistingDirectoryIgnored() throws Exception {
URL[] cp = ((URLClassLoader)ClassLoader.getSystemClassLoader()).getURLs();
StringBuffer buffer = new StringBuffer();
for (URL url : cp) {
buffer.append(url.toExternalForm());
buffer.append(",");
}
TezConfiguration conf = new TezConfiguration();
conf.set(TezConfiguration.TEZ_LIB_URIS, buffer.toString());
conf.setBoolean(TezConfiguration.TEZ_IGNORE_LIB_URIS, true);
Credentials credentials = new Credentials();
Map<String, LocalResource> localizedMap = TezClientUtils.setupTezJarsLocalResources(conf, credentials);
assertTrue(localizedMap.isEmpty());
}
@BeforeClass
public static void beforeClass() throws Exception {
LOG.info("Starting mini clusters");
try {
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
.format(true).racks(null).build();
remoteFs = dfsCluster.getFileSystem();
} catch (IOException io) {
throw new RuntimeException("problem starting mini dfs cluster", io);
}
if (miniTezCluster == null) {
miniTezCluster = new MiniTezCluster(TestDAGRecovery2.class.getName(),
1, 1, 1);
Configuration miniTezconf = new Configuration(conf);
miniTezconf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 4);
miniTezconf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
miniTezconf.setLong(TezConfiguration.TEZ_AM_SLEEP_TIME_BEFORE_EXIT_MILLIS, 500);
miniTezCluster.init(miniTezconf);
miniTezCluster.start();
}
}
private TezConfiguration getDAGAMConfFromMRConf() {
TezConfiguration finalConf = new TezConfiguration(this.tezConf);
Map<String, String> mrParamToDAGParamMap = DeprecatedKeys
.getMRToDAGParamMap();
for (Entry<String, String> entry : mrParamToDAGParamMap.entrySet()) {
if (finalConf.get(entry.getKey()) != null) {
finalConf.set(entry.getValue(), finalConf.get(entry.getKey()));
finalConf.unset(entry.getKey());
if (LOG.isDebugEnabled()) {
LOG.debug("MR->DAG Translating MR key: " + entry.getKey()
+ " to Tez key: " + entry.getValue() + " with value "
+ finalConf.get(entry.getValue()));
}
}
}
return finalConf;
}
/**
* <pre>
* Get the set of tasks to be profiled in the job. Example formats are
* v[0,1,2] - To profile subset of tasks in a vertex
* v[1,2,3];v2[5,6,7] - To profile multiple vertices
* v[1:5,20,30];v2[2:5,60,7] - To support range of tasks in vertices. Partial
* ranges are not supported (e.g v[:5],v2[2:]).
* v[] - To profile all tasks in a vertex
* </pre>
*
* @param conf
* @return Map<String, BitSet>
*/
private Map<String, BitSet> getTasksToProfile(Configuration conf) {
String tasksToProfile =
conf.getTrimmed(TezConfiguration.TEZ_PROFILE_TASK_LIST, "");
final Map<String, BitSet> resultSet = new HashMap<String, BitSet>();
if (tasksToProfile.isEmpty() || !isValid(tasksToProfile)) {
return resultSet; // empty set
}
Matcher matcher = TASKS_TO_PROFILE_REGEX.matcher(tasksToProfile);
while (matcher.find()) {
String vertexName = matcher.group(1).trim();
BitSet profiledTaskSet = parseTasksToProfile(matcher.group(2).trim());
resultSet.put(vertexName, profiledTaskSet);
}
LOG.info("Tasks to profile info=" + resultSet);
return resultSet;
}
@BeforeClass
public static void beforeClass() throws Exception {
LOG.info("Starting mini clusters");
try {
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
.format(true).racks(null).build();
remoteFs = dfsCluster.getFileSystem();
} catch (IOException io) {
throw new RuntimeException("problem starting mini dfs cluster", io);
}
if (miniTezCluster == null) {
miniTezCluster = new MiniTezCluster(TestDAGRecovery.class.getName(),
1, 1, 1);
Configuration miniTezconf = new Configuration(conf);
miniTezconf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 4);
miniTezconf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
miniTezconf.setLong(TezConfiguration.TEZ_AM_SLEEP_TIME_BEFORE_EXIT_MILLIS, 500);
miniTezCluster.init(miniTezconf);
miniTezCluster.start();
}
}
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testGroupDAGWithVertexReRunningAfterCommit() {
groupDag.getConf().setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false);
initDAG(groupDag);
startDAG(groupDag);
dispatcher.await();
Vertex v1 = groupDag.getVertex("vertex1");
Vertex v2 = groupDag.getVertex("vertex2");
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(v1.getVertexId(), VertexState.SUCCEEDED));
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(v2.getVertexId(), VertexState.SUCCEEDED));
dispatcher.await();
// vertex group commit happens
Assert.assertEquals(1, TotalCountingOutputCommitter.totalCommitCounter);
// dag failed when vertex re-run happens after vertex group commit is done.
dispatcher.getEventHandler().handle(new DAGEventVertexReRunning(v1.getVertexId()));
dispatcher.await();
Assert.assertEquals(DAGState.FAILED, groupDag.getState());
Assert.assertEquals(DAGTerminationCause.VERTEX_RERUN_AFTER_COMMIT, groupDag.getTerminationCause());
}
@Test(timeout=10000)
public void testMultipleAMNodeIDs() {
AppContext appContext = mock(AppContext.class);
Configuration conf = new Configuration(false);
conf.setInt(TezConfiguration.TEZ_AM_MAX_TASK_FAILURES_PER_NODE, 2);
TestEventHandler handler = new TestEventHandler();
AMNodeTracker amNodeTracker = new AMNodeTracker(handler, appContext);
doReturn(amNodeTracker).when(appContext).getNodeTracker();
AMContainerMap amContainerMap = mock(AMContainerMap.class);
TaskSchedulerManager taskSchedulerManager =
mock(TaskSchedulerManager.class);
dispatcher.register(AMNodeEventType.class, amNodeTracker);
dispatcher.register(AMContainerEventType.class, amContainerMap);
dispatcher.register(AMSchedulerEventType.class, taskSchedulerManager);
amNodeTracker.init(conf);
amNodeTracker.start();
try {
amNodeTracker.nodeSeen(new ExtendedNodeId(NodeId.newInstance("host", 2222), "uuid1"), 0);
amNodeTracker.nodeSeen(new ExtendedNodeId(NodeId.newInstance("host", 2222), "uuid1"), 0);
amNodeTracker.nodeSeen(new ExtendedNodeId(NodeId.newInstance("host", 2222), "uuid2"), 0);
amNodeTracker.nodeSeen(new ExtendedNodeId(NodeId.newInstance("host", 2222), "uuid2"), 0);
assertEquals(2, amNodeTracker.getNumNodes(0));
} finally {
amNodeTracker.stop();
}
}
@Override
public synchronized void serviceInit(Configuration conf) {
this.maxTaskFailuresPerNode = conf.getInt(
TezConfiguration.TEZ_AM_MAX_TASK_FAILURES_PER_NODE,
TezConfiguration.TEZ_AM_MAX_TASK_FAILURES_PER_NODE_DEFAULT);
this.nodeBlacklistingEnabled = conf.getBoolean(
TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED,
TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED_DEFAULT);
this.blacklistDisablePercent = conf.getInt(
TezConfiguration.TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD,
TezConfiguration.TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD_DEFAULT);
LOG.info("blacklistDisablePercent is " + blacklistDisablePercent +
", blacklistingEnabled: " + nodeBlacklistingEnabled +
", maxTaskFailuresPerNode: " + maxTaskFailuresPerNode);
if (blacklistDisablePercent < -1 || blacklistDisablePercent > 100) {
throw new TezUncheckedException("Invalid blacklistDisablePercent: "
+ blacklistDisablePercent
+ ". Should be an integer between 0 and 100 or -1 to disabled");
}
}
@Test(timeout = 5000)
public void testVertexKilledWhileCommitting() throws Exception {
conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
false);
setupDAG(createDAGPlan_SingleVertexWith2Committer(true, true));
initDAG(dag);
startDAG(dag);
VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1");
v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(),
TaskState.SUCCEEDED));
Assert.assertEquals(VertexState.COMMITTING, v1.getState());
// kill dag which will trigger the vertex killed event
dag.handle(new DAGEventTerminateDag(dag.getID(), DAGTerminationCause.DAG_KILL, null));
dispatcher.await();
Assert.assertEquals(VertexState.KILLED, v1.getState());
Assert.assertTrue(v1.commitFutures.isEmpty());
Assert.assertEquals(VertexTerminationCause.DAG_TERMINATED,
v1.getTerminationCause());
Assert.assertEquals(DAGState.KILLED, dag.getState());
Assert
.assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause());
historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 1);
historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
CountingOutputCommitter v1OutputCommitter_1 = (CountingOutputCommitter) v1
.getOutputCommitter("v1Out_1");
CountingOutputCommitter v1OutputCommitter_2 = (CountingOutputCommitter) v1
.getOutputCommitter("v1Out_2");
Assert.assertEquals(1, v1OutputCommitter_1.initCounter);
Assert.assertEquals(1, v1OutputCommitter_1.setupCounter);
// commit may not have started, so can't verify commitCounter
Assert.assertEquals(1, v1OutputCommitter_1.abortCounter);
Assert.assertEquals(1, v1OutputCommitter_2.initCounter);
Assert.assertEquals(1, v1OutputCommitter_2.setupCounter);
// commit may not have started, so can't verify commitCounter
Assert.assertEquals(1, v1OutputCommitter_2.abortCounter);
}
private Map<String, String> createDAGDomain(Configuration tezConf,
ApplicationId applicationId, String dagName, DAGAccessControls dagAccessControls)
throws IOException, HistoryACLPolicyException {
String domainId =
tezConf.get(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID);
if (!tezConf.getBoolean(TezConfiguration.TEZ_AM_ACLS_ENABLED,
TezConfiguration.TEZ_AM_ACLS_ENABLED_DEFAULT)) {
if (domainId != null) {
throw new TezUncheckedException("ACLs disabled but domainId for DAG is specified"
+ ", aclsEnabled=true, domainId=" + domainId);
}
return null;
}
boolean autoCreateDomain = tezConf.getBoolean(TezConfiguration.YARN_ATS_ACL_DOMAINS_AUTO_CREATE,
TezConfiguration.YARN_ATS_ACL_DOMAINS_AUTO_CREATE_DEFAULT);
if (domainId != null) {
// do nothing
LOG.info("Using specified domainId with Timeline, domainId=" + domainId);
} else {
if (!autoCreateDomain) {
// Error - Cannot fallback to default as that leaves ACLs open
throw new TezUncheckedException("Timeline DomainId is not specified and auto-create"
+ " Domains is disabled");
}
// Create a domain only if dagAccessControls has been specified.
if (dagAccessControls == null) {
return null;
}
domainId = DOMAIN_ID_PREFIX + applicationId.toString() + "_" + dagName;
createTimelineDomain(domainId, tezConf, dagAccessControls);
LOG.info("Created Timeline Domain for DAG-specific History ACLs, domainId=" + domainId);
}
return Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID, domainId);
}
public AMContainerMap(ContainerHeartbeatHandler chh, TaskCommunicatorManagerInterface tal,
ContainerSignatureMatcher containerSignatureMatcher, AppContext context) {
super("AMContainerMaps");
this.chh = chh;
this.tal = tal;
this.context = context;
this.containerSignatureMatcher = containerSignatureMatcher;
this.containerMap = new ConcurrentHashMap<ContainerId, AMContainer>();
this.auxiliaryService = context.getAMConf().get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
}
@Test
public void testTezAttemptRecoveryStagingPath() throws Exception {
String strAppId = "testAppId";
Path stageDir = TezCommonUtils.getTezSystemStagingPath(conf, strAppId);
Path recoveryPath = TezCommonUtils.getRecoveryPath(stageDir, conf);
Path recoveryStageDir = TezCommonUtils.getAttemptRecoveryPath(recoveryPath, 2);
String expectedDir = RESOLVED_STAGE_DIR + File.separatorChar
+ TezCommonUtils.TEZ_SYSTEM_SUB_DIR + File.separatorChar + strAppId + File.separator
+ TezConfiguration.DAG_RECOVERY_DATA_DIR_NAME + File.separator + "2";
Assert.assertEquals(recoveryStageDir.toString(), expectedDir);
}
@Test
public void testGenerateOnSpillEvent_With_FinalMerge() throws Exception {
List<Event> events = Lists.newLinkedList();
Path indexFile = createIndexFile(10, false);
boolean finalMergeEnabled = true;
boolean isLastEvent = true;
int spillId = 0;
int physicalOutputs = 10;
String pathComponent = "/attempt_x_y_0/file.out";
String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
//normal code path where we do final merge all the time
ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent,
outputContext, spillId, new TezSpillRecord(indexFile, conf),
physicalOutputs, true, pathComponent, null, false, auxiliaryService, TezCommonUtils.newBestCompressionDeflater());
Assert.assertTrue(events.size() == 2); //one for VM
Assert.assertTrue(events.get(0) instanceof VertexManagerEvent);
Assert.assertTrue(events.get(1) instanceof CompositeDataMovementEvent);
CompositeDataMovementEvent cdme = (CompositeDataMovementEvent) events.get(1);
Assert.assertTrue(cdme.getCount() == physicalOutputs);
Assert.assertTrue(cdme.getSourceIndexStart() == 0);
ShuffleUserPayloads.DataMovementEventPayloadProto dmeProto =
ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom( cdme.getUserPayload()));
//With final merge, spill details should not be present
Assert.assertFalse(dmeProto.hasSpillId());
Assert.assertFalse(dmeProto.hasLastEvent() || dmeProto.getLastEvent());
byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(dmeProto
.getEmptyPartitions());
BitSet emptyPartitionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions);
Assert.assertTrue("emptyPartitionBitSet cardinality (expecting 5) = " + emptyPartitionsBitSet
.cardinality(), emptyPartitionsBitSet.cardinality() == 5);
}
@Override
protected final int runJob(String[] args, TezConfiguration tezConf, TezClient tezClient) throws
TezException, InterruptedException, IOException {
LOG.info("Running: " + this.getClass().getSimpleName() + StringUtils.join(args, " "));
int numSourceTasks = Integer.parseInt(args[0]);
int totalSourceData = Integer.parseInt(args[1]);
int numFetcherTasks = Integer.parseInt(args[2]);
LOG.info("Parameters: numSourceTasks=" + numSourceTasks + ", totalSourceDataSize(bytes)=" + totalSourceData +
", numFetcherTasks=" + numFetcherTasks);
DAG dag = createDAG(numSourceTasks, totalSourceData, numFetcherTasks);
return runDag(dag, false, LOG);
}
private static void testAtsEnabled(ApplicationId appId, String dagIdStr, boolean expected,
String loggingClass, boolean amHistoryLoggingEnabled,
boolean dagHistoryLoggingEnabled) throws IOException {
TezConfiguration tezConf = new TezConfiguration();
YarnConfiguration yarnConf = new YarnConfiguration(tezConf);
tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, loggingClass);
tezConf.setBoolean(TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED, amHistoryLoggingEnabled);
tezConf.setBoolean(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED, dagHistoryLoggingEnabled);
DAGClientImplForTest dagClient = new DAGClientImplForTest(appId, dagIdStr, tezConf,
yarnConf,null);
assertEquals(expected, dagClient.getIsATSEnabled());
}
MockTezClient(String name, TezConfiguration tezConf, boolean isSession,
Map<String, LocalResource> localResources, Credentials credentials,
Clock clock, AtomicBoolean mockAppLauncherGoFlag,
boolean initFailFlag, boolean startFailFlag, int concurrency, int containers) {
super(name, tezConf, isSession, localResources, credentials);
this.client = new MockLocalClient(mockAppLauncherGoFlag, clock, initFailFlag, startFailFlag,
concurrency, containers);
}
@Test
public void testTop2RunOldCombiner() throws IOException, InterruptedException {
TezConfiguration conf = new TezConfiguration();
setKeyAndValueClassTypes(conf);
conf.setClass("mapred.combiner.class", Top2OldReducer.class, Object.class);
TaskContext taskContext = getTaskContext(conf);
MRCombiner combiner = new MRCombiner(taskContext);
Writer writer = Mockito.mock(Writer.class);
combiner.combine(new TezRawKeyValueIteratorTest(), writer);
long inputRecords = taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS).getValue();
long outputRecords = taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS).getValue();
assertEquals(6, inputRecords);
assertEquals(5, outputRecords);
}
public RecoveryParser(DAGAppMaster dagAppMaster,
FileSystem recoveryFS,
Path recoveryDataDir,
int currentAttemptId) throws IOException {
this.dagAppMaster = dagAppMaster;
this.recoveryFS = recoveryFS;
this.recoveryDataDir = recoveryDataDir;
this.currentAttemptId = currentAttemptId;
this.currentAttemptRecoveryDataDir = TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir,
currentAttemptId);
recoveryBufferSize = dagAppMaster.getConfig().getInt(
TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE,
TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE_DEFAULT);
this.recoveryFS.mkdirs(currentAttemptRecoveryDataDir);
}
private TezConfiguration createSessionConfig(Path remoteStagingDir) {
TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig());
tezConf.setInt(TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, 10);
tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "DEBUG");
tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
remoteStagingDir.toString());
tezConf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
tezConf.setInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, 4);
tezConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, 500);
tezConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, " -Xmx256m");
tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
return tezConf;
}