下面列出了org.testng.annotations.AfterSuite#org.apache.helix.BaseDataAccessor 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Find paths of all leaf nodes under an ancestor path (exclusive)
* @param accessor
* @param ancestorPath
* @return a list of paths
*/
static List<String> scanPath(BaseDataAccessor<ZNRecord> accessor, String ancestorPath) {
List<String> queue = Lists.newLinkedList();
queue.add(ancestorPath);
// BFS
List<String> leafPaths = Lists.newArrayList();
while (!queue.isEmpty()) {
String path = queue.remove(0);
List<String> childNames = accessor.getChildNames(path, 0);
if (childNames == null) {
// path doesn't exist
continue;
}
if (childNames.isEmpty() && !path.equals(ancestorPath)) {
// leaf node, excluding ancestorPath
leafPaths.add(path);
}
for (String childName : childNames) {
String subPath = String.format("%s/%s", path, childName);
queue.add(subPath);
}
}
return leafPaths;
}
@Override
public void enableInstance(String clusterName, List<String> instances, boolean enabled) {
// TODO: Reenable this after storage node bug fixed.
if (true) {
throw new HelixException("Current batch enable/disable instances are temporarily disabled!");
}
logger.info("Batch {} instances {} in cluster {}.", enabled ? "enable" : "disable",
HelixUtil.serializeByComma(instances), clusterName);
BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<>(_zkClient);
if (enabled) {
for (String instance : instances) {
enableSingleInstance(clusterName, instance, enabled, baseAccessor);
}
}
enableBatchInstances(clusterName, instances, enabled, baseAccessor);
}
@Override
public void setConstraint(String clusterName, final ConstraintType constraintType,
final String constraintId, final ConstraintItem constraintItem) {
logger.info("Set constraint type {} with constraint id {} for cluster {}.", constraintType,
constraintId, clusterName);
BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
String path = keyBuilder.constraint(constraintType.toString()).getPath();
baseAccessor.update(path, new DataUpdater<ZNRecord>() {
@Override
public ZNRecord update(ZNRecord currentData) {
ClusterConstraints constraints =
currentData == null ? new ClusterConstraints(constraintType)
: new ClusterConstraints(currentData);
constraints.addConstraintItem(constraintId, constraintItem);
return constraints.getRecord();
}
}, AccessOption.PERSISTENT);
}
@Override
public void removeConstraint(String clusterName, final ConstraintType constraintType,
final String constraintId) {
logger.info("Remove constraint type {} with constraint id {} for cluster {}.", constraintType,
constraintId, clusterName);
BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
String path = keyBuilder.constraint(constraintType.toString()).getPath();
baseAccessor.update(path, new DataUpdater<ZNRecord>() {
@Override
public ZNRecord update(ZNRecord currentData) {
if (currentData != null) {
ClusterConstraints constraints = new ClusterConstraints(currentData);
constraints.removeConstraintItem(constraintId);
return constraints.getRecord();
}
return null;
}
}, AccessOption.PERSISTENT);
}
private void enableSingleInstance(final String clusterName, final String instanceName,
final boolean enabled, BaseDataAccessor<ZNRecord> baseAccessor) {
String path = PropertyPathBuilder.instanceConfig(clusterName, instanceName);
if (!baseAccessor.exists(path, 0)) {
throw new HelixException("Cluster " + clusterName + ", instance: " + instanceName
+ ", instance config does not exist");
}
baseAccessor.update(path, new DataUpdater<ZNRecord>() {
@Override
public ZNRecord update(ZNRecord currentData) {
if (currentData == null) {
throw new HelixException("Cluster: " + clusterName + ", instance: " + instanceName
+ ", participant config is null");
}
InstanceConfig config = new InstanceConfig(currentData);
config.setInstanceEnabled(enabled);
return config.getRecord();
}
}, AccessOption.PERSISTENT);
}
public ZkCallbackCache(BaseDataAccessor<T> accessor, String chrootPath, List<String> paths,
ZkCacheEventThread eventThread) {
super();
_accessor = accessor;
_chrootPath = chrootPath;
_listener = new ConcurrentHashMap<>();
_eventThread = eventThread;
// init cache
// System.out.println("init cache: " + paths);
if (paths != null && !paths.isEmpty()) {
for (String path : paths) {
updateRecursive(path);
}
}
}
@Test
public void testSyncSet() {
String className = TestHelper.getTestClassName();
String methodName = TestHelper.getTestMethodName();
String testName = className + "_" + methodName;
System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));
String path = String.format("/%s/%s", _rootPath, "msg_0");
ZNRecord record = new ZNRecord("msg_0");
BaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
boolean success = accessor.set(path, record, AccessOption.PERSISTENT);
Assert.assertTrue(success);
ZNRecord getRecord = _gZkClient.readData(path);
Assert.assertNotNull(getRecord);
Assert.assertEquals(getRecord.getId(), "msg_0");
System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
}
@Test
public void testDisableResource() {
String className = TestHelper.getTestClassName();
String methodName = TestHelper.getTestMethodName();
String clusterName = className + "_" + methodName;
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
admin.addCluster(clusterName, true);
Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient), "Cluster should be setup");
String resourceName = "TestDB";
admin.addStateModelDef(clusterName, "MasterSlave",
new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave()));
admin.addResource(clusterName, resourceName, 4, "MasterSlave");
admin.enableResource(clusterName, resourceName, false);
BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<>(_gZkClient);
HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
IdealState idealState = accessor.getProperty(keyBuilder.idealStates(resourceName));
Assert.assertFalse(idealState.isEnabled());
admin.enableResource(clusterName, resourceName, true);
idealState = accessor.getProperty(keyBuilder.idealStates(resourceName));
Assert.assertTrue(idealState.isEnabled());
admin.dropCluster(clusterName);
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
/**
* Starts an additional in-memory ZooKeeper for testing.
* @param i index to be added to the ZK port to avoid conflicts
* @throws Exception
*/
private void startZooKeeper(int i)
throws Exception {
String zkAddress = ZK_PREFIX + (ZK_START_PORT + i);
ZkServer zkServer = TestHelper.startZkServer(zkAddress);
AssertJUnit.assertNotNull(zkServer);
HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
clientConfig.setZkSerializer(new ZNRecordSerializer());
HelixZkClient zkClient = DedicatedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress), clientConfig);
ClusterSetup gSetupTool = new ClusterSetup(zkClient);
BaseDataAccessor baseDataAccessor = new ZkBaseDataAccessor<>(zkClient);
_zkServerMap.put(zkAddress, zkServer);
_helixZkClientMap.put(zkAddress, zkClient);
_clusterSetupMap.put(zkAddress, gSetupTool);
_baseDataAccessorMap.put(zkAddress, baseDataAccessor);
}
@AfterSuite
public void afterSuite() throws IOException {
// Clean up all JMX objects
for (ObjectName mbean : _server.queryNames(null, null)) {
try {
_server.unregisterMBean(mbean);
} catch (Exception e) {
// OK
}
}
// Close all ZK resources
_baseDataAccessorMap.values().forEach(BaseDataAccessor::close);
_clusterSetupMap.values().forEach(ClusterSetup::close);
_helixZkClientMap.values().forEach(HelixZkClient::close);
_zkServerMap.values().forEach(TestHelper::stopZkServer);
}
/**
* Recursively deletes the PropertyStore path. If the node does not exist, it returns OK().
* @param clusterId
* @param path
* @return
*/
@DELETE
@Path("{path: .+}")
public Response deletePropertyByPath(@PathParam("clusterId") String clusterId,
@PathParam("path") String path) {
path = "/" + path;
if (!ZkValidationUtil.isPathValid(path)) {
LOG.info("The propertyStore path {} is invalid for cluster {}", path, clusterId);
return badRequest(
"Invalid path string. Valid path strings use slash as the directory separator and names the location of ZNode");
}
final String recordPath = PropertyPathBuilder.propertyStore(clusterId) + path;
BaseDataAccessor<byte[]> propertyStoreDataAccessor = getByteArrayDataAccessor();
if (!propertyStoreDataAccessor.remove(recordPath, AccessOption.PERSISTENT)) {
return serverError("Failed to delete PropertyStore record in path: " + path);
}
return OK();
}
public static Map<String, String> readZkChildrenAsBytesMap(ZkClient zkclient, PropertyKey propertyKey) {
BaseDataAccessor<byte[]> baseAccessor = new ZkBaseDataAccessor<byte[]>(zkclient);
String parentPath = propertyKey.getPath();
List<String> childNames = baseAccessor.getChildNames(parentPath, 0);
if (childNames == null) {
return null;
}
List<String> paths = new ArrayList<String>();
for (String childName : childNames) {
paths.add(parentPath + "/" + childName);
}
List<byte[]> values = baseAccessor.get(paths, null, 0);
Map<String, String> ret = new HashMap<String, String>();
for (int i = 0; i < childNames.size(); i++) {
ret.put(childNames.get(i), new String(values.get(i)));
}
return ret;
}
@Override
public void run() {
// For each record in status update and error node
// TODO: for now the status updates are dumped to cluster manager log4j log.
// We need to think if we should create per-instance log files that contains
// per-instance statusUpdates
// and errors
LOG.info("Scan statusUpdates and errors for cluster: " + _manager.getClusterName()
+ ", by controller: " + _manager);
HelixDataAccessor accessor = _manager.getHelixDataAccessor();
Builder keyBuilder = accessor.keyBuilder();
BaseDataAccessor<ZNRecord> baseAccessor = accessor.getBaseDataAccessor();
List<String> instances = accessor.getChildNames(keyBuilder.instanceConfigs());
for (String instance : instances) {
// dump participant status updates
String statusUpdatePath = PropertyPathBuilder.instanceStatusUpdate(_manager.getClusterName(), instance);
dump(baseAccessor, statusUpdatePath, _thresholdNoChangeMsForStatusUpdates, _maxLeafCount);
// dump participant errors
String errorPath = PropertyPathBuilder.instanceError(_manager.getClusterName(), instance);
dump(baseAccessor, errorPath, _thresholdNoChangeMsForErrors, _maxLeafCount);
}
// dump controller status updates
String controllerStatusUpdatePath = PropertyPathBuilder.controllerStatusUpdate(_manager.getClusterName());
dump(baseAccessor, controllerStatusUpdatePath, _thresholdNoChangeMsForStatusUpdates, _maxLeafCount);
// dump controller errors
String controllerErrorPath = PropertyPathBuilder.controllerError(_manager.getClusterName());
dump(baseAccessor, controllerErrorPath, _thresholdNoChangeMsForErrors, _maxLeafCount);
}
void dump(BaseDataAccessor<ZNRecord> accessor, String ancestorPath, long threshold,
int maxLeafCount) {
List<String> leafPaths = scanPath(accessor, ancestorPath);
if (leafPaths.isEmpty()) {
return;
}
Stat[] stats = accessor.getStats(leafPaths, 0);
List<String> dumpPaths = Lists.newArrayList();
long now = System.currentTimeMillis();
for (int i = 0; i < stats.length; i++) {
Stat stat = stats[i];
if ((stats.length > maxLeafCount) || ((now - stat.getMtime()) > threshold)) {
dumpPaths.add(leafPaths.get(i));
}
}
if (!dumpPaths.isEmpty()) {
LOG.info("Dump statusUpdates and errors records for paths: " + dumpPaths);
// No need to fail the batch read operation even it is partial result becuase it is for cleaning up.
List<ZNRecord> dumpRecords = accessor.get(dumpPaths, null, 0, false);
for (ZNRecord record : dumpRecords) {
if (record != null) {
LOG.info(new String(_jsonSerializer.serialize(record)));
}
}
// clean up
accessor.remove(dumpPaths, 0);
LOG.info("Remove statusUpdates and errors records for paths: " + dumpPaths);
}
}
public WriteThroughCache(BaseDataAccessor<T> accessor, List<String> paths) {
super();
_accessor = accessor;
// init cache
if (paths != null && !paths.isEmpty()) {
for (String path : paths) {
updateRecursive(path);
}
}
}
@Override
public void enableInstance(final String clusterName, final String instanceName,
final boolean enabled) {
logger.info("{} instance {} in cluster {}.", enabled ? "Enable" : "Disable", instanceName,
clusterName);
BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<>(_zkClient);
enableSingleInstance(clusterName, instanceName, enabled, baseAccessor);
// TODO: Reenable this after storage node bug fixed.
// enableBatchInstances(clusterName, Collections.singletonList(instanceName), enabled, baseAccessor);
}
public ZKHelixDataAccessor(String clusterName, InstanceType instanceType,
BaseDataAccessor<ZNRecord> baseDataAccessor) {
_clusterName = clusterName;
_instanceType = instanceType;
_baseDataAccessor = baseDataAccessor;
_propertyKeyBuilder = new PropertyKey.Builder(_clusterName);
}
/**
* Check all partitions are in OFFLINE state
* @param clusterName
* @throws Exception
*/
private void checkExternalView(String clusterName) throws Exception {
BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
final HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
// verify that states of TestDB0 are all OFFLINE
boolean result = TestHelper.verify(() -> {
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
ExternalView extView = accessor.getProperty(keyBuilder.externalView("TestDB0"));
if (extView == null) {
return false;
}
Set<String> partitionSet = extView.getPartitionSet();
if (partitionSet == null || partitionSet.size() != PARTITION_NUM) {
return false;
}
for (String partition : partitionSet) {
Map<String, String> instanceStates = extView.getStateMap(partition);
for (String state : instanceStates.values()) {
if (!"OFFLINE".equals(state)) {
return false;
}
}
}
return true;
}, 10 * 1000);
Assert.assertTrue(result);
}
@Test
public void testDisableResource() throws Exception {
String className = TestHelper.getTestClassName();
String methodName = TestHelper.getTestMethodName();
String clusterName = className + "_" + methodName;
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
"localhost", // participant name prefix
"TestDB", // resource name prefix
1, // resources
10, // partitions per resource
5, // number of nodes
3, // replicas
"MasterSlave", true); // do rebalance
// disable "TestDB0" resource
ClusterSetup.processCommandLineArgs(new String[] {
"--zkSvr", ZK_ADDR, "--enableResource", clusterName, "TestDB0", "false"
});
BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(ZK_ADDR);
HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
Assert.assertFalse(idealState.isEnabled());
// enable "TestDB0" resource
ClusterSetup.processCommandLineArgs(new String[] {
"--zkSvr", ZK_ADDR, "--enableResource", clusterName, "TestDB0", "true"
});
idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
Assert.assertTrue(idealState.isEnabled());
TestHelper.dropCluster(clusterName, _gZkClient);
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
@Override
public boolean verify() {
BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
HelixDataAccessor accessor = new ZKHelixDataAccessor(_clusterName, baseAccessor);
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
ExternalView externalView = accessor.getProperty(keyBuilder.externalView(_resourceName));
// verify external view empty
if (externalView != null) {
for (String partition : externalView.getPartitionSet()) {
Map<String, String> stateMap = externalView.getStateMap(partition);
if (stateMap != null && !stateMap.isEmpty()) {
LOG.error("External view not empty for " + partition);
return false;
}
}
}
// verify current state empty
List<String> liveParticipants = accessor.getChildNames(keyBuilder.liveInstances());
for (String participant : liveParticipants) {
List<String> sessionIds = accessor.getChildNames(keyBuilder.sessions(participant));
for (String sessionId : sessionIds) {
CurrentState currentState =
accessor.getProperty(keyBuilder.currentState(participant, sessionId, _resourceName));
Map<String, String> partitionStateMap = currentState.getPartitionStateMap();
if (partitionStateMap != null && !partitionStateMap.isEmpty()) {
LOG.error("Current state not empty for " + participant);
return false;
}
}
}
return true;
}
/**
* Returns a response containing the binary data and Stat.
* @param zkBaseDataAccessor
* @param path
* @return
*/
private Response getBinaryData(BaseDataAccessor<byte[]> zkBaseDataAccessor, String path) {
Stat stat = new Stat();
byte[] bytes = readBinaryDataFromZK(zkBaseDataAccessor, path, stat);
Map<String, Object> binaryResult = ImmutableMap
.of(ZooKeeperCommand.getBinaryData.name(), bytes, ZooKeeperCommand.getStat.name(),
ZKUtil.fromStatToMap(stat));
// Note: this serialization (using ObjectMapper) will convert this byte[] into
// a Base64 String! The REST client (user) must convert the resulting String back into
// a byte[] using Base64.
return JSONRepresentation(binaryResult);
}
/**
* Returns a response containing the string data and Stat.
* @param zkBaseDataAccessor
* @param path
* @return
*/
private Response getStringData(BaseDataAccessor<byte[]> zkBaseDataAccessor, String path) {
Stat stat = new Stat();
byte[] bytes = readBinaryDataFromZK(zkBaseDataAccessor, path, stat);
Map<String, Object> stringResult = ImmutableMap
.of(ZooKeeperCommand.getStringData.name(), new String(bytes),
ZooKeeperCommand.getStat.name(), ZKUtil.fromStatToMap(stat));
return JSONRepresentation(stringResult);
}
/**
* Returns byte[] from ZooKeeper.
* @param zkBaseDataAccessor
* @param path
* @return
*/
private byte[] readBinaryDataFromZK(BaseDataAccessor<byte[]> zkBaseDataAccessor, String path,
Stat stat) {
if (zkBaseDataAccessor.exists(path, AccessOption.PERSISTENT)) {
return zkBaseDataAccessor.get(path, stat, AccessOption.PERSISTENT);
} else {
throw new WebApplicationException(Response.status(Response.Status.NOT_FOUND)
.entity(String.format("The ZNode at path %s does not exist!", path)).build());
}
}
/**
* Returns a list of children ZNode names given the path for the parent ZNode.
* @param zkBaseDataAccessor
* @param path
* @return list of child ZNodes
*/
private Response getChildren(BaseDataAccessor<byte[]> zkBaseDataAccessor, String path) {
if (zkBaseDataAccessor.exists(path, AccessOption.PERSISTENT)) {
Map<String, List<String>> result = ImmutableMap.of(ZooKeeperCommand.getChildren.name(),
zkBaseDataAccessor.getChildNames(path, AccessOption.PERSISTENT));
return JSONRepresentation(result);
} else {
throw new WebApplicationException(Response.status(Response.Status.NOT_FOUND)
.entity(String.format("The ZNode at path %s does not exist", path)).build());
}
}
/**
* Returns the ZNode Stat object given the path.
* @param zkBaseDataAccessor
* @param path
* @return
*/
private Response getStat(BaseDataAccessor<byte[]> zkBaseDataAccessor, String path) {
Stat stat = zkBaseDataAccessor.getStat(path, AccessOption.PERSISTENT);
if (stat == null) {
throw new WebApplicationException(Response.status(Response.Status.NOT_FOUND)
.entity(String.format("The ZNode at path %s does not exist!", path)).build());
}
Map<String, String> result = ZKUtil.fromStatToMap(stat);
result.put("path", path);
return JSONRepresentation(result);
}
/**
* Sample HTTP URLs:
* http://<HOST>/clusters/{clusterId}/propertyStore/<PATH>
* It refers to the /PROPERTYSTORE/<PATH> in Helix metadata store
* @param clusterId The cluster Id
* @param path path parameter is like "abc/abc/abc" in the URL
* @return If the payload is ZNRecord format, return ZnRecord json response;
* Otherwise, return json object {<PATH>: raw string}
*/
@GET
@Path("{path: .+}")
public Response getPropertyByPath(@PathParam("clusterId") String clusterId,
@PathParam("path") String path) {
path = "/" + path;
if (!ZkValidationUtil.isPathValid(path)) {
LOG.info("The propertyStore path {} is invalid for cluster {}", path, clusterId);
return badRequest(
"Invalid path string. Valid path strings use slash as the directory separator and names the location of ZNode");
}
final String recordPath = PropertyPathBuilder.propertyStore(clusterId) + path;
BaseDataAccessor<byte[]> propertyStoreDataAccessor = getByteArrayDataAccessor();
if (propertyStoreDataAccessor.exists(recordPath, AccessOption.PERSISTENT)) {
byte[] bytes = propertyStoreDataAccessor.get(recordPath, null, AccessOption.PERSISTENT);
ZNRecord znRecord = (ZNRecord) ZN_RECORD_SERIALIZER.deserialize(bytes);
// The ZNRecordSerializer returns null when exception occurs in deserialization method
if (znRecord == null) {
ObjectNode jsonNode = OBJECT_MAPPER.createObjectNode();
jsonNode.put(CONTENT_KEY, new String(bytes));
return JSONRepresentation(jsonNode);
}
return JSONRepresentation(znRecord);
} else {
throw new WebApplicationException(Response.status(Response.Status.NOT_FOUND)
.entity(String.format("The property store path %s doesn't exist", recordPath)).build());
}
}
/**
* Returns a lazily-instantiated ZkBaseDataAccessor for the byte array type.
* @return
*/
public BaseDataAccessor<byte[]> getByteArrayZkBaseDataAccessor() {
if (_byteArrayZkBaseDataAccessor == null) {
synchronized (this) {
if (_byteArrayZkBaseDataAccessor == null) {
_byteArrayZkBaseDataAccessor =
new ZkBaseDataAccessor<>(_zkAddr, new ByteArraySerializer());
}
}
}
return _byteArrayZkBaseDataAccessor;
}
BaseDataAccessor<ZNRecord> createBaseDataAccessor() {
ZkBaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<>(_zkclient);
return baseDataAccessor;
}
public ZKHelixDataAccessor(String clusterName, BaseDataAccessor<ZNRecord> baseDataAccessor) {
this(clusterName, null, baseDataAccessor);
}
@Override
public BaseDataAccessor<ZNRecord> getBaseDataAccessor() {
return _baseDataAccessor;
}