下面列出了怎么用org.apache.hadoop.hbase.io.hfile.Compression的API类实例代码及写法,或者点击链接到github查看源代码。
public RecordWriter<BytesWritable, BytesWritable> getRecordWriter(
TaskAttemptContext context) throws IOException {
// Get the path of the temporary output file
final Path outputPath = FileOutputFormat.getOutputPath(context);
final Path outputDir = new FileOutputCommitter(outputPath, context).getWorkPath();
final Configuration conf = context.getConfiguration();
final FileSystem fs = outputDir.getFileSystem(conf);
int blockSize = conf.getInt(Constants.HFILE_BLOCKSIZE, 16384);
// Default to snappy.
Compression.Algorithm compressionAlgorithm = getAlgorithm(
conf.get(Constants.HFILE_COMPRESSION));
final StoreFile.Writer writer =
new StoreFile.WriterBuilder(conf, new CacheConfig(conf), fs, blockSize)
.withFilePath(hfilePath(outputPath, context.getTaskAttemptID().getTaskID().getId()))
.withCompression(compressionAlgorithm)
.build();
return new HFileRecordWriter(writer);
}
/**
* Returns the compression string. Defaults to SNAPPY compression.
*
* @param compressionString One of SNAPPY, GZ, LZO, LZ4 or NONE.
* @return The corresponding Compression.Algorithm enum type.
*/
public static Compression.Algorithm getAlgorithm(String compressionString) {
Compression.Algorithm compressionAlgo = Compression.Algorithm.SNAPPY;
if (compressionString == null) {
return compressionAlgo;
}
try {
compressionAlgo = Compression.Algorithm.valueOf(compressionString);
} catch (Throwable t) {
// Use the default.
return compressionAlgo;
}
return compressionAlgo;
}
@Test
public void testGetCompression() {
assertEquals(Compression.Algorithm.SNAPPY, HFileOutputFormat.getAlgorithm(null));
assertEquals(Compression.Algorithm.SNAPPY, HFileOutputFormat.getAlgorithm(""));
assertEquals(Compression.Algorithm.SNAPPY, HFileOutputFormat.getAlgorithm("WRONG_ALGO"));
assertEquals(Compression.Algorithm.SNAPPY, HFileOutputFormat.getAlgorithm("SNAPPY"));
assertEquals(Compression.Algorithm.NONE, HFileOutputFormat.getAlgorithm("NONE"));
}
/**
* Generate hfiles for testing purpose
*
* @param sourceFileSystem source file system
* @param conf configuration for hfile
* @param outputFolder output folder for generated hfiles
* @param partitionerType partitioner type
* @param numOfPartitions number of partitions
* @param numOfKeys number of keys
* @return list of generated hfiles
* @throws IOException if hfile creation goes wrong
*/
public static List<Path> generateHFiles(FileSystem sourceFileSystem, Configuration conf,
File outputFolder, PartitionerType partitionerType,
int numOfPartitions, int numOfKeys)
throws IOException {
StoreFile.Writer[] writers = new StoreFile.Writer[numOfPartitions];
for (int i = 0; i < numOfPartitions; i++) {
writers[i] = new StoreFile.WriterBuilder(conf, new CacheConfig(conf), sourceFileSystem, 4096)
.withFilePath(new Path(String.format("%s/%s", outputFolder.getAbsoluteFile(),
TerrapinUtil.formatPartitionName(i))))
.withCompression(Compression.Algorithm.NONE)
.build();
}
Partitioner partitioner = PartitionerFactory.getPartitioner(partitionerType);
for (int i = 0; i < numOfKeys; i++) {
byte[] key = String.format("%06d", i).getBytes();
byte[] value;
if (i <= 1) {
value = "".getBytes();
} else {
value = ("v" + (i + 1)).getBytes();
}
KeyValue kv = new KeyValue(key, Bytes.toBytes("cf"), Bytes.toBytes(""), value);
int partition = partitioner.getPartition(new BytesWritable(key), new BytesWritable(value),
numOfPartitions);
writers[partition].append(kv);
}
for (int i = 0; i < numOfPartitions; i++) {
writers[i].close();
}
return Lists.transform(Lists.newArrayList(writers), new Function<StoreFile.Writer, Path>() {
@Override
public Path apply(StoreFile.Writer writer) {
return writer.getPath();
}
});
}
@BeforeClass
public static void setUp() throws Exception {
int randomNum = (int) (Math.random() * Integer.MAX_VALUE);
hfilePath = "/tmp/hfile-" + randomNum;
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
keyValueMap = Maps.newHashMapWithExpectedSize(10000);
errorKeys = Sets.newHashSetWithExpectedSize(2000);
StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, new CacheConfig(conf),
fs, 4096).
withFilePath(new Path(hfilePath)).
withCompression(Compression.Algorithm.NONE).
build();
// Add upto 10K values.
for (int i = 0; i < 10000; i++) {
byte[] key = String.format("%04d", i).getBytes();
byte[] value = null;
// Add a couple of empty values for testing and making sure we return them.
if (i <= 1) {
value = "".getBytes();
} else {
value = ("v" + (i + 1)).getBytes();
}
KeyValue kv = new KeyValue(key,
Bytes.toBytes("cf"),
Bytes.toBytes(""),
value);
writer.append(kv);
keyValueMap.put(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
if (i >= 4000 && i < 6000) {
errorKeys.add(ByteBuffer.wrap(key));
}
}
writer.close();
hfileReader = new TestHFileReader(fs,
hfilePath,
new CacheConfig(conf),
new ExecutorServiceFuturePool(Executors.newFixedThreadPool(1)),
errorKeys);
}
private static boolean createTable(byte[] tableName, byte[] columnFamilyName,
short regionCount, long regionMaxSize, HBaseAdmin admin)
throws IOException {
if (admin.tableExists(tableName)) {
return false;
}
HTableDescriptor tableDescriptor = new HTableDescriptor();
tableDescriptor.setName(tableName);
HColumnDescriptor columnDescriptor = new HColumnDescriptor(columnFamilyName);
columnDescriptor.setCompressionType(Compression.Algorithm.SNAPPY);
columnDescriptor.setBlocksize(64 * 1024);
columnDescriptor.setBloomFilterType(BloomType.ROW);
columnDescriptor.setMaxVersions(10);
tableDescriptor.addFamily(columnDescriptor);
tableDescriptor.setMaxFileSize(regionMaxSize);
tableDescriptor.setValue(tableDescriptor.SPLIT_POLICY,
ConstantSizeRegionSplitPolicy.class.getName());
tableDescriptor.setDeferredLogFlush(true);
regionCount = (short) Math.abs(regionCount);
int regionRange = Short.MAX_VALUE / regionCount;
int counter = 0;
byte[][] splitKeys = new byte[regionCount][];
for (byte[] splitKey : splitKeys) {
counter = counter + regionRange;
String key = StringUtils.leftPad(Integer.toString(counter), 5, '0');
splitKey = Bytes.toBytes(key);
System.out.println(" - Split: " + splitKey);
}
return true;
}
private static void createTable(String tableName, String columnFamilyName,
short regionCount, long regionMaxSize, HBaseAdmin admin)
throws IOException {
System.out.println("Creating Table: " + tableName);
HTableDescriptor tableDescriptor = new HTableDescriptor();
tableDescriptor.setName(Bytes.toBytes(tableName));
HColumnDescriptor columnDescriptor = new HColumnDescriptor(columnFamilyName);
columnDescriptor.setCompressionType(Compression.Algorithm.SNAPPY);
columnDescriptor.setBlocksize(64 * 1024);
columnDescriptor.setBloomFilterType(BloomType.ROW);
tableDescriptor.addFamily(columnDescriptor);
tableDescriptor.setMaxFileSize(regionMaxSize);
tableDescriptor.setValue(tableDescriptor.SPLIT_POLICY, ConstantSizeRegionSplitPolicy.class.getName());
tableDescriptor.setDeferredLogFlush(true);
regionCount = (short)Math.abs(regionCount);
int regionRange = Short.MAX_VALUE/regionCount;
int counter = 0;
byte[][] splitKeys = new byte[regionCount][];
for (int i = 0 ; i < splitKeys.length; i++) {
counter = counter + regionRange;
String key = StringUtils.leftPad(Integer.toString(counter), 5, '0');
splitKeys[i] = Bytes.toBytes(key);
System.out.println(" - Split: " + i + " '" + key + "'");
}
admin.createTable(tableDescriptor, splitKeys);
}