下面列出了怎么用org.apache.hadoop.fs.ContentSummary的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public ContentSummary getContentSummary(Path f) throws IOException {
Map<String, String> params = new HashMap<String, String>();
params.put(OP_PARAM, Operation.GETCONTENTSUMMARY.toString());
HttpURLConnection conn =
getConnection(Operation.GETCONTENTSUMMARY.getMethod(), params, f, true);
HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
JSONObject json = (JSONObject) ((JSONObject)
HttpFSUtils.jsonParse(conn)).get(CONTENT_SUMMARY_JSON);
return new ContentSummary.Builder().
length((Long) json.get(CONTENT_SUMMARY_LENGTH_JSON)).
fileCount((Long) json.get(CONTENT_SUMMARY_FILE_COUNT_JSON)).
directoryCount((Long) json.get(CONTENT_SUMMARY_DIRECTORY_COUNT_JSON)).
quota((Long) json.get(CONTENT_SUMMARY_QUOTA_JSON)).
spaceConsumed((Long) json.get(CONTENT_SUMMARY_SPACE_CONSUMED_JSON)).
spaceQuota((Long) json.get(CONTENT_SUMMARY_SPACE_QUOTA_JSON)).build();
}
/**
* Finds files inside directories recusively and add to fileStatusList
* @param job refers to JobContext that is being used to read the configurations of the job that ran
* @param minSize refers to the minimum file block size.
* @param maxSize refers to the maximum file block size.
* @param splits refers to a list of splits that are being generated.
* @param fileStatusList list of FileStatus
* @throws IOException Signals that an I/O exception has occurred.
*/
public void setData(JobContext job, long minSize, long maxSize,
List<InputSplit> splits, List<FileStatus> fileStatusList) throws IOException {
for(FileStatus file:fileStatusList) {
if (file.isDirectory()) {
Path dirPath = file.getPath();
FileStatus [] fileArray = dirPath.getFileSystem(job.getConfiguration()).listStatus(dirPath);
setData(job, minSize, maxSize, splits, Arrays.asList(fileArray));
} else {
//Checking whether file is empty or not
Path path = file.getPath();
FileSystem fs = path.getFileSystem(job.getConfiguration());
ContentSummary cs = fs.getContentSummary(path);
if (cs.getLength() > 0) {
generateSplits(job, minSize, maxSize, splits, file);
}
}
}
}
/**
* Compute {@link ContentSummary}.
*/
public final ContentSummary computeAndConvertContentSummary(
ContentSummaryComputationContext summary) {
ContentCounts counts = computeContentSummary(summary).getCounts();
final QuotaCounts q = getQuotaCounts();
return new ContentSummary.Builder().
length(counts.getLength()).
fileCount(counts.getFileCount() + counts.getSymlinkCount()).
directoryCount(counts.getDirectoryCount()).
quota(q.getNameSpace()).
spaceConsumed(counts.getStoragespace()).
spaceQuota(q.getStorageSpace()).
typeConsumed(counts.getTypeSpaces()).
typeQuota(q.getTypeSpaces().asArray()).
build();
}
public static ContentSummary convert(ContentSummaryProto cs) {
if (cs == null) return null;
ContentSummary.Builder builder = new ContentSummary.Builder();
builder.length(cs.getLength()).
fileCount(cs.getFileCount()).
directoryCount(cs.getDirectoryCount()).
quota(cs.getQuota()).
spaceConsumed(cs.getSpaceConsumed()).
spaceQuota(cs.getSpaceQuota());
if (cs.hasTypeQuotaInfos()) {
for (HdfsProtos.StorageTypeQuotaInfoProto info :
cs.getTypeQuotaInfos().getTypeQuotaInfoList()) {
StorageType type = PBHelper.convertStorageType(info.getType());
builder.typeConsumed(type, info.getConsumed());
builder.typeQuota(type, info.getQuota());
}
}
return builder.build();
}
public static ContentSummaryProto convert(ContentSummary cs) {
if (cs == null) return null;
ContentSummaryProto.Builder builder = ContentSummaryProto.newBuilder();
builder.setLength(cs.getLength()).
setFileCount(cs.getFileCount()).
setDirectoryCount(cs.getDirectoryCount()).
setQuota(cs.getQuota()).
setSpaceConsumed(cs.getSpaceConsumed()).
setSpaceQuota(cs.getSpaceQuota());
if (cs.isTypeQuotaSet() || cs.isTypeConsumedAvailable()) {
HdfsProtos.StorageTypeQuotaInfosProto.Builder isb =
HdfsProtos.StorageTypeQuotaInfosProto.newBuilder();
for (StorageType t: StorageType.getTypesSupportingQuota()) {
HdfsProtos.StorageTypeQuotaInfoProto info =
HdfsProtos.StorageTypeQuotaInfoProto.newBuilder().
setType(convertStorageType(t)).
setConsumed(cs.getTypeConsumed(t)).
setQuota(cs.getTypeQuota(t)).
build();
isb.addTypeQuotaInfo(info);
}
builder.setTypeQuotaInfos(isb);
}
return builder.build();
}
@Test
public void testTruncate() throws Exception {
final short repl = 3;
final int blockSize = 1024;
final int numOfBlocks = 2;
Path dir = getTestRootPath(fSys, "test/hadoop");
Path file = getTestRootPath(fSys, "test/hadoop/file");
final byte[] data = getFileData(numOfBlocks, blockSize);
createFile(fSys, file, data, blockSize, repl);
final int newLength = blockSize;
boolean isReady = fSys.truncate(file, newLength);
Assert.assertTrue("Recovery is not expected.", isReady);
FileStatus fileStatus = fSys.getFileStatus(file);
Assert.assertEquals(fileStatus.getLen(), newLength);
AppendTestUtil.checkFullFile(fSys, file, newLength, data, file.toString());
ContentSummary cs = fSys.getContentSummary(dir);
Assert.assertEquals("Bad disk space usage", cs.getSpaceConsumed(),
newLength * repl);
Assert.assertTrue("Deleted", fSys.delete(dir, true));
}
private int calReducerNum(Path input) {
try {
long bytesPerReducer = DEFAULT_SIZE_PER_REDUCER;
FileSystem fs = FileSystem.get(job.getConfiguration());
ContentSummary cs = fs.getContentSummary(input);
long totalInputFileSize = cs.getLength();
int reducers = (int) ((totalInputFileSize + bytesPerReducer - 1) / bytesPerReducer);
reducers = Math.max(1, reducers);
reducers = Math.min(MAX_REDUCERS, reducers);
logger.info("BytesPerReducer={}, maxReducers={}, totalInputFileSize={}, setReducers={}", bytesPerReducer,
MAX_REDUCERS, totalInputFileSize, reducers);
return reducers;
} catch (IOException e) {
logger.error("error when calculate reducer number", e);
}
return 1;
}
@Test
public void stagePluginsForCache() throws Exception {
DistributedCacheUtilImpl ch = new DistributedCacheUtilImpl();
Configuration conf = new Configuration();
FileSystem fs = DistributedCacheTestUtil.getLocalFileSystem( conf );
Path pluginsDir = new Path( "bin/test/plugins-installation-dir" );
FileObject pluginDir = DistributedCacheTestUtil.createTestFolderWithContent();
try {
ch.stagePluginsForCache( fs, pluginsDir, "bin/test/sample-folder" );
Path pluginInstallPath = new Path( pluginsDir, "bin/test/sample-folder" );
assertTrue( fs.exists( pluginInstallPath ) );
ContentSummary summary = fs.getContentSummary( pluginInstallPath );
assertEquals( 6, summary.getFileCount() );
assertEquals( 9, summary.getDirectoryCount() );
} finally {
pluginDir.delete( new AllFileSelector() );
fs.delete( pluginsDir, true );
}
}
@Test(timeout = 60000)
public void testContentSummaryWithoutQuotaByStorageType() throws Exception {
final Path foo = new Path(dir, "foo");
Path createdFile1 = new Path(foo, "created_file1.data");
dfs.mkdirs(foo);
// set storage policy on directory "foo" to ONESSD
dfs.setStoragePolicy(foo, HdfsConstants.ONESSD_STORAGE_POLICY_NAME);
INode fnode = fsdir.getINode4Write(foo.toString());
assertTrue(fnode.isDirectory());
assertTrue(!fnode.isQuotaSet());
// Create file of size 2 * BLOCKSIZE under directory "foo"
long file1Len = BLOCKSIZE * 2;
int bufLen = BLOCKSIZE / 16;
DFSTestUtil.createFile(dfs, createdFile1, bufLen, file1Len, BLOCKSIZE, REPLICATION, seed);
// Verify getContentSummary without any quota set
ContentSummary cs = dfs.getContentSummary(foo);
assertEquals(cs.getSpaceConsumed(), file1Len * REPLICATION);
assertEquals(cs.getTypeConsumed(StorageType.SSD), file1Len);
assertEquals(cs.getTypeConsumed(StorageType.DISK), file1Len * 2);
}
@Test(timeout = 60000)
public void testContentSummaryWithoutStoragePolicy() throws Exception {
final Path foo = new Path(dir, "foo");
Path createdFile1 = new Path(foo, "created_file1.data");
dfs.mkdirs(foo);
INode fnode = fsdir.getINode4Write(foo.toString());
assertTrue(fnode.isDirectory());
assertTrue(!fnode.isQuotaSet());
// Create file of size 2 * BLOCKSIZE under directory "foo"
long file1Len = BLOCKSIZE * 2;
int bufLen = BLOCKSIZE / 16;
DFSTestUtil.createFile(dfs, createdFile1, bufLen, file1Len, BLOCKSIZE, REPLICATION, seed);
// Verify getContentSummary without any quota set
// Expect no type quota and usage information available
ContentSummary cs = dfs.getContentSummary(foo);
assertEquals(cs.getSpaceConsumed(), file1Len * REPLICATION);
for (StorageType t : StorageType.values()) {
assertEquals(cs.getTypeConsumed(t), 0);
assertEquals(cs.getTypeQuota(t), -1);
}
}
@Override
public ContentSummary getContentSummary(Path f) throws IOException {
Map<String, String> params = new HashMap<String, String>();
params.put(OP_PARAM, Operation.GETCONTENTSUMMARY.toString());
HttpURLConnection conn =
getConnection(Operation.GETCONTENTSUMMARY.getMethod(), params, f, true);
HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
JSONObject json = (JSONObject) ((JSONObject)
HttpFSUtils.jsonParse(conn)).get(CONTENT_SUMMARY_JSON);
return new ContentSummary.Builder().
length((Long) json.get(CONTENT_SUMMARY_LENGTH_JSON)).
fileCount((Long) json.get(CONTENT_SUMMARY_FILE_COUNT_JSON)).
directoryCount((Long) json.get(CONTENT_SUMMARY_DIRECTORY_COUNT_JSON)).
quota((Long) json.get(CONTENT_SUMMARY_QUOTA_JSON)).
spaceConsumed((Long) json.get(CONTENT_SUMMARY_SPACE_CONSUMED_JSON)).
spaceQuota((Long) json.get(CONTENT_SUMMARY_SPACE_QUOTA_JSON)).build();
}
/** {@inheritDoc} */
@Override public ContentSummary getContentSummary(Path f) throws IOException {
A.notNull(f, "f");
enterBusy();
try {
IgfsPathSummary sum = rmtClient.contentSummary(convert(f));
return new ContentSummary(sum.totalLength(), sum.filesCount(), sum.directoriesCount(),
-1, sum.totalLength(), rmtClient.fsStatus().spaceTotal());
}
finally {
leaveBusy();
}
}
/**
* Utility to attempt to stage a file to HDFS for use with Distributed Cache.
*
* @param ch Distributed Cache Helper
* @param source File or directory to stage
* @param fs FileSystem to stage to
* @param root Root directory to clean up when this test is complete
* @param dest Destination path to stage to
* @param expectedFileCount Expected number of files to exist in the destination once staged
* @param expectedDirCount Expected number of directories to exist in the destiation once staged
* @throws Exception
*/
static void stageForCacheTester( DistributedCacheUtilImpl ch, FileObject source, FileSystem fs, Path root, Path dest,
int expectedFileCount, int expectedDirCount ) throws Exception {
try {
ch.stageForCache( source, fs, dest, true );
assertTrue( fs.exists( dest ) );
ContentSummary cs = fs.getContentSummary( dest );
assertEquals( expectedFileCount, cs.getFileCount() );
assertEquals( expectedDirCount, cs.getDirectoryCount() );
assertEquals( FsPermission.createImmutable( (short) 0755 ), fs.getFileStatus( dest ).getPermission() );
} finally {
// Clean up after ourself
if ( !fs.delete( root, true ) ) {
log.logError( "error deleting FileSystem temp dir " + root );
}
}
}
private void verifyResultStats(Optional<TajoResultSetBase[]> existing, long numRows) throws Exception {
assertTrue(existing.isPresent());
// Get TableStats using TajoResultSetBase.
TajoResultSetBase[] resultSet = existing.get();
QueryId qid = resultSet[0].getQueryId();
QueryInfo queryInfo = testingCluster.getMaster().getContext().getQueryJobManager().getFinishedQuery(qid);
TableDesc desc = queryInfo.getResultDesc();
TableStats stats = desc.getStats();
// Compare specified number of rows to the number of rows in TableStats.
assertEquals(numRows, stats.getNumRows().longValue());
// Compare the volume number of directRaw to the number of rows in TableStats.
FileSystem fs = FileSystem.get(conf);
Path path = new Path(desc.getUri());
assertTrue(fs.exists(path));
ContentSummary summary = fs.getContentSummary(path);
assertEquals(summary.getLength(), stats.getNumBytes().longValue());
closeResultSets(resultSet);
}
public static ContentSummaryProto convert(ContentSummary cs) {
if (cs == null) return null;
ContentSummaryProto.Builder builder = ContentSummaryProto.newBuilder();
builder.setLength(cs.getLength()).
setFileCount(cs.getFileCount()).
setDirectoryCount(cs.getDirectoryCount()).
setQuota(cs.getQuota()).
setSpaceConsumed(cs.getSpaceConsumed()).
setSpaceQuota(cs.getSpaceQuota());
if (cs.isTypeQuotaSet() || cs.isTypeConsumedAvailable()) {
HdfsProtos.StorageTypeQuotaInfosProto.Builder isb =
HdfsProtos.StorageTypeQuotaInfosProto.newBuilder();
for (StorageType t: StorageType.getTypesSupportingQuota()) {
HdfsProtos.StorageTypeQuotaInfoProto info =
HdfsProtos.StorageTypeQuotaInfoProto.newBuilder().
setType(convertStorageType(t)).
setConsumed(cs.getTypeConsumed(t)).
setQuota(cs.getTypeQuota(t)).
build();
isb.addTypeQuotaInfo(info);
}
builder.setTypeQuotaInfos(isb);
}
return builder.build();
}
private long getFileSize(String hdfsUrl) throws IOException {
Configuration configuration = new Configuration();
Path path = new Path(hdfsUrl);
FileSystem fs = path.getFileSystem(configuration);
ContentSummary contentSummary = fs.getContentSummary(path);
long length = contentSummary.getLength();
return length;
}
private void cleanUnusedHdfsFiles() throws IOException {
UnusedHdfsFileCollector collector = new UnusedHdfsFileCollector();
collectUnusedHdfsFiles(collector);
if (collector.list.isEmpty()) {
logger.info("No HDFS files to clean up");
return;
}
long garbageBytes = 0;
List<String> garbageList = new ArrayList<>();
for (Pair<FileSystem, String> entry : collector.list) {
FileSystem fs = entry.getKey();
String path = entry.getValue();
try {
garbageList.add(path);
ContentSummary sum = fs.getContentSummary(new Path(path));
if (sum != null)
garbageBytes += sum.getLength();
if (delete) {
logger.info("Deleting HDFS path " + path);
fs.delete(new Path(path), true);
} else {
logger.info("Dry run, pending delete HDFS path " + path);
}
} catch (IOException e) {
logger.error("Error dealing unused HDFS path " + path, e);
}
}
hdfsGarbageFileBytes = garbageBytes;
hdfsGarbageFiles = garbageList;
}
public Repartitioner(int shardSize, int fileLengthThreshold, long totalRowCount, long rowCountThreshold,
ContentSummary contentSummary, List<Integer> shardByColumns) {
this.shardSize = shardSize;
this.fileLengthThreshold = fileLengthThreshold;
this.totalRowCount = totalRowCount;
this.rowCountThreshold = rowCountThreshold;
this.contentSummary = contentSummary;
if (shardByColumns != null) {
this.shardByColumns = shardByColumns;
}
}
@Override
public ContentSummary getContentSummary(final String src) throws IOException {
return (new ImmutableFSCaller<ContentSummary>() {
ContentSummary call() throws IOException {
return namenode.getContentSummary(src);
}
}).callFS();
}
@Test(timeout = 60000)
public void testQuotaByStorageTypeWithFileCreateTruncate() throws Exception {
final Path foo = new Path(dir, "foo");
Path createdFile1 = new Path(foo, "created_file1.data");
dfs.mkdirs(foo);
// set storage policy on directory "foo" to ONESSD
dfs.setStoragePolicy(foo, HdfsConstants.ONESSD_STORAGE_POLICY_NAME);
// set quota by storage type on directory "foo"
dfs.setQuotaByStorageType(foo, StorageType.SSD, BLOCKSIZE * 4);
INode fnode = fsdir.getINode4Write(foo.toString());
assertTrue(fnode.isDirectory());
assertTrue(fnode.isQuotaSet());
// Create file of size 2 * BLOCKSIZE under directory "foo"
long file1Len = BLOCKSIZE * 2;
int bufLen = BLOCKSIZE / 16;
DFSTestUtil.createFile(dfs, createdFile1, bufLen, file1Len, BLOCKSIZE, REPLICATION, seed);
// Verify SSD consumed before truncate
long ssdConsumed = fnode.asDirectory().getDirectoryWithQuotaFeature()
.getSpaceConsumed().getTypeSpaces().get(StorageType.SSD);
assertEquals(file1Len, ssdConsumed);
// Truncate file to 1 * BLOCKSIZE
int newFile1Len = BLOCKSIZE * 1;
dfs.truncate(createdFile1, newFile1Len);
// Verify SSD consumed after truncate
ssdConsumed = fnode.asDirectory().getDirectoryWithQuotaFeature()
.getSpaceConsumed().getTypeSpaces().get(StorageType.SSD);
assertEquals(newFile1Len, ssdConsumed);
ContentSummary cs = dfs.getContentSummary(foo);
assertEquals(cs.getSpaceConsumed(), newFile1Len * REPLICATION);
assertEquals(cs.getTypeConsumed(StorageType.SSD), newFile1Len);
assertEquals(cs.getTypeConsumed(StorageType.DISK), newFile1Len * 2);
}
/**
* Call to perform analysis across set of cached dirs.
*
* @param nnLoader the namenodeLoader of the NNA instance
* @param countMap map of dir -> file count to add to
* @param diskspaceMap map of dir -> diskspace to add to
*/
public void analyze(
NameNodeLoader nnLoader, Map<String, Long> countMap, Map<String, Long> diskspaceMap) {
long start = System.currentTimeMillis();
/* Make an in-mem copy of the cachedDirs so we can parallelize the stream. */
if (cachedDirs.size() > 0) {
HashSet<String> inMemCachedDirsCopy = new HashSet<>(cachedDirs);
Map<String, ContentSummary> contentSummaries =
inMemCachedDirsCopy
.parallelStream()
.collect(Collectors.toMap(Function.identity(), nnLoader::getContentSummary));
for (Entry<String, ContentSummary> entry : contentSummaries.entrySet()) {
if (entry.getKey() == null || entry.getValue() == null) {
continue;
}
countMap.put(entry.getKey(), entry.getValue().getFileCount());
diskspaceMap.put(entry.getKey(), entry.getValue().getSpaceConsumed());
}
}
long end = System.currentTimeMillis();
LOG.info(
"Performed cached directory analysis using getContentSummary calls in: "
+ (end - start)
+ "ms.");
}
@Override
public long getOnDiskSize() throws IOException {
Path hdfsDirPath = _shardContext.getHdfsDirPath();
Configuration configuration = _tableContext.getConfiguration();
FileSystem fileSystem = hdfsDirPath.getFileSystem(configuration);
ContentSummary contentSummary = fileSystem.getContentSummary(hdfsDirPath);
return contentSummary.getLength();
}
ContentSummary getContentSummary(String src) throws IOException {
String srcs = normalizePath(src);
synchronized (rootDir) {
INode targetNode = rootDir.getNode(srcs);
if (targetNode == null) {
throw new FileNotFoundException("File does not exist: " + srcs);
}
else {
return targetNode.computeContentSummary();
}
}
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testMap() throws Exception {
TenantAndIdEmittableKey key = new TenantAndIdEmittableKey();
ValueMapper m = new MockValueMapper();
BSONObject entry = new BasicBSONObject("found", "data");
BSONWritable entity = new BSONWritable(entry);
Context context = Mockito.mock(Context.class);
PowerMockito.when(context, "write", Matchers.any(EmittableKey.class),
Matchers.any(BSONObject.class)).thenAnswer(new Answer<BSONObject>() {
@Override
public BSONObject answer(InvocationOnMock invocation) throws Throwable {
Object[] args = invocation.getArguments();
assertNotNull(args);
assertEquals(args.length, 2);
assertTrue(args[0] instanceof TenantAndIdEmittableKey);
assertTrue(args[1] instanceof ContentSummary);
TenantAndIdEmittableKey id = (TenantAndIdEmittableKey) args[0];
assertNotNull(id);
ContentSummary e = (ContentSummary) args[1];
assertEquals(e.getLength(), 1);
assertEquals(e.getFileCount(), 2);
assertEquals(e.getDirectoryCount(), 3);
return null;
}
});
m.map(key, entity, context);
}
/**
* Connect to the name node and get content summary.
* @param path The path
* @return The content summary for the path.
* @throws IOException
*/
private ContentSummary getContentSummary(String path) throws IOException {
final HttpURLConnection connection = openConnection(
"/contentSummary" + ServletUtil.encodePath(path),
"ugi=" + getEncodedUgiParameter());
InputStream in = null;
try {
in = connection.getInputStream();
final XMLReader xr = XMLReaderFactory.createXMLReader();
xr.setContentHandler(this);
xr.parse(new InputSource(in));
} catch(FileNotFoundException fnfe) {
//the server may not support getContentSummary
return null;
} catch(SAXException saxe) {
final Exception embedded = saxe.getException();
if (embedded != null && embedded instanceof IOException) {
throw (IOException)embedded;
}
throw new IOException("Invalid xml format", saxe);
} finally {
if (in != null) {
in.close();
}
connection.disconnect();
}
return contentsummary;
}
void HDFSGetContentSummary(final String dirName) throws Exception {
String subdirName = dirName + "/tmpdir";
createFile(subdirName, 1);
createFile(subdirName, 2);
UserGroupInformation ugi = UserGroupInformation.createUserForTesting("bob", new String[] {});
ugi.doAs(new PrivilegedExceptionAction<Void>() {
public Void run() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", defaultFs);
FileSystem fs = FileSystem.get(conf);
try {
// GetContentSummary on the directory dirName
ContentSummary contentSummary = fs.getContentSummary(new Path(dirName));
long directoryCount = contentSummary.getDirectoryCount();
Assert.assertTrue("Found unexpected number of directories; expected-count=3, actual-count=" + directoryCount, directoryCount == 3);
} catch (Exception e) {
Assert.fail("Failed to getContentSummary, exception=" + e);
}
fs.close();
return null;
}
});
}
@Override
public ContentSummary getContentSummary(final Path p) throws IOException {
statistics.incrementReadOps(1);
final HttpOpParam.Op op = GetOpParam.Op.GETCONTENTSUMMARY;
return new FsPathResponseRunner<ContentSummary>(op, p) {
@Override
ContentSummary decodeResponse(Map<?,?> json) {
return JsonUtil.toContentSummary(json);
}
}.run();
}
/**
* Get {@link ContentSummary} rooted at the specified directory.
* @param src The string representation of the path
*
* @see ClientProtocol#getContentSummary(String)
*/
ContentSummary getContentSummary(String src) throws IOException {
TraceScope scope = getPathTraceScope("getContentSummary", src);
try {
return namenode.getContentSummary(src);
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
FileNotFoundException.class,
UnresolvedPathException.class);
} finally {
scope.close();
}
}
private List<PartitionDescProto> getPartitionsWithContentsSummary(TajoConf conf, Path outputDir,
List<PartitionDescProto> partitions) throws IOException {
List<PartitionDescProto> finalPartitions = new ArrayList<>();
FileSystem fileSystem = outputDir.getFileSystem(conf);
for (PartitionDescProto partition : partitions) {
PartitionDescProto.Builder builder = partition.toBuilder();
Path partitionPath = new Path(outputDir, partition.getPath());
ContentSummary contentSummary = fileSystem.getContentSummary(partitionPath);
builder.setNumBytes(contentSummary.getLength());
finalPartitions.add(builder.build());
}
return finalPartitions;
}
/**
* make sure we update the quota correctly after concat
*/
@Test
public void testConcatWithQuotaDecrease() throws IOException {
final short srcRepl = 3; // note this is different with REPL_FACTOR
final int srcNum = 10;
final Path foo = new Path("/foo");
final Path[] srcs = new Path[srcNum];
final Path target = new Path(foo, "target");
DFSTestUtil.createFile(dfs, target, blockSize, REPL_FACTOR, 0L);
dfs.setQuota(foo, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1);
for (int i = 0; i < srcNum; i++) {
srcs[i] = new Path(foo, "src" + i);
DFSTestUtil.createFile(dfs, srcs[i], blockSize * 2, srcRepl, 0L);
}
ContentSummary summary = dfs.getContentSummary(foo);
Assert.assertEquals(11, summary.getFileCount());
Assert.assertEquals(blockSize * REPL_FACTOR +
blockSize * 2 * srcRepl * srcNum, summary.getSpaceConsumed());
dfs.concat(target, srcs);
summary = dfs.getContentSummary(foo);
Assert.assertEquals(1, summary.getFileCount());
Assert.assertEquals(
blockSize * REPL_FACTOR + blockSize * 2 * REPL_FACTOR * srcNum,
summary.getSpaceConsumed());
}