下面列出了怎么用org.apache.hadoop.mapred.ShuffleHandler的API类实例代码及写法,或者点击链接到github查看源代码。
@SuppressWarnings("unchecked")
public synchronized void launch(ContainerRemoteLaunchEvent event) {
LOG.info("Launching " + taskAttemptID);
if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
state = ContainerState.DONE;
sendContainerLaunchFailedMsg(taskAttemptID,
"Container was killed before it was launched");
return;
}
ContainerManagementProtocolProxyData proxy = null;
try {
proxy = getCMProxy(containerMgrAddress, containerID);
// Construct the actual Container
ContainerLaunchContext containerLaunchContext =
event.getContainerLaunchContext();
// Now launch the actual container
StartContainerRequest startRequest =
StartContainerRequest.newInstance(containerLaunchContext,
event.getContainerToken());
List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
list.add(startRequest);
StartContainersRequest requestList = StartContainersRequest.newInstance(list);
StartContainersResponse response =
proxy.getContainerManagementProtocol().startContainers(requestList);
if (response.getFailedRequests() != null
&& response.getFailedRequests().containsKey(containerID)) {
throw response.getFailedRequests().get(containerID).deSerialize();
}
ByteBuffer portInfo =
response.getAllServicesMetaData().get(
ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID);
int port = -1;
if(portInfo != null) {
port = ShuffleHandler.deserializeMetaData(portInfo);
}
LOG.info("Shuffle port returned by ContainerManager for "
+ taskAttemptID + " : " + port);
if(port < 0) {
this.state = ContainerState.FAILED;
throw new IllegalStateException("Invalid shuffle port number "
+ port + " returned for " + taskAttemptID);
}
// after launching, send launched event to task attempt to move
// it from ASSIGNED to RUNNING state
context.getEventHandler().handle(
new TaskAttemptContainerLaunchedEvent(taskAttemptID, port));
this.state = ContainerState.RUNNING;
} catch (Throwable t) {
String message = "Container launch failed for " + containerID + " : "
+ StringUtils.stringifyException(t);
this.state = ContainerState.FAILED;
sendContainerLaunchFailedMsg(taskAttemptID, message);
} finally {
if (proxy != null) {
cmProxy.mayBeCloseProxy(proxy);
}
}
}
@Before
public void setup() throws IOException {
serviceResponse.clear();
serviceResponse.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
ShuffleHandler.serializeMetaData(80));
}
@Override
public void serviceInit(Configuration conf) throws Exception {
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
"apps_staging_dir/").getAbsolutePath());
}
// By default, VMEM monitoring disabled, PMEM monitoring enabled.
if (!conf.getBoolean(
MRConfig.MAPREDUCE_MINICLUSTER_CONTROL_RESOURCE_MONITORING,
MRConfig.DEFAULT_MAPREDUCE_MINICLUSTER_CONTROL_RESOURCE_MONITORING)) {
conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false);
conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false);
}
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "000");
try {
Path stagingPath = FileContext.getFileContext(conf).makeQualified(
new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
/*
* Re-configure the staging path on Windows if the file system is localFs.
* We need to use a absolute path that contains the drive letter. The unit
* test could run on a different drive than the AM. We can run into the
* issue that job files are localized to the drive where the test runs on,
* while the AM starts on a different drive and fails to find the job
* metafiles. Using absolute path can avoid this ambiguity.
*/
if (Path.WINDOWS) {
if (LocalFileSystem.class.isInstance(stagingPath.getFileSystem(conf))) {
conf.set(MRJobConfig.MR_AM_STAGING_DIR,
new File(conf.get(MRJobConfig.MR_AM_STAGING_DIR))
.getAbsolutePath());
}
}
FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
if (fc.util().exists(stagingPath)) {
LOG.info(stagingPath + " exists! deleting...");
fc.delete(stagingPath, true);
}
LOG.info("mkdir: " + stagingPath);
//mkdir the staging directory so that right permissions are set while running as proxy user
fc.mkdir(stagingPath, null, true);
//mkdir done directory as well
String doneDir = JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
Path doneDirPath = fc.makeQualified(new Path(doneDir));
fc.mkdir(doneDirPath, null, true);
} catch (IOException e) {
throw new YarnRuntimeException("Could not create staging directory. ", e);
}
conf.set(MRConfig.MASTER_ADDRESS, "test"); // The default is local because of
// which shuffle doesn't happen
//configure the shuffle service in NM
conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID });
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class,
Service.class);
// Non-standard shuffle port
conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
DefaultContainerExecutor.class, ContainerExecutor.class);
// TestMRJobs is for testing non-uberized operation only; see TestUberAM
// for corresponding uberized tests.
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
super.serviceInit(conf);
}
@SuppressWarnings("unchecked")
public synchronized void launch(ContainerRemoteLaunchEvent event) {
LOG.info("Launching " + taskAttemptID);
if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
state = ContainerState.DONE;
sendContainerLaunchFailedMsg(taskAttemptID,
"Container was killed before it was launched");
return;
}
ContainerManagementProtocolProxyData proxy = null;
try {
proxy = getCMProxy(containerMgrAddress, containerID);
// Construct the actual Container
ContainerLaunchContext containerLaunchContext =
event.getContainerLaunchContext();
// Now launch the actual container
StartContainerRequest startRequest =
StartContainerRequest.newInstance(containerLaunchContext,
event.getContainerToken());
List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
list.add(startRequest);
StartContainersRequest requestList = StartContainersRequest.newInstance(list);
StartContainersResponse response =
proxy.getContainerManagementProtocol().startContainers(requestList);
if (response.getFailedRequests() != null
&& response.getFailedRequests().containsKey(containerID)) {
throw response.getFailedRequests().get(containerID).deSerialize();
}
ByteBuffer portInfo =
response.getAllServicesMetaData().get(
ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID);
int port = -1;
if(portInfo != null) {
port = ShuffleHandler.deserializeMetaData(portInfo);
}
LOG.info("Shuffle port returned by ContainerManager for "
+ taskAttemptID + " : " + port);
if(port < 0) {
this.state = ContainerState.FAILED;
throw new IllegalStateException("Invalid shuffle port number "
+ port + " returned for " + taskAttemptID);
}
// after launching, send launched event to task attempt to move
// it from ASSIGNED to RUNNING state
context.getEventHandler().handle(
new TaskAttemptContainerLaunchedEvent(taskAttemptID, port));
this.state = ContainerState.RUNNING;
} catch (Throwable t) {
String message = "Container launch failed for " + containerID + " : "
+ StringUtils.stringifyException(t);
this.state = ContainerState.FAILED;
sendContainerLaunchFailedMsg(taskAttemptID, message);
} finally {
if (proxy != null) {
cmProxy.mayBeCloseProxy(proxy);
}
}
}
@Before
public void setup() throws IOException {
serviceResponse.clear();
serviceResponse.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
ShuffleHandler.serializeMetaData(80));
}
@Override
public void serviceInit(Configuration conf) throws Exception {
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
"apps_staging_dir/").getAbsolutePath());
}
// By default, VMEM monitoring disabled, PMEM monitoring enabled.
if (!conf.getBoolean(
MRConfig.MAPREDUCE_MINICLUSTER_CONTROL_RESOURCE_MONITORING,
MRConfig.DEFAULT_MAPREDUCE_MINICLUSTER_CONTROL_RESOURCE_MONITORING)) {
conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false);
conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false);
}
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "000");
try {
Path stagingPath = FileContext.getFileContext(conf).makeQualified(
new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
/*
* Re-configure the staging path on Windows if the file system is localFs.
* We need to use a absolute path that contains the drive letter. The unit
* test could run on a different drive than the AM. We can run into the
* issue that job files are localized to the drive where the test runs on,
* while the AM starts on a different drive and fails to find the job
* metafiles. Using absolute path can avoid this ambiguity.
*/
if (Path.WINDOWS) {
if (LocalFileSystem.class.isInstance(stagingPath.getFileSystem(conf))) {
conf.set(MRJobConfig.MR_AM_STAGING_DIR,
new File(conf.get(MRJobConfig.MR_AM_STAGING_DIR))
.getAbsolutePath());
}
}
FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
if (fc.util().exists(stagingPath)) {
LOG.info(stagingPath + " exists! deleting...");
fc.delete(stagingPath, true);
}
LOG.info("mkdir: " + stagingPath);
//mkdir the staging directory so that right permissions are set while running as proxy user
fc.mkdir(stagingPath, null, true);
//mkdir done directory as well
String doneDir = JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
Path doneDirPath = fc.makeQualified(new Path(doneDir));
fc.mkdir(doneDirPath, null, true);
} catch (IOException e) {
throw new YarnRuntimeException("Could not create staging directory. ", e);
}
conf.set(MRConfig.MASTER_ADDRESS, "test"); // The default is local because of
// which shuffle doesn't happen
//configure the shuffle service in NM
conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID });
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class,
Service.class);
// Non-standard shuffle port
conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
DefaultContainerExecutor.class, ContainerExecutor.class);
// TestMRJobs is for testing non-uberized operation only; see TestUberAM
// for corresponding uberized tests.
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
super.serviceInit(conf);
}
@Override
public void serviceInit(Configuration conf) throws Exception {
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_TEZ_FRAMEWORK_NAME);
// blacklisting disabled to prevent scheduling issues
conf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
"apps_staging_dir" + Path.SEPARATOR).getAbsolutePath());
}
if (conf.get(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC) == null) {
// nothing defined. set quick delete value
conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0l);
}
File appJarLocalFile = new File(MiniTezCluster.APPJAR);
if (!appJarLocalFile.exists()) {
String message = "TezAppJar " + MiniTezCluster.APPJAR
+ " not found. Exiting.";
LOG.info(message);
throw new TezUncheckedException(message);
}
FileSystem fs = FileSystem.get(conf);
Path testRootDir = fs.makeQualified(new Path("target", getName() + "-tmpDir"));
Path appRemoteJar = new Path(testRootDir, "TezAppJar.jar");
// Copy AppJar and make it public.
Path appMasterJar = new Path(MiniTezCluster.APPJAR);
fs.copyFromLocalFile(appMasterJar, appRemoteJar);
fs.setPermission(appRemoteJar, new FsPermission("777"));
conf.set(TezConfiguration.TEZ_LIB_URIS, appRemoteJar.toUri().toString());
LOG.info("Set TEZ-LIB-URI to: " + conf.get(TezConfiguration.TEZ_LIB_URIS));
// VMEM monitoring disabled, PMEM monitoring enabled.
conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false);
conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false);
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "000");
try {
Path stagingPath = FileContext.getFileContext(conf).makeQualified(
new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
/*
* Re-configure the staging path on Windows if the file system is localFs.
* We need to use a absolute path that contains the drive letter. The unit
* test could run on a different drive than the AM. We can run into the
* issue that job files are localized to the drive where the test runs on,
* while the AM starts on a different drive and fails to find the job
* metafiles. Using absolute path can avoid this ambiguity.
*/
if (Path.WINDOWS) {
if (LocalFileSystem.class.isInstance(stagingPath.getFileSystem(conf))) {
conf.set(MRJobConfig.MR_AM_STAGING_DIR,
new File(conf.get(MRJobConfig.MR_AM_STAGING_DIR))
.getAbsolutePath());
}
}
FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
if (fc.util().exists(stagingPath)) {
LOG.info(stagingPath + " exists! deleting...");
fc.delete(stagingPath, true);
}
LOG.info("mkdir: " + stagingPath);
fc.mkdir(stagingPath, null, true);
//mkdir done directory as well
String doneDir =
JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
Path doneDirPath = fc.makeQualified(new Path(doneDir));
fc.mkdir(doneDirPath, null, true);
} catch (IOException e) {
throw new TezUncheckedException("Could not create staging directory. ", e);
}
conf.set(MRConfig.MASTER_ADDRESS, "test");
//configure the shuffle service in NM
conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID });
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class,
Service.class);
// Non-standard shuffle port
conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
DefaultContainerExecutor.class, ContainerExecutor.class);
// TestMRJobs is for testing non-uberized operation only; see TestUberAM
// for corresponding uberized tests.
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
super.serviceInit(conf);
}