下面列出了怎么用org.apache.hadoop.fs.LocalDirAllocator的API类实例代码及写法,或者点击链接到github查看源代码。
public MonitoringTimerTask(Configuration conf) throws YarnRuntimeException {
float maxUsableSpacePercentagePerDisk =
conf.getFloat(
YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE,
YarnConfiguration.DEFAULT_NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE);
long minFreeSpacePerDiskMB =
conf.getLong(YarnConfiguration.NM_MIN_PER_DISK_FREE_SPACE_MB,
YarnConfiguration.DEFAULT_NM_MIN_PER_DISK_FREE_SPACE_MB);
localDirs =
new DirectoryCollection(
validatePaths(conf
.getTrimmedStrings(YarnConfiguration.NM_LOCAL_DIRS)),
maxUsableSpacePercentagePerDisk, minFreeSpacePerDiskMB);
logDirs =
new DirectoryCollection(
validatePaths(conf.getTrimmedStrings(YarnConfiguration.NM_LOG_DIRS)),
maxUsableSpacePercentagePerDisk, minFreeSpacePerDiskMB);
localDirsAllocator = new LocalDirAllocator(
YarnConfiguration.NM_LOCAL_DIRS);
logDirsAllocator = new LocalDirAllocator(YarnConfiguration.NM_LOG_DIRS);
}
public void setConf(Configuration conf) {
if (conf instanceof JobConf) {
this.conf = (JobConf) conf;
} else {
this.conf = new JobConf(conf);
}
this.mapOutputFile = ReflectionUtils.newInstance(
conf.getClass(MRConfig.TASK_LOCAL_OUTPUT_CLASS,
MROutputFiles.class, MapOutputFile.class), conf);
this.lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR);
// add the static resolutions (this is required for the junit to
// work on testcases that simulate multiple nodes on a single physical
// node.
String hostToResolved[] = conf.getStrings(MRConfig.STATIC_RESOLUTIONS);
if (hostToResolved != null) {
for (String str : hostToResolved) {
String name = str.substring(0, str.indexOf('='));
String resolvedName = str.substring(str.indexOf('=') + 1);
NetUtils.addStaticResolution(name, resolvedName);
}
}
}
public MonitoringTimerTask(Configuration conf) throws YarnRuntimeException {
float maxUsableSpacePercentagePerDisk =
conf.getFloat(
YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE,
YarnConfiguration.DEFAULT_NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE);
long minFreeSpacePerDiskMB =
conf.getLong(YarnConfiguration.NM_MIN_PER_DISK_FREE_SPACE_MB,
YarnConfiguration.DEFAULT_NM_MIN_PER_DISK_FREE_SPACE_MB);
localDirs =
new DirectoryCollection(
validatePaths(conf
.getTrimmedStrings(YarnConfiguration.NM_LOCAL_DIRS)),
maxUsableSpacePercentagePerDisk, minFreeSpacePerDiskMB);
logDirs =
new DirectoryCollection(
validatePaths(conf.getTrimmedStrings(YarnConfiguration.NM_LOG_DIRS)),
maxUsableSpacePercentagePerDisk, minFreeSpacePerDiskMB);
localDirsAllocator = new LocalDirAllocator(
YarnConfiguration.NM_LOCAL_DIRS);
logDirsAllocator = new LocalDirAllocator(YarnConfiguration.NM_LOG_DIRS);
}
public void setConf(Configuration conf) {
if (conf instanceof JobConf) {
this.conf = (JobConf) conf;
} else {
this.conf = new JobConf(conf);
}
this.mapOutputFile = ReflectionUtils.newInstance(
conf.getClass(MRConfig.TASK_LOCAL_OUTPUT_CLASS,
MROutputFiles.class, MapOutputFile.class), conf);
this.lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR);
// add the static resolutions (this is required for the junit to
// work on testcases that simulate multiple nodes on a single physical
// node.
String hostToResolved[] = conf.getStrings(MRConfig.STATIC_RESOLUTIONS);
if (hostToResolved != null) {
for (String str : hostToResolved) {
String name = str.substring(0, str.indexOf('='));
String resolvedName = str.substring(str.indexOf('=') + 1);
NetUtils.addStaticResolution(name, resolvedName);
}
}
}
/**
* Default constructor
*
* @param account JOSS account object
* @param url URL connection
* @param contentTypeT content type
* @param metadataT input metadata
* @param connectionManager SwiftConnectionManager
* @param fsT SwiftAPIClient
* @throws IOException if error
*/
public SwiftNoStreamingOutputStream(JossAccount account, URL url, final String contentTypeT,
Map<String, String> metadataT, SwiftConnectionManager connectionManager,
SwiftAPIClient fsT)
throws IOException {
LOG.debug("SwiftNoStreamingOutputStream constructor entry for {}", url.toString());
mUrl = url;
contentType = contentTypeT;
mAccount = account;
scm = connectionManager;
metadata = metadataT;
try {
mBackupFile = fsT.createTmpFileForWrite("output-",
LocalDirAllocator.SIZE_UNKNOWN);
LOG.debug("OutputStream for key '{}' writing to tempfile: {}", mUrl.toString(), mBackupFile);
mBackupOutputStream = new BufferedOutputStream(new FileOutputStream(mBackupFile), 32768);
} catch (IOException e) {
LOG.error(e.getMessage());
throw e;
}
}
private ExternalSortExec(final TaskAttemptContext context, final SortNode plan)
throws PhysicalPlanningException {
super(context, plan.getInSchema(), plan.getOutSchema(), null, plan.getSortKeys());
this.plan = plan;
this.defaultFanout = context.getConf().getIntVar(ConfVars.EXECUTOR_EXTERNAL_SORT_FANOUT);
if (defaultFanout < 2) {
throw new PhysicalPlanningException(ConfVars.EXECUTOR_EXTERNAL_SORT_FANOUT.varname + " cannot be lower than 2");
}
// TODO - sort buffer and core num should be changed to use the allocated container resource.
this.sortBufferBytesNum = context.getQueryContext().getInt(SessionVars.EXTSORT_BUFFER_SIZE) * StorageUnit.MB;
this.allocatedCoreNum = context.getConf().getIntVar(ConfVars.EXECUTOR_EXTERNAL_SORT_THREAD_NUM);
this.localDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
this.localFS = new RawLocalFileSystem();
this.intermediateMeta = CatalogUtil.newTableMeta(BuiltinStorages.DRAW, context.getConf());
this.inputStats = new TableStats();
this.sortAlgorithm = getSortAlgorithm(context.getQueryContext(), sortSpecs);
LOG.info(sortAlgorithm.name() + " sort is selected");
}
public void cleanupTemporalDirectories() {
if (deletionService == null) return;
LocalDirAllocator lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
try {
Iterable<Path> iter = lDirAllocator.getAllLocalPathsToRead(".", systemConf);
FileSystem localFS = FileSystem.getLocal(systemConf);
for (Path path : iter) {
PathData[] items = PathData.expandAsGlob(localFS.makeQualified(new Path(path, "*")).toString(), systemConf);
ArrayList<Path> paths = new ArrayList<>();
for (PathData pd : items) {
paths.add(pd.path);
}
if (paths.size() == 0) continue;
deletionService.delete(null, paths.toArray(new Path[paths.size()]));
}
} catch (IOException e) {
LOG.error(e.getMessage(), e);
}
}
public HashShuffleAppenderManager(TajoConf systemConf) throws IOException {
this.systemConf = systemConf;
// initialize LocalDirAllocator
lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
// initialize DFS and LocalFileSystems
defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf);
localFS = FileSystem.getLocal(systemConf);
pageSize = systemConf.getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME) * StorageUnit.MB;
Iterable<Path> allLocalPath = lDirAllocator.getAllLocalPathsToRead(".", systemConf);
//add async hash shuffle writer
for (Path path : allLocalPath) {
temporalPaths.add(localFS.makeQualified(path).toString());
executors.put(temporalPaths.size() - 1, Executors.newSingleThreadExecutor());
}
}
private ExternalSortExec(final TaskAttemptContext context, final AbstractStorageManager sm, final SortNode plan)
throws PhysicalPlanningException {
super(context, plan.getInSchema(), plan.getOutSchema(), null, plan.getSortKeys());
this.plan = plan;
this.meta = CatalogUtil.newTableMeta(StoreType.ROWFILE);
this.defaultFanout = context.getConf().getIntVar(ConfVars.EXECUTOR_EXTERNAL_SORT_FANOUT);
if (defaultFanout < 2) {
throw new PhysicalPlanningException(ConfVars.EXECUTOR_EXTERNAL_SORT_FANOUT.varname + " cannot be lower than 2");
}
// TODO - sort buffer and core num should be changed to use the allocated container resource.
this.sortBufferBytesNum = context.getConf().getLongVar(ConfVars.EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE) * 1048576L;
this.allocatedCoreNum = context.getConf().getIntVar(ConfVars.EXECUTOR_EXTERNAL_SORT_THREAD_NUM);
this.executorService = Executors.newFixedThreadPool(this.allocatedCoreNum);
this.inMemoryTable = new ArrayList<Tuple>(100000);
this.sortTmpDir = getExecutorTmpDir();
localDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
localFS = new RawLocalFileSystem();
}
protected void cleanupTemporalDirectories() {
if(deletionService == null) return;
LocalDirAllocator lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
try {
Iterable<Path> iter = lDirAllocator.getAllLocalPathsToRead(".", systemConf);
FileSystem localFS = FileSystem.getLocal(systemConf);
for (Path path : iter){
PathData[] items = PathData.expandAsGlob(localFS.makeQualified(new Path(path, "*")).toString(), systemConf);
ArrayList<Path> paths = new ArrayList<Path>();
for (PathData pd : items){
paths.add(pd.path);
}
if(paths.size() == 0) continue;
deletionService.delete(null, paths.toArray(new Path[paths.size()]));
}
} catch (IOException e) {
LOG.error(e.getMessage(), e);
}
}
@SuppressWarnings("deprecation")
public CoronaJobTrackerRunner(
TaskTracker.TaskInProgress tip, Task task, TaskTracker tracker,
JobConf ttConf, CoronaSessionInfo info) throws IOException {
super(tip, task, tracker, ttConf);
this.coronaSessionInfo = info;
LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
workDir = new File(lDirAlloc.getLocalPathForWrite(
TaskTracker.getLocalTaskDir(
task.getJobID().toString(),
task.getTaskID().toString(),
task.isTaskCleanupTask())
+ Path.SEPARATOR + MRConstants.WORKDIR,
conf). toString());
if (!workDir.mkdirs()) {
if (!workDir.isDirectory()) {
throw new IOException("Mkdirs failed to create " + workDir.toString());
}
}
localizeTaskConfiguration(tracker, ttConf, workDir.toString(), task, task
.getJobID());
}
MapOutput(InputAttemptIdentifier attemptIdentifier, MergeManager merger, long size,
Configuration conf, LocalDirAllocator localDirAllocator,
int fetcher, boolean primaryMapOutput,
TezTaskOutputFiles mapOutputFile)
throws IOException {
this.id = ID.incrementAndGet();
this.attemptIdentifier = attemptIdentifier;
this.merger = merger;
type = Type.DISK;
memory = null;
byteStream = null;
this.size = size;
this.localFS = FileSystem.getLocal(conf);
outputPath =
mapOutputFile.getInputFileForWrite(this.attemptIdentifier.getInputIdentifier().getInputIndex(), size);
tmpOutputPath = outputPath.suffix(String.valueOf(fetcher));
disk = localFS.create(tmpOutputPath);
this.primaryMapOutput = primaryMapOutput;
}
public DiskFetchedInput(long compressedSize,
InputAttemptIdentifier inputAttemptIdentifier,
FetchedInputCallback callbackHandler, Configuration conf,
LocalDirAllocator localDirAllocator, TezTaskOutputFiles filenameAllocator)
throws IOException {
super(inputAttemptIdentifier, callbackHandler);
this.size = compressedSize;
this.localFS = FileSystem.getLocal(conf).getRaw();
this.outputPath = filenameAllocator.getInputFileForWrite(
this.getInputAttemptIdentifier().getInputIdentifier(), this
.getInputAttemptIdentifier().getSpillEventId(), this.size);
// Files are not clobbered due to the id being appended to the outputPath in the tmpPath,
// otherwise fetches for the same task but from different attempts would clobber each other.
this.tmpOutputPath = outputPath.suffix(String.valueOf(getId()));
}
@Before
public void setup() throws IOException {
conf = new Configuration();
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS, SorterImpl.LEGACY.name()); // DefaultSorter
conf.set("fs.defaultFS", "file:///");
localFs = FileSystem.getLocal(conf);
workingDir = new Path(
new Path(System.getProperty("test.build.data", "/tmp")),
TestDefaultSorter.class.getName())
.makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
String localDirs = workingDir.toString();
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName());
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS,
HashPartitioner.class.getName());
conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDirs);
dirAllocator = new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
}
public void setConf(Configuration conf) {
if (conf instanceof JobConf) {
this.conf = (JobConf) conf;
} else {
this.conf = new JobConf(conf);
}
this.mapOutputFile.setConf(this.conf);
this.lDirAlloc = new LocalDirAllocator("mapred.local.dir");
// add the static resolutions (this is required for the junit to
// work on testcases that simulate multiple nodes on a single physical
// node.
String hostToResolved[] = conf.getStrings("hadoop.net.static.resolutions");
if (hostToResolved != null) {
for (String str : hostToResolved) {
String name = str.substring(0, str.indexOf('='));
String resolvedName = str.substring(str.indexOf('=') + 1);
NetUtils.addStaticResolution(name, resolvedName);
}
}
}
public Context(org.apache.hadoop.mapreduce.TaskAttemptID reduceId,
JobConf jobConf, FileSystem localFS,
TaskUmbilicalProtocol umbilical,
LocalDirAllocator localDirAllocator,
Reporter reporter, CompressionCodec codec,
Class<? extends Reducer> combinerClass,
CombineOutputCollector<K,V> combineCollector,
Counters.Counter spilledRecordsCounter,
Counters.Counter reduceCombineInputCounter,
Counters.Counter shuffledMapsCounter,
Counters.Counter reduceShuffleBytes,
Counters.Counter failedShuffleCounter,
Counters.Counter mergedMapOutputsCounter,
TaskStatus status, Progress copyPhase, Progress mergePhase,
Task reduceTask, MapOutputFile mapOutputFile,
Map<TaskAttemptID, MapOutputFile> localMapFiles) {
this.reduceId = reduceId;
this.jobConf = jobConf;
this.localFS = localFS;
this. umbilical = umbilical;
this.localDirAllocator = localDirAllocator;
this.reporter = reporter;
this.codec = codec;
this.combinerClass = combinerClass;
this.combineCollector = combineCollector;
this.spilledRecordsCounter = spilledRecordsCounter;
this.reduceCombineInputCounter = reduceCombineInputCounter;
this.shuffledMapsCounter = shuffledMapsCounter;
this.reduceShuffleBytes = reduceShuffleBytes;
this.failedShuffleCounter = failedShuffleCounter;
this.mergedMapOutputsCounter = mergedMapOutputsCounter;
this.status = status;
this.copyPhase = copyPhase;
this.mergePhase = mergePhase;
this.reduceTask = reduceTask;
this.mapOutputFile = mapOutputFile;
this.localMapFiles = localMapFiles;
}
@Test
/**
* A testing method verifying availability and accessibility of API needed for
* AuxiliaryService(s) which are "Shuffle-Providers" (ShuffleHandler and 3rd party plugins)
*/
public void testProviderApi() {
LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class);
JobConf mockJobConf = mock(JobConf.class);
try {
mockLocalDirAllocator.getLocalPathToRead("", mockJobConf);
}
catch (Exception e) {
assertTrue("Threw exception:" + e, false);
}
}
public Context(org.apache.hadoop.mapreduce.TaskAttemptID reduceId,
JobConf jobConf, FileSystem localFS,
TaskUmbilicalProtocol umbilical,
LocalDirAllocator localDirAllocator,
Reporter reporter, CompressionCodec codec,
Class<? extends Reducer> combinerClass,
CombineOutputCollector<K,V> combineCollector,
Counters.Counter spilledRecordsCounter,
Counters.Counter reduceCombineInputCounter,
Counters.Counter shuffledMapsCounter,
Counters.Counter reduceShuffleBytes,
Counters.Counter failedShuffleCounter,
Counters.Counter mergedMapOutputsCounter,
TaskStatus status, Progress copyPhase, Progress mergePhase,
Task reduceTask, MapOutputFile mapOutputFile,
Map<TaskAttemptID, MapOutputFile> localMapFiles) {
this.reduceId = reduceId;
this.jobConf = jobConf;
this.localFS = localFS;
this. umbilical = umbilical;
this.localDirAllocator = localDirAllocator;
this.reporter = reporter;
this.codec = codec;
this.combinerClass = combinerClass;
this.combineCollector = combineCollector;
this.spilledRecordsCounter = spilledRecordsCounter;
this.reduceCombineInputCounter = reduceCombineInputCounter;
this.shuffledMapsCounter = shuffledMapsCounter;
this.reduceShuffleBytes = reduceShuffleBytes;
this.failedShuffleCounter = failedShuffleCounter;
this.mergedMapOutputsCounter = mergedMapOutputsCounter;
this.status = status;
this.copyPhase = copyPhase;
this.mergePhase = mergePhase;
this.reduceTask = reduceTask;
this.mapOutputFile = mapOutputFile;
this.localMapFiles = localMapFiles;
}
@Test
/**
* A testing method verifying availability and accessibility of API needed for
* AuxiliaryService(s) which are "Shuffle-Providers" (ShuffleHandler and 3rd party plugins)
*/
public void testProviderApi() {
LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class);
JobConf mockJobConf = mock(JobConf.class);
try {
mockLocalDirAllocator.getLocalPathToRead("", mockJobConf);
}
catch (Exception e) {
assertTrue("Threw exception:" + e, false);
}
}
@VisibleForTesting
public LocalFetcher(TajoConf conf, URI uri, String tableName) throws IOException {
super(conf, uri);
this.maxUrlLength = conf.getIntVar(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH);
this.tableName = tableName;
this.localFileSystem = new LocalFileSystem();
this.localDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
this.pullServerService = null;
String scheme = uri.getScheme() == null ? "http" : uri.getScheme();
this.host = uri.getHost() == null ? "localhost" : uri.getHost();
this.port = uri.getPort();
if (port == -1) {
if (scheme.equalsIgnoreCase("http")) {
this.port = 80;
} else if (scheme.equalsIgnoreCase("https")) {
this.port = 443;
}
}
bootstrap = new Bootstrap()
.group(
NettyUtils.getSharedEventLoopGroup(NettyUtils.GROUP.FETCHER,
conf.getIntVar(ConfVars.SHUFFLE_RPC_CLIENT_WORKER_THREAD_NUM)))
.channel(NioSocketChannel.class)
.option(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
conf.getIntVar(ConfVars.SHUFFLE_FETCHER_CONNECT_TIMEOUT) * 1000)
.option(ChannelOption.SO_RCVBUF, 1048576) // set 1M
.option(ChannelOption.TCP_NODELAY, true);
}
public void cleanup(String strPath) {
if (deletionService == null) return;
LocalDirAllocator lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
try {
Iterable<Path> iter = lDirAllocator.getAllLocalPathsToRead(strPath, systemConf);
FileSystem localFS = FileSystem.getLocal(systemConf);
for (Path path : iter) {
deletionService.delete(localFS.makeQualified(path));
}
} catch (IOException e) {
LOG.error(e.getMessage(), e);
}
}
/**
* Retrieve meta information of file chunks which correspond to the requested URI.
* Only meta information for the file chunks which has non-zero length are retrieved.
*
* @param conf
* @param lDirAlloc
* @param localFS
* @param params
* @param gson
* @param indexReaderCache
* @param lowCacheHitCheckThreshold
* @return
* @throws IOException
* @throws ExecutionException
*/
public static List<String> getJsonMeta(final TajoConf conf,
final LocalDirAllocator lDirAlloc,
final FileSystem localFS,
final PullServerParams params,
final Gson gson,
final LoadingCache<IndexCacheKey, BSTIndexReader> indexReaderCache,
final int lowCacheHitCheckThreshold)
throws IOException, ExecutionException {
final List<String> taskIds = PullServerUtil.splitMaps(params.taskAttemptIds());
final Path queryBaseDir = PullServerUtil.getBaseOutputDir(params.queryId(), params.ebId());
final List<String> jsonMetas = new ArrayList<>();
for (String eachTaskId : taskIds) {
Path outputPath = StorageUtil.concatPath(queryBaseDir, eachTaskId, "output");
if (!lDirAlloc.ifExists(outputPath.toString(), conf)) {
LOG.warn("Range shuffle - file not exist. " + outputPath);
continue;
}
Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(outputPath.toString(), conf));
FileChunkMeta meta;
meta = PullServerUtil.searchFileChunkMeta(params.queryId(), params.ebId(), eachTaskId, path,
params.startKey(), params.endKey(), params.last(), indexReaderCache, lowCacheHitCheckThreshold);
if (meta != null && meta.getLength() > 0) {
String jsonStr = gson.toJson(meta, FileChunkMeta.class);
jsonMetas.add(jsonStr);
}
}
return jsonMetas;
}
@Override
public void init(Configuration conf) {
this.systemConf = (TajoConf)conf;
try {
// initialize DFS and LocalFileSystems
defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(conf);
localFS = FileSystem.getLocal(conf);
// the base dir for an output dir
baseDir = queryId.toString() + "/output" + "/" + executionBlockId.getId();
// initialize LocalDirAllocator
lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
baseDirPath = localFS.makeQualified(lDirAllocator.getLocalPathForWrite(baseDir, conf));
LOG.info("TaskRunner basedir is created (" + baseDir +")");
// Setup QueryEngine according to the query plan
// Here, we can setup row-based query engine or columnar query engine.
this.queryEngine = new TajoQueryEngine(systemConf);
} catch (Throwable t) {
t.printStackTrace();
LOG.error(t);
}
super.init(conf);
}
protected void cleanup(String strPath) {
if(deletionService == null) return;
LocalDirAllocator lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
try {
Iterable<Path> iter = lDirAllocator.getAllLocalPathsToRead(strPath, systemConf);
FileSystem localFS = FileSystem.getLocal(systemConf);
for (Path path : iter){
deletionService.delete(localFS.makeQualified(path));
}
} catch (IOException e) {
LOG.error(e.getMessage(), e);
}
}
public NettyMapOutputAttributes(
JobConf jobConf, TaskTracker taskTracker, FileSystem localFS,
LocalDirAllocator localDirAllocator,
ShuffleServerMetrics shuffleServerMetrics) {
this.taskTracker = taskTracker;
this.localFS = localFS;
this.localDirAllocator = localDirAllocator;
this.jobConf = jobConf;
this.shuffleServerMetrics = shuffleServerMetrics;
}
public void setConf(Configuration conf) {
if (conf instanceof JobConf) {
this.conf = (JobConf) conf;
} else {
this.conf = new JobConf(conf);
}
this.mapOutputFile.setConf(this.conf);
this.lDirAlloc = new LocalDirAllocator("mapred.local.dir");
loadStaticResolutions(conf);
}
protected void initNettyMapOutputHttpServer(JobConf conf) throws IOException {
int nettyHttpPort = conf.getInt(NETTY_MAPOUTPUT_HTTP_PORT, 0);
NettyMapOutputAttributes attributes = new NettyMapOutputAttributes(
conf, this, FileSystem.getLocal(conf),
new LocalDirAllocator("mapred.local.dir"), shuffleServerMetrics);
nettyMapOutputServer = new NettyMapOutputHttpServer(nettyHttpPort);
nettyMapOutputServer.init(conf);
shuffleServerMetrics.setNettyWorkerThreadPool(
nettyMapOutputServer.getWorkerThreadPool());
HttpMapOutputPipelineFactory pipelineFactory =
new HttpMapOutputPipelineFactory(attributes, nettyHttpPort);
this.nettyMapOutputHttpPort = nettyMapOutputServer.start(pipelineFactory);
}
protected void initHttpServer(JobConf conf,
boolean useNettyMapOutputs) throws IOException {
String infoAddr =
NetUtils.getServerAddress(conf,
"tasktracker.http.bindAddress",
"tasktracker.http.port",
"mapred.task.tracker.http.address");
InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
String httpBindAddress = infoSocAddr.getHostName();
int httpPort = infoSocAddr.getPort();
server = new HttpServer("task", httpBindAddress, httpPort,
httpPort == 0, conf);
workerThreads = conf.getInt("tasktracker.http.threads", 40);
server.setThreads(1, workerThreads);
// let the jsp pages get to the task tracker, config, and other relevant
// objects
FileSystem local = FileSystem.getLocal(conf);
this.localDirAllocator = new LocalDirAllocator("mapred.local.dir");
server.setAttribute("task.tracker", this);
server.setAttribute("local.file.system", local);
server.setAttribute("conf", conf);
server.setAttribute("log", LOG);
server.setAttribute("localDirAllocator", localDirAllocator);
server.setAttribute("shuffleServerMetrics", shuffleServerMetrics);
server.setAttribute(ReconfigurationServlet.
CONF_SERVLET_RECONFIGURABLE_PREFIX + "/ttconfchange",
TaskTracker.this);
server.setAttribute("nettyMapOutputHttpPort", nettyMapOutputHttpPort);
server.addInternalServlet("reconfiguration", "/ttconfchange",
ReconfigurationServlet.class);
server.addInternalServlet(
"mapOutput", "/mapOutput", MapOutputServlet.class);
server.addInternalServlet("taskLog", "/tasklog", TaskLogServlet.class);
server.start();
this.httpPort = server.getPort();
checkJettyPort();
}
public DiskFetchedInput(long actualSize, long compressedSize,
InputAttemptIdentifier inputAttemptIdentifier,
FetchedInputCallback callbackHandler, Configuration conf,
LocalDirAllocator localDirAllocator, TezTaskOutputFiles filenameAllocator)
throws IOException {
super(Type.DISK, actualSize, compressedSize, inputAttemptIdentifier, callbackHandler);
this.localFS = FileSystem.getLocal(conf);
this.outputPath = filenameAllocator.getInputFileForWrite(
this.inputAttemptIdentifier.getInputIdentifier().getInputIndex(), actualSize);
// Files are not clobbered due to the id being appended to the outputPath in the tmpPath,
// otherwise fetches for the same task but from different attempts would clobber each other.
this.tmpOutputPath = outputPath.suffix(String.valueOf(id));
}
@VisibleForTesting
//TODO: Refactor following to make use of methods from TezTaskOutputFiles to be consistent.
protected Path getShuffleInputFileName(String pathComponent, String suffix)
throws IOException {
LocalDirAllocator localDirAllocator = new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
suffix = suffix != null ? suffix : "";
String outputPath = Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + Path.SEPARATOR +
pathComponent + Path.SEPARATOR +
Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING + suffix;
String pathFromLocalDir = getPathForLocalDir(outputPath);
return localDirAllocator.getLocalPathToRead(pathFromLocalDir.toString(), conf);
}