下面列出了怎么用org.apache.hadoop.hbase.wal.WAL的API类实例代码及写法,或者点击链接到github查看源代码。
private HRegion createRegion(String tableName, byte[] family, long ttl) throws IOException {
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
HColumnDescriptor cfd = new HColumnDescriptor(family);
if (ttl > 0) {
cfd.setValue(TxConstants.PROPERTY_TTL, String.valueOf(ttl));
}
cfd.setMaxVersions(10);
htd.addFamily(cfd);
htd.addCoprocessor(TransactionProcessor.class.getName());
Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), htd.getTableName());
FileSystem fs = FileSystem.get(conf);
assertTrue(fs.mkdirs(tablePath));
WALFactory walFactory = new WALFactory(conf, null, tableName + ".hlog");
WAL hLog = walFactory.getWAL(new byte[]{1}, null);
HRegionInfo regionInfo = new HRegionInfo(TableName.valueOf(tableName));
HRegionFileSystem regionFS = HRegionFileSystem.createRegionOnFileSystem(conf, fs, tablePath, regionInfo);
return new HRegion(regionFS, hLog, conf, htd,
new LocalRegionServerServices(conf, ServerName.valueOf(
InetAddress.getLocalHost().getHostName(), 0, System.currentTimeMillis())));
}
private HRegion createRegion(String tableName, byte[] family, long ttl) throws IOException {
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
HColumnDescriptor cfd = new HColumnDescriptor(family);
if (ttl > 0) {
cfd.setValue(TxConstants.PROPERTY_TTL, String.valueOf(ttl));
}
cfd.setMaxVersions(10);
htd.addFamily(cfd);
htd.addCoprocessor(TransactionProcessor.class.getName());
Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), htd.getTableName());
FileSystem fs = FileSystem.get(conf);
assertTrue(fs.mkdirs(tablePath));
WALFactory walFactory = new WALFactory(conf, null, tableName + ".hlog");
WAL hLog = walFactory.getWAL(new byte[]{1}, null);
HRegionInfo regionInfo = new HRegionInfo(TableName.valueOf(tableName));
HRegionFileSystem regionFS = HRegionFileSystem.createRegionOnFileSystem(conf, fs, tablePath, regionInfo);
return new HRegion(regionFS, hLog, conf, htd,
new LocalRegionServerServices(conf, ServerName.valueOf(
InetAddress.getLocalHost().getHostName(), 0, System.currentTimeMillis())));
}
private void checkMinLogRolls(final WAL log, final int minRolls)
throws Exception {
final List<Path> paths = new ArrayList<>();
log.registerWALActionsListener(new WALActionsListener() {
@Override
public void postLogRoll(Path oldFile, Path newFile) {
LOG.debug("postLogRoll: oldFile="+oldFile+" newFile="+newFile);
paths.add(newFile);
}
});
// Sleep until we should get at least min-LogRoll events
long wtime = System.currentTimeMillis();
Thread.sleep((minRolls + 1) * LOG_ROLL_PERIOD);
// Do some extra sleep in case the machine is slow,
// and the log-roll is not triggered exactly on LOG_ROLL_PERIOD.
final int NUM_RETRIES = 1 + 8 * (minRolls - paths.size());
for (int retry = 0; paths.size() < minRolls && retry < NUM_RETRIES; ++retry) {
Thread.sleep(LOG_ROLL_PERIOD / 4);
}
wtime = System.currentTimeMillis() - wtime;
LOG.info(String.format("got %d rolls after %dms (%dms each) - expected at least %d rolls",
paths.size(), wtime, wtime / paths.size(), minRolls));
assertFalse(paths.size() < minRolls);
}
private HRegion createRegion(String tableName, byte[] family, long ttl) throws IOException {
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
HColumnDescriptor cfd = new HColumnDescriptor(family);
if (ttl > 0) {
cfd.setValue(TxConstants.PROPERTY_TTL, String.valueOf(ttl));
}
cfd.setMaxVersions(10);
htd.addFamily(cfd);
htd.addCoprocessor(TransactionProcessor.class.getName());
Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), htd.getTableName());
FileSystem fs = FileSystem.get(conf);
assertTrue(fs.mkdirs(tablePath));
WALFactory walFactory = new WALFactory(conf, null, tableName + ".hlog");
WAL hLog = walFactory.getWAL(new byte[]{1});
HRegionInfo regionInfo = new HRegionInfo(TableName.valueOf(tableName));
HRegionFileSystem regionFS = HRegionFileSystem.createRegionOnFileSystem(conf, fs, tablePath, regionInfo);
return new HRegion(regionFS, hLog, conf, htd,
new LocalRegionServerServices(conf, ServerName.valueOf(
InetAddress.getLocalHost().getHostName(), 0, System.currentTimeMillis())));
}
private HRegion createRegion(String tableName, byte[] family, long ttl) throws IOException {
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
HColumnDescriptor cfd = new HColumnDescriptor(family);
if (ttl > 0) {
cfd.setValue(TxConstants.PROPERTY_TTL, String.valueOf(ttl));
}
cfd.setMaxVersions(10);
htd.addFamily(cfd);
htd.addCoprocessor(TransactionProcessor.class.getName());
Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), htd.getTableName());
FileSystem fs = FileSystem.get(conf);
assertTrue(fs.mkdirs(tablePath));
WALFactory walFactory = new WALFactory(conf, null, tableName + ".hlog");
WAL hLog = walFactory.getWAL(new byte[]{1});
HRegionInfo regionInfo = new HRegionInfo(TableName.valueOf(tableName));
HRegionFileSystem regionFS = HRegionFileSystem.createRegionOnFileSystem(conf, fs, tablePath, regionInfo);
return new HRegion(regionFS, hLog, conf, htd,
new LocalRegionServerServices(conf, ServerName.valueOf(
InetAddress.getLocalHost().getHostName(), 0, System.currentTimeMillis())));
}
private HRegion createRegion(String tableName, byte[] family, long ttl) throws IOException {
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
HColumnDescriptor cfd = new HColumnDescriptor(family);
if (ttl > 0) {
cfd.setValue(TxConstants.PROPERTY_TTL, String.valueOf(ttl));
}
cfd.setMaxVersions(10);
htd.addFamily(cfd);
htd.addCoprocessor(TransactionProcessor.class.getName());
Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), htd.getTableName());
FileSystem fs = FileSystem.get(conf);
assertTrue(fs.mkdirs(tablePath));
WALFactory walFactory = new WALFactory(conf, null, tableName + ".hlog");
WAL hLog = walFactory.getWAL(new byte[]{1});
HRegionInfo regionInfo = new HRegionInfo(TableName.valueOf(tableName));
HRegionFileSystem regionFS = HRegionFileSystem.createRegionOnFileSystem(conf, fs, tablePath, regionInfo);
return new HRegion(regionFS, hLog, conf, htd,
new LocalRegionServerServices(conf, ServerName.valueOf(
InetAddress.getLocalHost().getHostName(), 0, System.currentTimeMillis())));
}
/**
* Test blocksize change from HBASE-20520 takes on both asycnfs and old wal provider.
* Hard to verify more than this given the blocksize is passed down to HDFS on create -- not
* kept local to the streams themselves.
*/
@Test
public void testBlocksizeDefaultsToTwiceHDFSBlockSize() throws IOException {
TableName tableName = TableName.valueOf("test");
final WALFactory walFactory = new WALFactory(TEST_UTIL.getConfiguration(), this.walProvider);
Configuration conf = TEST_UTIL.getConfiguration();
WALProvider provider = walFactory.getWALProvider();
// Get a WAL instance from the provider. Check its blocksize.
WAL wal = provider.getWAL(null);
if (wal instanceof AbstractFSWAL) {
long expectedDefaultBlockSize =
WALUtil.getWALBlockSize(conf, FileSystem.get(conf), TEST_UTIL.getDataTestDir());
long blocksize = ((AbstractFSWAL)wal).blocksize;
assertEquals(expectedDefaultBlockSize, blocksize);
LOG.info("Found blocksize of {} on {}", blocksize, wal);
} else {
fail("Unknown provider " + provider);
}
}
private static HRegion bootstrap(Configuration conf, TableDescriptor td, FileSystem fs,
Path rootDir, FileSystem walFs, Path walRootDir, WALFactory walFactory,
MasterRegionWALRoller walRoller, String serverName) throws IOException {
TableName tn = td.getTableName();
RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setRegionId(REGION_ID).build();
Path tmpTableDir = CommonFSUtils.getTableDir(rootDir,
TableName.valueOf(tn.getNamespaceAsString(), tn.getQualifierAsString() + "-tmp"));
if (fs.exists(tmpTableDir) && !fs.delete(tmpTableDir, true)) {
throw new IOException("Can not delete partial created proc region " + tmpTableDir);
}
HRegion.createHRegion(conf, regionInfo, fs, tmpTableDir, td).close();
Path tableDir = CommonFSUtils.getTableDir(rootDir, tn);
if (!fs.rename(tmpTableDir, tableDir)) {
throw new IOException("Can not rename " + tmpTableDir + " to " + tableDir);
}
WAL wal = createWAL(walFactory, walRoller, serverName, walFs, walRootDir, regionInfo);
return HRegion.openHRegionFromTableDir(conf, fs, tableDir, regionInfo, td, wal, null, null);
}
/**
* A 'full' WAL transaction involves starting an mvcc transaction followed by an append, an
* optional sync, and then a call to complete the mvcc transaction. This method does it all. Good
* for case of adding a single edit or marker to the WAL.
* <p/>
* This write is for internal use only. Not for external client consumption.
* @return WALKeyImpl that was added to the WAL.
*/
private static WALKeyImpl doFullMarkerAppendTransaction(final WAL wal,
final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri, final WALEdit edit,
final MultiVersionConcurrencyControl mvcc,
final Map<String, byte[]> extendedAttributes, final boolean sync) throws IOException {
// TODO: Pass in current time to use?
WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(),
System.currentTimeMillis(), mvcc, replicationScope, extendedAttributes);
long trx = MultiVersionConcurrencyControl.NONE;
try {
trx = wal.appendMarker(hri, walKey, edit);
if (sync) {
wal.sync(trx);
}
// Call complete only here because these are markers only. They are not for clients to read.
mvcc.complete(walKey.getWriteEntry());
} catch (IOException ioe) {
if (walKey.getWriteEntry() != null) {
mvcc.complete(walKey.getWriteEntry());
}
throw ioe;
}
return walKey;
}
/**
* Waits until there is only one log(the current writing one) in the replication queue
* @param numRs number of regionservers
*/
private void waitForLogAdvance(int numRs) throws Exception {
Waiter.waitFor(CONF1, 10000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
for (int i = 0; i < numRs; i++) {
HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i);
RegionInfo regionInfo =
UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
WAL wal = hrs.getWAL(regionInfo);
Path currentFile = ((AbstractFSWAL<?>) wal).getCurrentFileName();
Replication replicationService = (Replication) UTIL1.getHBaseCluster()
.getRegionServer(i).getReplicationSourceService();
for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
.getSources()) {
ReplicationSource source = (ReplicationSource) rsi;
if (!currentFile.equals(source.getCurrentPath())) {
return false;
}
}
}
return true;
}
});
}
/**
* Test the case where the secondary region replica is not in reads enabled state because it is
* waiting for a flush or region open marker from primary region. Replaying CANNOT_FLUSH
* flush marker entry should restore the reads enabled status in the region and allow the reads
* to continue.
*/
@Test
public void testReplayingFlushRequestRestoresReadsEnabledState() throws IOException {
disableReads(secondaryRegion);
// Test case 1: Test that replaying CANNOT_FLUSH request marker assuming this came from
// triggered flush restores readsEnabled
primaryRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
reader = createWALReaderForPrimary();
while (true) {
WAL.Entry entry = reader.next();
if (entry == null) {
break;
}
FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
if (flush != null) {
secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId());
}
}
// now reads should be enabled
secondaryRegion.get(new Get(Bytes.toBytes(0)));
}
protected final void checkOrder(int expectedEntries) throws IOException {
try (WAL.Reader reader =
WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) {
long seqId = -1L;
int count = 0;
for (Entry entry;;) {
entry = reader.next();
if (entry == null) {
break;
}
assertTrue(
"Sequence id go backwards from " + seqId + " to " + entry.getKey().getSequenceId(),
entry.getKey().getSequenceId() >= seqId);
seqId = entry.getKey().getSequenceId();
count++;
}
assertEquals(expectedEntries, count);
}
}
private long getMaxSeqId(HRegionServer rs, RegionInfo region) throws IOException {
Path walFile = ((AbstractFSWAL<?>) rs.getWAL(null)).getCurrentFileName();
long maxSeqId = -1L;
try (WAL.Reader reader =
WALFactory.createReader(UTIL.getTestFileSystem(), walFile, UTIL.getConfiguration())) {
for (;;) {
WAL.Entry entry = reader.next();
if (entry == null) {
break;
}
if (Bytes.equals(region.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName())) {
maxSeqId = Math.max(maxSeqId, entry.getKey().getSequenceId());
}
}
}
return maxSeqId;
}
HRegion initHRegion(byte[] tableName, String callingMethod, Configuration conf,
byte[]... families) throws IOException {
TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor =
new TableDescriptorBuilder.ModifyableTableDescriptor(TableName.valueOf(tableName));
for (byte[] family : families) {
tableDescriptor.setColumnFamily(
new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(family));
}
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
RegionInfo info = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build();
Path path = new Path(DIR + callingMethod);
WAL wal = HBaseTestingUtility.createWal(conf, path, info);
HRegion r = HRegion.createHRegion(info, path, conf, tableDescriptor, wal);
// this following piece is a hack. currently a coprocessorHost
// is secretly loaded at OpenRegionHandler. we don't really
// start a region server here, so just manually create cphost
// and set it to region.
RegionCoprocessorHost host = new RegionCoprocessorHost(r, null, conf);
r.setCoprocessorHost(host);
return r;
}
private void addWALEdits(final TableName tableName, final RegionInfo hri, final byte[] rowName,
final byte[] family, final int count, EnvironmentEdge ee, final WAL wal,
final NavigableMap<byte[], Integer> scopes, final MultiVersionConcurrencyControl mvcc)
throws IOException {
String familyStr = Bytes.toString(family);
long txid = -1;
for (int j = 0; j < count; j++) {
byte[] qualifierBytes = Bytes.toBytes(Integer.toString(j));
byte[] columnBytes = Bytes.toBytes(familyStr + ":" + Integer.toString(j));
WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes));
// uses WALKeyImpl instead of HLogKey on purpose. will only work for tests where we don't care
// about legacy coprocessors
txid = wal.appendData(hri,
new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, ee.currentTime(), mvcc), edit);
}
if (-1 != txid) {
wal.sync(txid);
}
}
@Test
public void testReplicateLogEntries_SingleWALEditForMultipleRows() throws IOException {
byte[] rowKeyA = Bytes.toBytes("A");
byte[] rowKeyB = Bytes.toBytes("B");
byte[] data = Bytes.toBytes("data");
Cell kvA = new KeyValue(rowKeyA, DATA_COLFAM, PAYLOAD_QUALIFIER, data);
Cell kvB = new KeyValue(rowKeyB, DATA_COLFAM, PAYLOAD_QUALIFIER, data);
WAL.Entry entry = createHlogEntry(TABLE_NAME, kvA, kvB);
replicateWALEntry(new WAL.Entry[]{entry});
SepEvent expectedEventA = SepEvent.create(TABLE_NAME, rowKeyA, Lists.newArrayList(kvA),
Bytes.toBytes("data"));
SepEvent expectedEventB = SepEvent.create(TABLE_NAME, rowKeyB, Lists.newArrayList(kvB),
Bytes.toBytes("data"));
verify(eventListener).processEvents(Lists.newArrayList(expectedEventA, expectedEventB));
}
@Test
public void testSystemCatalogWALEntryFilter() throws Exception {
//now create WAL.Entry objects that refer to cells in those view rows in System.Catalog
Get tenantGet = getGet(catalogTable, TENANT_BYTES, TENANT_VIEW_NAME);
Get nonTenantGet = getGet(catalogTable, DEFAULT_TENANT_BYTES, NONTENANT_VIEW_NAME);
WAL.Entry nonTenantEntry = getEntry(systemCatalogTableName, nonTenantGet);
WAL.Entry tenantEntry = getEntry(systemCatalogTableName, tenantGet);
//verify that the tenant view WAL.Entry passes the filter and the non-tenant view does not
SystemCatalogWALEntryFilter filter = new SystemCatalogWALEntryFilter();
Assert.assertNull(filter.filter(nonTenantEntry));
WAL.Entry filteredTenantEntry = filter.filter(tenantEntry);
Assert.assertNotNull("Tenant view was filtered when it shouldn't be!", filteredTenantEntry);
Assert.assertEquals(tenantEntry.getEdit().size(),
filter.filter(tenantEntry).getEdit().size());
//now check that a WAL.Entry with cells from both a tenant and a non-tenant
//catalog row only allow the tenant cells through
WALEdit comboEdit = new WALEdit();
comboEdit.getCells().addAll(nonTenantEntry.getEdit().getCells());
comboEdit.getCells().addAll(tenantEntry.getEdit().getCells());
WAL.Entry comboEntry = new WAL.Entry(walKey, comboEdit);
Assert.assertEquals(tenantEntry.getEdit().size() + nonTenantEntry.getEdit().size()
, comboEntry.getEdit().size());
Assert.assertEquals(tenantEntry.getEdit().size(),
filter.filter(comboEntry).getEdit().size());
}
private WAL createWAL(final Configuration c, WALFactory walFactory) throws IOException {
WAL wal = walFactory.getWAL(new byte[]{});
// Set down maximum recovery so we dfsclient doesn't linger retrying something
// long gone.
HBaseTestingUtility.setMaxRecoveryErrorCount(((FSHLog) wal).getOutputStream(), 1);
return wal;
}
@SuppressWarnings("deprecation")
@Before
public void setUp() throws Exception {
Path hbaseRootDir = TEST_UTIL.getDataTestDir();
TEST_UTIL.getConfiguration().set("hbase.rootdir", hbaseRootDir.toString());
FileSystem newFS = FileSystem.newInstance(TEST_UTIL.getConfiguration());
HRegionInfo hri = new HRegionInfo(tableName, null, null, false);
Path basedir = FSUtils.getTableDir(hbaseRootDir, tableName);
WALFactory walFactory = new WALFactory(TEST_UTIL.getConfiguration(), null, "TestPerRegionIndexWriteCache");
WAL wal = walFactory.getWAL(Bytes.toBytes("logs"));
HTableDescriptor htd = new HTableDescriptor(tableName);
HColumnDescriptor a = new HColumnDescriptor(Bytes.toBytes("a"));
htd.addFamily(a);
r1 = new HRegion(basedir, wal, newFS, TEST_UTIL.getConfiguration(), hri, htd, null) {
@Override
public int hashCode() {
return 1;
}
@Override
public String toString() {
return "testRegion1";
}
};
r2 = new HRegion(basedir, wal, newFS, TEST_UTIL.getConfiguration(), hri, htd, null) {
@Override
public int hashCode() {
return 2;
}
@Override
public String toString() {
return "testRegion1";
}
};
}
/**
* Test the case where the secondary region replica is not in reads enabled state because it is
* waiting for a flush or region open marker from primary region. Replaying region open event
* entry from primary should restore the reads enabled status in the region and allow the reads
* to continue.
*/
@Test
public void testReplayingRegionOpenEventRestoresReadsEnabledState() throws IOException {
// Test case 3: Test that replaying region open event markers restores readsEnabled
disableReads(secondaryRegion);
primaryRegion.close();
primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
reader = createWALReaderForPrimary();
while (true) {
WAL.Entry entry = reader.next();
if (entry == null) {
break;
}
RegionEventDescriptor regionEventDesc
= WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
if (regionEventDesc != null) {
secondaryRegion.replayWALRegionEventMarker(regionEventDesc);
}
}
// now reads should be enabled
secondaryRegion.get(new Get(Bytes.toBytes(0)));
}
@Override
protected void afterRoll(WAL wal) {
// move the archived WAL files to the global archive path
try {
MasterRegionUtils.moveFilesUnderDir(fs, walArchiveDir, globalWALArchiveDir,
archivedWALSuffix);
} catch (IOException e) {
LOG.warn("Failed to move archived wals from {} to global dir {}", walArchiveDir,
globalWALArchiveDir, e);
}
}
/**
* Add a bunch of dummy data and roll the logs every two insert. We
* should end up with 10 rolled files (plus the roll called in
* the constructor). Also test adding a listener while it's running.
*/
@Test
public void testActionListener() throws Exception {
DummyWALActionsListener observer = new DummyWALActionsListener();
final WALFactory wals = new WALFactory(conf, "testActionListener");
wals.getWALProvider().addWALActionsListener(observer);
DummyWALActionsListener laterobserver = new DummyWALActionsListener();
RegionInfo hri = RegionInfoBuilder.newBuilder(TableName.valueOf(SOME_BYTES))
.setStartKey(SOME_BYTES).setEndKey(SOME_BYTES).build();
final WAL wal = wals.getWAL(hri);
MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
for (int i = 0; i < 20; i++) {
byte[] b = Bytes.toBytes(i + "");
KeyValue kv = new KeyValue(b, b, b);
WALEdit edit = new WALEdit();
edit.add(kv);
NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
scopes.put(b, 0);
long txid = wal.appendData(hri,
new WALKeyImpl(hri.getEncodedNameAsBytes(), TableName.valueOf(b), 0, mvcc, scopes), edit);
wal.sync(txid);
if (i == 10) {
wal.registerWALActionsListener(laterobserver);
}
if (i % 2 == 0) {
wal.rollWriter();
}
}
wal.close();
assertEquals(11, observer.preLogRollCounter);
assertEquals(11, observer.postLogRollCounter);
assertEquals(5, laterobserver.preLogRollCounter);
assertEquals(5, laterobserver.postLogRollCounter);
assertEquals(1, observer.closedCount);
}
@Override
public void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfGreater) {
long minSequenceId = pipeline.getMinSequenceId();
if(minSequenceId != Long.MAX_VALUE) {
byte[] encodedRegionName = getRegionServices().getRegionInfo().getEncodedNameAsBytes();
byte[] familyName = getFamilyNameInBytes();
WAL WAL = getRegionServices().getWAL();
if (WAL != null) {
WAL.updateStore(encodedRegionName, familyName, minSequenceId, onlyIfGreater);
}
}
}
@Override
protected WAL createWAL(Configuration c, Path hbaseRootDir, String logName) throws IOException {
FSHLog wal = new FSHLog(FileSystem.get(c), hbaseRootDir, logName, c);
wal.init();
// Set down maximum recovery so we dfsclient doesn't linger retrying something
// long gone.
HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1);
return wal;
}
/**
* Write the marker that a compaction has succeeded and is about to be committed. This provides
* info to the HMaster to allow it to recover the compaction if this regionserver dies in the
* middle. It also prevents the compaction from finishing if this regionserver has already lost
* its lease on the log.
* <p/>
* This write is for internal use only. Not for external client consumption.
* @param mvcc Used by WAL to get sequence Id for the waledit.
*/
public static WALKeyImpl writeCompactionMarker(WAL wal,
NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, final CompactionDescriptor c,
MultiVersionConcurrencyControl mvcc) throws IOException {
WALKeyImpl walKey =
writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc, null);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));
}
return walKey;
}
/**
* Write a flush marker indicating a start / abort or a complete of a region flush
* <p/>
* This write is for internal use only. Not for external client consumption.
*/
public static WALKeyImpl writeFlushMarker(WAL wal, NavigableMap<byte[], Integer> replicationScope,
RegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc)
throws IOException {
WALKeyImpl walKey = doFullMarkerAppendTransaction(wal, replicationScope, hri,
WALEdit.createFlushWALEdit(hri, f), mvcc, null, sync);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f));
}
return walKey;
}
/**
* Write a region open marker indicating that the region is opened. This write is for internal use
* only. Not for external client consumption.
*/
public static WALKeyImpl writeRegionEventMarker(WAL wal,
NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, RegionEventDescriptor r,
MultiVersionConcurrencyControl mvcc) throws IOException {
WALKeyImpl walKey =
writeMarker(wal, replicationScope, hri, WALEdit.createRegionEventWALEdit(hri, r), mvcc, null);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r));
}
return walKey;
}
/**
* Write a log marker that a bulk load has succeeded and is about to be committed.
* This write is for internal use only. Not for external client consumption.
* @param wal The log to write into.
* @param replicationScope The replication scope of the families in the HRegion
* @param hri A description of the region in the table that we are bulk loading into.
* @param desc A protocol buffers based description of the client's bulk loading request
* @return walKey with sequenceid filled out for this bulk load marker
* @throws IOException We will throw an IOException if we can not append to the HLog.
*/
public static WALKeyImpl writeBulkLoadMarkerAndSync(final WAL wal,
final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri,
final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc)
throws IOException {
WALKeyImpl walKey = writeMarker(wal, replicationScope, hri,
WALEdit.createBulkLoadEvent(hri, desc), mvcc, null);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended Bulk Load marker " + TextFormat.shortDebugString(desc));
}
return walKey;
}
private static WALKeyImpl writeMarker(final WAL wal,
final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri, final WALEdit edit,
final MultiVersionConcurrencyControl mvcc, final Map<String, byte[]> extendedAttributes)
throws IOException {
// If sync == true in below, then timeout is not used; safe to pass UNSPECIFIED_TIMEOUT
return doFullMarkerAppendTransaction(wal, replicationScope, hri, edit, mvcc, extendedAttributes,
true);
}
/**
* Constructor
* @param log the write ahead log
* @param conf the configuration
*/
public WALCoprocessorHost(final WAL log, final Configuration conf) {
// We don't want to require an Abortable passed down through (FS)HLog, so
// this means that a failure to load of a WAL coprocessor won't abort the
// server. This isn't ideal, and means that security components that
// utilize a WALObserver will have to check the observer initialization
// state manually. However, WALObservers will eventually go away so it
// should be an acceptable state of affairs.
super(null);
this.wal = log;
// load system default cp's from configuration.
loadSystemCoprocessors(conf, WAL_COPROCESSOR_CONF_KEY);
}