下面列出了org.apache.hadoop.fs.FileStatus#getLen ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Return an array containing hostnames, offset and size of
* portions of the given file. For WASB we'll just lie and give
* fake hosts to make sure we get many splits in MR jobs.
*/
@Override
public BlockLocation[] getFileBlockLocations(FileStatus file,
long start, long len) throws IOException {
if (file == null) {
return null;
}
if ((start < 0) || (len < 0)) {
throw new IllegalArgumentException("Invalid start or len parameter");
}
if (file.getLen() < start) {
return new BlockLocation[0];
}
final String blobLocationHost = getConf().get(
AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME,
AZURE_BLOCK_LOCATION_HOST_DEFAULT);
final String[] name = { blobLocationHost };
final String[] host = { blobLocationHost };
long blockSize = file.getBlockSize();
if (blockSize <= 0) {
throw new IllegalArgumentException(
"The block size for the given file is not a positive number: "
+ blockSize);
}
int numberOfLocations = (int) (len / blockSize)
+ ((len % blockSize == 0) ? 0 : 1);
BlockLocation[] locations = new BlockLocation[numberOfLocations];
for (int i = 0; i < locations.length; i++) {
long currentOffset = start + (i * blockSize);
long currentLength = Math.min(blockSize, start + len - currentOffset);
locations[i] = new BlockLocation(name, host, currentOffset, currentLength);
}
return locations;
}
/**
* Opens an FSDataInputStream at the indicated Path.
* @param f the file name to open
* @param bufferSize the size of the buffer to be used.
*/
public FSDataInputStream open(Path f, int bufferSize)
throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Opening '{}' for reading.", f);
}
final FileStatus fileStatus = getFileStatus(f);
if (fileStatus.isDirectory()) {
throw new FileNotFoundException("Can't open " + f + " because it is a directory");
}
return new FSDataInputStream(new S3AInputStream(bucket, pathToKey(f),
fileStatus.getLen(), s3, statistics));
}
/**
* @param mobRefs multimap of original table name -> mob hfile
*/
private void calculateMobLengthMap(SetMultimap<TableName, String> mobRefs) throws IOException {
FileSystem fs = store.getFileSystem();
HashMap<String, Long> map = mobLengthMap.get();
map.clear();
for (Entry<TableName, String> reference : mobRefs.entries()) {
final TableName table = reference.getKey();
final String mobfile = reference.getValue();
if (MobFileName.isOldMobFileName(mobfile)) {
disableIO.set(Boolean.TRUE);
}
List<Path> locations = mobStore.getLocations(table);
for (Path p : locations) {
try {
FileStatus st = fs.getFileStatus(new Path(p, mobfile));
long size = st.getLen();
LOG.debug("Referenced MOB file={} size={}", mobfile, size);
map.put(mobfile, size);
break;
} catch (FileNotFoundException exception) {
LOG.debug("Mob file {} was not in location {}. May have other locations to try.", mobfile,
p);
}
}
if (!map.containsKey(mobfile)) {
throw new FileNotFoundException("Could not find mob file " + mobfile + " in the list of " +
"expected locations: " + locations);
}
}
}
@Test
public void testBulkLoading() throws Exception {
TableName tn = helper.createTableWithRegions(1);
// Set a quota
QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(
tn, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS);
admin.setQuota(settings);
Map<byte[], List<Path>> family2Files = helper.generateFileToLoad(tn, 3, 550);
// Make sure the files are about as long as we expect
FileSystem fs = TEST_UTIL.getTestFileSystem();
FileStatus[] files = fs.listStatus(
new Path(fs.getHomeDirectory(), testName.getMethodName() + "_files"));
long totalSize = 0;
for (FileStatus file : files) {
assertTrue(
"Expected the file, " + file.getPath() + ", length to be larger than 25KB, but was "
+ file.getLen(),
file.getLen() > 25 * SpaceQuotaHelperForTests.ONE_KILOBYTE);
totalSize += file.getLen();
}
assertFalse("The bulk load failed",
BulkLoadHFiles.create(TEST_UTIL.getConfiguration()).bulkLoad(tn, family2Files).isEmpty());
final long finalTotalSize = totalSize;
TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
@Override
boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
return snapshot.getUsage() >= finalTotalSize;
}
});
}
@Override
public List<InputSplit> getSplits(JobContext jobContext) throws IOException {
//read the params from AccumuloInputFormat
Configuration conf = jobContext.getConfiguration();
Instance instance = MRUtils.AccumuloProps.getInstance(jobContext);
String user = MRUtils.AccumuloProps.getUsername(jobContext);
AuthenticationToken password = MRUtils.AccumuloProps.getPassword(jobContext);
String table = MRUtils.AccumuloProps.getTablename(jobContext);
ArgumentChecker.notNull(instance);
ArgumentChecker.notNull(table);
//find the files necessary
try {
Connector connector = instance.getConnector(user, password);
TableOperations tos = connector.tableOperations();
String tableId = tos.tableIdMap().get(table);
Scanner scanner = connector.createScanner("accumulo.metadata", Authorizations.EMPTY); //TODO: auths?
scanner.setRange(new Range(new Text(tableId + "\u0000"), new Text(tableId + "\uFFFD")));
scanner.fetchColumnFamily(new Text("file"));
List<String> files = new ArrayList<String>();
List<InputSplit> fileSplits = new ArrayList<InputSplit>();
for (Map.Entry<Key, Value> entry : scanner) {
String file = entry.getKey().getColumnQualifier().toString();
Path path = new Path(file);
FileSystem fs = path.getFileSystem(conf);
FileStatus fileStatus = fs.getFileStatus(path);
long len = fileStatus.getLen();
BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, len);
files.add(file);
fileSplits.add(new FileSplit(path, 0, len, fileBlockLocations[0].getHosts()));
}
System.out.println(files);
return fileSplits;
} catch (Exception e) {
throw new IOException(e);
}
}
public void map(LongWritable key, HarEntry value,
OutputCollector<IntWritable, Text> out,
Reporter reporter) throws IOException {
Path relPath = new Path(value.path);
int hash = HarFileSystem.getHarHash(relPath);
String towrite = null;
Path srcPath = realPath(relPath, rootPath);
long startPos = partStream.getPos();
FileSystem srcFs = srcPath.getFileSystem(conf);
FileStatus srcStatus = srcFs.getFileStatus(srcPath);
String propStr = URLEncoder.encode(
srcStatus.getModificationTime() + " "
+ srcStatus.getAccessTime() + " "
+ srcStatus.getPermission().toShort() + " "
+ URLEncoder.encode(srcStatus.getOwner(), "UTF-8") + " "
+ URLEncoder.encode(srcStatus.getGroup(), "UTF-8"),
"UTF-8");
if (value.isDir()) {
towrite = URLEncoder.encode(relPath.toString(),"UTF-8")
+ " dir " + propStr + " 0 0 ";
StringBuffer sbuff = new StringBuffer();
sbuff.append(towrite);
for (String child: value.children) {
sbuff.append(URLEncoder.encode(child,"UTF-8") + " ");
}
towrite = sbuff.toString();
//reading directories is also progress
reporter.progress();
}
else {
FSDataInputStream input = srcFs.open(srcStatus.getPath());
reporter.setStatus("Copying file " + srcStatus.getPath() +
" to archive.");
copyData(srcStatus.getPath(), input, partStream, reporter);
towrite = URLEncoder.encode(relPath.toString(),"UTF-8")
+ " file " + partname + " " + startPos
+ " " + srcStatus.getLen() + " " + propStr + " ";
}
out.collect(new IntWritable(hash), new Text(towrite));
}
public static StripeReader getStripeReader(Codec codec, Configuration conf,
long blockSize, FileSystem fs, long stripeIdx, FileStatus srcStat)
throws IOException {
if (codec.isDirRaid) {
Path srcDir = srcStat.isDir()? srcStat.getPath():
srcStat.getPath().getParent();
return new DirectoryStripeReader(conf, codec, fs, stripeIdx,
srcDir,
RaidNode.listDirectoryRaidFileStatus(conf, fs, srcDir));
} else {
return new FileStripeReader(conf, blockSize,
codec, fs, stripeIdx, srcStat.getPath(), srcStat.getLen());
}
}
/**
* @return <expected, gotten, backup>, where each is sorted
*/
private static List<List<String>> getFileLists(FileStatus[] previous, FileStatus[] archived) {
List<List<String>> files = new ArrayList<>(3);
// copy over the original files
List<String> originalFileNames = convertToString(previous);
files.add(originalFileNames);
List<String> currentFiles = new ArrayList<>(previous.length);
List<FileStatus> backedupFiles = new ArrayList<>(previous.length);
for (FileStatus f : archived) {
String name = f.getPath().getName();
// if the file has been backed up
if (name.contains(".")) {
Path parent = f.getPath().getParent();
String shortName = name.split("[.]")[0];
Path modPath = new Path(parent, shortName);
FileStatus file = new FileStatus(f.getLen(), f.isDirectory(), f.getReplication(),
f.getBlockSize(), f.getModificationTime(), modPath);
backedupFiles.add(file);
} else {
// otherwise, add it to the list to compare to the original store files
currentFiles.add(name);
}
}
files.add(currentFiles);
files.add(convertToString(backedupFiles));
return files;
}
/**
* Get the total logical size in the directory
* @param lfs the Files under the directory
* @return
*/
public static long getDirLogicalSize(List<FileStatus> lfs) {
long totalSize = 0L;
if (null == lfs) {
return totalSize;
}
for (FileStatus fsStat : lfs) {
totalSize += fsStat.getLen();
}
return totalSize;
}
@Test
public void testVarioutType() throws IOException {
TajoConf conf = new TajoConf();
TableMeta meta = CatalogUtil.newTableMeta(BuiltinStorages.JSON, conf);
Path tablePath = new Path(getResourcePath("dataset", "TestJsonSerDe"), "testVariousType.json");
FileSystem fs = FileSystem.getLocal(conf);
FileStatus status = fs.getFileStatus(tablePath);
FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null);
scanner.init();
Tuple tuple = scanner.next();
assertNotNull(tuple);
assertNull(scanner.next());
scanner.close();
Tuple baseTuple = new VTuple(new Datum[] {
DatumFactory.createBool(true), // 0
DatumFactory.createChar("hyunsik"), // 1
DatumFactory.createInt2((short) 17), // 2
DatumFactory.createInt4(59), // 3
DatumFactory.createInt8(23L), // 4
DatumFactory.createFloat4(77.9f), // 5
DatumFactory.createFloat8(271.9d), // 6
DatumFactory.createText("hyunsik"), // 7
DatumFactory.createBlob("hyunsik".getBytes()), // 8
NullDatum.get(), // 9
});
assertEquals(baseTuple, tuple);
}
/**
* Create a {@link LocalResource} record with all the given parameters.
*/
private static LocalResource createLocalResource(FileSystem fc, Path file,
LocalResourceType type, LocalResourceVisibility visibility)
throws IOException {
FileStatus fstat = fc.getFileStatus(file);
URL resourceURL = ConverterUtils.getYarnUrlFromPath(fc.resolvePath(fstat
.getPath()));
long resourceSize = fstat.getLen();
long resourceModificationTime = fstat.getModificationTime();
return LocalResource.newInstance(resourceURL, type, visibility,
resourceSize, resourceModificationTime);
}
public void readRandomHeap() throws Exception{
System.out.println("reading random file in heap mode " + path);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
FileStatus status = fs.getFileStatus(path);
FSDataInputStream instream = fs.open(path);
byte[] buf = new byte[size];
double sumbytes = 0;
double ops = 0;
long _range = status.getLen()- ((long)buf.length);
double range = (double) _range;
Random random = new Random();
System.out.println("file capacity " + status.getLen());
System.out.println("read size " + size);
System.out.println("operations " + loop);
long start = System.currentTimeMillis();
while (ops < loop) {
double _offset = range*random.nextDouble();
long offset = (long) _offset;
instream.seek(offset);
double ret = (double) this.read(instream, buf);
if (ret > 0) {
sumbytes = sumbytes + ret;
ops = ops + 1.0;
} else {
break;
}
}
long end = System.currentTimeMillis();
double executionTime = ((double) (end - start)) / 1000.0;
double throughput = 0.0;
double latency = 0.0;
double sumbits = sumbytes * 8.0;
if (executionTime > 0) {
throughput = sumbits / executionTime / 1024.0 / 1024.0;
latency = 1000000.0 * executionTime / ops;
}
System.out.println("execution time " + executionTime);
System.out.println("ops " + ops);
System.out.println("sumbytes " + sumbytes);
System.out.println("throughput " + throughput);
System.out.println("latency " + latency);
System.out.println("closing stream");
instream.close();
fs.close();
}
@Test
public void testFindValueInSingleCSV() throws IOException {
meta = CatalogUtil.newTableMeta(StoreType.CSV);
Path tablePath = StorageUtil.concatPath(testDir, "testFindValueInSingleCSV", "table.csv");
fs.mkdirs(tablePath.getParent());
Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
appender.init();
Tuple tuple;
for (int i = 0; i < TUPLE_NUM; i++) {
tuple = new VTuple(5);
tuple.put(0, DatumFactory.createInt4(i));
tuple.put(1, DatumFactory.createInt8(i));
tuple.put(2, DatumFactory.createFloat8(i));
tuple.put(3, DatumFactory.createFloat4(i));
tuple.put(4, DatumFactory.createText("field_" + i));
appender.addTuple(tuple);
}
appender.close();
FileStatus status = fs.getFileStatus(tablePath);
long fileLen = status.getLen();
FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
SortSpec[] sortKeys = new SortSpec[2];
sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false);
sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false);
Schema keySchema = new Schema();
keySchema.addColumn(new Column("long", Type.INT8));
keySchema.addColumn(new Column("double", Type.FLOAT8));
TupleComparator comp = new TupleComparator(keySchema, sortKeys);
BSTIndex bst = new BSTIndex(conf);
BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir,
"FindValueInCSV.idx"), BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
creater.setLoadNum(LOAD_NUM);
creater.open();
SeekableScanner fileScanner = new CSVScanner(conf, schema, meta, tablet);
fileScanner.init();
Tuple keyTuple;
long offset;
while (true) {
keyTuple = new VTuple(2);
offset = fileScanner.getNextOffset();
tuple = fileScanner.next();
if (tuple == null)
break;
keyTuple.put(0, tuple.get(1));
keyTuple.put(1, tuple.get(2));
creater.write(keyTuple, offset);
}
creater.flush();
creater.close();
fileScanner.close();
tuple = new VTuple(keySchema.size());
BSTIndexReader reader = bst.getIndexReader(new Path(testDir,
"FindValueInCSV.idx"), keySchema, comp);
reader.open();
fileScanner = new CSVScanner(conf, schema, meta, tablet);
fileScanner.init();
for (int i = 0; i < TUPLE_NUM - 1; i++) {
tuple.put(0, DatumFactory.createInt8(i));
tuple.put(1, DatumFactory.createFloat8(i));
long offsets = reader.find(tuple);
fileScanner.seek(offsets);
tuple = fileScanner.next();
assertEquals(i, (tuple.get(1).asInt8()));
assertEquals(i, (tuple.get(2).asFloat8()) , 0.01);
offsets = reader.next();
if (offsets == -1) {
continue;
}
fileScanner.seek(offsets);
tuple = fileScanner.next();
assertTrue("[seek check " + (i + 1) + " ]",
(i + 1) == (tuple.get(0).asInt4()));
assertTrue("[seek check " + (i + 1) + " ]",
(i + 1) == (tuple.get(1).asInt8()));
}
}
private ConnectorPageSource createDataPageSource(
ConnectorSession session,
HdfsContext hdfsContext,
Path path,
long start,
long length,
FileFormat fileFormat,
List<IcebergColumnHandle> dataColumns,
TupleDomain<IcebergColumnHandle> predicate)
{
switch (fileFormat) {
case ORC:
FileSystem fileSystem = null;
FileStatus fileStatus = null;
try {
fileSystem = hdfsEnvironment.getFileSystem(hdfsContext, path);
fileStatus = fileSystem.getFileStatus(path);
}
catch (IOException e) {
throw new PrestoException(ICEBERG_FILESYSTEM_ERROR, e);
}
long fileSize = fileStatus.getLen();
return createOrcPageSource(
hdfsEnvironment,
session.getUser(),
hdfsEnvironment.getConfiguration(hdfsContext, path),
path,
start,
length,
fileSize,
dataColumns,
predicate,
orcReaderOptions
.withMaxMergeDistance(getOrcMaxMergeDistance(session))
.withMaxBufferSize(getOrcMaxBufferSize(session))
.withStreamBufferSize(getOrcStreamBufferSize(session))
.withTinyStripeThreshold(getOrcTinyStripeThreshold(session))
.withMaxReadBlockSize(getOrcMaxReadBlockSize(session))
.withLazyReadSmallRanges(getOrcLazyReadSmallRanges(session))
.withNestedLazy(isOrcNestedLazy(session))
.withBloomFiltersEnabled(isOrcBloomFiltersEnabled(session)),
fileFormatDataSourceStats);
case PARQUET:
return createParquetPageSource(
hdfsEnvironment,
session.getUser(),
hdfsEnvironment.getConfiguration(hdfsContext, path),
path,
start,
length,
dataColumns,
parquetReaderOptions
.withFailOnCorruptedStatistics(isFailOnCorruptedParquetStatistics(session))
.withMaxReadBlockSize(getParquetMaxReadBlockSize(session)),
predicate,
fileFormatDataSourceStats);
}
throw new PrestoException(NOT_SUPPORTED, "File format not supported for Iceberg: " + fileFormat);
}
public InputSplit[] getSplits(JobConf jconf, int numSplits)
throws IOException {
String srcfilelist = jconf.get(SRC_LIST_LABEL, "");
if ("".equals(srcfilelist)) {
throw new IOException("Unable to get the " +
"src file for archive generation.");
}
long totalSize = jconf.getLong(TOTAL_SIZE_LABEL, -1);
if (totalSize == -1) {
throw new IOException("Invalid size of files to archive");
}
//we should be safe since this is set by our own code
Path src = new Path(srcfilelist);
FileSystem fs = src.getFileSystem(jconf);
FileStatus fstatus = fs.getFileStatus(src);
ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
LongWritable key = new LongWritable();
final HarEntry value = new HarEntry();
// the remaining bytes in the file split
long remaining = fstatus.getLen();
// the count of sizes calculated till now
long currentCount = 0L;
// the endposition of the split
long lastPos = 0L;
// the start position of the split
long startPos = 0L;
long targetSize = totalSize/numSplits;
// create splits of size target size so that all the maps
// have equals sized data to read and write to.
try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, src, jconf)) {
while(reader.next(key, value)) {
if (currentCount + key.get() > targetSize && currentCount != 0){
long size = lastPos - startPos;
splits.add(new FileSplit(src, startPos, size, (String[]) null));
remaining = remaining - size;
startPos = lastPos;
currentCount = 0L;
}
currentCount += key.get();
lastPos = reader.getPosition();
}
// the remaining not equal to the target size.
if (remaining != 0) {
splits.add(new FileSplit(src, startPos, remaining, (String[])null));
}
}
return splits.toArray(new FileSplit[splits.size()]);
}
/**
* Calculate the total size of all objects in the indicated bucket
*
* @param path to use
* @return calculated size
* @throws IOException
*/
@Override
public long calculateSize(Path path) throws IOException {
long totalBucketSize = 0L;
if (s3Enabled) {
String key = pathToKey(path);
final FileStatus fileStatus = fs.getFileStatus(path);
if (fileStatus.isDirectory()) {
if (!key.isEmpty()) {
key = key + "/";
}
ListObjectsRequest request = new ListObjectsRequest();
request.setBucketName(uri.getHost());
request.setPrefix(key);
request.setMaxKeys(maxKeys);
if (LOG.isDebugEnabled()) {
LOG.debug("listStatus: doing listObjects for directory " + key);
}
ObjectListing objects = s3.listObjects(request);
while (true) {
for (S3ObjectSummary summary : objects.getObjectSummaries()) {
Path keyPath = keyToPath(summary.getKey()).makeQualified(uri, fs.getWorkingDirectory());
// Skip over keys that are ourselves and old S3N _$folder$ files
if (keyPath.equals(path) || summary.getKey().endsWith(S3N_FOLDER_SUFFIX)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring: " + keyPath);
}
continue;
}
if (!objectRepresentsDirectory(summary.getKey(), summary.getSize())) {
totalBucketSize += summary.getSize();
}
}
if (objects.isTruncated()) {
if (LOG.isDebugEnabled()) {
LOG.debug("listStatus: list truncated - getting next batch");
}
objects = s3.listNextBatchOfObjects(objects);
} else {
break;
}
}
} else {
return fileStatus.getLen();
}
} else {
totalBucketSize = fs.getContentSummary(path).getLength();
}
return totalBucketSize;
}
OneFileInfo(FileStatus stat, Configuration conf,
boolean isSplitable,
HashMap<String, List<OneBlockInfo>> rackToBlocks,
HashMap<OneBlockInfo, String[]> blockToNodes,
HashMap<String, Set<OneBlockInfo>> nodeToBlocks,
HashMap<String, Set<String>> rackToNodes,
long maxSize)
throws IOException {
this.fileSize = 0;
// get block locations from file system
BlockLocation[] locations;
if (stat instanceof LocatedFileStatus) {
locations = ((LocatedFileStatus) stat).getBlockLocations();
} else {
FileSystem fs = stat.getPath().getFileSystem(conf);
locations = fs.getFileBlockLocations(stat, 0, stat.getLen());
}
// create a list of all block and their locations
if (locations == null) {
blocks = new OneBlockInfo[0];
} else {
if(locations.length == 0 && !stat.isDirectory()) {
locations = new BlockLocation[] { new BlockLocation() };
}
if (!isSplitable) {
// if the file is not splitable, just create the one block with
// full file length
blocks = new OneBlockInfo[1];
fileSize = stat.getLen();
blocks[0] = new OneBlockInfo(stat.getPath(), 0, fileSize,
locations[0].getHosts(), locations[0].getTopologyPaths());
} else {
ArrayList<OneBlockInfo> blocksList = new ArrayList<OneBlockInfo>(
locations.length);
for (int i = 0; i < locations.length; i++) {
fileSize += locations[i].getLength();
// each split can be a maximum of maxSize
long left = locations[i].getLength();
long myOffset = locations[i].getOffset();
long myLength = 0;
do {
if (maxSize == 0) {
myLength = left;
} else {
if (left > maxSize && left < 2 * maxSize) {
// if remainder is between max and 2*max - then
// instead of creating splits of size max, left-max we
// create splits of size left/2 and left/2. This is
// a heuristic to avoid creating really really small
// splits.
myLength = left / 2;
} else {
myLength = Math.min(maxSize, left);
}
}
OneBlockInfo oneblock = new OneBlockInfo(stat.getPath(),
myOffset, myLength, locations[i].getHosts(),
locations[i].getTopologyPaths());
left -= myLength;
myOffset += myLength;
blocksList.add(oneblock);
} while (left > 0);
}
blocks = blocksList.toArray(new OneBlockInfo[blocksList.size()]);
}
populateBlockInfo(blocks, rackToBlocks, blockToNodes,
nodeToBlocks, rackToNodes);
}
}
@Override
public long fileLength(String name) throws IOException {
FileStatus fileStatus = fileSystem.getFileStatus(new Path(hdfsDirPath, name));
return fileStatus.getLen();
}
@Test
public void testFindValueASCOrder() throws IOException {
meta = CatalogUtil.newTableMeta(dataFormat, conf);
Path tablePath = new Path(testDir, "testFindValue_" + dataFormat);
Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(meta, schema, tablePath);
appender.init();
Tuple tuple;
// order by asc
for (int i = 0; i < TUPLE_NUM; i++) {
tuple = new VTuple(5);
tuple.put(0, DatumFactory.createInt4(i));
tuple.put(1, DatumFactory.createInt8(i));
tuple.put(2, DatumFactory.createFloat8(i));
tuple.put(3, DatumFactory.createFloat4(i));
tuple.put(4, DatumFactory.createText("field_" + i));
appender.addTuple(tuple);
}
appender.close();
FileStatus status = fs.getFileStatus(tablePath);
long fileLen = status.getLen();
FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
SortSpec[] sortKeys = new SortSpec[2];
sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false);
sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false);
Schema keySchema = SchemaBuilder.builder()
.add(new Column("long", Type.INT8))
.add(new Column("double", Type.FLOAT8))
.build();
BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
BSTIndex bst = new BSTIndex(conf);
BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindValue_" + dataFormat + ".idx"),
BSTIndex.TWO_LEVEL_INDEX,
keySchema, comp, true);
creater.setLoadNum(LOAD_NUM);
creater.init();
SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
getSeekableScanner(meta, schema, tablet, schema);
scanner.init();
Tuple keyTuple;
long offset;
while (true) {
keyTuple = new VTuple(2);
offset = scanner.getNextOffset();
tuple = scanner.next();
if (tuple == null) break;
keyTuple.put(0, tuple.asDatum(1));
keyTuple.put(1, tuple.asDatum(2));
creater.write(keyTuple, offset);
}
creater.flush();
creater.close();
scanner.close();
tuple = new VTuple(keySchema.size());
BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValue_" + dataFormat + ".idx"), keySchema, comp);
reader.init();
scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
getSeekableScanner(meta, schema, tablet, schema);
scanner.init();
for (int i = 0; i < TUPLE_NUM - 1; i++) {
tuple.put(0, DatumFactory.createInt8(i));
tuple.put(1, DatumFactory.createFloat8(i));
long offsets = reader.find(tuple);
scanner.seek(offsets);
tuple = scanner.next();
assertTrue("seek check [" + (i) + " ," + (tuple.getInt8(1)) + "]", (i) == (tuple.getInt8(1)));
assertTrue("seek check [" + (i) + " ," + (tuple.getFloat8(2)) + "]", (i) == (tuple.getFloat8(2)));
offsets = reader.next();
if (offsets == -1) {
continue;
}
scanner.seek(offsets);
tuple = scanner.next();
assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.getInt4(0)));
assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.getInt8(1)));
}
reader.close();
scanner.close();
}
public static DataFile fromStat(FileStatus stat, PartitionData partition, long rowCount) {
String location = stat.getPath().toString();
FileFormat format = FileFormat.fromFileName(location);
return new GenericDataFile(
location, format, partition, rowCount, stat.getLen());
}