下面列出了怎么用org.apache.hadoop.hbase.regionserver.wal.WALCellCodec的API类实例代码及写法,或者点击链接到github查看源代码。
public static boolean isWALEditCodecSet(Configuration conf) {
// check to see if the WALEditCodec is installed
try {
// Use reflection to load the IndexedWALEditCodec, since it may not load with an older version
// of HBase
Class.forName(INDEX_WAL_EDIT_CODEC_CLASS_NAME);
} catch (Throwable t) {
return false;
}
if (INDEX_WAL_EDIT_CODEC_CLASS_NAME.equals(conf
.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, null))) {
// its installed, and it can handle compression and non-compression cases
return true;
}
return false;
}
@Test
public void testWALKeySerialization() throws Exception {
Map<String, byte[]> attributes = new HashMap<String, byte[]>();
attributes.put("foo", Bytes.toBytes("foo-value"));
attributes.put("bar", Bytes.toBytes("bar-value"));
WALKeyImpl key = new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), new ArrayList<UUID>(), 0L, 0L,
mvcc, scopes, attributes);
Assert.assertEquals(attributes, key.getExtendedAttributes());
WALProtos.WALKey.Builder builder = key.getBuilder(WALCellCodec.getNoneCompressor());
WALProtos.WALKey serializedKey = builder.build();
WALKeyImpl deserializedKey = new WALKeyImpl();
deserializedKey.readFieldsFromPb(serializedKey, WALCellCodec.getNoneUncompressor());
//equals() only checks region name, sequence id and write time
Assert.assertEquals(key, deserializedKey);
//can't use Map.equals() because byte arrays use reference equality
Assert.assertEquals(key.getExtendedAttributes().keySet(),
deserializedKey.getExtendedAttributes().keySet());
for (Map.Entry<String, byte[]> entry : deserializedKey.getExtendedAttributes().entrySet()){
Assert.assertArrayEquals(key.getExtendedAttribute(entry.getKey()), entry.getValue());
}
Assert.assertEquals(key.getReplicationScopes(), deserializedKey.getReplicationScopes());
}
public static boolean isWALEditCodecSet(Configuration conf) {
// check to see if the WALEditCodec is installed
try {
// Use reflection to load the IndexedWALEditCodec, since it may not load with an older version
// of HBase
Class.forName(INDEX_WAL_EDIT_CODEC_CLASS_NAME);
} catch (Throwable t) {
return false;
}
if (INDEX_WAL_EDIT_CODEC_CLASS_NAME.equals(conf
.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, null))) {
// its installed, and it can handle compression and non-compression cases
return true;
}
return false;
}
public static void ensureMutableIndexingCorrectlyConfigured(Configuration conf) throws IllegalStateException {
// check to see if the WALEditCodec is installed
if (isWALEditCodecSet(conf)) { return; }
// otherwise, we have to install the indexedhlogreader, but it cannot have compression
String codecClass = INDEX_WAL_EDIT_CODEC_CLASS_NAME;
String indexLogReaderName = INDEX_HLOG_READER_CLASS_NAME;
try {
// Use reflection to load the IndexedHLogReader, since it may not load with an older version
// of HBase
Class.forName(indexLogReaderName);
} catch (ClassNotFoundException e) {
throw new IllegalStateException(codecClass + " is not installed, but "
+ indexLogReaderName + " hasn't been installed in hbase-site.xml under " + HLOG_READER_IMPL_KEY);
}
if (indexLogReaderName.equals(conf.get(HLOG_READER_IMPL_KEY, indexLogReaderName))) {
if (conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false)) { throw new IllegalStateException(
"WAL Compression is only supported with " + codecClass
+ ". You can install in hbase-site.xml, under " + WALCellCodec.WAL_CELL_CODEC_CLASS_KEY);
}
} else {
throw new IllegalStateException(codecClass + " is not installed, but "
+ indexLogReaderName + " hasn't been installed in hbase-site.xml under " + HLOG_READER_IMPL_KEY);
}
}
@BeforeClass
public static void setupCluster() throws Exception {
//add our codec and enable WAL compression
Configuration conf = UTIL.getConfiguration();
setUpConfigForMiniCluster(conf);
IndexTestingUtils.setupConfig(conf);
// disable version checking, so we can test against whatever version of HBase happens to be
// installed (right now, its generally going to be SNAPSHOT versions).
conf.setBoolean(Indexer.CHECK_VERSION_CONF_KEY, false);
conf.set(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY,
IndexedWALEditCodec.class.getName());
conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
//start the mini-cluster
UTIL.startMiniCluster();
}
/**
* Compressed WALs are supported when we have the WALEditCodec installed
* @throws Exception
*/
@Test
public void testCompressedWALWithCodec() throws Exception {
Configuration conf = new Configuration(false);
conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
// works with WALEditcodec
conf.set(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, IndexedWALEditCodec.class.getName());
IndexManagementUtil.ensureMutableIndexingCorrectlyConfigured(conf);
}
public void readFieldsFromPb(WALProtos.WALKey walKey,
WALCellCodec.ByteStringUncompressor uncompressor) throws IOException {
this.encodedRegionName = uncompressor.uncompress(walKey.getEncodedRegionName(),
CompressionContext.DictionaryIndex.REGION);
byte[] tablenameBytes =
uncompressor.uncompress(walKey.getTableName(), CompressionContext.DictionaryIndex.TABLE);
this.tablename = TableName.valueOf(tablenameBytes);
clusterIds.clear();
for (HBaseProtos.UUID clusterId : walKey.getClusterIdsList()) {
clusterIds.add(new UUID(clusterId.getMostSigBits(), clusterId.getLeastSigBits()));
}
if (walKey.hasNonceGroup()) {
this.nonceGroup = walKey.getNonceGroup();
}
if (walKey.hasNonce()) {
this.nonce = walKey.getNonce();
}
this.replicationScope = null;
if (walKey.getScopesCount() > 0) {
this.replicationScope = new TreeMap<>(Bytes.BYTES_COMPARATOR);
for (FamilyScope scope : walKey.getScopesList()) {
byte[] family =
uncompressor.uncompress(scope.getFamily(), CompressionContext.DictionaryIndex.FAMILY);
this.replicationScope.put(family, scope.getScopeType().getNumber());
}
}
setSequenceId(walKey.getLogSequenceNumber());
this.writeTime = walKey.getWriteTime();
if (walKey.hasOrigSequenceNumber()) {
this.origLogSeqNum = walKey.getOrigSequenceNumber();
}
if (walKey.getExtendedAttributesCount() > 0){
this.extendedAttributes = new HashMap<>(walKey.getExtendedAttributesCount());
for (WALProtos.Attribute attr : walKey.getExtendedAttributesList()){
byte[] value =
uncompressor.uncompress(attr.getValue(), CompressionContext.DictionaryIndex.TABLE);
extendedAttributes.put(attr.getKey(), value);
}
}
}
@VisibleForTesting
WALSplitter(final WALFactory factory, Configuration conf, Path walDir, FileSystem walFS,
Path rootDir, FileSystem rootFS, LastSequenceId idChecker,
SplitLogWorkerCoordination splitLogWorkerCoordination, RegionServerServices rsServices) {
this.conf = HBaseConfiguration.create(conf);
String codecClassName =
conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
this.walDir = walDir;
this.walFS = walFS;
this.rootDir = rootDir;
this.rootFS = rootFS;
this.sequenceIdChecker = idChecker;
this.splitLogWorkerCoordination = splitLogWorkerCoordination;
this.rsServices = rsServices;
this.walFactory = factory;
PipelineController controller = new PipelineController();
this.tmpDirName =
conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
// if we limit the number of writers opened for sinking recovered edits
boolean splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false);
boolean splitToHFile = conf.getBoolean(WAL_SPLIT_TO_HFILE, DEFAULT_WAL_SPLIT_TO_HFILE);
long bufferSize = this.conf.getLong(SPLIT_WAL_BUFFER_SIZE, 128 * 1024 * 1024);
int numWriterThreads = this.conf.getInt(SPLIT_WAL_WRITER_THREADS, 3);
if (splitToHFile) {
entryBuffers = new BoundedEntryBuffers(controller, bufferSize);
outputSink =
new BoundedRecoveredHFilesOutputSink(this, controller, entryBuffers, numWriterThreads);
} else if (splitWriterCreationBounded) {
entryBuffers = new BoundedEntryBuffers(controller, bufferSize);
outputSink =
new BoundedRecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads);
} else {
entryBuffers = new EntryBuffers(controller, bufferSize);
outputSink = new RecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads);
}
}
public static void ensureMutableIndexingCorrectlyConfigured(Configuration conf) throws IllegalStateException {
// check to see if the WALEditCodec is installed
if (isWALEditCodecSet(conf)) { return; }
// otherwise, we have to install the indexedhlogreader, but it cannot have compression
String codecClass = INDEX_WAL_EDIT_CODEC_CLASS_NAME;
String indexLogReaderName = INDEX_HLOG_READER_CLASS_NAME;
try {
// Use reflection to load the IndexedHLogReader, since it may not load with an older version
// of HBase
Class.forName(indexLogReaderName);
} catch (ClassNotFoundException e) {
throw new IllegalStateException(codecClass + " is not installed, but "
+ indexLogReaderName + " hasn't been installed in hbase-site.xml under " + HLOG_READER_IMPL_KEY);
}
if (indexLogReaderName.equals(conf.get(HLOG_READER_IMPL_KEY, indexLogReaderName))) {
if (conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false)) { throw new IllegalStateException(
"WAL Compression is only supported with " + codecClass
+ ". You can install in hbase-site.xml, under " + WALCellCodec.WAL_CELL_CODEC_CLASS_KEY);
}
} else {
throw new IllegalStateException(codecClass + " is not installed, but "
+ indexLogReaderName + " hasn't been installed in hbase-site.xml under " + HLOG_READER_IMPL_KEY);
}
}
/**
* Compressed WALs are supported when we have the WALEditCodec installed
* @throws Exception
*/
@Test
public void testCompressedWALWithCodec() throws Exception {
Configuration conf = new Configuration(false);
conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
// works with WALEditcodec
conf.set(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, IndexedWALEditCodec.class.getName());
IndexManagementUtil.ensureMutableIndexingCorrectlyConfigured(conf);
}
public QueryServicesOptions setWALEditCodec(String walEditCodec) {
return set(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, walEditCodec);
}
public static void setupConfig(Configuration conf) {
conf.setInt(MASTER_INFO_PORT_KEY, -1);
conf.setInt(RS_INFO_PORT_KEY, -1);
// setup our codec, so we get proper replay/write
conf.set(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, IndexedWALEditCodec.class.getName());
}
/**
* Create a new ReplicateWALEntryRequest from a list of WAL entries
* @param entries the WAL entries to be replicated
* @param encodedRegionName alternative region name to use if not null
* @param replicationClusterId Id which will uniquely identify source cluster FS client
* configurations in the replication configuration directory
* @param sourceBaseNamespaceDir Path to source cluster base namespace directory
* @param sourceHFileArchiveDir Path to the source cluster hfile archive directory
* @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values found.
*/
public static Pair<ReplicateWALEntryRequest, CellScanner> buildReplicateWALEntryRequest(
final Entry[] entries, byte[] encodedRegionName, String replicationClusterId,
Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir) {
// Accumulate all the Cells seen in here.
List<List<? extends Cell>> allCells = new ArrayList<>(entries.length);
int size = 0;
WALEntry.Builder entryBuilder = WALEntry.newBuilder();
ReplicateWALEntryRequest.Builder builder = ReplicateWALEntryRequest.newBuilder();
for (Entry entry: entries) {
entryBuilder.clear();
WALProtos.WALKey.Builder keyBuilder;
try {
keyBuilder = entry.getKey().getBuilder(WALCellCodec.getNoneCompressor());
} catch (IOException e) {
throw new AssertionError(
"There should not throw exception since NoneCompressor do not throw any exceptions", e);
}
if(encodedRegionName != null){
keyBuilder.setEncodedRegionName(
UnsafeByteOperations.unsafeWrap(encodedRegionName));
}
entryBuilder.setKey(keyBuilder.build());
WALEdit edit = entry.getEdit();
List<Cell> cells = edit.getCells();
// Add up the size. It is used later serializing out the kvs.
for (Cell cell: cells) {
size += PrivateCellUtil.estimatedSerializedSizeOf(cell);
}
// Collect up the cells
allCells.add(cells);
// Write out how many cells associated with this entry.
entryBuilder.setAssociatedCellCount(cells.size());
builder.addEntry(entryBuilder.build());
}
if (replicationClusterId != null) {
builder.setReplicationClusterId(replicationClusterId);
}
if (sourceBaseNamespaceDir != null) {
builder.setSourceBaseNamespaceDirPath(sourceBaseNamespaceDir.toString());
}
if (sourceHFileArchiveDir != null) {
builder.setSourceHFileArchiveDirPath(sourceHFileArchiveDir.toString());
}
return new Pair<>(builder.build(),
getCellScanner(allCells, size));
}
private Path writeWAL(final WALFactory wals, final String tblName, boolean offheap)
throws IOException {
Configuration conf = TEST_UTIL.getConfiguration();
String clsName = conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
conf.setClass(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, SecureWALCellCodec.class,
WALCellCodec.class);
try {
TableName tableName = TableName.valueOf(tblName);
NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
scopes.put(tableName.getName(), 0);
RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).build();
final int total = 10;
final byte[] row = Bytes.toBytes("row");
final byte[] family = Bytes.toBytes("family");
final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
// Write the WAL
WAL wal = wals.getWAL(regionInfo);
for (int i = 0; i < total; i++) {
WALEdit kvs = new WALEdit();
KeyValue kv = new KeyValue(row, family, Bytes.toBytes(i), value);
if (offheap) {
ByteBuffer bb = ByteBuffer.allocateDirect(kv.getBuffer().length);
bb.put(kv.getBuffer());
ByteBufferKeyValue offheapKV = new ByteBufferKeyValue(bb, 0, kv.getLength());
kvs.add(offheapKV);
} else {
kvs.add(kv);
}
wal.appendData(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc, scopes), kvs);
}
wal.sync();
final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal);
wal.shutdown();
return walPath;
} finally {
// restore the cell codec class
conf.set(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, clsName);
}
}
/**
* Sets up the actual job.
* @param args The command line parameters.
* @return The newly created job.
* @throws IOException When setting up the job fails.
*/
public Job createSubmittableJob(String[] args) throws IOException {
Configuration conf = getConf();
setupTime(conf, WALInputFormat.START_TIME_KEY);
setupTime(conf, WALInputFormat.END_TIME_KEY);
String inputDirs = args[0];
String[] tables = args[1].split(",");
String[] tableMap;
if (args.length > 2) {
tableMap = args[2].split(",");
if (tableMap.length != tables.length) {
throw new IOException("The same number of tables and mapping must be provided.");
}
} else {
// if not mapping is specified map each table to itself
tableMap = tables;
}
conf.setStrings(TABLES_KEY, tables);
conf.setStrings(TABLE_MAP_KEY, tableMap);
conf.set(FileInputFormat.INPUT_DIR, inputDirs);
Job job =
Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + System.currentTimeMillis()));
job.setJarByClass(WALPlayer.class);
job.setInputFormatClass(WALInputFormat.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
if (hfileOutPath != null) {
LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);
// the bulk HFile case
List<TableName> tableNames = getTableNameList(tables);
job.setMapperClass(WALKeyValueMapper.class);
job.setReducerClass(CellSortReducer.class);
Path outputDir = new Path(hfileOutPath);
FileOutputFormat.setOutputPath(job, outputDir);
job.setMapOutputValueClass(MapReduceExtendedCell.class);
try (Connection conn = ConnectionFactory.createConnection(conf);) {
List<TableInfo> tableInfoList = new ArrayList<TableInfo>();
for (TableName tableName : tableNames) {
Table table = conn.getTable(tableName);
RegionLocator regionLocator = conn.getRegionLocator(tableName);
tableInfoList.add(new TableInfo(table.getDescriptor(), regionLocator));
}
MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfoList);
}
TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
org.apache.hbase.thirdparty.com.google.common.base.Preconditions.class);
} else {
// output to live cluster
job.setMapperClass(WALMapper.class);
job.setOutputFormatClass(MultiTableOutputFormat.class);
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.initCredentials(job);
// No reducers.
job.setNumReduceTasks(0);
}
String codecCls = WALCellCodec.getWALCellCodecClass(conf).getName();
try {
TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
Class.forName(codecCls));
} catch (Exception e) {
throw new IOException("Cannot determine wal codec class " + codecCls, e);
}
return job;
}
public QueryServicesOptions setWALEditCodec(String walEditCodec) {
return set(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, walEditCodec);
}
public static void setupConfig(Configuration conf) {
conf.setInt(MASTER_INFO_PORT_KEY, -1);
conf.setInt(RS_INFO_PORT_KEY, -1);
// setup our codec, so we get proper replay/write
conf.set(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, IndexedWALEditCodec.class.getName());
}