下面列出了怎么用org.apache.hadoop.hbase.regionserver.wal.WALActionsListener的API类实例代码及写法,或者点击链接到github查看源代码。
public void addWAL(WAL wal) {
// check without lock first
if (walNeedsRoll.containsKey(wal)) {
return;
}
// this is to avoid race between addWAL and requestRollAll.
synchronized (this) {
if (walNeedsRoll.putIfAbsent(wal, Boolean.FALSE) == null) {
wal.registerWALActionsListener(new WALActionsListener() {
@Override
public void logRollRequested(WALActionsListener.RollRequestReason reason) {
// TODO logs will contend with each other here, replace with e.g. DelayedQueue
synchronized (AbstractWALRoller.this) {
walNeedsRoll.put(wal, Boolean.TRUE);
AbstractWALRoller.this.notifyAll();
}
}
});
}
}
}
@Override
public void addWALActionsListener(WALActionsListener listener) {
// Notice that there is an assumption that this method must be called before the getWAL above,
// so we can make sure there is no sub WALProvider yet, so we only add the listener to our
// listeners list without calling addWALActionListener for each WALProvider. Although it is no
// hurt to execute an extra loop to call addWALActionListener for each WALProvider, but if the
// extra code actually works, then we will have other big problems. So leave it as is.
listeners.add(listener);
}
public DisabledWAL(final Path path, final Configuration conf,
final List<WALActionsListener> listeners) {
this.coprocessorHost = new WALCoprocessorHost(this, conf);
this.path = path;
if (null != listeners) {
for(WALActionsListener listener : listeners) {
registerWALActionsListener(listener);
}
}
}
@Override
public void shutdown() {
if(closed.compareAndSet(false, true)) {
if (!this.listeners.isEmpty()) {
for (WALActionsListener listener : this.listeners) {
listener.logCloseRequested();
}
}
}
}
@Override
public void sync() {
if (!this.listeners.isEmpty()) {
for (WALActionsListener listener : this.listeners) {
listener.postSync(0L, 0);
}
}
}
public DualAsyncFSWALForTest(FileSystem fs, FileSystem remoteFs, Path rootDir, Path remoteWALDir,
String logDir, String archiveDir, Configuration conf, List<WALActionsListener> listeners,
boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup,
Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
super(fs, remoteFs, rootDir, remoteWALDir, logDir, archiveDir, conf, listeners, failIfWALExists,
prefix, suffix, eventLoopGroup, channelClass);
}
private void rollWALAndWait(final HBaseTestingUtility utility, final TableName table,
final byte[] row) throws IOException {
final Admin admin = utility.getAdmin();
final MiniHBaseCluster cluster = utility.getMiniHBaseCluster();
// find the region that corresponds to the given row.
HRegion region = null;
for (HRegion candidate : cluster.getRegions(table)) {
if (HRegion.rowIsInRange(candidate.getRegionInfo(), row)) {
region = candidate;
break;
}
}
assertNotNull("Couldn't find the region for row '" + Arrays.toString(row) + "'", region);
final CountDownLatch latch = new CountDownLatch(1);
// listen for successful log rolls
final WALActionsListener listener = new WALActionsListener() {
@Override
public void postLogRoll(final Path oldPath, final Path newPath) throws IOException {
latch.countDown();
}
};
region.getWAL().registerWALActionsListener(listener);
// request a roll
admin.rollWALWriter(cluster.getServerHoldingRegion(region.getTableDescriptor().getTableName(),
region.getRegionInfo().getRegionName()));
// wait
try {
latch.await();
} catch (InterruptedException exception) {
LOG.warn("Interrupted while waiting for the wal of '" + region + "' to roll. If later " +
"replication tests fail, it's probably because we should still be waiting.");
Thread.currentThread().interrupt();
}
region.getWAL().unregisterWALActionsListener(listener);
}
private void rollWALAndWait(final HBaseTestingUtility utility, final TableName table,
final byte[] row) throws IOException {
final Admin admin = utility.getAdmin();
final MiniHBaseCluster cluster = utility.getMiniHBaseCluster();
// find the region that corresponds to the given row.
HRegion region = null;
for (HRegion candidate : cluster.getRegions(table)) {
if (HRegion.rowIsInRange(candidate.getRegionInfo(), row)) {
region = candidate;
break;
}
}
assertNotNull("Couldn't find the region for row '" + Arrays.toString(row) + "'", region);
final CountDownLatch latch = new CountDownLatch(1);
// listen for successful log rolls
final WALActionsListener listener = new WALActionsListener() {
@Override
public void postLogRoll(final Path oldPath, final Path newPath) throws IOException {
latch.countDown();
}
};
region.getWAL().registerWALActionsListener(listener);
// request a roll
admin.rollWALWriter(cluster.getServerHoldingRegion(region.getTableDescriptor().getTableName(),
region.getRegionInfo().getRegionName()));
// wait
try {
latch.await();
} catch (InterruptedException exception) {
LOG.warn("Interrupted while waiting for the wal of '" + region + "' to roll. If later " +
"replication tests fail, it's probably because we should still be waiting.");
Thread.currentThread().interrupt();
}
region.getWAL().unregisterWALActionsListener(listener);
}
public PauseWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
String prefix, String suffix, EventLoopGroup eventLoopGroup,
Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix,
eventLoopGroup, channelClass);
}
@Override
public void addWALActionsListener(WALActionsListener listener) {
listeners.add(listener);
}
@Override
public void registerWALActionsListener(final WALActionsListener listener) {
listeners.add(listener);
}
@Override
public boolean unregisterWALActionsListener(final WALActionsListener listener) {
return listeners.remove(listener);
}
@Override
public void addWALActionsListener(WALActionsListener listener) {
disabled.registerWALActionsListener(listener);
}
@Override
public void addWALActionsListener(WALActionsListener listener) {
listeners.add(listener);
provider.addWALActionsListener(listener);
}
public PauseWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
String prefix, String suffix) throws IOException {
super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
}
@Override
public void addWALActionsListener(WALActionsListener listener) {
// TODO Implement WALProvider.addWALActionLister
}
public THLog(final FileSystem fs, final Path dir, final Path oldLogDir,
final Configuration conf, final List<WALActionsListener> listeners)
throws IOException {
super(fs, dir, oldLogDir, conf, listeners, false, null);
}
/**
* Create an edit log at the given <code>dir</code> location.
*
* You should never have to load an existing log. If there is a log at
* startup, it should have already been processed and deleted by the time the
* WAL object is started up.
*
* @param fs filesystem handle
* @param rootDir path to where logs and oldlogs
* @param logDir dir where wals are stored
* @param archiveDir dir where wals are archived
* @param conf configuration to use
* @param listeners Listeners on WAL events. Listeners passed here will
* be registered before we do anything else; e.g. the
* Constructor {@link #rollWriter()}.
* @param failIfWALExists If true IOException will be thrown if files related to this wal
* already exist.
* @param prefix should always be hostname and port in distributed env and
* it will be URL encoded before being used.
* If prefix is null, "wal" will be used
* @param suffix will be url encoded. null is treated as empty. non-empty must start with
* {@link AbstractFSWALProvider#WAL_FILE_NAME_DELIMITER}
* @throws IOException
*/
public IOTestWAL(final FileSystem fs, final Path rootDir, final String logDir,
final String archiveDir, final Configuration conf,
final List<WALActionsListener> listeners,
final boolean failIfWALExists, final String prefix, final String suffix)
throws IOException {
super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
Collection<String> operations = conf.getStringCollection(ALLOWED_OPERATIONS);
doFileRolls = operations.isEmpty() || operations.contains(AllowedOperations.all.name()) ||
operations.contains(AllowedOperations.fileroll.name());
initialized = true;
LOG.info("Initialized with file rolling " + (doFileRolls ? "enabled" : "disabled"));
}
/**
* Registers WALActionsListener
*/
void registerWALActionsListener(final WALActionsListener listener);
/**
* Unregisters WALActionsListener
*/
boolean unregisterWALActionsListener(final WALActionsListener listener);
/**
* Add a {@link WALActionsListener}.
* <p>
* Notice that you must call this method before calling {@link #getWAL(RegionInfo)} as this method
* will not effect the {@link WAL} which has already been created. And as long as we can only it
* when initialization, it is not thread safe.
*/
void addWALActionsListener(WALActionsListener listener);