下面列出了com.codahale.metrics.SlidingTimeWindowArrayReservoir#org.apache.helix.PropertyType 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Check and validate the input of the sourceDataTypeMap parameter
* @param sourceDataTypeMap
*/
private void validateSourceDataTypeMap(Map<PropertyType, List<String>> sourceDataTypeMap) {
for (PropertyType propertyType : sourceDataTypeMap.keySet()) {
if (propertyType.equals(PropertyType.CUSTOMIZEDVIEW)
&& sourceDataTypeMap.get(propertyType).size() == 0) {
logger.error("CustomizedView has been used without any aggregation type!");
throw new HelixException("CustomizedView has been used without any aggregation type!");
}
if (!propertyType.equals(PropertyType.CUSTOMIZEDVIEW)
&& sourceDataTypeMap.get(propertyType).size() != 0) {
logger.error("Type has been used in addition to the propertyType {} !",
propertyType.name());
throw new HelixException(
String.format("Type %s has been used in addition to the propertyType %s !",
sourceDataTypeMap.get(propertyType), propertyType.name()));
}
}
}
@Override
@PreFetch(enabled = false)
public void onCustomizedViewRootChange(List<String> customizedViewTypes,
NotificationContext changeContext) {
logger.info(
"Registering the CustomizedView listeners again due to the CustomizedView root change.");
List<String> userRequestedTypes =
_sourceDataTypeMap.getOrDefault(PropertyType.CUSTOMIZEDVIEW, Collections.emptyList());
for (String customizedStateType : userRequestedTypes) {
try {
_helixManager.addCustomizedViewChangeListener(this, customizedStateType);
} catch (Exception e) {
shutdown();
throw new HelixException(
String.format("Failed to attach CustomizedView Listener to HelixManager for type %s!",
customizedStateType),
e);
}
}
}
@Test(expectedExceptions = HelixException.class)
public void TestSiblingNodesActiveReplicaCheck_exception_whenExternalViewUnavailable() {
String resource = "resource";
Mock mock = new Mock();
doReturn(ImmutableList.of(resource)).when(mock.dataAccessor)
.getChildNames(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES)));
// set ideal state
IdealState idealState = mock(IdealState.class);
when(idealState.isEnabled()).thenReturn(true);
when(idealState.isValid()).thenReturn(true);
when(idealState.getStateModelDefRef()).thenReturn("MasterSlave");
doReturn(idealState).when(mock.dataAccessor).getProperty(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES)));
doReturn(null).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.EXTERNALVIEW)));
InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE);
}
public RoutingTableProviderMonitor(final PropertyType propertyType, String clusterName) {
_propertyType = propertyType;
_clusterName = clusterName == null ? DEFAULT : clusterName;
// Don't put instanceName into sensor name. This detail information is in the MBean name already.
_sensorName = String
.format("%s.%s.%s", MonitorDomainNames.RoutingTableProvider.name(), _clusterName,
_propertyType.name());
_dataRefreshLatencyGauge = new HistogramDynamicMetric("DataRefreshLatencyGauge", new Histogram(
new SlidingTimeWindowArrayReservoir(getResetIntervalInMs(), TimeUnit.MILLISECONDS)));
_callbackCounter = new SimpleDynamicMetric("CallbackCounter", 0l);
_eventQueueSizeGauge = new SimpleDynamicMetric("EventQueueSizeGauge", 0l);
_dataRefreshCounter = new SimpleDynamicMetric("DataRefreshCounter", 0l);
if (propertyType.equals(PropertyType.CURRENTSTATES)) {
_statePropLatencyGauge = new HistogramDynamicMetric("StatePropagationLatencyGauge",
new Histogram(
new SlidingTimeWindowArrayReservoir(getResetIntervalInMs(), TimeUnit.MILLISECONDS)));
}
}
@Override
public HelixProperty.Stat getPropertyStat(PropertyKey key) {
PropertyType type = key.getType();
String path = key.getPath();
int options = constructOptions(type);
try {
Stat stat = _baseDataAccessor.getStat(path, options);
if (stat != null) {
return new HelixProperty.Stat(stat.getVersion(), stat.getCtime(), stat.getMtime(), stat.getEphemeralOwner());
}
} catch (ZkNoNodeException e) {
}
return null;
}
@Test()
public void testUpdateOnNotification() throws Exception {
MockZkHelixDataAccessor accessor =
new MockZkHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
RoutingDataCache cache =
new RoutingDataCache("CLUSTER_" + TestHelper.getTestClassName(), PropertyType.EXTERNALVIEW);
cache.refresh(accessor);
Assert.assertEquals(accessor.getReadCount(PropertyType.EXTERNALVIEW), 1);
accessor.clearReadCounters();
// refresh again should read nothing
cache.refresh(accessor);
Assert.assertEquals(accessor.getReadCount(PropertyType.EXTERNALVIEW), 0);
accessor.clearReadCounters();
// refresh again should read nothing as ideal state is same
cache.notifyDataChange(HelixConstants.ChangeType.EXTERNAL_VIEW);
cache.refresh(accessor);
Assert.assertEquals(accessor.getReadCount(PropertyType.EXTERNALVIEW), 0);
}
@Test
public void TestHasErrorPartitions_true() {
String sessionId = "sessionId";
String resource = "db";
Mock mock = new Mock();
LiveInstance liveInstance = new LiveInstance(TEST_INSTANCE);
liveInstance.setSessionId(sessionId);
doReturn(liveInstance).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.LIVEINSTANCES)));
doReturn(ImmutableList.of(resource)).when(mock.dataAccessor)
.getChildNames(argThat(new PropertyKeyArgument(PropertyType.CURRENTSTATES)));
CurrentState currentState = mock(CurrentState.class);
when(currentState.getPartitionStateMap()).thenReturn(ImmutableMap.of("db0", "ERROR"));
doReturn(currentState).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.CURRENTSTATES)));
Assert.assertTrue(
InstanceValidationUtil.hasErrorPartitions(mock.dataAccessor, TEST_CLUSTER, TEST_INSTANCE));
}
private void updateHistory(HelixManager manager) {
HelixDataAccessor accessor = manager.getHelixDataAccessor();
Builder keyBuilder = accessor.keyBuilder();
final String clusterName = manager.getClusterName();
final String instanceName = manager.getInstanceName();
final String version = manager.getVersion();
// Record a MaintenanceSignal history
if (!accessor.getBaseDataAccessor().update(keyBuilder.controllerLeaderHistory().getPath(),
oldRecord -> {
if (oldRecord == null) {
oldRecord = new ZNRecord(PropertyType.HISTORY.toString());
}
return new ControllerHistory(oldRecord).updateHistory(clusterName, instanceName,
version);
}, AccessOption.PERSISTENT)) {
LOG.error("Failed to persist leader history to ZK!");
}
}
StringRepresentation getInstanceErrorsRepresentation(String clusterName, String instanceName)
throws JsonGenerationException, JsonMappingException, IOException {
ZkClient zkClient = (ZkClient) getContext().getAttributes().get(RestAdminApplication.ZKCLIENT);
;
String instanceSessionId =
ClusterRepresentationUtil.getInstanceSessionId(zkClient, clusterName, instanceName);
String message =
ClusterRepresentationUtil
.getInstancePropertyNameListAsString(zkClient, clusterName, instanceName,
PropertyType.CURRENTSTATES, instanceSessionId, MediaType.APPLICATION_JSON);
StringRepresentation representation =
new StringRepresentation(message, MediaType.APPLICATION_JSON);
return representation;
}
public void testCurrentStateMetrics() throws JMException, InterruptedException {
PropertyType type = PropertyType.CURRENTSTATES;
RoutingTableProviderMonitor monitor = new RoutingTableProviderMonitor(type, TEST_CLUSTER);
monitor.register();
ObjectName name = buildObjectName(type, TEST_CLUSTER);
monitor.increaseCallbackCounters(10);
Assert.assertEquals((long) _beanServer.getAttribute(name, "StatePropagationLatencyGauge.Max"), 0);
monitor.recordStatePropagationLatency(5);
long statelatency = (long) _beanServer.getAttribute(name, "StatePropagationLatencyGauge.Max");
Assert.assertEquals(statelatency, 5);
monitor.recordStatePropagationLatency(10);
statelatency = (long) _beanServer.getAttribute(name, "StatePropagationLatencyGauge.Max");
Assert.assertEquals(statelatency, 10);
monitor.unregister();
}
@Test
public void TestHasResourceAssigned_success() {
String sessionId = "sessionId";
String resource = "db";
Mock mock = new Mock();
LiveInstance liveInstance = new LiveInstance(TEST_INSTANCE);
liveInstance.setSessionId(sessionId);
doReturn(liveInstance).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.LIVEINSTANCES)));
doReturn(ImmutableList.of(resource)).when(mock.dataAccessor)
.getChildNames(argThat(new PropertyKeyArgument(PropertyType.CURRENTSTATES)));
CurrentState currentState = mock(CurrentState.class);
when(currentState.getPartitionStateMap()).thenReturn(ImmutableMap.of("db0", "master"));
doReturn(currentState).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.CURRENTSTATES)));
Assert.assertTrue(
InstanceValidationUtil.hasResourceAssigned(mock.dataAccessor, TEST_CLUSTER, TEST_INSTANCE));
}
@Test
public void TestSiblingNodesActiveReplicaCheck_whenNoMinActiveReplica() {
String resource = "resource";
Mock mock = new Mock();
doReturn(ImmutableList.of(resource)).when(mock.dataAccessor)
.getChildNames(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES)));
// set ideal state
IdealState idealState = mock(IdealState.class);
when(idealState.isEnabled()).thenReturn(true);
when(idealState.isValid()).thenReturn(true);
when(idealState.getStateModelDefRef()).thenReturn("MasterSlave");
doReturn(idealState).when(mock.dataAccessor).getProperty(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES)));
//set externalView
ExternalView externalView = mock(ExternalView.class);
// the resource sibling check will be skipped by design
when(externalView.getMinActiveReplicas()).thenReturn(-1);
doReturn(externalView).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.EXTERNALVIEW)));
boolean result = InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE);
Assert.assertTrue(result);
}
@Override
public boolean execute()
throws Exception {
if (!_validateTableConfig && !_validateSchema) {
throw new RuntimeException("Need to specify at least one of -schema and -tableConfig");
}
LOGGER.info("Connecting to Zookeeper: {}, cluster: ", _zkAddress, _clusterName);
ZNRecordSerializer serializer = new ZNRecordSerializer();
String path = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, _clusterName);
_helixPropertyStore = new ZkHelixPropertyStore<>(_zkAddress, serializer, path);
LOGGER.info("\n\n-------------------- Starting Validation --------------------");
if (_validateTableConfig) {
validateTableConfig();
}
if (_validateSchema) {
validateSchema();
}
return true;
}
@Test
public void TestHasErrorPartitions_false() {
String sessionId = "sessionId";
String resource = "db";
Mock mock = new Mock();
LiveInstance liveInstance = new LiveInstance(TEST_INSTANCE);
liveInstance.setSessionId(sessionId);
doReturn(liveInstance).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.LIVEINSTANCES)));
doReturn(ImmutableList.of(resource)).when(mock.dataAccessor)
.getChildNames(argThat(new PropertyKeyArgument(PropertyType.CURRENTSTATES)));
CurrentState currentState = mock(CurrentState.class);
when(currentState.getPartitionStateMap()).thenReturn(ImmutableMap.of("db0", "Master"));
doReturn(currentState).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.CURRENTSTATES)));
Assert.assertFalse(
InstanceValidationUtil.hasErrorPartitions(mock.dataAccessor, TEST_CLUSTER, TEST_INSTANCE));
}
@Test(dependsOnMethods = "testCustomizedViewWithoutType")
public void testCustomizedViewCorrectConstructor() throws Exception {
Map<PropertyType, List<String>> sourceDataTypes = new HashMap<>();
sourceDataTypes.put(PropertyType.CUSTOMIZEDVIEW, Arrays.asList("typeA"));
MockRoutingTableProvider routingTableProvider =
new MockRoutingTableProvider(_spectator, sourceDataTypes);
CustomizedView customizedView = new CustomizedView(TEST_DB);
customizedView.setState("p1", "h1", "testState");
// Clear the flag before writing to the Customized View Path
customizedViewChangeCalled.getAndSet(false);
String customizedViewPath = PropertyPathBuilder.customizedView(CLUSTER_NAME, "typeA", TEST_DB);
_spectator.getHelixDataAccessor().getBaseDataAccessor().set(customizedViewPath,
customizedView.getRecord(), AccessOption.PERSISTENT);
boolean onCustomizedViewChangeCalled =
TestHelper.verify(() -> customizedViewChangeCalled.get(), WAIT_DURATION);
Assert.assertTrue(onCustomizedViewChangeCalled);
_spectator.getHelixDataAccessor().getBaseDataAccessor().remove(customizedViewPath,
AccessOption.PERSISTENT);
routingTableProvider.shutdown();
}
StringRepresentation getInstanceErrorsRepresentation(String clusterName, String instanceName)
throws JsonGenerationException, JsonMappingException, IOException {
ZkClient zkClient = (ZkClient) getContext().getAttributes().get(RestAdminApplication.ZKCLIENT);
String instanceSessionId =
ClusterRepresentationUtil.getInstanceSessionId(zkClient, clusterName, instanceName);
String message =
ClusterRepresentationUtil
.getInstancePropertyNameListAsString(zkClient, clusterName, instanceName,
PropertyType.CURRENTSTATES, instanceSessionId, MediaType.APPLICATION_JSON);
StringRepresentation representation =
new StringRepresentation(message, MediaType.APPLICATION_JSON);
return representation;
}
@Test
public void TestIsInstanceStable_true() {
String resource = "db";
Mock mock = new Mock();
ClusterConfig clusterConfig = new ClusterConfig(TEST_CLUSTER);
clusterConfig.setPersistIntermediateAssignment(true);
doReturn(clusterConfig).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.CONFIGS)));
doReturn(ImmutableList.of(resource)).when(mock.dataAccessor)
.getChildNames(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES)));
IdealState idealState = mock(IdealState.class);
when(idealState.isEnabled()).thenReturn(Boolean.TRUE);
when(idealState.getPartitionSet()).thenReturn(ImmutableSet.of("db0"));
when(idealState.getInstanceStateMap("db0"))
.thenReturn(ImmutableMap.of(TEST_INSTANCE, "Master"));
idealState.setInstanceStateMap("db0", ImmutableMap.of(TEST_INSTANCE, "Master"));
doReturn(idealState).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES)));
ExternalView externalView = new ExternalView(resource);
externalView.setStateMap("db0", ImmutableMap.of(TEST_INSTANCE, "Master"));
doReturn(externalView).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.EXTERNALVIEW)));
boolean result = InstanceValidationUtil.isInstanceStable(mock.dataAccessor, TEST_INSTANCE);
Assert.assertTrue(result);
}
public static ZkHelixPropertyStore<ZNRecord> getZkPropertyStore(HelixManager helixManager,
String clusterName) {
ZkBaseDataAccessor<ZNRecord> baseAccessor =
(ZkBaseDataAccessor<ZNRecord>) helixManager.getHelixDataAccessor().getBaseDataAccessor();
String propertyStorePath = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, clusterName);
ZkHelixPropertyStore<ZNRecord> propertyStore = new ZkHelixPropertyStore<ZNRecord>(baseAccessor,
propertyStorePath, Arrays.asList(propertyStorePath));
return propertyStore;
}
public RoutingTable(Map<String, Map<String, Map<String, CurrentState>>> currentStateMap,
Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances) {
// TODO Aggregate currentState to an ExternalView in the RoutingTable, so there is no need to
// refresh according to the currentStateMap. - jjwang
this(Collections.<ExternalView> emptyList(),
instanceConfigs, liveInstances, PropertyType.CURRENTSTATES);
refresh(currentStateMap);
}
private void addCount(PropertyKey key, int count) {
PropertyType type = key.getType();
if (!_readPathCounters.containsKey(type)) {
_readPathCounters.put(type, 0);
}
_readPathCounters.put(type, _readPathCounters.get(type) + count);
}
protected RoutingTable(Collection<ExternalView> externalViews, Collection<InstanceConfig> instanceConfigs,
Collection<LiveInstance> liveInstances, PropertyType propertytype) {
// TODO Refactor these constructors so we don't have so many constructor.
_propertyType = propertytype;
_resourceInfoMap = new HashMap<>();
_resourceGroupInfoMap = new HashMap<>();
_liveInstances = new HashSet<>(liveInstances);
_instanceConfigs = new HashSet<>(instanceConfigs);
_externalViews = new HashSet<>(externalViews);
refresh(_externalViews);
}
/**
* Returns a Collection of latest snapshot of CustomizedView. Note that if the RoutingTable is
* instantiated using CurrentStates, this Collection will be empty.
* @return
*/
public Collection<CustomizedView> getCustomizeViews() {
if (_propertyType.equals(PropertyType.CUSTOMIZEDVIEW)){
CustomizedViewRoutingTable customizedViewRoutingTable =
(CustomizedViewRoutingTable) _routingTable;
return customizedViewRoutingTable.geCustomizedViews();
}
return Collections.emptySet();
}
@Test
public void TestSiblingNodesActiveReplicaCheck_fail() {
String resource = "resource";
Mock mock = new Mock();
doReturn(ImmutableList.of(resource)).when(mock.dataAccessor)
.getChildNames(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES)));
// set ideal state
IdealState idealState = mock(IdealState.class);
when(idealState.isEnabled()).thenReturn(true);
when(idealState.isValid()).thenReturn(true);
when(idealState.getStateModelDefRef()).thenReturn("MasterSlave");
doReturn(idealState).when(mock.dataAccessor).getProperty(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES)));
ExternalView externalView = mock(ExternalView.class);
when(externalView.getMinActiveReplicas()).thenReturn(3);
when(externalView.getStateModelDefRef()).thenReturn("MasterSlave");
when(externalView.getPartitionSet()).thenReturn(ImmutableSet.of("db0"));
when(externalView.getStateMap("db0")).thenReturn(
ImmutableMap.of(TEST_INSTANCE, "Master", "instance1", "Slave", "instance2", "Slave"));
doReturn(externalView).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.EXTERNALVIEW)));
StateModelDefinition stateModelDefinition = mock(StateModelDefinition.class);
when(stateModelDefinition.getInitialState()).thenReturn("OFFLINE");
doReturn(stateModelDefinition).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.STATEMODELDEFS)));
boolean result =
InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE);
Assert.assertFalse(result);
}
/**
* Initialize empty RoutingDataCache with clusterName, _propertyTypes.
* @param clusterName
* @param sourceDataTypeMap
*/
public RoutingDataCache(String clusterName, Map<PropertyType, List<String>> sourceDataTypeMap) {
super(clusterName);
_sourceDataTypeMap = sourceDataTypeMap;
_currentStateCache = new CurrentStateCache(clusterName);
_customizedViewCaches = new HashMap<>();
sourceDataTypeMap.getOrDefault(PropertyType.CUSTOMIZEDVIEW, Collections.emptyList())
.forEach(customizedStateType -> _customizedViewCaches.put(customizedStateType,
new CustomizedViewCache(clusterName, customizedStateType)));
_targetExternalViewCache = new TargetExternalViewCache(clusterName);
requireFullRefresh();
}
protected Set<JobConfig.Builder> createJobs(String cluster, String workflowName, int numJobs) {
HelixPropertyStore<ZNRecord> propertyStore =
new ZkHelixPropertyStore<>((ZkBaseDataAccessor<ZNRecord>) _baseAccessor,
PropertyPathBuilder.propertyStore(cluster), null);
Set<JobConfig.Builder> jobCfgs = new HashSet<>();
for (int i = 0; i < numJobs; i++) {
JobConfig.Builder job =
new JobConfig.Builder().setCommand("DummyCommand").setTargetResource("RESOURCE")
.setWorkflow(workflowName);
jobCfgs.add(job);
JobContext jobContext = TaskTestUtil
.buildJobContext(System.currentTimeMillis(), System.currentTimeMillis() + 1,
TaskPartitionState.COMPLETED);
_baseAccessor.set(String.format("/%s/%s%s/%s/%s", cluster, PropertyType.PROPERTYSTORE.name(),
TaskConstants.REBALANCER_CONTEXT_ROOT, workflowName + "_" + JOB_PREFIX + i,
TaskConstants.CONTEXT_NODE), jobContext.getRecord(), AccessOption.PERSISTENT);
_configAccessor.setResourceConfig(cluster, workflowName + "_" + JOB_PREFIX + i, job.build());
// add job content stores
ZNRecord contentStore = new ZNRecord(TaskUtil.USER_CONTENT_NODE);
contentStore.setMapField(TaskUtil
.getNamespacedTaskName(TaskUtil.getNamespacedJobName(workflowName, JOB_PREFIX + i), "0"),
Collections.<String, String>emptyMap());
propertyStore.create(Joiner.on("/")
.join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowName + "_" + JOB_PREFIX + i,
TaskUtil.USER_CONTENT_NODE), contentStore, AccessOption.PERSISTENT);
}
return jobCfgs;
}
@Override
@PreFetch(enabled = false)
public void onExternalViewChange(List<ExternalView> externalViewList,
NotificationContext changeContext) {
HelixConstants.ChangeType changeType = changeContext.getChangeType();
if (changeType != null && !_sourceDataTypeMap.containsKey(changeType.getPropertyType())) {
logger.warn(
"onExternalViewChange called with mismatched change types. Source data types does not contain changed data type: {}",
changeType);
return;
}
// Refresh with full list of external view.
if (externalViewList != null && externalViewList.size() > 0) {
// keep this here for back-compatibility, application can call onExternalViewChange directly
// with externalview list supplied.
String keyReference = generateReferenceKey(PropertyType.EXTERNALVIEW.name(), DEFAULT_STATE_TYPE);
HelixDataAccessor accessor = changeContext.getManager().getHelixDataAccessor();
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
List<InstanceConfig> configList = accessor.getChildValues(keyBuilder.instanceConfigs(), true);
List<LiveInstance> liveInstances = accessor.getChildValues(keyBuilder.liveInstances(), true);
refreshExternalView(externalViewList, configList, liveInstances, keyReference);
} else {
ClusterEventType eventType;
if (_sourceDataTypeMap.containsKey(PropertyType.EXTERNALVIEW)) {
eventType = ClusterEventType.ExternalViewChange;
} else if (_sourceDataTypeMap.containsKey(PropertyType.TARGETEXTERNALVIEW)) {
eventType = ClusterEventType.TargetExternalViewChange;
} else {
logger.warn(
"onExternalViewChange called with mismatched change types. Source data types does not contain changed data type: {}",
changeType);
return;
}
_routerUpdater.queueEvent(changeContext, eventType, changeType);
}
}
@Override
@PreFetch(enabled = true)
public void onLiveInstanceChange(List<LiveInstance> liveInstances,
NotificationContext changeContext) {
if (_sourceDataTypeMap.containsKey(PropertyType.CURRENTSTATES)) {
// Go though the live instance list and update CurrentState listeners
updateCurrentStatesListeners(liveInstances, changeContext);
}
_routerUpdater.queueEvent(changeContext, ClusterEventType.LiveInstanceChange,
HelixConstants.ChangeType.LIVE_INSTANCE);
}
private void init() {
LOGGER.info("Trying to connect to " + _zkAddress + " cluster " + _clusterName);
_helixAdmin = new ZKHelixAdmin(_zkAddress);
ZNRecordSerializer serializer = new ZNRecordSerializer();
String path = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, _clusterName);
_propertyStore = new ZkHelixPropertyStore<>(_zkAddress, serializer, path);
}
private void reset() {
logger.info("Resetting the routing table.");
RoutingTable newRoutingTable;
for (String key: _routingTableRefMap.keySet()) {
PropertyType propertyType = _routingTableRefMap.get(key).get().getPropertyType();
if (propertyType == PropertyType.CUSTOMIZEDVIEW) {
String stateType = _routingTableRefMap.get(key).get().getStateType();
newRoutingTable = new CustomizedViewRoutingTable(propertyType, stateType);
} else {
newRoutingTable = new RoutingTable(propertyType);
}
_routingTableRefMap.get(key).set(newRoutingTable);
}
}
protected void refreshExternalView(Collection<ExternalView> externalViews,
Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances,
String referenceKey) {
long startTime = System.currentTimeMillis();
PropertyType propertyType = _routingTableRefMap.get(referenceKey).get().getPropertyType();
RoutingTable newRoutingTable =
new RoutingTable(externalViews, instanceConfigs, liveInstances, propertyType);
resetRoutingTableAndNotify(startTime, newRoutingTable, referenceKey);
}