下面列出了org.testng.annotations.AfterSuite#org.apache.helix.tools.ClusterSetup 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test()
public void testDisableNode() throws Exception {
String command =
"-zkSvr " + ZK_ADDR + " -enableInstance " + CLUSTER_NAME + " " + PARTICIPANT_PREFIX
+ "_12918" + " TestDB TestDB_0 false";
ClusterSetup.processCommandLineArgs(command.split(" "));
boolean result =
ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
ZK_ADDR, CLUSTER_NAME));
Assert.assertTrue(result);
ZKHelixAdmin tool = new ZKHelixAdmin(_gZkClient);
tool.enableInstance(CLUSTER_NAME, PARTICIPANT_PREFIX + "_12918", true);
result =
ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
ZK_ADDR, CLUSTER_NAME));
Assert.assertTrue(result);
}
/**
* Test cluster creation according to the pre-set routing mapping.
* Helix Java API tested is ClusterSetup in this method.
*/
@Test
public void testCreateClusters() {
// Create two ClusterSetups using two different constructors
// Note: ZK Address here could be anything because multiZk mode is on (it will be ignored)
ClusterSetup clusterSetupZkAddr = new ClusterSetup(ZK_SERVER_MAP.keySet().iterator().next());
ClusterSetup clusterSetupBuilder = new ClusterSetup.Builder().build();
createClusters(clusterSetupZkAddr);
verifyClusterCreation(clusterSetupZkAddr);
createClusters(clusterSetupBuilder);
verifyClusterCreation(clusterSetupBuilder);
// Create clusters again to continue with testing
createClusters(clusterSetupBuilder);
}
private void verifyClusterSetupMsdsEndpoint(
RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig) {
System.out.println("Start " + TestHelper.getTestMethodName());
ClusterSetup firstClusterSetup = new ClusterSetup.Builder().build();
ClusterSetup secondClusterSetup =
new ClusterSetup.Builder().setRealmAwareZkConnectionConfig(connectionConfig).build();
try {
verifyMsdsZkRealm(CLUSTER_ONE, true,
() -> firstClusterSetup.addCluster(CLUSTER_ONE, false));
verifyMsdsZkRealm(CLUSTER_FOUR, false,
() -> firstClusterSetup.addCluster(CLUSTER_FOUR, false));
verifyMsdsZkRealm(CLUSTER_FOUR, true,
() -> secondClusterSetup.addCluster(CLUSTER_FOUR, false));
verifyMsdsZkRealm(CLUSTER_ONE, false,
() -> secondClusterSetup.addCluster(CLUSTER_ONE, false));
} finally {
firstClusterSetup.close();
secondClusterSetup.close();
}
}
@BeforeSuite
public void beforeSuite() throws Exception {
// TODO: use logging.properties file to config java.util.logging.Logger levels
java.util.logging.Logger topJavaLogger = java.util.logging.Logger.getLogger("");
topJavaLogger.setLevel(Level.WARNING);
// start zk
_zkServer = TestHelper.startZkServer(ZK_ADDR);
AssertJUnit.assertTrue(_zkServer != null);
ZKClientPool.reset();
_gZkClient =
new ZkClient(ZK_ADDR, ZkClient.DEFAULT_SESSION_TIMEOUT, ZkClient.DEFAULT_CONNECTION_TIMEOUT,
new ZNRecordSerializer());
_gSetupTool = new ClusterSetup(_gZkClient);
// start admin
_adminThread = new AdminThread(ZK_ADDR, ADMIN_PORT);
_adminThread.start();
// create a client
_gClient = new Client(Protocol.HTTP);
// wait for the web service to start
Thread.sleep(100);
}
@Test()
public void testDropResource() throws Exception {
// add a resource to be dropped
_gSetupTool.addResourceToCluster(CLUSTER_NAME, "MyDB", 6, STATE_MODEL);
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, "MyDB", 3);
boolean result =
ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
ZK_ADDR, CLUSTER_NAME));
Assert.assertTrue(result);
String command = "-zkSvr " + ZK_ADDR + " -dropResource " + CLUSTER_NAME + " " + "MyDB";
ClusterSetup.processCommandLineArgs(command.split(" "));
TestHelper.verifyWithTimeout("verifyEmptyCurStateAndExtView", 30 * 1000, CLUSTER_NAME, "MyDB",
TestHelper.<String> setOf("localhost_12918", "localhost_12919", "localhost_12920",
"localhost_12921", "localhost_12922"), ZK_ADDR);
}
@BeforeClass
public void beforeClass() throws Exception {
ZkServer zkServer = TestHelper.startZkServer(_zkAddr);
_zkServerRef.set(zkServer);
_zkClient = SharedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(_zkAddr));
_zkClient.setZkSerializer(new ZNRecordSerializer());
_setupTool = new ClusterSetup(_zkClient);
_participants = new MockParticipantManager[_numNodes];
_setupTool.addCluster(CLUSTER_NAME, true);
setupParticipants(_setupTool);
setupDBs(_setupTool);
createManagers(_zkAddr, CLUSTER_NAME);
// start controller
String controllerName = CONTROLLER_PREFIX + "_0";
_controller = new ClusterControllerManager(_zkAddr, CLUSTER_NAME, controllerName);
_controller.syncStart();
ZkHelixClusterVerifier clusterVerifier =
new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(_zkAddr).build();
Assert.assertTrue(clusterVerifier.verifyByPolling());
}
@Override
@BeforeClass
public void beforeClass() throws Exception {
System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
// setup storage cluster
_gSetupTool.addCluster(CLUSTER_NAME, true);
_gSetupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, _PARTITIONS, STATE_MODEL);
for (int i = 0; i < NODE_NR; i++) {
String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
_gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
}
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, 3);
// Set the timeout values
IdealState idealState =
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, TEST_DB);
String stateTransition = "SLAVE" + "-" + "MASTER" + "_" + Message.Attributes.TIMEOUT;
idealState.getRecord().setSimpleField(stateTransition, "300");
String command =
"-zkSvr " + ZK_ADDR + " -addResourceProperty " + CLUSTER_NAME + " " + TEST_DB + " "
+ stateTransition + " 200";
ClusterSetup.processCommandLineArgs(command.split(" "));
}
private static void setup() {
IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() {
@Override
public void createDefaultNameSpace(ZkClient client) {
client.deleteRecursive("/" + clusterName);
}
};
zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, port);
zkServer.start();
ClusterSetup clusterSetup = new ClusterSetup(zkConnectString);
clusterSetup.setupTestCluster(clusterName);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void dropCluster(String clusterName, RealmAwareZkClient zkClient, ClusterSetup setup) {
String namespace = "/" + clusterName;
if (zkClient.exists(namespace)) {
try {
setup.deleteCluster(clusterName);
} catch (Exception ex) {
// Failed to delete, give some more time for connections to drop
try {
Thread.sleep(3000L);
setup.deleteCluster(clusterName);
} catch (Exception ignored) {
// OK - just ignore
}
}
}
}
/**
* Starts an additional in-memory ZooKeeper for testing.
* @param i index to be added to the ZK port to avoid conflicts
* @throws Exception
*/
private void startZooKeeper(int i)
throws Exception {
String zkAddress = ZK_PREFIX + (ZK_START_PORT + i);
ZkServer zkServer = TestHelper.startZkServer(zkAddress);
AssertJUnit.assertNotNull(zkServer);
HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
clientConfig.setZkSerializer(new ZNRecordSerializer());
HelixZkClient zkClient = DedicatedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress), clientConfig);
ClusterSetup gSetupTool = new ClusterSetup(zkClient);
BaseDataAccessor baseDataAccessor = new ZkBaseDataAccessor<>(zkClient);
_zkServerMap.put(zkAddress, zkServer);
_helixZkClientMap.put(zkAddress, zkClient);
_clusterSetupMap.put(zkAddress, gSetupTool);
_baseDataAccessorMap.put(zkAddress, baseDataAccessor);
}
@AfterSuite
public void afterSuite() throws IOException {
// Clean up all JMX objects
for (ObjectName mbean : _server.queryNames(null, null)) {
try {
_server.unregisterMBean(mbean);
} catch (Exception e) {
// OK
}
}
// Close all ZK resources
_baseDataAccessorMap.values().forEach(BaseDataAccessor::close);
_clusterSetupMap.values().forEach(ClusterSetup::close);
_helixZkClientMap.values().forEach(HelixZkClient::close);
_zkServerMap.values().forEach(TestHelper::stopZkServer);
}
private static void addConfiguration(ClusterSetup setup, String baseDir, String clusterName,
String instanceName) throws IOException {
Map<String, String> properties = new HashMap<String, String>();
HelixConfigScopeBuilder builder = new HelixConfigScopeBuilder(ConfigScopeProperty.PARTICIPANT);
HelixConfigScope instanceScope =
builder.forCluster(clusterName).forParticipant(instanceName).build();
properties.put("change_log_dir", baseDir + instanceName + "/translog");
properties.put("file_store_dir", baseDir + instanceName + "/filestore");
properties.put("check_point_dir", baseDir + instanceName + "/checkpoint");
setup.getClusterManagementTool().setConfig(instanceScope, properties);
FileUtils.deleteDirectory(new File(properties.get("change_log_dir")));
FileUtils.deleteDirectory(new File(properties.get("file_store_dir")));
FileUtils.deleteDirectory(new File(properties.get("check_point_dir")));
new File(properties.get("change_log_dir")).mkdirs();
new File(properties.get("file_store_dir")).mkdirs();
new File(properties.get("check_point_dir")).mkdirs();
}
/**
* Remove a cluster
* <p>
* Usage: <code> curl -X DELETE http://{host:port}/clusters/{clusterName}
*/
@Override
public Representation delete() {
try {
String clusterName =
ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.CLUSTER_NAME);
ZkClient zkClient =
ResourceUtil.getAttributeFromCtx(getContext(), ResourceUtil.ContextKey.ZKCLIENT);
ClusterSetup setupTool = new ClusterSetup(zkClient);
setupTool.deleteCluster(clusterName);
getResponse().setStatus(Status.SUCCESS_OK);
} catch (Exception e) {
getResponse().setEntity(ClusterRepresentationUtil.getErrorAsJsonStringFromException(e),
MediaType.APPLICATION_JSON);
getResponse().setStatus(Status.SUCCESS_OK);
}
return null;
}
/**
* Remove constraints
* <p>
* Usage:
* <code>curl -X DELETE http://{host:port}/clusters/{cluster}/constraints/MESSAGE_CONSTRAINT/{constraintId}
*/
@Override
public Representation delete() {
String clusterName =
ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.CLUSTER_NAME);
String constraintTypeStr =
ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.CONSTRAINT_TYPE);
String constraintId =
ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.CONSTRAINT_ID);
try {
ZkClient zkClient =
ResourceUtil.getAttributeFromCtx(getContext(), ResourceUtil.ContextKey.ZKCLIENT);
ClusterSetup setupTool = new ClusterSetup(zkClient);
setupTool.removeConstraint(clusterName, constraintTypeStr, constraintId);
} catch (Exception e) {
LOG.error("Error in delete constraint", e);
getResponse().setEntity(ClusterRepresentationUtil.getErrorAsJsonStringFromException(e),
MediaType.APPLICATION_JSON);
getResponse().setStatus(Status.SUCCESS_OK);
}
return null;
}
StringRepresentation getSchedulerTasksRepresentation() throws JsonGenerationException,
JsonMappingException, IOException {
String clusterName = (String) getRequest().getAttributes().get("clusterName");
String instanceName = (String) getRequest().getAttributes().get("instanceName");
ZkClient zkClient = (ZkClient) getContext().getAttributes().get(RestAdminApplication.ZKCLIENT);
ClusterSetup setupTool = new ClusterSetup(zkClient);
List<String> instances =
setupTool.getClusterManagementTool().getInstancesInCluster(clusterName);
HelixDataAccessor accessor =
ClusterRepresentationUtil.getClusterDataAccessor(zkClient, clusterName);
LiveInstance liveInstance =
accessor.getProperty(accessor.keyBuilder().liveInstance(instanceName));
String sessionId = liveInstance.getEphemeralOwner();
StringRepresentation representation = new StringRepresentation("");// (ClusterRepresentationUtil.ObjectToJson(instanceConfigs),
// MediaType.APPLICATION_JSON);
return representation;
}
@Override
public Representation delete() {
try {
String clusterName =
ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.CLUSTER_NAME);
String resourceName =
ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.RESOURCE_NAME);
ZkClient zkclient =
ResourceUtil.getAttributeFromCtx(getContext(), ResourceUtil.ContextKey.ZKCLIENT);
ClusterSetup setupTool = new ClusterSetup(zkclient);
setupTool.dropResourceFromCluster(clusterName, resourceName);
getResponse().setStatus(Status.SUCCESS_OK);
} catch (Exception e) {
getResponse().setEntity(ClusterRepresentationUtil.getErrorAsJsonStringFromException(e),
MediaType.APPLICATION_JSON);
getResponse().setStatus(Status.SUCCESS_OK);
LOG.error("", e);
}
return null;
}
@Override
public Representation delete() {
try {
String clusterName =
ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.CLUSTER_NAME);
String instanceName =
ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.INSTANCE_NAME);
ZkClient zkclient =
ResourceUtil.getAttributeFromCtx(getContext(), ResourceUtil.ContextKey.ZKCLIENT);
ClusterSetup setupTool = new ClusterSetup(zkclient);
setupTool.dropInstanceFromCluster(clusterName, instanceName);
getResponse().setStatus(Status.SUCCESS_OK);
} catch (Exception e) {
getResponse().setEntity(ClusterRepresentationUtil.getErrorAsJsonStringFromException(e),
MediaType.APPLICATION_JSON);
getResponse().setStatus(Status.SUCCESS_OK);
LOG.error("Error in delete instance", e);
}
return null;
}
StringRepresentation getClustersRepresentation() throws JsonGenerationException,
JsonMappingException, IOException {
ZkClient zkClient =
ResourceUtil.getAttributeFromCtx(getContext(), ResourceUtil.ContextKey.ZKCLIENT);
ClusterSetup setupTool = new ClusterSetup(zkClient);
List<String> clusters = setupTool.getClusterManagementTool().getClusters();
ZNRecord clustersRecord = new ZNRecord("Clusters Summary");
clustersRecord.setListField("clusters", clusters);
StringRepresentation representation =
new StringRepresentation(ClusterRepresentationUtil.ZNRecordToJson(clustersRecord),
MediaType.APPLICATION_JSON);
return representation;
}
private void setUpHelixCluster(String zookeeperQuorum, String clusterName) {
ZkClient zkClient = ZKClientPool.getZkClient(zookeeperQuorum);
HelixAdmin helixAdmin = new ZKHelixAdmin(zkClient);
try {
if(!ImmutableSet.copyOf(helixAdmin.getClusters()).contains(clusterName)) {
ClusterSetup helixClusterSetUp = new ClusterSetup(zkClient);
helixClusterSetUp.addCluster(clusterName, false);
helixClusterSetUp.setConfig(HelixConfigScope.ConfigScopeProperty.CLUSTER, clusterName,
"allowParticipantAutoJoin=true");
}
} finally {
zkClient.close();
}
}
/**
* Create a Helix cluster for the Gobblin Cluster application.
*
* @param zkConnectionString the ZooKeeper connection string
* @param clusterName the Helix cluster name
* @param overwrite true to overwrite exiting cluster, false to reuse existing cluster
*/
public static void createGobblinHelixCluster(String zkConnectionString, String clusterName, boolean overwrite) {
ClusterSetup clusterSetup = new ClusterSetup(zkConnectionString);
// Create the cluster and overwrite if it already exists
clusterSetup.addCluster(clusterName, overwrite);
// Helix 0.6.x requires a configuration property to have the form key=value.
String autoJoinConfig = ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN + "=true";
clusterSetup.setConfig(HelixConfigScope.ConfigScopeProperty.CLUSTER, clusterName, autoJoinConfig);
}
/**
* Create a Helix cluster for the Gobblin Cluster application.
*
* @param zkConnectionString the ZooKeeper connection string
* @param clusterName the Helix cluster name
* @param overwrite true to overwrite exiting cluster, false to reuse existing cluster
*/
public static void createGobblinHelixCluster(
String zkConnectionString,
String clusterName,
boolean overwrite) {
ClusterSetup clusterSetup = new ClusterSetup(zkConnectionString);
// Create the cluster and overwrite if it already exists
clusterSetup.addCluster(clusterName, overwrite);
// Helix 0.6.x requires a configuration property to have the form key=value.
String autoJoinConfig = ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN + "=true";
clusterSetup.setConfig(HelixConfigScope.ConfigScopeProperty.CLUSTER, clusterName, autoJoinConfig);
}
private Map<String, String> activateClusterCmd(String grandClusterName, boolean enabled) {
Map<String, String> parameters = new HashMap<String, String>();
parameters.put(JsonParameters.GRAND_CLUSTER, grandClusterName);
parameters.put(JsonParameters.ENABLED, "" + enabled);
parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.activateCluster);
return parameters;
}
/**
* Populate info on ZooKeeper server and start {@link HelixControllerManager}.
* @param zkConnectString zk connect string to zk server.
* @param vcrClusterName the vcr cluster name.
* @param clusterMap the {@link ClusterMap} to use.
* @return the created {@link HelixControllerManager}.
*/
public static HelixControllerManager populateZkInfoAndStartController(String zkConnectString, String vcrClusterName,
ClusterMap clusterMap) {
HelixZkClient zkClient = DedicatedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(zkConnectString), new HelixZkClient.ZkClientConfig());
try {
zkClient.setZkSerializer(new ZNRecordSerializer());
ClusterSetup clusterSetup = new ClusterSetup(zkClient);
clusterSetup.addCluster(vcrClusterName, true);
HelixAdmin admin = new ZKHelixAdmin(zkClient);
// set ALLOW_PARTICIPANT_AUTO_JOIN
HelixConfigScope configScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).
forCluster(vcrClusterName).build();
Map<String, String> helixClusterProperties = new HashMap<>();
helixClusterProperties.put(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, String.valueOf(true));
admin.setConfig(configScope, helixClusterProperties);
// set PersistBestPossibleAssignment
ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
ClusterConfig clusterConfig = configAccessor.getClusterConfig(vcrClusterName);
clusterConfig.setPersistBestPossibleAssignment(true);
configAccessor.setClusterConfig(vcrClusterName, clusterConfig);
FullAutoModeISBuilder builder = new FullAutoModeISBuilder(helixResource);
builder.setStateModel(LeaderStandbySMD.name);
for (PartitionId partitionId : clusterMap.getAllPartitionIds(null)) {
builder.add(partitionId.toPathString());
}
builder.setRebalanceStrategy(CrushEdRebalanceStrategy.class.getName());
IdealState idealState = builder.build();
admin.addResource(vcrClusterName, helixResource, idealState);
admin.rebalance(vcrClusterName, helixResource, 3, "", "");
HelixControllerManager helixControllerManager = new HelixControllerManager(zkConnectString, vcrClusterName);
helixControllerManager.syncStart();
return helixControllerManager;
} finally {
zkClient.close();
}
}
@Test()
public void testDropResourceWhileNodeDead() throws Exception {
// add a resource to be dropped
_gSetupTool.addResourceToCluster(CLUSTER_NAME, "MyDB2", 16, STATE_MODEL);
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, "MyDB2", 3);
boolean verifyResult =
ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
ZK_ADDR, CLUSTER_NAME));
Assert.assertTrue(verifyResult);
String hostToKill = "localhost_12920";
_participants[2].syncStop();
Thread.sleep(1000);
String command = "-zkSvr " + ZK_ADDR + " -dropResource " + CLUSTER_NAME + " " + "MyDB2";
ClusterSetup.processCommandLineArgs(command.split(" "));
TestHelper.verifyWithTimeout("verifyEmptyCurStateAndExtView", 30 * 1000, CLUSTER_NAME, "MyDB2",
TestHelper.<String> setOf("localhost_12918", "localhost_12919",
/* "localhost_12920", */"localhost_12921", "localhost_12922"), ZK_ADDR);
_participants[2] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, hostToKill);
_participants[2].syncStart();
TestHelper.verifyWithTimeout("verifyEmptyCurStateAndExtView", 30 * 1000, CLUSTER_NAME, "MyDB2",
TestHelper.<String> setOf("localhost_12918", "localhost_12919", "localhost_12920",
"localhost_12921", "localhost_12922"), ZK_ADDR);
}
private Map<String, String> addInstanceTagCmd(String tag) {
Map<String, String> parameters = new HashMap<String, String>();
parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addInstanceTag);
parameters.put(ClusterSetup.instanceGroupTag, tag);
return parameters;
}
private Map<String, String> swapInstanceCmd(String oldInstance, String newInstance) {
Map<String, String> parameters = new HashMap<String, String>();
parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.swapInstance);
parameters.put(JsonParameters.OLD_INSTANCE, oldInstance);
parameters.put(JsonParameters.NEW_INSTANCE, newInstance);
return parameters;
}
protected void setupStorageCluster(ClusterSetup setupTool, String clusterName, String dbName,
int partitionNr, String prefix, int startPort, String stateModel, int replica,
boolean rebalance) {
setupTool.addResourceToCluster(clusterName, dbName, partitionNr, stateModel);
for (int i = 0; i < NODE_NR; i++) {
String instanceName = prefix + "_" + (startPort + i);
setupTool.addInstanceToCluster(clusterName, instanceName);
}
if (rebalance) {
setupTool.rebalanceStorageCluster(clusterName, dbName, replica);
}
}
@Override
public void doTransition(Message message, NotificationContext context) {
HelixManager manager = context.getManager();
String clusterName = manager.getClusterName();
String instance = message.getTgtName();
String partitionName = message.getPartitionName();
String fromState = message.getFromState();
String toState = message.getToState();
if (instance.equals("localhost_12919") && partitionName.equals("TestDB0_0")) {
if (fromState.equals("SLAVE") && toState.equals("OFFLINE")) {
slaveToOfflineCnt++;
try {
String command = "--zkSvr " + ZK_ADDR + " --enablePartition true " + clusterName
+ " localhost_12919 TestDB0 TestDB0_0";
ClusterSetup.processCommandLineArgs(command.split("\\s+"));
} catch (Exception e) {
LOG.error("Exception in cluster setup", e);
}
} else if (slaveToOfflineCnt > 0 && fromState.equals("OFFLINE")
&& toState.equals("SLAVE")) {
offlineToSlave++;
}
}
}
void verifyAddCluster() throws IOException, InterruptedException {
String httpUrlBase = "http://localhost:" + ADMIN_PORT + "/clusters";
Map<String, String> paraMap = new HashMap<String, String>();
paraMap.put(JsonParameters.CLUSTER_NAME, clusterName);
paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addCluster);
Reference resourceRef = new Reference(httpUrlBase);
Request request = new Request(Method.POST, resourceRef);
request.setEntity(
JsonParameters.JSON_PARAMETERS + "=" + ClusterRepresentationUtil.ObjectToJson(paraMap),
MediaType.APPLICATION_ALL);
Response response = _gClient.handle(request);
Representation result = response.getEntity();
StringWriter sw = new StringWriter();
result.write(sw);
// System.out.println(sw.toString());
ObjectMapper mapper = new ObjectMapper();
ZNRecord zn = mapper.readValue(new StringReader(sw.toString()), ZNRecord.class);
AssertJUnit.assertTrue(zn.getListField("clusters").contains(clusterName));
}
@Test()
public void testDisablePartition() throws Exception {
LOG.info("START testDisablePartition() at " + new Date(System.currentTimeMillis()));
// localhost_12919 is MASTER for TestDB_0
String command = "--zkSvr " + ZK_ADDR + " --enablePartition false " + CLUSTER_NAME
+ " localhost_12919 TestDB TestDB_0 TestDB_9";
ClusterSetup.processCommandLineArgs(command.split("\\s+"));
Map<String, Set<String>> map = new HashMap<>();
map.put("TestDB_0", TestHelper.setOf("localhost_12919"));
map.put("TestDB_9", TestHelper.setOf("localhost_12919"));
boolean result = ClusterStateVerifier.verifyByPolling(
new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
Assert.assertTrue(result);
TestHelper.verifyState(CLUSTER_NAME, ZK_ADDR, map, "OFFLINE");
ZKHelixAdmin tool = new ZKHelixAdmin(_gZkClient);
tool.enablePartition(true, CLUSTER_NAME, "localhost_12919", "TestDB",
Collections.singletonList("TestDB_9"));
result = ClusterStateVerifier.verifyByPolling(
new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
Assert.assertTrue(result);
map.clear();
map.put("TestDB_0", TestHelper.setOf("localhost_12919"));
TestHelper.verifyState(CLUSTER_NAME, ZK_ADDR, map, "OFFLINE");
map.clear();
map.put("TestDB_9", TestHelper.setOf("localhost_12919"));
TestHelper.verifyState(CLUSTER_NAME, ZK_ADDR, map, "MASTER");
LOG.info("STOP testDisablePartition() at " + new Date(System.currentTimeMillis()));
}