下面列出了org.apache.hadoop.hbase.client.Admin#addReplicationPeer ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
final void setupReplication() throws Exception {
Admin admin1 = UTIL1.getAdmin();
admin1.createTable(t1SyncupSource);
admin1.createTable(t2SyncupSource);
Admin admin2 = UTIL2.getAdmin();
admin2.createTable(t1SyncupTarget);
admin2.createTable(t2SyncupTarget);
// Get HTable from Master
Connection conn1 = ConnectionFactory.createConnection(UTIL1.getConfiguration());
ht1Source = conn1.getTable(TN1);
ht2Source = conn1.getTable(TN2);
// Get HTable from Peer1
Connection conn2 = ConnectionFactory.createConnection(UTIL2.getConfiguration());
ht1TargetAtPeer1 = conn2.getTable(TN1);
ht2TargetAtPeer1 = conn2.getTable(TN2);
/**
* set M-S : Master: utility1 Slave1: utility2
*/
ReplicationPeerConfig rpc =
ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey()).build();
admin1.addReplicationPeer("1", rpc);
}
private void doTest() throws IOException {
Admin admin = UTIL.getAdmin();
String peerId = "1";
ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
.setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/hbase2").build();
admin.addReplicationPeer(peerId, peerConfig, true);
assertEquals(peerConfig.getClusterKey(),
admin.getReplicationPeerConfig(peerId).getClusterKey());
ReplicationPeerConfig newPeerConfig =
ReplicationPeerConfig.newBuilder(peerConfig).setBandwidth(123456).build();
admin.updateReplicationPeerConfig(peerId, newPeerConfig);
assertEquals(newPeerConfig.getBandwidth(),
admin.getReplicationPeerConfig(peerId).getBandwidth());
admin.disableReplicationPeer(peerId);
assertFalse(admin.listReplicationPeers().get(0).isEnabled());
admin.enableReplicationPeer(peerId);
assertTrue(admin.listReplicationPeers().get(0).isEnabled());
admin.removeReplicationPeer(peerId);
assertTrue(admin.listReplicationPeers().isEmpty());
// make sure that we have run into the mocked method
MockHMaster master = (MockHMaster) UTIL.getHBaseCluster().getMaster();
assertTrue(master.addPeerCalled);
assertTrue(master.removePeerCalled);
assertTrue(master.updatePeerConfigCalled);
assertTrue(master.enablePeerCalled);
assertTrue(master.disablePeerCalled);
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf1 = HBaseConfiguration.create();
conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
conf1.setLong("replication.source.sleepforretries", 100);
// Each WAL is about 120 bytes
conf1.setInt(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, REPLICATION_SOURCE_QUOTA);
conf1.setLong("replication.source.per.peer.node.bandwidth", 100L);
utility1 = new HBaseTestingUtility(conf1);
utility1.startMiniZKCluster();
MiniZooKeeperCluster miniZK = utility1.getZkCluster();
new ZKWatcher(conf1, "cluster1", null, true);
conf2 = new Configuration(conf1);
conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
utility2 = new HBaseTestingUtility(conf2);
utility2.setZkCluster(miniZK);
new ZKWatcher(conf2, "cluster2", null, true);
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(utility2.getClusterKey());
utility1.startMiniCluster();
utility2.startMiniCluster();
Admin admin1 = ConnectionFactory.createConnection(conf1).getAdmin();
admin1.addReplicationPeer("peer1", rpc);
admin1.addReplicationPeer("peer2", rpc);
admin1.addReplicationPeer("peer3", rpc);
numOfPeer = admin1.listReplicationPeers().size();
}
@Test
public void testMultiSlaveReplication() throws Exception {
LOG.info("testCyclicReplication");
MiniHBaseCluster master = utility1.startMiniCluster();
utility2.startMiniCluster();
utility3.startMiniCluster();
Admin admin1 = ConnectionFactory.createConnection(conf1).getAdmin();
utility1.getAdmin().createTable(table);
utility2.getAdmin().createTable(table);
utility3.getAdmin().createTable(table);
Table htable1 = utility1.getConnection().getTable(tableName);
Table htable2 = utility2.getConnection().getTable(tableName);
Table htable3 = utility3.getConnection().getTable(tableName);
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(utility2.getClusterKey());
admin1.addReplicationPeer("1", rpc);
// put "row" and wait 'til it got around, then delete
putAndWait(row, famName, htable1, htable2);
deleteAndWait(row, htable1, htable2);
// check it wasn't replication to cluster 3
checkRow(row,0,htable3);
putAndWait(row2, famName, htable1, htable2);
// now roll the region server's logs
rollWALAndWait(utility1, htable1.getName(), row2);
// after the log was rolled put a new row
putAndWait(row3, famName, htable1, htable2);
rpc = new ReplicationPeerConfig();
rpc.setClusterKey(utility3.getClusterKey());
admin1.addReplicationPeer("2", rpc);
// put a row, check it was replicated to all clusters
putAndWait(row1, famName, htable1, htable2, htable3);
// delete and verify
deleteAndWait(row1, htable1, htable2, htable3);
// make sure row2 did not get replicated after
// cluster 3 was added
checkRow(row2,0,htable3);
// row3 will get replicated, because it was in the
// latest log
checkRow(row3,1,htable3);
Put p = new Put(row);
p.addColumn(famName, row, row);
htable1.put(p);
// now roll the logs again
rollWALAndWait(utility1, htable1.getName(), row);
// cleanup "row2", also conveniently use this to wait replication
// to finish
deleteAndWait(row2, htable1, htable2, htable3);
// Even if the log was rolled in the middle of the replication
// "row" is still replication.
checkRow(row, 1, htable2);
// Replication thread of cluster 2 may be sleeping, and since row2 is not there in it,
// we should wait before checking.
checkWithWait(row, 1, htable3);
// cleanup the rest
deleteAndWait(row, htable1, htable2, htable3);
deleteAndWait(row3, htable1, htable2, htable3);
utility3.shutdownMiniCluster();
utility2.shutdownMiniCluster();
utility1.shutdownMiniCluster();
}
/**
* Tests that recovered queues are preserved on a regionserver shutdown.
* See HBASE-18192
*/
@Test
public void testServerShutdownRecoveredQueue() throws Exception {
try {
// Ensure single-threaded WAL
conf.set("hbase.wal.provider", "defaultProvider");
conf.setInt("replication.sleep.before.failover", 2000);
// Introduces a delay in regionserver shutdown to give the race condition a chance to kick in.
conf.set(HConstants.REGION_SERVER_IMPL, ShutdownDelayRegionServer.class.getName());
MiniHBaseCluster cluster = TEST_UTIL.startMiniCluster(2);
TEST_UTIL_PEER.startMiniCluster(1);
HRegionServer serverA = cluster.getRegionServer(0);
final ReplicationSourceManager managerA =
((Replication) serverA.getReplicationSourceService()).getReplicationManager();
HRegionServer serverB = cluster.getRegionServer(1);
final ReplicationSourceManager managerB =
((Replication) serverB.getReplicationSourceService()).getReplicationManager();
final Admin admin = TEST_UTIL.getAdmin();
final String peerId = "TestPeer";
admin.addReplicationPeer(peerId,
ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL_PEER.getClusterKey()).build());
// Wait for replication sources to come up
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() throws Exception {
return !(managerA.getSources().isEmpty() || managerB.getSources().isEmpty());
}
});
// Disabling peer makes sure there is at least one log to claim when the server dies
// The recovered queue will also stay there until the peer is disabled even if the
// WALs it contains have no data.
admin.disableReplicationPeer(peerId);
// Stopping serverA
// It's queues should be claimed by the only other alive server i.e. serverB
cluster.stopRegionServer(serverA.getServerName());
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() throws Exception {
return managerB.getOldSources().size() == 1;
}
});
final HRegionServer serverC = cluster.startRegionServer().getRegionServer();
serverC.waitForServerOnline();
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() throws Exception {
return serverC.getReplicationSourceService() != null;
}
});
final ReplicationSourceManager managerC =
((Replication) serverC.getReplicationSourceService()).getReplicationManager();
// Sanity check
assertEquals(0, managerC.getOldSources().size());
// Stopping serverB
// Now serverC should have two recovered queues:
// 1. The serverB's normal queue
// 2. serverA's recovered queue on serverB
cluster.stopRegionServer(serverB.getServerName());
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() throws Exception {
return managerC.getOldSources().size() == 2;
}
});
admin.enableReplicationPeer(peerId);
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() throws Exception {
return managerC.getOldSources().size() == 0;
}
});
} finally {
conf.set(HConstants.REGION_SERVER_IMPL, HRegionServer.class.getName());
}
}
@Before
@Override
public void setUpBase() throws Exception {
/** "super.setUpBase()" already sets peer1 from 1 <-> 2 <-> 3
* and this test add the fourth cluster.
* So we have following topology:
* 1
* / \
* 2 4
* /
* 3
*
* The 1 -> 4 has two peers,
* ns_peer1: ns1 -> ns1 (validate this peer hfile-refs)
* ns_peer1 configuration is NAMESPACES => ["ns1"]
*
* ns_peer2: ns2:t2_syncup -> ns2:t2_syncup, this peers is
* ns_peer2 configuration is NAMESPACES => ["ns2"],
* TABLE_CFS => { "ns2:t2_syncup" => []}
*
* The 1 -> 2 has one peer, this peer configuration is
* add_peer '2', CLUSTER_KEY => "server1.cie.com:2181:/hbase"
*
*/
super.setUpBase();
// Create tables
TableDescriptor table1 = TableDescriptorBuilder.newBuilder(NS1_TABLE)
.setColumnFamily(
ColumnFamilyDescriptorBuilder.newBuilder(famName)
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
TableDescriptor table2 = TableDescriptorBuilder.newBuilder(NS2_TABLE)
.setColumnFamily(
ColumnFamilyDescriptorBuilder.newBuilder(famName)
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
Admin admin1 = UTIL1.getAdmin();
admin1.createNamespace(NamespaceDescriptor.create(NS1).build());
admin1.createNamespace(NamespaceDescriptor.create(NS2).build());
admin1.createTable(table1);
admin1.createTable(table2);
Admin admin2 = UTIL2.getAdmin();
admin2.createNamespace(NamespaceDescriptor.create(NS1).build());
admin2.createNamespace(NamespaceDescriptor.create(NS2).build());
admin2.createTable(table1);
admin2.createTable(table2);
Admin admin3 = UTIL3.getAdmin();
admin3.createNamespace(NamespaceDescriptor.create(NS1).build());
admin3.createNamespace(NamespaceDescriptor.create(NS2).build());
admin3.createTable(table1);
admin3.createTable(table2);
Admin admin4 = UTIL4.getAdmin();
admin4.createNamespace(NamespaceDescriptor.create(NS1).build());
admin4.createNamespace(NamespaceDescriptor.create(NS2).build());
admin4.createTable(table1);
admin4.createTable(table2);
/**
* Set ns_peer1 1: ns1 -> 2: ns1
*
* add_peer 'ns_peer1', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod",
* NAMESPACES => ["ns1"]
*/
Set<String> namespaces = new HashSet<>();
namespaces.add(NS1);
ReplicationPeerConfig rpc4_ns =
ReplicationPeerConfig.newBuilder().setClusterKey(UTIL4.getClusterKey())
.setReplicateAllUserTables(false).setNamespaces(namespaces).build();
admin1.addReplicationPeer(PEER4_NS, rpc4_ns);
/**
* Set ns_peer2 1: ns2:t2_syncup -> 4: ns2:t2_syncup
*
* add_peer 'ns_peer2', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod",
* NAMESPACES => ["ns2"], TABLE_CFS => { "ns2:t2_syncup" => [] }
*/
Map<TableName, List<String>> tableCFsMap = new HashMap<>();
tableCFsMap.put(NS2_TABLE, null);
ReplicationPeerConfig rpc4_ns_table =
ReplicationPeerConfig.newBuilder().setClusterKey(UTIL4.getClusterKey())
.setReplicateAllUserTables(false).setTableCFsMap(tableCFsMap).build();
admin1.addReplicationPeer(PEER4_NS_TABLE, rpc4_ns_table);
}