org.testng.annotations.AfterSuite#org.apache.helix.BaseDataAccessor源码实例Demo

下面列出了org.testng.annotations.AfterSuite#org.apache.helix.BaseDataAccessor 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: helix   文件: ZKPathDataDumpTask.java
/**
 * Find paths of all leaf nodes under an ancestor path (exclusive)
 * @param accessor
 * @param ancestorPath
 * @return a list of paths
 */
static List<String> scanPath(BaseDataAccessor<ZNRecord> accessor, String ancestorPath) {
  List<String> queue = Lists.newLinkedList();
  queue.add(ancestorPath);

  // BFS
  List<String> leafPaths = Lists.newArrayList();
  while (!queue.isEmpty()) {
    String path = queue.remove(0);
    List<String> childNames = accessor.getChildNames(path, 0);
    if (childNames == null) {
      // path doesn't exist
      continue;
    }
    if (childNames.isEmpty() && !path.equals(ancestorPath)) {
      // leaf node, excluding ancestorPath
      leafPaths.add(path);
    }
    for (String childName : childNames) {
      String subPath = String.format("%s/%s", path, childName);
      queue.add(subPath);
    }
  }
  return leafPaths;
}
 
源代码2 项目: helix   文件: ZKHelixAdmin.java
@Override
public void enableInstance(String clusterName, List<String> instances, boolean enabled) {
  // TODO: Reenable this after storage node bug fixed.
  if (true) {
    throw new HelixException("Current batch enable/disable instances are temporarily disabled!");
  }
  logger.info("Batch {} instances {} in cluster {}.", enabled ? "enable" : "disable",
      HelixUtil.serializeByComma(instances), clusterName);
  BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<>(_zkClient);
  if (enabled) {
    for (String instance : instances) {
      enableSingleInstance(clusterName, instance, enabled, baseAccessor);
    }
  }
  enableBatchInstances(clusterName, instances, enabled, baseAccessor);
}
 
源代码3 项目: helix   文件: ZKHelixAdmin.java
@Override
public void setConstraint(String clusterName, final ConstraintType constraintType,
    final String constraintId, final ConstraintItem constraintItem) {
  logger.info("Set constraint type {} with constraint id {} for cluster {}.", constraintType,
      constraintId, clusterName);
  BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);

  PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
  String path = keyBuilder.constraint(constraintType.toString()).getPath();

  baseAccessor.update(path, new DataUpdater<ZNRecord>() {
    @Override
    public ZNRecord update(ZNRecord currentData) {
      ClusterConstraints constraints =
          currentData == null ? new ClusterConstraints(constraintType)
              : new ClusterConstraints(currentData);

      constraints.addConstraintItem(constraintId, constraintItem);
      return constraints.getRecord();
    }
  }, AccessOption.PERSISTENT);
}
 
源代码4 项目: helix   文件: ZKHelixAdmin.java
@Override
public void removeConstraint(String clusterName, final ConstraintType constraintType,
    final String constraintId) {
  logger.info("Remove constraint type {} with constraint id {} for cluster {}.", constraintType,
      constraintId, clusterName);
  BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);

  PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
  String path = keyBuilder.constraint(constraintType.toString()).getPath();

  baseAccessor.update(path, new DataUpdater<ZNRecord>() {
    @Override
    public ZNRecord update(ZNRecord currentData) {
      if (currentData != null) {
        ClusterConstraints constraints = new ClusterConstraints(currentData);

        constraints.removeConstraintItem(constraintId);
        return constraints.getRecord();
      }
      return null;
    }
  }, AccessOption.PERSISTENT);
}
 
源代码5 项目: helix   文件: ZKHelixAdmin.java
private void enableSingleInstance(final String clusterName, final String instanceName,
    final boolean enabled, BaseDataAccessor<ZNRecord> baseAccessor) {
  String path = PropertyPathBuilder.instanceConfig(clusterName, instanceName);

  if (!baseAccessor.exists(path, 0)) {
    throw new HelixException("Cluster " + clusterName + ", instance: " + instanceName
        + ", instance config does not exist");
  }

  baseAccessor.update(path, new DataUpdater<ZNRecord>() {
    @Override
    public ZNRecord update(ZNRecord currentData) {
      if (currentData == null) {
        throw new HelixException("Cluster: " + clusterName + ", instance: " + instanceName
            + ", participant config is null");
      }

      InstanceConfig config = new InstanceConfig(currentData);
      config.setInstanceEnabled(enabled);
      return config.getRecord();
    }
  }, AccessOption.PERSISTENT);
}
 
源代码6 项目: helix   文件: ZkCallbackCache.java
public ZkCallbackCache(BaseDataAccessor<T> accessor, String chrootPath, List<String> paths,
    ZkCacheEventThread eventThread) {
  super();
  _accessor = accessor;
  _chrootPath = chrootPath;

  _listener = new ConcurrentHashMap<>();
  _eventThread = eventThread;

  // init cache
  // System.out.println("init cache: " + paths);
  if (paths != null && !paths.isEmpty()) {
    for (String path : paths) {
      updateRecursive(path);
    }
  }
}
 
源代码7 项目: helix   文件: TestZkBaseDataAccessor.java
@Test
public void testSyncSet() {
  String className = TestHelper.getTestClassName();
  String methodName = TestHelper.getTestMethodName();
  String testName = className + "_" + methodName;

  System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));

  String path = String.format("/%s/%s", _rootPath, "msg_0");
  ZNRecord record = new ZNRecord("msg_0");
  BaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);

  boolean success = accessor.set(path, record, AccessOption.PERSISTENT);
  Assert.assertTrue(success);
  ZNRecord getRecord = _gZkClient.readData(path);
  Assert.assertNotNull(getRecord);
  Assert.assertEquals(getRecord.getId(), "msg_0");

  System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
}
 
源代码8 项目: helix   文件: TestZkHelixAdmin.java
@Test
public void testDisableResource() {
  String className = TestHelper.getTestClassName();
  String methodName = TestHelper.getTestMethodName();
  String clusterName = className + "_" + methodName;
  System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
  HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
  admin.addCluster(clusterName, true);
  Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient), "Cluster should be setup");
  String resourceName = "TestDB";
  admin.addStateModelDef(clusterName, "MasterSlave",
      new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave()));
  admin.addResource(clusterName, resourceName, 4, "MasterSlave");
  admin.enableResource(clusterName, resourceName, false);
  BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<>(_gZkClient);
  HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
  PropertyKey.Builder keyBuilder = accessor.keyBuilder();
  IdealState idealState = accessor.getProperty(keyBuilder.idealStates(resourceName));
  Assert.assertFalse(idealState.isEnabled());
  admin.enableResource(clusterName, resourceName, true);
  idealState = accessor.getProperty(keyBuilder.idealStates(resourceName));
  Assert.assertTrue(idealState.isEnabled());

  admin.dropCluster(clusterName);
  System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
 
源代码9 项目: helix   文件: ZkTestBase.java
/**
 * Starts an additional in-memory ZooKeeper for testing.
 * @param i index to be added to the ZK port to avoid conflicts
 * @throws Exception
 */
private void startZooKeeper(int i)
    throws Exception {
  String zkAddress = ZK_PREFIX + (ZK_START_PORT + i);
  ZkServer zkServer = TestHelper.startZkServer(zkAddress);
  AssertJUnit.assertNotNull(zkServer);
  HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
  clientConfig.setZkSerializer(new ZNRecordSerializer());
  HelixZkClient zkClient = DedicatedZkClientFactory.getInstance()
      .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress), clientConfig);
  ClusterSetup gSetupTool = new ClusterSetup(zkClient);
  BaseDataAccessor baseDataAccessor = new ZkBaseDataAccessor<>(zkClient);

  _zkServerMap.put(zkAddress, zkServer);
  _helixZkClientMap.put(zkAddress, zkClient);
  _clusterSetupMap.put(zkAddress, gSetupTool);
  _baseDataAccessorMap.put(zkAddress, baseDataAccessor);
}
 
源代码10 项目: helix   文件: ZkTestBase.java
@AfterSuite
public void afterSuite() throws IOException {
  // Clean up all JMX objects
  for (ObjectName mbean : _server.queryNames(null, null)) {
    try {
      _server.unregisterMBean(mbean);
    } catch (Exception e) {
      // OK
    }
  }

  // Close all ZK resources
  _baseDataAccessorMap.values().forEach(BaseDataAccessor::close);
  _clusterSetupMap.values().forEach(ClusterSetup::close);
  _helixZkClientMap.values().forEach(HelixZkClient::close);
  _zkServerMap.values().forEach(TestHelper::stopZkServer);
}
 
源代码11 项目: helix   文件: PropertyStoreAccessor.java
/**
 * Recursively deletes the PropertyStore path. If the node does not exist, it returns OK().
 * @param clusterId
 * @param path
 * @return
 */
@DELETE
@Path("{path: .+}")
public Response deletePropertyByPath(@PathParam("clusterId") String clusterId,
    @PathParam("path") String path) {
  path = "/" + path;
  if (!ZkValidationUtil.isPathValid(path)) {
    LOG.info("The propertyStore path {} is invalid for cluster {}", path, clusterId);
    return badRequest(
        "Invalid path string. Valid path strings use slash as the directory separator and names the location of ZNode");
  }
  final String recordPath = PropertyPathBuilder.propertyStore(clusterId) + path;
  BaseDataAccessor<byte[]> propertyStoreDataAccessor = getByteArrayDataAccessor();
  if (!propertyStoreDataAccessor.remove(recordPath, AccessOption.PERSISTENT)) {
    return serverError("Failed to delete PropertyStore record in path: " + path);
  }
  return OK();
}
 
源代码12 项目: helix   文件: ResourceUtil.java
public static Map<String, String> readZkChildrenAsBytesMap(ZkClient zkclient, PropertyKey propertyKey) {
  BaseDataAccessor<byte[]> baseAccessor = new ZkBaseDataAccessor<byte[]>(zkclient);
  String parentPath = propertyKey.getPath();
  List<String> childNames = baseAccessor.getChildNames(parentPath, 0);
  if (childNames == null) {
    return null;
  }
  List<String> paths = new ArrayList<String>();
  for (String childName : childNames) {
    paths.add(parentPath + "/" + childName);
  }
  List<byte[]> values = baseAccessor.get(paths, null, 0);
  Map<String, String> ret = new HashMap<String, String>();
  for (int i = 0; i < childNames.size(); i++) {
    ret.put(childNames.get(i), new String(values.get(i)));
  }
  return ret;
}
 
源代码13 项目: helix   文件: ZKPathDataDumpTask.java
@Override
public void run() {
  // For each record in status update and error node
  // TODO: for now the status updates are dumped to cluster manager log4j log.
  // We need to think if we should create per-instance log files that contains
  // per-instance statusUpdates
  // and errors
  LOG.info("Scan statusUpdates and errors for cluster: " + _manager.getClusterName()
      + ", by controller: " + _manager);
  HelixDataAccessor accessor = _manager.getHelixDataAccessor();
  Builder keyBuilder = accessor.keyBuilder();
  BaseDataAccessor<ZNRecord> baseAccessor = accessor.getBaseDataAccessor();

  List<String> instances = accessor.getChildNames(keyBuilder.instanceConfigs());
  for (String instance : instances) {
    // dump participant status updates
    String statusUpdatePath = PropertyPathBuilder.instanceStatusUpdate(_manager.getClusterName(), instance);
    dump(baseAccessor, statusUpdatePath, _thresholdNoChangeMsForStatusUpdates, _maxLeafCount);

    // dump participant errors
    String errorPath = PropertyPathBuilder.instanceError(_manager.getClusterName(), instance);
    dump(baseAccessor, errorPath, _thresholdNoChangeMsForErrors, _maxLeafCount);
  }
  // dump controller status updates
  String controllerStatusUpdatePath = PropertyPathBuilder.controllerStatusUpdate(_manager.getClusterName());
  dump(baseAccessor, controllerStatusUpdatePath, _thresholdNoChangeMsForStatusUpdates, _maxLeafCount);

  // dump controller errors
  String controllerErrorPath = PropertyPathBuilder.controllerError(_manager.getClusterName());
  dump(baseAccessor, controllerErrorPath, _thresholdNoChangeMsForErrors, _maxLeafCount);
}
 
源代码14 项目: helix   文件: ZKPathDataDumpTask.java
void dump(BaseDataAccessor<ZNRecord> accessor, String ancestorPath, long threshold,
    int maxLeafCount) {
  List<String> leafPaths = scanPath(accessor, ancestorPath);
  if (leafPaths.isEmpty()) {
    return;
  }

  Stat[] stats = accessor.getStats(leafPaths, 0);
  List<String> dumpPaths = Lists.newArrayList();
  long now = System.currentTimeMillis();
  for (int i = 0; i < stats.length; i++) {
    Stat stat = stats[i];
    if ((stats.length > maxLeafCount) || ((now - stat.getMtime()) > threshold)) {
      dumpPaths.add(leafPaths.get(i));
    }
  }

  if (!dumpPaths.isEmpty()) {
    LOG.info("Dump statusUpdates and errors records for paths: " + dumpPaths);
    // No need to fail the batch read operation even it is partial result becuase it is for cleaning up.
    List<ZNRecord> dumpRecords = accessor.get(dumpPaths, null, 0, false);
    for (ZNRecord record : dumpRecords) {
      if (record != null) {
        LOG.info(new String(_jsonSerializer.serialize(record)));
      }
    }

    // clean up
    accessor.remove(dumpPaths, 0);
    LOG.info("Remove statusUpdates and errors records for paths: " + dumpPaths);
  }
}
 
源代码15 项目: helix   文件: WriteThroughCache.java
public WriteThroughCache(BaseDataAccessor<T> accessor, List<String> paths) {
  super();
  _accessor = accessor;

  // init cache
  if (paths != null && !paths.isEmpty()) {
    for (String path : paths) {
      updateRecursive(path);
    }
  }
}
 
源代码16 项目: helix   文件: ZKHelixAdmin.java
@Override
public void enableInstance(final String clusterName, final String instanceName,
    final boolean enabled) {
  logger.info("{} instance {} in cluster {}.", enabled ? "Enable" : "Disable", instanceName,
      clusterName);
  BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<>(_zkClient);
  enableSingleInstance(clusterName, instanceName, enabled, baseAccessor);
  // TODO: Reenable this after storage node bug fixed.
  // enableBatchInstances(clusterName, Collections.singletonList(instanceName), enabled, baseAccessor);

}
 
源代码17 项目: helix   文件: ZKHelixDataAccessor.java
public ZKHelixDataAccessor(String clusterName, InstanceType instanceType,
    BaseDataAccessor<ZNRecord> baseDataAccessor) {
  _clusterName = clusterName;
  _instanceType = instanceType;
  _baseDataAccessor = baseDataAccessor;
  _propertyKeyBuilder = new PropertyKey.Builder(_clusterName);
}
 
源代码18 项目: helix   文件: TestDisableResource.java
/**
 * Check all partitions are in OFFLINE state
 * @param clusterName
 * @throws Exception
 */
private void checkExternalView(String clusterName) throws Exception {
  BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
  final HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);

  // verify that states of TestDB0 are all OFFLINE
  boolean result = TestHelper.verify(() -> {
    PropertyKey.Builder keyBuilder = accessor.keyBuilder();

    ExternalView extView = accessor.getProperty(keyBuilder.externalView("TestDB0"));
    if (extView == null) {
      return false;
    }
    Set<String> partitionSet = extView.getPartitionSet();
    if (partitionSet == null || partitionSet.size() != PARTITION_NUM) {
      return false;
    }
    for (String partition : partitionSet) {
      Map<String, String> instanceStates = extView.getStateMap(partition);
      for (String state : instanceStates.values()) {
        if (!"OFFLINE".equals(state)) {
          return false;
        }
      }
    }
    return true;
  }, 10 * 1000);
  Assert.assertTrue(result);
}
 
源代码19 项目: helix   文件: TestClusterSetup.java
@Test
public void testDisableResource() throws Exception {
  String className = TestHelper.getTestClassName();
  String methodName = TestHelper.getTestMethodName();
  String clusterName = className + "_" + methodName;
  System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
  TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
      "localhost", // participant name prefix
      "TestDB", // resource name prefix
      1, // resources
      10, // partitions per resource
      5, // number of nodes
      3, // replicas
      "MasterSlave", true); // do rebalance
  // disable "TestDB0" resource
  ClusterSetup.processCommandLineArgs(new String[] {
      "--zkSvr", ZK_ADDR, "--enableResource", clusterName, "TestDB0", "false"
  });
  BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(ZK_ADDR);
  HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
  PropertyKey.Builder keyBuilder = accessor.keyBuilder();
  IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
  Assert.assertFalse(idealState.isEnabled());
  // enable "TestDB0" resource
  ClusterSetup.processCommandLineArgs(new String[] {
      "--zkSvr", ZK_ADDR, "--enableResource", clusterName, "TestDB0", "true"
  });
  idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
  Assert.assertTrue(idealState.isEnabled());

  TestHelper.dropCluster(clusterName, _gZkClient);
  System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
 
源代码20 项目: helix   文件: ZkTestBase.java
@Override
public boolean verify() {
  BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
  HelixDataAccessor accessor = new ZKHelixDataAccessor(_clusterName, baseAccessor);
  PropertyKey.Builder keyBuilder = accessor.keyBuilder();
  ExternalView externalView = accessor.getProperty(keyBuilder.externalView(_resourceName));

  // verify external view empty
  if (externalView != null) {
    for (String partition : externalView.getPartitionSet()) {
      Map<String, String> stateMap = externalView.getStateMap(partition);
      if (stateMap != null && !stateMap.isEmpty()) {
        LOG.error("External view not empty for " + partition);
        return false;
      }
    }
  }

  // verify current state empty
  List<String> liveParticipants = accessor.getChildNames(keyBuilder.liveInstances());
  for (String participant : liveParticipants) {
    List<String> sessionIds = accessor.getChildNames(keyBuilder.sessions(participant));
    for (String sessionId : sessionIds) {
      CurrentState currentState =
          accessor.getProperty(keyBuilder.currentState(participant, sessionId, _resourceName));
      Map<String, String> partitionStateMap = currentState.getPartitionStateMap();
      if (partitionStateMap != null && !partitionStateMap.isEmpty()) {
        LOG.error("Current state not empty for " + participant);
        return false;
      }
    }
  }
  return true;
}
 
源代码21 项目: helix   文件: ZooKeeperAccessor.java
/**
 * Returns a response containing the binary data and Stat.
 * @param zkBaseDataAccessor
 * @param path
 * @return
 */
private Response getBinaryData(BaseDataAccessor<byte[]> zkBaseDataAccessor, String path) {
  Stat stat = new Stat();
  byte[] bytes = readBinaryDataFromZK(zkBaseDataAccessor, path, stat);
  Map<String, Object> binaryResult = ImmutableMap
      .of(ZooKeeperCommand.getBinaryData.name(), bytes, ZooKeeperCommand.getStat.name(),
          ZKUtil.fromStatToMap(stat));
  // Note: this serialization (using ObjectMapper) will convert this byte[] into
  // a Base64 String! The REST client (user) must convert the resulting String back into
  // a byte[] using Base64.
  return JSONRepresentation(binaryResult);
}
 
源代码22 项目: helix   文件: ZooKeeperAccessor.java
/**
 * Returns a response containing the string data and Stat.
 * @param zkBaseDataAccessor
 * @param path
 * @return
 */
private Response getStringData(BaseDataAccessor<byte[]> zkBaseDataAccessor, String path) {
  Stat stat = new Stat();
  byte[] bytes = readBinaryDataFromZK(zkBaseDataAccessor, path, stat);
  Map<String, Object> stringResult = ImmutableMap
      .of(ZooKeeperCommand.getStringData.name(), new String(bytes),
          ZooKeeperCommand.getStat.name(), ZKUtil.fromStatToMap(stat));
  return JSONRepresentation(stringResult);
}
 
源代码23 项目: helix   文件: ZooKeeperAccessor.java
/**
 * Returns byte[] from ZooKeeper.
 * @param zkBaseDataAccessor
 * @param path
 * @return
 */
private byte[] readBinaryDataFromZK(BaseDataAccessor<byte[]> zkBaseDataAccessor, String path,
    Stat stat) {
  if (zkBaseDataAccessor.exists(path, AccessOption.PERSISTENT)) {
    return zkBaseDataAccessor.get(path, stat, AccessOption.PERSISTENT);
  } else {
    throw new WebApplicationException(Response.status(Response.Status.NOT_FOUND)
        .entity(String.format("The ZNode at path %s does not exist!", path)).build());
  }
}
 
源代码24 项目: helix   文件: ZooKeeperAccessor.java
/**
 * Returns a list of children ZNode names given the path for the parent ZNode.
 * @param zkBaseDataAccessor
 * @param path
 * @return list of child ZNodes
 */
private Response getChildren(BaseDataAccessor<byte[]> zkBaseDataAccessor, String path) {
  if (zkBaseDataAccessor.exists(path, AccessOption.PERSISTENT)) {
    Map<String, List<String>> result = ImmutableMap.of(ZooKeeperCommand.getChildren.name(),
        zkBaseDataAccessor.getChildNames(path, AccessOption.PERSISTENT));
    return JSONRepresentation(result);
  } else {
    throw new WebApplicationException(Response.status(Response.Status.NOT_FOUND)
        .entity(String.format("The ZNode at path %s does not exist", path)).build());
  }
}
 
源代码25 项目: helix   文件: ZooKeeperAccessor.java
/**
 * Returns the ZNode Stat object given the path.
 * @param zkBaseDataAccessor
 * @param path
 * @return
 */
private Response getStat(BaseDataAccessor<byte[]> zkBaseDataAccessor, String path) {
  Stat stat = zkBaseDataAccessor.getStat(path, AccessOption.PERSISTENT);
  if (stat == null) {
    throw new WebApplicationException(Response.status(Response.Status.NOT_FOUND)
        .entity(String.format("The ZNode at path %s does not exist!", path)).build());
  }
  Map<String, String> result = ZKUtil.fromStatToMap(stat);
  result.put("path", path);
  return JSONRepresentation(result);
}
 
源代码26 项目: helix   文件: PropertyStoreAccessor.java
/**
 * Sample HTTP URLs:
 *  http://<HOST>/clusters/{clusterId}/propertyStore/<PATH>
 * It refers to the /PROPERTYSTORE/<PATH> in Helix metadata store
 * @param clusterId The cluster Id
 * @param path path parameter is like "abc/abc/abc" in the URL
 * @return If the payload is ZNRecord format, return ZnRecord json response;
 *         Otherwise, return json object {<PATH>: raw string}
 */
@GET
@Path("{path: .+}")
public Response getPropertyByPath(@PathParam("clusterId") String clusterId,
    @PathParam("path") String path) {
  path = "/" + path;
  if (!ZkValidationUtil.isPathValid(path)) {
    LOG.info("The propertyStore path {} is invalid for cluster {}", path, clusterId);
    return badRequest(
        "Invalid path string. Valid path strings use slash as the directory separator and names the location of ZNode");
  }
  final String recordPath = PropertyPathBuilder.propertyStore(clusterId) + path;
  BaseDataAccessor<byte[]> propertyStoreDataAccessor = getByteArrayDataAccessor();
  if (propertyStoreDataAccessor.exists(recordPath, AccessOption.PERSISTENT)) {
    byte[] bytes = propertyStoreDataAccessor.get(recordPath, null, AccessOption.PERSISTENT);
    ZNRecord znRecord = (ZNRecord) ZN_RECORD_SERIALIZER.deserialize(bytes);
    // The ZNRecordSerializer returns null when exception occurs in deserialization method
    if (znRecord == null) {
      ObjectNode jsonNode = OBJECT_MAPPER.createObjectNode();
      jsonNode.put(CONTENT_KEY, new String(bytes));
      return JSONRepresentation(jsonNode);
    }
    return JSONRepresentation(znRecord);
  } else {
    throw new WebApplicationException(Response.status(Response.Status.NOT_FOUND)
        .entity(String.format("The property store path %s doesn't exist", recordPath)).build());
  }
}
 
源代码27 项目: helix   文件: ServerContext.java
/**
 * Returns a lazily-instantiated ZkBaseDataAccessor for the byte array type.
 * @return
 */
public BaseDataAccessor<byte[]> getByteArrayZkBaseDataAccessor() {
  if (_byteArrayZkBaseDataAccessor == null) {
    synchronized (this) {
      if (_byteArrayZkBaseDataAccessor == null) {
        _byteArrayZkBaseDataAccessor =
            new ZkBaseDataAccessor<>(_zkAddr, new ByteArraySerializer());
      }
    }
  }
  return _byteArrayZkBaseDataAccessor;
}
 
源代码28 项目: helix   文件: ZKHelixManager.java
BaseDataAccessor<ZNRecord> createBaseDataAccessor() {
  ZkBaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<>(_zkclient);
  return baseDataAccessor;
}
 
源代码29 项目: helix   文件: ZKHelixDataAccessor.java
public ZKHelixDataAccessor(String clusterName, BaseDataAccessor<ZNRecord> baseDataAccessor) {
  this(clusterName, null, baseDataAccessor);
}
 
源代码30 项目: helix   文件: ZKHelixDataAccessor.java
@Override
public BaseDataAccessor<ZNRecord> getBaseDataAccessor() {
  return _baseDataAccessor;
}