下面列出了org.apache.hadoop.fs.FSDataOutputStream#writeInt ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/** @throws Exception If failed. */
@Test
public void testRenameIfSrcPathIsAlreadyBeingOpenedToRead() throws Exception {
Path fsHome = new Path(primaryFsUri);
Path srcFile = new Path(fsHome, "srcFile");
Path dstFile = new Path(fsHome, "dstFile");
FSDataOutputStream os = fs.create(srcFile, EnumSet.noneOf(CreateFlag.class),
Options.CreateOpts.perms(FsPermission.getDefault()));
int cnt = 1024;
for (int i = 0; i < cnt; i++)
os.writeInt(i);
os.close();
FSDataInputStream is = fs.open(srcFile);
for (int i = 0; i < cnt; i++) {
if (i == 100)
// Rename file during the read process.
fs.rename(srcFile, dstFile);
assertEquals(i, is.readInt());
}
assertPathDoesNotExist(fs, srcFile);
assertPathExists(fs, dstFile);
os.close();
is.close();
}
/** @throws Exception If failed. */
@Test
public void testRenameIfSrcPathIsAlreadyBeingOpenedToRead() throws Exception {
Path fsHome = new Path(primaryFsUri);
Path srcFile = new Path(fsHome, "srcFile");
Path dstFile = new Path(fsHome, "dstFile");
FSDataOutputStream os = fs.create(srcFile);
int cnt = 1024;
for (int i = 0; i < cnt; i++)
os.writeInt(i);
os.close();
FSDataInputStream is = fs.open(srcFile);
for (int i = 0; i < cnt; i++) {
if (i == 100)
// Rename file during the read process.
assertTrue(fs.rename(srcFile, dstFile));
assertEquals(i, is.readInt());
}
assertPathDoesNotExist(fs, srcFile);
assertPathExists(fs, dstFile);
os.close();
is.close();
}
private void writeWALEdit(WALCellCodec codec, List<Cell> kvs, FSDataOutputStream out) throws IOException {
out.writeInt(kvs.size());
Codec.Encoder cellEncoder = codec.getEncoder(out);
// We interleave the two lists for code simplicity
for (Cell kv : kvs) {
cellEncoder.write(kv);
}
}
public static FSDataInputStream getStream(IntBuffer buf) throws IOException {
File tmpDir = Files.createTempDir();
Path filePath = new Path(tmpDir.getAbsolutePath() + "/testOut");
FileSystem fs = FileSystem.get(filePath.toUri(), new Configuration());
FSDataOutputStream fOut = fs.create(filePath);
buf.rewind();
while (buf.hasRemaining()) {
fOut.writeInt(buf.get());
}
fOut.close();
buf.rewind();
return fs.open(filePath);
}
private void writeFileCache(FSDataOutputStream outputStream) throws IOException {
Set<Entry<String, FStat>> entrySet = _cache.entrySet();
outputStream.writeInt(_cache.size());
for (Entry<String, FStat> e : entrySet) {
String name = e.getKey();
FStat fstat = e.getValue();
writeString(outputStream, name);
outputStream.writeLong(fstat._lastMod);
outputStream.writeLong(fstat._length);
}
}
private void writeWALEdit(WALCellCodec codec, List<Cell> kvs, FSDataOutputStream out) throws IOException {
out.writeInt(kvs.size());
Codec.Encoder cellEncoder = codec.getEncoder(out);
// We interleave the two lists for code simplicity
for (Cell kv : kvs) {
cellEncoder.write(kv);
}
}
/**
* Persists a job in DFS.
*
* @param job the job about to be 'retired'
*/
public void store(JobInProgress job) {
if (active && retainTime > 0) {
JobID jobId = job.getStatus().getJobID();
Path jobStatusFile = getInfoFilePath(jobId);
try {
FSDataOutputStream dataOut = fs.create(jobStatusFile);
job.getStatus().write(dataOut);
job.getProfile().write(dataOut);
job.getCounters().write(dataOut);
TaskCompletionEvent[] events =
job.getTaskCompletionEvents(0, Integer.MAX_VALUE);
dataOut.writeInt(events.length);
for (TaskCompletionEvent event : events) {
event.write(dataOut);
}
dataOut.close();
} catch (IOException ex) {
LOG.warn("Could not store [" + jobId + "] job info : " +
ex.getMessage(), ex);
try {
fs.delete(jobStatusFile, true);
}
catch (IOException ex1) {
//ignore
}
}
}
}
private void writeToken(FileSystem fs, Path path) throws IOException{
if (token != null && token.length > 0) {
FSDataOutputStream out = null;
try {
out = fs.create(new Path(path, "_token"));
out.writeInt(token.length);
out.write(token);
out.close();
}finally {
if (out != null) {
out.close();
}
}
}
}
/**
* Persists a job in DFS.
*
* @param job the job about to be 'retired'
*/
public void store(JobInProgress job) {
if (active && retainTime > 0) {
JobID jobId = job.getStatus().getJobID();
Path jobStatusFile = getInfoFilePath(jobId);
try {
FSDataOutputStream dataOut = fs.create(jobStatusFile);
job.getStatus().write(dataOut);
job.getProfile().write(dataOut);
job.getCounters().write(dataOut);
TaskCompletionEvent[] events =
job.getTaskCompletionEvents(0, Integer.MAX_VALUE);
dataOut.writeInt(events.length);
for (TaskCompletionEvent event : events) {
event.write(dataOut);
}
dataOut.close();
} catch (IOException ex) {
LOG.warn("Could not store [" + jobId + "] job info : " +
ex.getMessage(), ex);
try {
fs.delete(jobStatusFile, true);
}
catch (IOException ex1) {
//ignore
}
}
}
}
private static void writeSplitHeader(FSDataOutputStream out)
throws IOException {
out.write(SPLIT_FILE_HEADER);
out.writeInt(splitVersion);
}
private static void writeSplitHeader(FSDataOutputStream out)
throws IOException {
out.write(SPLIT_FILE_HEADER);
out.writeInt(splitVersion);
}
@Test
public void testAppendAndReadOnCorruptedLog() throws IOException, URISyntaxException, InterruptedException {
Writer writer =
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
HoodieDataBlock dataBlock = getDataBlock(records, header);
writer = writer.appendBlock(dataBlock);
writer.close();
// Append some arbit byte[] to thee end of the log (mimics a partially written commit)
fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf());
FSDataOutputStream outputStream = fs.append(writer.getLogFile().getPath());
// create a block with
outputStream.write(HoodieLogFormat.MAGIC);
// Write out a length that does not confirm with the content
outputStream.writeLong(474);
outputStream.writeInt(HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
outputStream.writeInt(HoodieLogFormat.CURRENT_VERSION);
// Write out a length that does not confirm with the content
outputStream.writeLong(400);
// Write out incomplete content
outputStream.write("something-random".getBytes());
outputStream.flush();
outputStream.close();
// Append a proper block that is of the missing length of the corrupted block
writer =
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
records = SchemaTestUtil.generateTestRecords(0, 10);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
dataBlock = getDataBlock(records, header);
writer = writer.appendBlock(dataBlock);
writer.close();
// First round of reads - we should be able to read the first block and then EOF
Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema());
assertTrue(reader.hasNext(), "First block should be available");
reader.next();
assertTrue(reader.hasNext(), "We should have corrupted block next");
HoodieLogBlock block = reader.next();
assertEquals(HoodieLogBlockType.CORRUPT_BLOCK, block.getBlockType(), "The read block should be a corrupt block");
assertTrue(reader.hasNext(), "Third block should be available");
reader.next();
assertFalse(reader.hasNext(), "There should be no more block left");
reader.close();
// Simulate another failure back to back
outputStream = fs.append(writer.getLogFile().getPath());
// create a block with
outputStream.write(HoodieLogFormat.MAGIC);
// Write out a length that does not confirm with the content
outputStream.writeLong(1000);
outputStream.writeInt(HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
outputStream.writeInt(HoodieLogFormat.CURRENT_VERSION);
// Write out a length that does not confirm with the content
outputStream.writeLong(500);
// Write out some bytes
outputStream.write("something-else-random".getBytes());
outputStream.flush();
outputStream.close();
// Should be able to append a new block
writer =
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
records = SchemaTestUtil.generateTestRecords(0, 100);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
dataBlock = getDataBlock(records, header);
writer = writer.appendBlock(dataBlock);
writer.close();
// Second round of reads - we should be able to read the first and last block
reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema());
assertTrue(reader.hasNext(), "First block should be available");
reader.next();
assertTrue(reader.hasNext(), "We should get the 1st corrupted block next");
reader.next();
assertTrue(reader.hasNext(), "Third block should be available");
reader.next();
assertTrue(reader.hasNext(), "We should get the 2nd corrupted block next");
block = reader.next();
assertEquals(HoodieLogBlockType.CORRUPT_BLOCK, block.getBlockType(), "The read block should be a corrupt block");
assertTrue(reader.hasNext(), "We should get the last block next");
reader.next();
assertFalse(reader.hasNext(), "We should have no more blocks left");
reader.close();
}
@Test
public void testAvroLogRecordReaderWithRollbackPartialBlock()
throws IOException, URISyntaxException, InterruptedException {
Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
Writer writer =
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
// Write 1
List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords1 = records1.stream()
.map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
HoodieDataBlock dataBlock = getDataBlock(records1, header);
writer = writer.appendBlock(dataBlock);
writer.close();
// Write 2
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
// Append some arbit byte[] to thee end of the log (mimics a partially written commit)
fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf());
FSDataOutputStream outputStream = fs.append(writer.getLogFile().getPath());
// create a block with
outputStream.write(HoodieLogFormat.MAGIC);
// Write out a length that does not confirm with the content
outputStream.writeLong(1000);
outputStream.writeInt(HoodieLogFormat.CURRENT_VERSION);
outputStream.writeInt(HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
// Write out some header
outputStream.write(HoodieLogBlock.getLogMetadataBytes(header));
outputStream.writeLong("something-random".getBytes().length);
outputStream.write("something-random".getBytes());
outputStream.flush();
outputStream.close();
// Rollback the last write
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "101");
header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE,
String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
HoodieCommandBlock commandBlock = new HoodieCommandBlock(header);
writer =
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
writer = writer.appendBlock(commandBlock);
// Write 3
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "103");
List<IndexedRecord> records3 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords3 = records3.stream()
.map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
dataBlock = getDataBlock(records3, header);
writer = writer.appendBlock(dataBlock);
writer.close();
List<String> allLogFiles =
FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100")
.map(s -> s.getPath().toString()).collect(Collectors.toList());
HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, "103",
10240L, true, false, bufferSize, BASE_OUTPUT_PATH);
assertEquals(200, scanner.getTotalLogRecords(), "We would read 200 records");
Set<String> readKeys = new HashSet<>(200);
scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
assertEquals(200, readKeys.size(), "Stream collect should return all 200 records");
copyOfRecords1.addAll(copyOfRecords3);
Set<String> originalKeys =
copyOfRecords1.stream().map(s -> ((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString())
.collect(Collectors.toSet());
assertEquals(originalKeys, readKeys, "CompositeAvroLogReader should return 200 records from 2 versions");
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testAvroLogRecordReaderWithMixedInsertsCorruptsAndRollback(boolean readBlocksLazily)
throws IOException, URISyntaxException, InterruptedException {
// Write a 3 Data blocs with same InstantTime (written in same batch)
Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
Writer writer =
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
// Write 1
List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
HoodieDataBlock dataBlock = getDataBlock(records1, header);
writer = writer.appendBlock(dataBlock);
writer = writer.appendBlock(dataBlock);
writer = writer.appendBlock(dataBlock);
writer.close();
// Append some arbit byte[] to the end of the log (mimics a partially written commit)
fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf());
FSDataOutputStream outputStream = fs.append(writer.getLogFile().getPath());
// create a block with
outputStream.write(HoodieLogFormat.MAGIC);
outputStream.writeLong(1000);
outputStream.writeInt(HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
outputStream.writeInt(HoodieLogFormat.CURRENT_VERSION);
// Write out a length that does not confirm with the content
outputStream.writeLong(100);
outputStream.flush();
outputStream.close();
// Append some arbit byte[] to the end of the log (mimics a partially written commit)
fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf());
outputStream = fs.append(writer.getLogFile().getPath());
// create a block with
outputStream.write(HoodieLogFormat.MAGIC);
outputStream.writeLong(1000);
outputStream.writeInt(HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
outputStream.writeInt(HoodieLogFormat.CURRENT_VERSION);
// Write out a length that does not confirm with the content
outputStream.writeLong(100);
outputStream.flush();
outputStream.close();
writer =
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
writer = writer.appendBlock(dataBlock);
writer.close();
// Append some arbit byte[] to the end of the log (mimics a partially written commit)
fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf());
outputStream = fs.append(writer.getLogFile().getPath());
// create a block with
outputStream.write(HoodieLogFormat.MAGIC);
outputStream.writeLong(1000);
outputStream.writeInt(HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
outputStream.writeInt(HoodieLogFormat.CURRENT_VERSION);
// Write out a length that does not confirm with the content
outputStream.writeLong(100);
outputStream.flush();
outputStream.close();
writer =
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
// Write 1 rollback block for the last commit instant
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
header.put(HeaderMetadataType.TARGET_INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE,
String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
HoodieCommandBlock commandBlock = new HoodieCommandBlock(header);
writer = writer.appendBlock(commandBlock);
writer.close();
List<String> allLogFiles =
FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100")
.map(s -> s.getPath().toString()).collect(Collectors.toList());
HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, "101",
10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH);
assertEquals(0, scanner.getTotalLogRecords(), "We would read 0 records");
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testAppendAndReadOnCorruptedLogInReverse(boolean readBlocksLazily)
throws IOException, URISyntaxException, InterruptedException {
Writer writer =
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
Schema schema = getSimpleSchema();
List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
HoodieDataBlock dataBlock = getDataBlock(records, header);
writer = writer.appendBlock(dataBlock);
writer.close();
// Append some arbit byte[] to thee end of the log (mimics a partially written commit)
fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf());
FSDataOutputStream outputStream = fs.append(writer.getLogFile().getPath());
// create a block with
outputStream.write(HoodieLogFormat.MAGIC);
outputStream.writeInt(HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
// Write out a length that does not confirm with the content
outputStream.writeInt(1000);
// Write out footer length
outputStream.writeInt(1);
// Write out some metadata
// TODO : test for failure to write metadata - NA ?
outputStream.write(HoodieLogBlock.getLogMetadataBytes(header));
outputStream.write("something-random".getBytes());
outputStream.flush();
outputStream.close();
// Should be able to append a new block
writer =
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
records = SchemaTestUtil.generateTestRecords(0, 100);
dataBlock = getDataBlock(records, header);
writer = writer.appendBlock(dataBlock);
writer.close();
// First round of reads - we should be able to read the first block and then EOF
HoodieLogFileReader reader =
new HoodieLogFileReader(fs, writer.getLogFile(), schema, bufferSize, readBlocksLazily, true);
assertTrue(reader.hasPrev(), "Last block should be available");
HoodieLogBlock block = reader.prev();
assertTrue(block instanceof HoodieDataBlock, "Last block should be datablock");
assertTrue(reader.hasPrev(), "Last block should be available");
assertThrows(CorruptedLogFileException.class, () -> {
reader.prev();
});
reader.close();
}
@Test
public void testRemoveCompactedFilesWithException() throws Exception {
byte[] fam = Bytes.toBytes("f");
byte[] col = Bytes.toBytes("c");
byte[] val = Bytes.toBytes("val");
TableName tableName = TableName.valueOf(name.getMethodName());
TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam)).build();
RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
HRegion region = initHRegion(htd, info);
RegionServerServices rss = mock(RegionServerServices.class);
List<HRegion> regions = new ArrayList<>();
regions.add(region);
Mockito.doReturn(regions).when(rss).getRegions();
// Create the cleaner object
final CompactedHFilesDischarger cleaner =
new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false);
// Add some data to the region and do some flushes
int batchSize = 10;
int fileCount = 10;
for (int f = 0; f < fileCount; f++) {
int start = f * batchSize;
for (int i = start; i < start + batchSize; i++) {
Put p = new Put(Bytes.toBytes("row" + i));
p.addColumn(fam, col, val);
region.put(p);
}
// flush them
region.flush(true);
}
HStore store = region.getStore(fam);
assertEquals(fileCount, store.getStorefilesCount());
Collection<HStoreFile> storefiles = store.getStorefiles();
// None of the files should be in compacted state.
for (HStoreFile file : storefiles) {
assertFalse(file.isCompactedAway());
}
StoreFileManager fileManager = store.getStoreEngine().getStoreFileManager();
Collection<HStoreFile> initialCompactedFiles = fileManager.getCompactedfiles();
assertTrue(initialCompactedFiles == null || initialCompactedFiles.isEmpty());
// Do compaction
region.compact(true);
// all prior store files should now be compacted
Collection<HStoreFile> compactedFilesPreClean = fileManager.getCompactedfiles();
assertNotNull(compactedFilesPreClean);
assertTrue(compactedFilesPreClean.size() > 0);
// add the dummy file to the store directory
HRegionFileSystem regionFS = region.getRegionFileSystem();
Path errFile = regionFS.getStoreFilePath(Bytes.toString(fam), ERROR_FILE);
FSDataOutputStream out = regionFS.getFileSystem().create(errFile);
out.writeInt(1);
out.close();
HStoreFile errStoreFile = new MockHStoreFile(testUtil, errFile, 1, 0, false, 1);
fileManager.addCompactionResults(
ImmutableList.of(errStoreFile), ImmutableList.of());
// cleanup compacted files
cleaner.chore();
// make sure the compacted files are cleared
Collection<HStoreFile> compactedFilesPostClean = fileManager.getCompactedfiles();
assertEquals(1, compactedFilesPostClean.size());
for (HStoreFile origFile : compactedFilesPreClean) {
assertFalse(compactedFilesPostClean.contains(origFile));
}
// close the region
try {
region.close();
} catch (FailedArchiveException e) {
// expected due to errorfile
assertEquals(1, e.getFailedFiles().size());
assertEquals(ERROR_FILE, e.getFailedFiles().iterator().next().getName());
}
}
private void writeString(FSDataOutputStream outputStream, String s) throws IOException {
byte[] bs = s.getBytes(UTF_8);
outputStream.writeInt(bs.length);
outputStream.write(bs);
}
/**
* Checks consistency of create --> open --> append --> open operations with different buffer sizes.
*
* @param createBufSize Buffer size used for file creation.
* @param writeCntsInCreate Count of times to write in file creation.
* @param openAfterCreateBufSize Buffer size used for file opening after creation.
* @param appendBufSize Buffer size used for file appending.
* @param writeCntsInAppend Count of times to write in file appending.
* @param openAfterAppendBufSize Buffer size used for file opening after appending.
* @throws Exception If failed.
*/
private void checkConsistency(int createBufSize, int writeCntsInCreate, int openAfterCreateBufSize,
int appendBufSize, int writeCntsInAppend, int openAfterAppendBufSize) throws Exception {
final Path igfsHome = new Path(PRIMARY_URI);
Path file = new Path(igfsHome, "/someDir/someInnerDir/someFile");
FSDataOutputStream os = fs.create(file, true, createBufSize);
for (int i = 0; i < writeCntsInCreate; i++)
os.writeInt(i);
os.close();
FSDataInputStream is = fs.open(file, openAfterCreateBufSize);
for (int i = 0; i < writeCntsInCreate; i++)
assertEquals(i, is.readInt());
is.close();
os = fs.append(file, appendBufSize);
for (int i = writeCntsInCreate; i < writeCntsInCreate + writeCntsInAppend; i++)
os.writeInt(i);
os.close();
is = fs.open(file, openAfterAppendBufSize);
for (int i = 0; i < writeCntsInCreate + writeCntsInAppend; i++)
assertEquals(i, is.readInt());
is.close();
}
/**
* Checks consistency of create --> open --> append --> open operations with different buffer sizes.
*
* @param createBufSize Buffer size used for file creation.
* @param writeCntsInCreate Count of times to write in file creation.
* @param openAfterCreateBufSize Buffer size used for file opening after creation.
* @param appendBufSize Buffer size used for file appending.
* @param writeCntsInAppend Count of times to write in file appending.
* @param openAfterAppendBufSize Buffer size used for file opening after appending.
* @throws Exception If failed.
*/
private void checkConsistency(int createBufSize, int writeCntsInCreate, int openAfterCreateBufSize,
int appendBufSize, int writeCntsInAppend, int openAfterAppendBufSize) throws Exception {
final Path igfsHome = new Path(primaryFsUri);
Path file = new Path(igfsHome, "/someDir/someInnerDir/someFile");
if (createBufSize == -1)
createBufSize = fs.getServerDefaults().getFileBufferSize();
if (appendBufSize == -1)
appendBufSize = fs.getServerDefaults().getFileBufferSize();
FSDataOutputStream os = fs.create(file, EnumSet.of(CreateFlag.OVERWRITE),
Options.CreateOpts.perms(FsPermission.getDefault()), Options.CreateOpts.bufferSize(createBufSize));
for (int i = 0; i < writeCntsInCreate; i++)
os.writeInt(i);
os.close();
FSDataInputStream is = fs.open(file, openAfterCreateBufSize);
for (int i = 0; i < writeCntsInCreate; i++)
assertEquals(i, is.readInt());
is.close();
os = fs.create(file, EnumSet.of(CreateFlag.APPEND),
Options.CreateOpts.perms(FsPermission.getDefault()), Options.CreateOpts.bufferSize(appendBufSize));
for (int i = writeCntsInCreate; i < writeCntsInCreate + writeCntsInAppend; i++)
os.writeInt(i);
os.close();
is = fs.open(file, openAfterAppendBufSize);
for (int i = 0; i < writeCntsInCreate + writeCntsInAppend; i++)
assertEquals(i, is.readInt());
is.close();
}