下面列出了怎么用org.apache.hadoop.hbase.regionserver.StoreFile的API类实例代码及写法,或者点击链接到github查看源代码。
public RecordWriter<BytesWritable, BytesWritable> getRecordWriter(
TaskAttemptContext context) throws IOException {
// Get the path of the temporary output file
final Path outputPath = FileOutputFormat.getOutputPath(context);
final Path outputDir = new FileOutputCommitter(outputPath, context).getWorkPath();
final Configuration conf = context.getConfiguration();
final FileSystem fs = outputDir.getFileSystem(conf);
int blockSize = conf.getInt(Constants.HFILE_BLOCKSIZE, 16384);
// Default to snappy.
Compression.Algorithm compressionAlgorithm = getAlgorithm(
conf.get(Constants.HFILE_COMPRESSION));
final StoreFile.Writer writer =
new StoreFile.WriterBuilder(conf, new CacheConfig(conf), fs, blockSize)
.withFilePath(hfilePath(outputPath, context.getTaskAttemptID().getTaskID().getId()))
.withCompression(compressionAlgorithm)
.build();
return new HFileRecordWriter(writer);
}
public void create(String tableName, String[] columnFamily,
String startKey, String endKey) throws IOException {
HTableDescriptor desc = new HTableDescriptor(Bytes.toBytes(tableName));
for (String cf : columnFamily) {
HColumnDescriptor cfDesc = new HColumnDescriptor(cf);
// column family attribute
cfDesc.setValue(HConstants.VERSIONS,
String.valueOf(Integer.MAX_VALUE));
cfDesc.setValue(HColumnDescriptor.BLOOMFILTER,
StoreFile.BloomType.ROW.toString());
desc.addFamily(cfDesc);
}
if (!admin.tableExists(tableName)) {
if (null != startKey && null != endKey)
admin.createTable(desc, Bytes.toBytes(startKey),
Bytes.toBytes(endKey), NUM_REGIONSERVERS);
}
}
@Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, StoreFile resultFile,
CompactionRequest request) throws IOException {
// Persist the compaction state after a successful compaction
if (compactionState != null) {
compactionState.persist();
}
}
@Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, StoreFile resultFile,
CompactionRequest request) throws IOException {
// Persist the compaction state after a succesful compaction
if (compactionState != null) {
compactionState.persist();
}
}
@Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, StoreFile resultFile,
CompactionRequest request) throws IOException {
// Persist the compaction state after a successful compaction
if (compactionState != null) {
compactionState.persist();
}
}
@Override
public void postCompact(
org.apache.hadoop.hbase.coprocessor.ObserverContext<RegionCoprocessorEnvironment> c,
Store store, StoreFile resultFile,
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker tracker,
CompactionRequest request) throws IOException {
// Persist the compaction state after a successful compaction
if (compactionState != null) {
compactionState.persist();
}
}
@Override
public void postCompact(
org.apache.hadoop.hbase.coprocessor.ObserverContext<RegionCoprocessorEnvironment> c,
Store store, StoreFile resultFile,
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker tracker,
CompactionRequest request) throws IOException {
super.postCompact(c, store, resultFile, tracker, request);
lastMajorCompactionTime.set(System.currentTimeMillis());
}
@Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, StoreFile resultFile,
CompactionRequest request) throws IOException {
// Persist the compaction state after a successful compaction
if (compactionState != null) {
compactionState.persist();
}
}
@Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, StoreFile resultFile,
CompactionRequest request) throws IOException {
// Persist the compaction state after a successful compaction
if (compactionState != null) {
compactionState.persist();
}
}
@Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, StoreFile resultFile,
CompactionRequest request) throws IOException {
// Persist the compaction state after a successful compaction
if (compactionState != null) {
compactionState.persist();
}
}
@Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, StoreFile resultFile,
CompactionRequest request) throws IOException {
// Persist the compaction state after a successful compaction
if (compactionState != null) {
compactionState.persist();
}
}
/**
* Generate hfiles for testing purpose
*
* @param sourceFileSystem source file system
* @param conf configuration for hfile
* @param outputFolder output folder for generated hfiles
* @param partitionerType partitioner type
* @param numOfPartitions number of partitions
* @param numOfKeys number of keys
* @return list of generated hfiles
* @throws IOException if hfile creation goes wrong
*/
public static List<Path> generateHFiles(FileSystem sourceFileSystem, Configuration conf,
File outputFolder, PartitionerType partitionerType,
int numOfPartitions, int numOfKeys)
throws IOException {
StoreFile.Writer[] writers = new StoreFile.Writer[numOfPartitions];
for (int i = 0; i < numOfPartitions; i++) {
writers[i] = new StoreFile.WriterBuilder(conf, new CacheConfig(conf), sourceFileSystem, 4096)
.withFilePath(new Path(String.format("%s/%s", outputFolder.getAbsoluteFile(),
TerrapinUtil.formatPartitionName(i))))
.withCompression(Compression.Algorithm.NONE)
.build();
}
Partitioner partitioner = PartitionerFactory.getPartitioner(partitionerType);
for (int i = 0; i < numOfKeys; i++) {
byte[] key = String.format("%06d", i).getBytes();
byte[] value;
if (i <= 1) {
value = "".getBytes();
} else {
value = ("v" + (i + 1)).getBytes();
}
KeyValue kv = new KeyValue(key, Bytes.toBytes("cf"), Bytes.toBytes(""), value);
int partition = partitioner.getPartition(new BytesWritable(key), new BytesWritable(value),
numOfPartitions);
writers[partition].append(kv);
}
for (int i = 0; i < numOfPartitions; i++) {
writers[i].close();
}
return Lists.transform(Lists.newArrayList(writers), new Function<StoreFile.Writer, Path>() {
@Override
public Path apply(StoreFile.Writer writer) {
return writer.getPath();
}
});
}
@BeforeClass
public static void setUp() throws Exception {
int randomNum = (int) (Math.random() * Integer.MAX_VALUE);
hfilePath = "/tmp/hfile-" + randomNum;
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
keyValueMap = Maps.newHashMapWithExpectedSize(10000);
errorKeys = Sets.newHashSetWithExpectedSize(2000);
StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, new CacheConfig(conf),
fs, 4096).
withFilePath(new Path(hfilePath)).
withCompression(Compression.Algorithm.NONE).
build();
// Add upto 10K values.
for (int i = 0; i < 10000; i++) {
byte[] key = String.format("%04d", i).getBytes();
byte[] value = null;
// Add a couple of empty values for testing and making sure we return them.
if (i <= 1) {
value = "".getBytes();
} else {
value = ("v" + (i + 1)).getBytes();
}
KeyValue kv = new KeyValue(key,
Bytes.toBytes("cf"),
Bytes.toBytes(""),
value);
writer.append(kv);
keyValueMap.put(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
if (i >= 4000 && i < 6000) {
errorKeys.add(ByteBuffer.wrap(key));
}
}
writer.close();
hfileReader = new TestHFileReader(fs,
hfilePath,
new CacheConfig(conf),
new ExecutorServiceFuturePool(Executors.newFixedThreadPool(1)),
errorKeys);
}
@Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
StoreFile resultFile) throws IOException {
try {
latch2.await();
} catch (InterruptedException e1) {
}
super.postCompact(e, store, resultFile);
}
@Override
public void postFlush(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, StoreFile resultFile, FlushLifeCycleTracker tracker) throws IOException {
ctPostFlush.incrementAndGet();
if (throwOnPostFlush.get()){
throw new IOException("throwOnPostFlush is true in postFlush");
}
}
@Override
public void postCompactSelection(
ObserverContext<RegionCoprocessorEnvironment> c, Store store,
List<? extends StoreFile> selected, CompactionLifeCycleTracker tracker,
CompactionRequest request) {
if (selected != null) {
filesCompactedCounter.increment(selected.size());
}
}
@Override
final ArrayList<StoreFile> applyCompactionPolicy(final ArrayList<StoreFile> candidates, final boolean mayUseOffPeak,
final boolean mayBeStuck) throws IOException {
LOG.info("Sending empty list for compaction to avoid compaction and do only deletes of files older than TTL");
return new ArrayList<StoreFile>();
}
public static boolean isLocalIndexStoreFilesConsistent(RegionCoprocessorEnvironment environment, Store store) {
byte[] startKey = environment.getRegion().getRegionInfo().getStartKey();
byte[] endKey = environment.getRegion().getRegionInfo().getEndKey();
byte[] indexKeyEmbedded = startKey.length == 0 ? new byte[endKey.length] : startKey;
for (StoreFile file : store.getStorefiles()) {
if (file.getFirstKey().isPresent() && file.getFirstKey().get() != null) {
byte[] fileFirstRowKey = CellUtil.cloneRow(file.getFirstKey().get());
if ((fileFirstRowKey != null && Bytes.compareTo(fileFirstRowKey, 0,
indexKeyEmbedded.length, indexKeyEmbedded, 0, indexKeyEmbedded.length) != 0)) {
return false; }
}
}
return true;
}
@Override
public List<StoreFile> close(final boolean abort) throws IOException {
prepareToClose();
if (!commitPendingTransactions.isEmpty()) {
LOG.warn("Closing transactional region [" + getRegionInfo().getRegionNameAsString() + "], but still have ["
+ commitPendingTransactions.size() + "] transactions that are pending commit.");
// TODO resolve from the Global Trx Log.
}
return super.close(abort);
}
public void testBlockCacheConfiguration() throws Exception {
this.c.close();
this.c = createCache();
try {
HDFSStoreFactory hsf = this.c.createHDFSStoreFactory();
//Configure a block cache to cache about 20 blocks.
long heapSize = HeapMemoryMonitor.getTenuredPoolMaxMemory();
int blockSize = StoreFile.DEFAULT_BLOCKSIZE_SMALL;
int blockCacheSize = 5 * blockSize;
int entrySize = blockSize / 2;
float percentage = 100 * (float) blockCacheSize / (float) heapSize;
hsf.setBlockCacheSize(percentage);
HDFSStoreImpl store = (HDFSStoreImpl) hsf.create("myHDFSStore");
RegionFactory rf1 = this.c.createRegionFactory(RegionShortcut.PARTITION_HDFS);
//Create a region that evicts everything
HDFSEventQueueAttributesFactory heqf = new HDFSEventQueueAttributesFactory();
heqf.setBatchTimeInterval(10);
LocalRegion r1 = (LocalRegion) rf1.setHDFSStoreName("myHDFSStore").setEvictionAttributes(EvictionAttributes.createLRUEntryAttributes(1)).create("r1");
//Populate about many times our block cache size worth of data
//We want to try to cache at least 5 blocks worth of index and metadata
byte[] value = new byte[entrySize];
int numEntries = 10 * blockCacheSize / entrySize;
for(int i = 0; i < numEntries; i++) {
r1.put(i, value);
}
//Wait for the events to be written to HDFS.
Set<String> queueIds = r1.getAsyncEventQueueIds();
assertEquals(1, queueIds.size());
AsyncEventQueueImpl queue = (AsyncEventQueueImpl) c.getAsyncEventQueue(queueIds.iterator().next());
long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(120);
while(queue.size() > 0 && System.nanoTime() < end) {
Thread.sleep(10);
}
assertEquals(0, queue.size());
Thread.sleep(10000);
//Do some reads to cache some blocks. Note that this doesn't
//end up caching data blocks, just index and bloom filters blocks.
for(int i = 0; i < numEntries; i++) {
r1.get(i);
}
long statSize = store.getStats().getBlockCache().getBytesCached();
assertTrue("Block cache stats expected to be near " + blockCacheSize + " was " + statSize,
blockCacheSize / 2 < statSize &&
statSize <= 2 * blockCacheSize);
long currentSize = store.getBlockCache().getCurrentSize();
assertTrue("Block cache size expected to be near " + blockCacheSize + " was " + currentSize,
blockCacheSize / 2 < currentSize &&
currentSize <= 2 * blockCacheSize);
} finally {
this.c.close();
}
}
@Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, StoreFile resultFile,
CompactionRequest request) throws IOException {
super.postCompact(e, store, resultFile, request);
lastMajorCompactionTime.set(System.currentTimeMillis());
}
@Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, StoreFile resultFile,
CompactionRequest request) throws IOException {
super.postCompact(e, store, resultFile, request);
lastMajorCompactionTime.set(System.currentTimeMillis());
}
@Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, StoreFile resultFile,
CompactionRequest request) throws IOException {
super.postCompact(e, store, resultFile, request);
lastMajorCompactionTime.set(System.currentTimeMillis());
}
@Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, StoreFile resultFile,
CompactionRequest request) throws IOException {
super.postCompact(e, store, resultFile, request);
lastMajorCompactionTime.set(System.currentTimeMillis());
}
@Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, StoreFile resultFile,
CompactionRequest request) throws IOException {
super.postCompact(e, store, resultFile, request);
lastMajorCompactionTime.set(System.currentTimeMillis());
}
@Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, StoreFile resultFile,
CompactionRequest request) throws IOException {
super.postCompact(e, store, resultFile, request);
lastMajorCompactionTime.set(System.currentTimeMillis());
}
@Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, StoreFile resultFile,
CompactionRequest request) throws IOException {
super.postCompact(e, store, resultFile, request);
lastMajorCompactionTime.set(System.currentTimeMillis());
}
public HFileRecordWriter(StoreFile.Writer writer) {
this.writer = writer;
}
@Test
public void testWrite() throws Exception {
Configuration conf = new Configuration();
HColumnDescriptor columnDescriptor = new HColumnDescriptor();
// Disable block cache to ensure it reads the actual file content.
columnDescriptor.setBlockCacheEnabled(false);
FileSystem fs = FileSystem.get(conf);
int blockSize = conf.getInt(Constants.HFILE_BLOCKSIZE, 16384);
final StoreFile.Writer writer =
new StoreFile.WriterBuilder(conf, new CacheConfig(conf, columnDescriptor), fs, blockSize)
.withFilePath(new Path(tempFile.toURI()))
.build();
/* Create our RecordWriter */
RecordWriter<BytesWritable, BytesWritable> hfileWriter =
new HFileRecordWriter(writer);
List<String> keys = Lists.newArrayList();
List<String> values = Lists.newArrayList();
for (int i = 0; i < 100; ++i) {
String key = String.format("%03d", i);
String val = "value " + i;
keys.add(key);
values.add(val);
hfileWriter.write(new BytesWritable(key.getBytes()), new BytesWritable(val.getBytes()));
}
/* This internally closes the StoreFile.Writer */
hfileWriter.close(null);
HFile.Reader reader = HFile.createReader(fs, new Path(tempFile.toURI()),
new CacheConfig(conf, columnDescriptor));
HFileScanner scanner = reader.getScanner(false, false, false);
boolean valid = scanner.seekTo();
List<String> gotKeys = Lists.newArrayListWithCapacity(keys.size());
List<String> gotValues = Lists.newArrayListWithCapacity(values.size());
while(valid) {
KeyValue keyValue = scanner.getKeyValue();
gotKeys.add(new String(keyValue.getRow()));
gotValues.add(new String(keyValue.getValue()));
valid = scanner.next();
}
assertEquals(keys, gotKeys);
assertEquals(values, gotValues);
reader.close();
}
public void testBlockCacheConfiguration() throws Exception {
this.c.close();
this.c = createCache();
try {
HDFSStoreFactory hsf = this.c.createHDFSStoreFactory();
//Configure a block cache to cache about 20 blocks.
long heapSize = HeapMemoryMonitor.getTenuredPoolMaxMemory();
int blockSize = StoreFile.DEFAULT_BLOCKSIZE_SMALL;
int blockCacheSize = 5 * blockSize;
int entrySize = blockSize / 2;
float percentage = 100 * (float) blockCacheSize / (float) heapSize;
hsf.setBlockCacheSize(percentage);
HDFSStoreImpl store = (HDFSStoreImpl) hsf.create("myHDFSStore");
RegionFactory rf1 = this.c.createRegionFactory(RegionShortcut.PARTITION_HDFS);
//Create a region that evicts everything
HDFSEventQueueAttributesFactory heqf = new HDFSEventQueueAttributesFactory();
heqf.setBatchTimeInterval(10);
LocalRegion r1 = (LocalRegion) rf1.setHDFSStoreName("myHDFSStore").setEvictionAttributes(EvictionAttributes.createLRUEntryAttributes(1)).create("r1");
//Populate about many times our block cache size worth of data
//We want to try to cache at least 5 blocks worth of index and metadata
byte[] value = new byte[entrySize];
int numEntries = 10 * blockCacheSize / entrySize;
for(int i = 0; i < numEntries; i++) {
r1.put(i, value);
}
//Wait for the events to be written to HDFS.
Set<String> queueIds = r1.getAsyncEventQueueIds();
assertEquals(1, queueIds.size());
AsyncEventQueueImpl queue = (AsyncEventQueueImpl) c.getAsyncEventQueue(queueIds.iterator().next());
long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(120);
while(queue.size() > 0 && System.nanoTime() < end) {
Thread.sleep(10);
}
assertEquals(0, queue.size());
Thread.sleep(10000);
//Do some reads to cache some blocks. Note that this doesn't
//end up caching data blocks, just index and bloom filters blocks.
for(int i = 0; i < numEntries; i++) {
r1.get(i);
}
long statSize = store.getStats().getBlockCache().getBytesCached();
assertTrue("Block cache stats expected to be near " + blockCacheSize + " was " + statSize,
blockCacheSize / 2 < statSize &&
statSize <= 2 * blockCacheSize);
long currentSize = store.getBlockCache().getCurrentSize();
assertTrue("Block cache size expected to be near " + blockCacheSize + " was " + currentSize,
blockCacheSize / 2 < currentSize &&
currentSize <= 2 * blockCacheSize);
} finally {
this.c.close();
}
}