下面列出了org.apache.hadoop.fs.Path#suffix ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static MapOutput createDiskMapOutput(InputAttemptIdentifier attemptIdentifier,
FetchedInputAllocatorOrderedGrouped callback, long size, Configuration conf,
int fetcher, boolean primaryMapOutput,
TezTaskOutputFiles mapOutputFile) throws
IOException {
FileSystem fs = FileSystem.getLocal(conf).getRaw();
Path outputPath = mapOutputFile.getInputFileForWrite(
attemptIdentifier.getInputIdentifier(), attemptIdentifier.getSpillEventId(), size);
// Files are not clobbered due to the id being appended to the outputPath in the tmpPath,
// otherwise fetches for the same task but from different attempts would clobber each other.
Path tmpOutputPath = outputPath.suffix(String.valueOf(fetcher));
long offset = 0;
DiskMapOutput mapOutput = new DiskMapOutput(attemptIdentifier, callback, size, outputPath, offset,
primaryMapOutput, tmpOutputPath);
mapOutput.disk = fs.create(tmpOutputPath);
return mapOutput;
}
/**
* Returns a {@link OutputStream} for a file that might need
* compression.
*/
static OutputStream getPossiblyCompressedOutputStream(Path file,
Configuration conf)
throws IOException {
FileSystem fs = file.getFileSystem(conf);
JobConf jConf = new JobConf(conf);
if (org.apache.hadoop.mapred.FileOutputFormat.getCompressOutput(jConf)) {
// get the codec class
Class<? extends CompressionCodec> codecClass =
org.apache.hadoop.mapred.FileOutputFormat
.getOutputCompressorClass(jConf,
GzipCodec.class);
// get the codec implementation
CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);
// add the appropriate extension
file = file.suffix(codec.getDefaultExtension());
if (isCompressionEmulationEnabled(conf)) {
FSDataOutputStream fileOut = fs.create(file, false);
return new DataOutputStream(codec.createOutputStream(fileOut));
}
}
return fs.create(file, false);
}
/**
* Read an existing index. Reads and returns the index index, which is a list of chunks defined by the Cassandra
* Index.db file along with the configured split size.
*
* @param fileSystem Hadoop file system.
* @param sstablePath SSTable Index.db.
* @return Index of chunks.
* @throws IOException
*/
public static SSTableIndexIndex readIndex(final FileSystem fileSystem, final Path sstablePath) throws IOException {
final Closer closer = Closer.create();
final Path indexPath = sstablePath.suffix(SSTABLE_INDEX_SUFFIX);
// Detonate if we don't have an index.
final FSDataInputStream inputStream = closer.register(fileSystem.open(indexPath));
final SSTableIndexIndex indexIndex = new SSTableIndexIndex();
try {
while (inputStream.available() != 0) {
indexIndex.add(inputStream.readLong(), inputStream.readLong());
}
} finally {
closer.close();
}
return indexIndex;
}
private boolean containMetaWals(ServerName serverName) throws IOException {
Path logDir = new Path(master.getWALRootDir(),
AbstractFSWALProvider.getWALDirectoryName(serverName.toString()));
Path splitDir = logDir.suffix(AbstractFSWALProvider.SPLITTING_EXT);
Path checkDir = master.getFileSystem().exists(splitDir) ? splitDir : logDir;
try {
return master.getFileSystem().listStatus(checkDir, META_FILTER).length > 0;
} catch (FileNotFoundException fnfe) {
// If no files, then we don't contain metas; was failing schedule of
// SCP because this was FNFE'ing when no server dirs ('Unknown Server').
LOG.warn("No dir for WALs for {}; continuing", serverName.toString());
return false;
}
}
private static byte[] read(FileSystem fileSystem, Path checkpointFilePath) throws IOException {
Path backupCheckpointPath = checkpointFilePath.suffix(".bak");
FSDataInputStream is = null;
if (fileSystem.exists(checkpointFilePath)) {
is = fileSystem.open(checkpointFilePath);
} else if (fileSystem.exists(backupCheckpointPath)) {
is = fileSystem.open(backupCheckpointPath);
}
return is != null ? IOUtils.toByteArray(is) : null;
}
private void walkPath(Path path, PathFilter pathFilter, List<Path> accumulator) {
try {
FileSystem fs = path.getFileSystem(getConf());
FileStatus fileStatus = fs.getFileStatus(path);
if (fileStatus.isDir()) {
FileStatus[] children = fs.listStatus(path, pathFilter);
for (FileStatus childStatus : children) {
walkPath(childStatus.getPath(), pathFilter, accumulator);
}
} else if (path.toString().endsWith(INDEX_EXTENSION)) {
Path sstableIndexPath = path.suffix(SSTableIndexIndex.SSTABLE_INDEX_SUFFIX);
if (fs.exists(sstableIndexPath)) {
// If the index exists and is of nonzero size, we're already done.
// We re-index a file with a zero-length index, because every file has at least one block.
if (fs.getFileStatus(sstableIndexPath).getLen() > 0) {
LOG.info("[SKIP] SSTABLE index file already exists for " + path);
return;
} else {
LOG.info("Adding SSTABLE file " + path + " to indexing list (index file exists but is zero length)");
accumulator.add(path);
}
} else {
// If no index exists, we need to index the file.
LOG.info("Adding SSTABLE file " + path + " to indexing list (no index currently exists)");
accumulator.add(path);
}
}
} catch (IOException ioe) {
LOG.warn("Error walking path: " + path, ioe);
}
}
@Override
public void perform() throws Exception {
getLogger().info("Start corrupting data files");
FileSystem fs = CommonFSUtils.getRootDirFileSystem(getConf());
Path rootDir = CommonFSUtils.getRootDir(getConf());
Path defaultDir = rootDir.suffix("/data/default");
RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(defaultDir, true);
while (iterator.hasNext()){
LocatedFileStatus status = iterator.next();
if(!HFile.isHFileFormat(fs, status.getPath())){
continue;
}
if(RandomUtils.nextFloat(0, 100) > chance){
continue;
}
FSDataOutputStream out = fs.create(status.getPath(), true);
try {
out.write(0);
} finally {
out.close();
}
getLogger().info("Corrupting {}", status.getPath());
}
getLogger().info("Done corrupting data files");
}
protected void writeRenameReadCompare(Path path, long len)
throws IOException, NoSuchAlgorithmException {
// If len > fs.s3n.multipart.uploads.block.size,
// we'll use a multipart upload copy
MessageDigest digest = MessageDigest.getInstance("MD5");
OutputStream out = new BufferedOutputStream(
new DigestOutputStream(fs.create(path, false), digest));
for (long i = 0; i < len; i++) {
out.write('Q');
}
out.flush();
out.close();
assertTrue("Exists", fs.exists(path));
// Depending on if this file is over 5 GB or not,
// rename will cause a multipart upload copy
Path copyPath = path.suffix(".copy");
fs.rename(path, copyPath);
assertTrue("Copy exists", fs.exists(copyPath));
// Download file from S3 and compare the digest against the original
MessageDigest digest2 = MessageDigest.getInstance("MD5");
InputStream in = new BufferedInputStream(
new DigestInputStream(fs.open(copyPath), digest2));
long copyLen = 0;
while (in.read() != -1) {copyLen++;}
in.close();
assertEquals("Copy length matches original", len, copyLen);
assertArrayEquals("Digests match", digest.digest(), digest2.digest());
}
private static void write(FileSystem fileSystem, Path checkpointFilePath, byte[] value)
throws IOException {
Path tmpPath = checkpointFilePath.suffix(TEMP_FILE_SUFFIX);
Path backupPath = checkpointFilePath.suffix(BACKUP_FILE_SUFFIX);
if (fileSystem.exists(checkpointFilePath)) {
if (fileSystem.exists(backupPath)) {
fileSystem.delete(backupPath, false);
}
fileSystem.rename(checkpointFilePath, backupPath);
}
FSDataOutputStream os = fileSystem.create(tmpPath, true);
os.write(value);
os.close();
fileSystem.rename(tmpPath, checkpointFilePath);
}
@Override
public Path getBucketPath(Clock clock, Path basePath, Tuple4<Integer, Long, Integer, String> element) {
return basePath.suffix(String.valueOf(element.f0));
}
@Override
public Path getBucketPath(Clock clock, Path basePath, Tuple4<Integer, Long, Integer, String> element) {
return basePath.suffix(String.valueOf(element.f0));
}
/**
* Retrieve the map output of a single map task
* and send it to the merger.
*/
private boolean copyMapOutput(TaskAttemptID mapTaskId) throws IOException {
// Figure out where the map task stored its output.
Path mapOutputFileName = localMapFiles.get(mapTaskId).getOutputFile();
Path indexFileName = mapOutputFileName.suffix(".index");
// Read its index to determine the location of our split
// and its size.
SpillRecord sr = new SpillRecord(indexFileName, job);
IndexRecord ir = sr.getIndex(reduce);
long compressedLength = ir.partLength;
long decompressedLength = ir.rawLength;
compressedLength -= CryptoUtils.cryptoPadding(job);
decompressedLength -= CryptoUtils.cryptoPadding(job);
// Get the location for the map output - either in-memory or on-disk
MapOutput<K, V> mapOutput = merger.reserve(mapTaskId, decompressedLength,
id);
// Check if we can shuffle *now* ...
if (mapOutput == null) {
LOG.info("fetcher#" + id + " - MergeManager returned Status.WAIT ...");
return false;
}
// Go!
LOG.info("localfetcher#" + id + " about to shuffle output of map " +
mapOutput.getMapId() + " decomp: " +
decompressedLength + " len: " + compressedLength + " to " +
mapOutput.getDescription());
// now read the file, seek to the appropriate section, and send it.
FileSystem localFs = FileSystem.getLocal(job).getRaw();
FSDataInputStream inStream = localFs.open(mapOutputFileName);
inStream = CryptoUtils.wrapIfNecessary(job, inStream);
try {
inStream.seek(ir.startOffset + CryptoUtils.cryptoPadding(job));
mapOutput.shuffle(LOCALHOST, inStream, compressedLength, decompressedLength, metrics, reporter);
} finally {
try {
inStream.close();
} catch (IOException ioe) {
LOG.warn("IOException closing inputstream from map output: "
+ ioe.toString());
}
}
scheduler.copySucceeded(mapTaskId, LOCALHOST, compressedLength, 0, 0,
mapOutput);
return true; // successful fetch.
}
private void storageCompressionTest(String dataFormat, Class<? extends CompressionCodec> codec) throws IOException {
Schema schema = SchemaBuilder.builder()
.add("id", Type.INT4)
.add("age", Type.FLOAT4)
.add("name", Type.TEXT)
.build();
TableMeta meta = CatalogUtil.newTableMeta(dataFormat, conf);
meta.putProperty("compression.codec", codec.getCanonicalName());
meta.putProperty("compression.type", SequenceFile.CompressionType.BLOCK.name());
meta.putProperty("rcfile.serde", TextSerializerDeserializer.class.getName());
meta.putProperty("sequencefile.serde", TextSerializerDeserializer.class.getName());
if (codec.equals(SnappyCodec.class)) {
meta.putProperty(OrcConf.COMPRESS.getAttribute(), "SNAPPY");
} else if (codec.equals(Lz4Codec.class)) {
meta.putProperty(OrcConf.COMPRESS.getAttribute(), "ZLIB");
} else {
meta.putProperty(OrcConf.COMPRESS.getAttribute(), "NONE");
}
String fileName = "Compression_" + codec.getSimpleName();
Path tablePath = new Path(testDir, fileName);
Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(meta, schema, tablePath);
appender.enableStats();
appender.init();
String extension = "";
if (appender instanceof DelimitedTextFile.DelimitedTextFileAppender) {
extension = ((DelimitedTextFile.DelimitedTextFileAppender) appender).getExtension();
}
int tupleNum = 1000;
VTuple vTuple;
for (int i = 0; i < tupleNum; i++) {
vTuple = new VTuple(3);
vTuple.put(0, DatumFactory.createInt4(i + 1));
vTuple.put(1, DatumFactory.createFloat4((float) i));
vTuple.put(2, DatumFactory.createText(String.valueOf(i)));
appender.addTuple(vTuple);
}
appender.close();
TableStats stat = appender.getStats();
assertEquals(tupleNum, stat.getNumRows().longValue());
tablePath = tablePath.suffix(extension);
FileStatus status = fs.getFileStatus(tablePath);
long fileLen = status.getLen();
FileFragment[] tablets = new FileFragment[1];
tablets[0] = new FileFragment(fileName, tablePath, 0, fileLen);
Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, tablets[0], schema);
scanner.init();
if (dataFormat.equalsIgnoreCase(BuiltinStorages.SEQUENCE_FILE)) {
assertTrue(scanner instanceof SequenceFileScanner);
Writable key = ((SequenceFileScanner) scanner).getKey();
assertEquals(key.getClass().getCanonicalName(), LongWritable.class.getCanonicalName());
}
int tupleCnt = 0;
while ((scanner.next()) != null) {
tupleCnt++;
}
scanner.close();
assertEquals(tupleNum, tupleCnt);
assertNotSame(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue());
assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue());
}
/**
* Test compressible {@link GridmixRecord}.
*/
@Test
public void testCompressibleGridmixRecord() throws IOException {
JobConf conf = new JobConf();
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
CompressionEmulationUtil.setInputCompressionEmulationEnabled(conf, true);
FileSystem lfs = FileSystem.getLocal(conf);
int dataSize = 1024 * 1024 * 10; // 10 MB
float ratio = 0.357F;
// define the test's root temp directory
Path rootTempDir =
new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
lfs.getUri(), lfs.getWorkingDirectory());
Path tempDir = new Path(rootTempDir,
"TestPossiblyCompressibleGridmixRecord");
lfs.delete(tempDir, true);
// define a compressible GridmixRecord
GridmixRecord record = new GridmixRecord(dataSize, 0);
record.setCompressibility(true, ratio); // enable compression
conf.setClass(FileOutputFormat.COMPRESS_CODEC, GzipCodec.class,
CompressionCodec.class);
org.apache.hadoop.mapred.FileOutputFormat.setCompressOutput(conf, true);
// write the record to a file
Path recordFile = new Path(tempDir, "record");
OutputStream outStream = CompressionEmulationUtil
.getPossiblyCompressedOutputStream(recordFile,
conf);
DataOutputStream out = new DataOutputStream(outStream);
record.write(out);
out.close();
outStream.close();
// open the compressed stream for reading
Path actualRecordFile = recordFile.suffix(".gz");
InputStream in =
CompressionEmulationUtil
.getPossiblyDecompressedInputStream(actualRecordFile, conf, 0);
// get the compressed file size
long compressedFileSize = lfs.listStatus(actualRecordFile)[0].getLen();
GridmixRecord recordRead = new GridmixRecord();
recordRead.readFields(new DataInputStream(in));
assertEquals("Record size mismatch in a compressible GridmixRecord",
dataSize, recordRead.getSize());
assertTrue("Failed to generate a compressible GridmixRecord",
recordRead.getSize() > compressedFileSize);
// check if the record can generate data with the desired compression ratio
float seenRatio = ((float)compressedFileSize)/dataSize;
assertEquals(CompressionEmulationUtil.standardizeCompressionRatio(ratio),
CompressionEmulationUtil.standardizeCompressionRatio(seenRatio), 1.0D);
}
@Test
public void testSplitCompressionData() throws IOException {
if(StoreType.CSV != storeType) return;
Schema schema = new Schema();
schema.addColumn("id", Type.INT4);
schema.addColumn("age", Type.INT8);
TableMeta meta = CatalogUtil.newTableMeta(storeType);
meta.putOption("compression.codec", BZip2Codec.class.getCanonicalName());
Path tablePath = new Path(testDir, "SplitCompression");
Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
appender.enableStats();
appender.init();
String extention = "";
if (appender instanceof CSVFile.CSVAppender) {
extention = ((CSVFile.CSVAppender) appender).getExtension();
}
int tupleNum = 100000;
VTuple vTuple;
for (int i = 0; i < tupleNum; i++) {
vTuple = new VTuple(2);
vTuple.put(0, DatumFactory.createInt4(i + 1));
vTuple.put(1, DatumFactory.createInt8(25l));
appender.addTuple(vTuple);
}
appender.close();
TableStats stat = appender.getStats();
assertEquals(tupleNum, stat.getNumRows().longValue());
tablePath = tablePath.suffix(extention);
FileStatus status = fs.getFileStatus(tablePath);
long fileLen = status.getLen();
long randomNum = (long) (Math.random() * fileLen) + 1;
FileFragment[] tablets = new FileFragment[2];
tablets[0] = new FileFragment("SplitCompression", tablePath, 0, randomNum);
tablets[1] = new FileFragment("SplitCompression", tablePath, randomNum, (fileLen - randomNum));
Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, tablets[0], schema);
assertTrue(scanner.isSplittable());
scanner.init();
int tupleCnt = 0;
Tuple tuple;
while ((tuple = scanner.next()) != null) {
tupleCnt++;
}
scanner.close();
scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, tablets[1], schema);
assertTrue(scanner.isSplittable());
scanner.init();
while ((tuple = scanner.next()) != null) {
tupleCnt++;
}
scanner.close();
assertEquals(tupleNum, tupleCnt);
}
private synchronized void resetFile() {
// this will keep it thread safe, so we don't create too many files
if (this.fileLineCounter == 0 && this.currentWriter != null) {
return;
}
// Create the path for where the file is going to live.
Path filePath = this.path.suffix("/" + hdfsConfiguration.getWriterFilePrefix() + "-" + new Date().getTime());
if ( hdfsConfiguration.getCompression().equals(HdfsWriterConfiguration.Compression.GZIP)) {
filePath = filePath.suffix(".gz");
} else {
filePath = filePath.suffix(".tsv");
}
try {
// if there is a current writer, we must close it first.
if (this.currentWriter != null) {
flush();
close();
}
this.fileLineCounter = 0;
// Check to see if a file of the same name exists, if it does, then we are not going to be able to proceed.
if (client.exists(filePath)) {
throw new RuntimeException("Unable to create file: " + filePath);
}
if ( hdfsConfiguration.getCompression().equals(HdfsWriterConfiguration.Compression.GZIP)) {
this.currentWriter = new OutputStreamWriter(new GZIPOutputStream(client.create(filePath)));
} else {
this.currentWriter = new OutputStreamWriter(client.create(filePath));
}
// Add another file to the list of written files.
writtenFiles.add(filePath);
LOGGER.info("File Created: {}", filePath);
} catch (Exception ex) {
LOGGER.error("COULD NOT CreateFile: {}", filePath);
LOGGER.error(ex.getMessage());
throw new RuntimeException(ex);
}
}
private void storageCompressionTest(CatalogProtos.StoreType storeType, Class<? extends CompressionCodec> codec)
throws IOException {
Schema schema = new Schema();
schema.addColumn("id", TajoDataTypes.Type.INT4);
schema.addColumn("age", TajoDataTypes.Type.INT8);
TableMeta meta = CatalogUtil.newTableMeta(storeType);
meta.putOption("compression.codec", codec.getCanonicalName());
String fileName = "Compression_" + codec.getSimpleName();
Path tablePath = new Path(testDir, fileName);
Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
appender.enableStats();
appender.init();
String extension = "";
if (appender instanceof CSVFile.CSVAppender) {
extension = ((CSVFile.CSVAppender) appender).getExtension();
}
int tupleNum = 10000;
VTuple vTuple;
for (int i = 0; i < tupleNum; i++) {
vTuple = new VTuple(2);
vTuple.put(0, DatumFactory.createInt4(i + 1));
vTuple.put(1, DatumFactory.createInt8(25l));
appender.addTuple(vTuple);
}
appender.close();
TableStats stat = appender.getStats();
assertEquals(tupleNum, stat.getNumRows().longValue());
tablePath = tablePath.suffix(extension);
FileStatus status = fs.getFileStatus(tablePath);
long fileLen = status.getLen();
FileFragment[] tablets = new FileFragment[1];
tablets[0] = new FileFragment(fileName, tablePath, 0, fileLen);
Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, tablets[0], schema);
scanner.init();
int tupleCnt = 0;
while (scanner.next() != null) {
tupleCnt++;
}
scanner.close();
assertEquals(tupleCnt, tupleNum);
}
/**
* Test compressible {@link GridmixRecord}.
*/
@Test
public void testCompressibleGridmixRecord() throws IOException {
JobConf conf = new JobConf();
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
CompressionEmulationUtil.setInputCompressionEmulationEnabled(conf, true);
FileSystem lfs = FileSystem.getLocal(conf);
int dataSize = 1024 * 1024 * 10; // 10 MB
float ratio = 0.357F;
// define the test's root temp directory
Path rootTempDir =
new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
lfs.getUri(), lfs.getWorkingDirectory());
Path tempDir = new Path(rootTempDir,
"TestPossiblyCompressibleGridmixRecord");
lfs.delete(tempDir, true);
// define a compressible GridmixRecord
GridmixRecord record = new GridmixRecord(dataSize, 0);
record.setCompressibility(true, ratio); // enable compression
conf.setClass(FileOutputFormat.COMPRESS_CODEC, GzipCodec.class,
CompressionCodec.class);
org.apache.hadoop.mapred.FileOutputFormat.setCompressOutput(conf, true);
// write the record to a file
Path recordFile = new Path(tempDir, "record");
OutputStream outStream = CompressionEmulationUtil
.getPossiblyCompressedOutputStream(recordFile,
conf);
DataOutputStream out = new DataOutputStream(outStream);
record.write(out);
out.close();
outStream.close();
// open the compressed stream for reading
Path actualRecordFile = recordFile.suffix(".gz");
InputStream in =
CompressionEmulationUtil
.getPossiblyDecompressedInputStream(actualRecordFile, conf, 0);
// get the compressed file size
long compressedFileSize = lfs.listStatus(actualRecordFile)[0].getLen();
GridmixRecord recordRead = new GridmixRecord();
recordRead.readFields(new DataInputStream(in));
assertEquals("Record size mismatch in a compressible GridmixRecord",
dataSize, recordRead.getSize());
assertTrue("Failed to generate a compressible GridmixRecord",
recordRead.getSize() > compressedFileSize);
// check if the record can generate data with the desired compression ratio
float seenRatio = ((float)compressedFileSize)/dataSize;
assertEquals(CompressionEmulationUtil.standardizeCompressionRatio(ratio),
CompressionEmulationUtil.standardizeCompressionRatio(seenRatio), 1.0D);
}
private Path getWALSplitDir(ServerName serverName) {
Path logDir =
new Path(this.rootDir, AbstractFSWALProvider.getWALDirectoryName(serverName.toString()));
return logDir.suffix(AbstractFSWALProvider.SPLITTING_EXT);
}
private void storageCompressionTest(StoreType storeType, Class<? extends CompressionCodec> codec) throws IOException {
Schema schema = new Schema();
schema.addColumn("id", Type.INT4);
schema.addColumn("age", Type.FLOAT4);
schema.addColumn("name", Type.TEXT);
TableMeta meta = CatalogUtil.newTableMeta(storeType);
meta.putOption("compression.codec", codec.getCanonicalName());
meta.putOption("rcfile.serde", TextSerializerDeserializer.class.getName());
String fileName = "Compression_" + codec.getSimpleName();
Path tablePath = new Path(testDir, fileName);
Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
appender.enableStats();
appender.init();
String extension = "";
if (appender instanceof CSVFile.CSVAppender) {
extension = ((CSVFile.CSVAppender) appender).getExtension();
}
int tupleNum = 100000;
VTuple vTuple;
for (int i = 0; i < tupleNum; i++) {
vTuple = new VTuple(3);
vTuple.put(0, DatumFactory.createInt4(i + 1));
vTuple.put(1, DatumFactory.createFloat4((float) i));
vTuple.put(2, DatumFactory.createText(String.valueOf(i)));
appender.addTuple(vTuple);
}
appender.close();
TableStats stat = appender.getStats();
assertEquals(tupleNum, stat.getNumRows().longValue());
tablePath = tablePath.suffix(extension);
FileStatus status = fs.getFileStatus(tablePath);
long fileLen = status.getLen();
FileFragment[] tablets = new FileFragment[1];
tablets[0] = new FileFragment(fileName, tablePath, 0, fileLen);
Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, tablets[0], schema);
if (StoreType.CSV == storeType) {
if (SplittableCompressionCodec.class.isAssignableFrom(codec)) {
assertTrue(scanner.isSplittable());
} else {
assertFalse(scanner.isSplittable());
}
}
scanner.init();
int tupleCnt = 0;
Tuple tuple;
while ((tuple = scanner.next()) != null) {
tupleCnt++;
}
scanner.close();
assertEquals(tupleNum, tupleCnt);
assertNotSame(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue());
assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue());
}