下面列出了com.codahale.metrics.SlidingTimeWindowArrayReservoir#org.apache.helix.InstanceType 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Instantiate a MockHelixManager.
* @param instanceName the name of the instance associated with this manager.
* @param instanceType the {@link InstanceType} of the requester.
* @param zkAddr the address identifying the zk service to which this request is to be made.
* @param helixCluster the {@link MockHelixCluster} associated with this manager.
* @param znRecordMap a map that contains ZNode path and its {@link ZNRecord} associated with HelixPropertyStore in this manager.
* @param beBadException the {@link Exception} that this manager will throw on listener registrations.
*/
MockHelixManager(String instanceName, InstanceType instanceType, String zkAddr, MockHelixCluster helixCluster,
Map<String, ZNRecord> znRecordMap, Exception beBadException) {
this.instanceName = instanceName;
this.instanceType = instanceType;
mockAdmin = helixCluster.getHelixAdminFactory().getHelixAdmin(zkAddr);
mockAdmin.addHelixManager(this);
clusterName = helixCluster.getClusterName();
dataAccessor = new MockHelixDataAccessor(clusterName, mockAdmin);
this.beBadException = beBadException;
this.znRecordMap = znRecordMap;
Properties storeProps = new Properties();
storeProps.setProperty("helix.property.store.root.path",
"/" + clusterName + "/" + ClusterMapUtils.PROPERTYSTORE_STR);
HelixPropertyStoreConfig propertyStoreConfig = new HelixPropertyStoreConfig(new VerifiableProperties(storeProps));
helixPropertyStore =
(ZkHelixPropertyStore<ZNRecord>) CommonUtils.createHelixPropertyStore(zkAddr, propertyStoreConfig,
Collections.singletonList(propertyStoreConfig.rootPath));
if (znRecordMap != null) {
for (Map.Entry<String, ZNRecord> znodePathAndRecord : znRecordMap.entrySet()) {
helixPropertyStore.set(znodePathAndRecord.getKey(), znodePathAndRecord.getValue(), AccessOption.PERSISTENT);
}
}
}
@Test()
public void testParticipant() throws Exception {
String className = getShortClassName();
LOG.info("RUN " + className + " at " + new Date(System.currentTimeMillis()));
final String clusterName = CLUSTER_PREFIX + "_" + className + "_" + "testParticipant";
TestHelper.setupEmptyCluster(_gZkClient, clusterName);
final String controllerName = "participant_0";
HelixManager manager =
new MockZKHelixManager(clusterName, controllerName, InstanceType.PARTICIPANT, _gZkClient);
GenericHelixController participant0 = new GenericHelixController();
List<HelixTimerTask> timerTasks = Collections.emptyList();
try {
DistributedLeaderElection election =
new DistributedLeaderElection(manager, participant0, timerTasks);
Assert.fail(
"Should not be able construct DistributedLeaderElection object using participant manager.");
} catch (HelixException ex) {
// expected
}
TestHelper.dropCluster(clusterName, _gZkClient);
}
/**
* Helper method to start a fake server that only implements Helix part.
*
* @throws Exception
*/
private void startFakeServer()
throws Exception {
_serverInstance = CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE + NetUtil.getHostAddress() + "_"
+ CommonConstants.Helix.DEFAULT_SERVER_NETTY_PORT;
// Create server instance with the fake server state model
_serverHelixManager = HelixManagerFactory
.getZKHelixManager(getHelixClusterName(), _serverInstance, InstanceType.PARTICIPANT, ZkStarter.DEFAULT_ZK_STR);
_serverHelixManager.getStateMachineEngine()
.registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(),
new FakeServerSegmentStateModelFactory());
_serverHelixManager.connect();
// Add Helix tag to the server
_serverHelixManager.getClusterManagmentTool().addInstanceTag(getHelixClusterName(), _serverInstance,
TableNameBuilder.REALTIME.tableNameWithType(TagNameUtils.DEFAULT_TENANT_NAME));
// Initialize controller leader locator
ControllerLeaderLocator.create(_serverHelixManager);
}
public static void main(String[] args) throws Exception {
CommandLine cmd = processCommandLineArgs(args);
String zkAddress = cmd.getOptionValue(zkAddr);
String clusterName = cmd.getOptionValue(cluster);
String instance = cmd.getOptionValue(instanceName);
String stateModelName = cmd.getOptionValue(stateModel);
HelixManager manager =
new ZKHelixManager(clusterName, instance, InstanceType.PARTICIPANT, zkAddress);
StateMachineEngine stateMach = manager.getStateMachineEngine();
stateMach.registerStateModelFactory(stateModelName, new AgentStateModelFactory());
Runtime.getRuntime().addShutdownHook(new HelixAgentShutdownHook(manager));
try {
manager.connect();
Thread.currentThread().join();
} catch (Exception e) {
LOG.error(e.toString());
} finally {
if (manager != null && manager.isConnected()) {
manager.disconnect();
}
}
}
public int reloadAllSegments(String tableNameWithType) {
LOGGER.info("Sending reload message for table: {}", tableNameWithType);
Criteria recipientCriteria = new Criteria();
recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
recipientCriteria.setInstanceName("%");
recipientCriteria.setResource(tableNameWithType);
recipientCriteria.setSessionSpecific(true);
SegmentReloadMessage segmentReloadMessage = new SegmentReloadMessage(tableNameWithType, null);
ClusterMessagingService messagingService = _helixZkManager.getMessagingService();
// Infinite timeout on the recipient
int timeoutMs = -1;
int numMessagesSent = messagingService.send(recipientCriteria, segmentReloadMessage, null, timeoutMs);
if (numMessagesSent > 0) {
LOGGER.info("Sent {} reload messages for table: {}", numMessagesSent, tableNameWithType);
} else {
LOGGER.warn("No reload message sent for table: {}", tableNameWithType);
}
return numMessagesSent;
}
public HelixManager start() throws Exception {
HelixManager manager = null;
// zk cluster manager
if (_clusterMangerType.equalsIgnoreCase("zk")) {
manager =
HelixManagerFactory.getZKHelixManager(_clusterName, _instanceName,
InstanceType.PARTICIPANT, _zkConnectString);
} else {
throw new IllegalArgumentException("Unsupported cluster manager type:" + _clusterMangerType);
}
MockOnlineOfflineStateModelFactory stateModelFactory2 =
new MockOnlineOfflineStateModelFactory(_transDelayInMs, _resourceName, _resourceTag,
_instanceName);
// genericStateMachineHandler = new StateMachineEngine();
StateMachineEngine stateMach = manager.getStateMachineEngine();
stateMach.registerStateModelFactory("OnlineOffline", stateModelFactory2);
manager.connect();
//manager.getMessagingService().registerMessageHandlerFactory(MessageType.STATE_TRANSITION.name(), genericStateMachineHandler);
return manager;
}
@BeforeClass
public void beforeClass() throws Exception {
// Logger.getRootLogger().setLevel(Level.INFO);
System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
// setup storage cluster
_gSetupTool.addCluster(CLUSTER_NAME, true);
String storageNodeName = PARTICIPANT_PREFIX + "_" + START_PORT;
_gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
_participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, storageNodeName);
_participant.syncStart();
// create cluster manager
_manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin",
InstanceType.ADMINISTRATOR, ZK_ADDR);
_manager.connect();
}
@Test
public void testSendTokenFileUpdatedMessage() throws Exception {
Logger log = LoggerFactory.getLogger("testSendTokenFileUpdatedMessage");
this._yarnAppYarnAppSecurityManagerWithKeytabs.sendTokenFileUpdatedMessage(InstanceType.CONTROLLER);
Assert.assertEquals(this.curatorFramework.checkExists().forPath(
String.format("/%s/CONTROLLER/MESSAGES", YarnSecurityManagerTest.class.getSimpleName())).getVersion(), 0);
AssertWithBackoff.create().logger(log).timeoutMs(20000)
.assertEquals(new GetHelixMessageNumFunc(YarnSecurityManagerTest.class.getSimpleName(), InstanceType.CONTROLLER, "",
this.curatorFramework), 1, "1 controller message queued");
this._yarnAppYarnAppSecurityManagerWithKeytabs.sendTokenFileUpdatedMessage(InstanceType.PARTICIPANT, HELIX_TEST_INSTANCE_PARTICIPANT);
Assert.assertEquals(this.curatorFramework.checkExists().forPath(
String.format("/%s/INSTANCES/%s/MESSAGES", YarnSecurityManagerTest.class.getSimpleName(), HELIX_TEST_INSTANCE_PARTICIPANT)).getVersion(), 0);
AssertWithBackoff.create().logger(log).timeoutMs(20000)
.assertEquals(new GetHelixMessageNumFunc(YarnSecurityManagerTest.class.getSimpleName(), InstanceType.PARTICIPANT, HELIX_TEST_INSTANCE_PARTICIPANT,
this.curatorFramework), 1, "1 controller message queued");
}
protected void addFakeBrokerInstanceToAutoJoinHelixCluster(String instanceId, boolean isSingleTenant)
throws Exception {
HelixManager helixManager = HelixManagerFactory
.getZKHelixManager(getHelixClusterName(), instanceId, InstanceType.PARTICIPANT, ZkStarter.DEFAULT_ZK_STR);
helixManager.getStateMachineEngine()
.registerStateModelFactory(FakeBrokerResourceOnlineOfflineStateModelFactory.STATE_MODEL_DEF,
FakeBrokerResourceOnlineOfflineStateModelFactory.FACTORY_INSTANCE);
helixManager.connect();
HelixAdmin helixAdmin = helixManager.getClusterManagmentTool();
if (isSingleTenant) {
helixAdmin.addInstanceTag(getHelixClusterName(), instanceId, TagNameUtils.getBrokerTagForTenant(null));
} else {
helixAdmin.addInstanceTag(getHelixClusterName(), instanceId, UNTAGGED_BROKER_INSTANCE);
}
_fakeInstanceHelixManagers.add(helixManager);
}
@Test
public void testConnectHelixManagerWithRetry() {
HelixManager instanceManager = HelixManagerFactory.getZKHelixManager(
clusterName, corruptHelixInstance, InstanceType.PARTICIPANT, testingZKServer.getConnectString());
ClusterIntegrationTestUtils.createPartialInstanceStructure(instanceManager, testingZKServer.getConnectString());
//Ensure that the connecting to Helix without retry will throw a HelixException
try {
corruptGobblinTaskRunner.connectHelixManager();
Assert.fail("Unexpected success in connecting to HelixManager");
} catch (Exception e) {
//Assert that a HelixException is thrown.
Assert.assertTrue(e.getClass().equals(HelixException.class));
}
//Ensure that connect with retry succeeds
corruptGobblinTaskRunner.connectHelixManagerWithRetry();
Assert.assertTrue(true);
}
public HelixCallbackMonitor(InstanceType type, String clusterName, String instanceName,
HelixConstants.ChangeType changeType) throws JMException {
_changeType = changeType;
_type = type;
_clusterName = clusterName;
_instanceName = instanceName;
// Don't put instanceName into sensor name. This detail information is in the MBean name already.
_sensorName = String
.format("%s.%s.%s.%s", MonitorDomainNames.HelixCallback.name(), type.name(), clusterName,
changeType.name());
_latencyGauge = new HistogramDynamicMetric("LatencyGauge", new Histogram(
new SlidingTimeWindowArrayReservoir(getResetIntervalInMs(), TimeUnit.MILLISECONDS)));
_totalLatencyCounter = new SimpleDynamicMetric("LatencyCounter", 0l);
_unbatchedCounter = new SimpleDynamicMetric("UnbatchedCounter", 0l);
_counter = new SimpleDynamicMetric("Counter", 0l);
}
public void start() throws Exception {
manager =
HelixManagerFactory.getZKHelixManager(clusterName, instanceName, InstanceType.PARTICIPANT,
zkConnectString);
stateModelFactory = new BootstrapHandler();
// genericStateMachineHandler = new StateMachineEngine();
// genericStateMachineHandler.registerStateModelFactory("MasterSlave", stateModelFactory);
StateMachineEngine stateMach = manager.getStateMachineEngine();
stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
manager.getMessagingService().registerMessageHandlerFactory(
MessageType.STATE_TRANSITION.name(), stateMach);
manager.getMessagingService().registerMessageHandlerFactory(
MessageType.USER_DEFINE_MSG.name(), new CustomMessageHandlerFactory());
manager.connect();
}
@Test()
public void testControllerLeaderHistory() throws Exception {
HelixManager manager = HelixManagerFactory
.getZKHelixManager(CLUSTER_NAME, "admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
manager.connect();
PropertyKey.Builder keyBuilder = new PropertyKey.Builder(CLUSTER_NAME);
PropertyKey propertyKey = keyBuilder.controllerLeaderHistory();
ControllerHistory controllerHistory = manager.getHelixDataAccessor().getProperty(propertyKey);
Assert.assertNotNull(controllerHistory);
List<String> list = controllerHistory.getRecord().getListField("HISTORY");
Assert.assertEquals(list.size(), 1);
for (int i = 0; i <= 12; i++) {
_controller.syncStop();
_controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, "Controller-" + i);
_controller.syncStart();
}
controllerHistory = manager.getHelixDataAccessor().getProperty(propertyKey);
Assert.assertNotNull(controllerHistory);
list = controllerHistory.getRecord().getListField("HISTORY");
Assert.assertEquals(list.size(), 10);
manager.disconnect();
}
/**
* Instantiate a HelixParticipant.
* @param clusterMapConfig the {@link ClusterMapConfig} associated with this participant.
* @param helixFactory the {@link HelixFactory} to use to get the {@link HelixManager}.
* @param metricRegistry the {@link MetricRegistry} to instantiate {@link HelixParticipantMetrics}.
* @param zkConnectStr the address identifying the zk service which this participant interacts with.
* @param isSoleParticipant whether this is the sole participant on current node.
*/
public HelixParticipant(ClusterMapConfig clusterMapConfig, HelixFactory helixFactory, MetricRegistry metricRegistry,
String zkConnectStr, boolean isSoleParticipant) {
this.clusterMapConfig = clusterMapConfig;
this.zkConnectStr = zkConnectStr;
participantMetrics =
new HelixParticipantMetrics(metricRegistry, isSoleParticipant ? null : zkConnectStr, localPartitionAndState);
clusterName = clusterMapConfig.clusterMapClusterName;
instanceName = getInstanceName(clusterMapConfig.clusterMapHostName, clusterMapConfig.clusterMapPort);
if (clusterName.isEmpty()) {
throw new IllegalStateException("Cluster name is empty in clusterMapConfig");
}
// HelixAdmin is initialized in constructor allowing caller to do any administrative operations in Helix
// before participating.
helixAdmin = helixFactory.getHelixAdmin(this.zkConnectStr);
manager = helixFactory.getZKHelixManager(clusterName, instanceName, InstanceType.PARTICIPANT, this.zkConnectStr);
replicaSyncUpManager = new AmbryReplicaSyncUpManager(clusterMapConfig);
partitionStateChangeListeners = new HashMap<>();
}
protected void addFakeServerInstanceToAutoJoinHelixCluster(String instanceId, boolean isSingleTenant, int adminPort)
throws Exception {
HelixManager helixManager = HelixManagerFactory
.getZKHelixManager(getHelixClusterName(), instanceId, InstanceType.PARTICIPANT, ZkStarter.DEFAULT_ZK_STR);
helixManager.getStateMachineEngine()
.registerStateModelFactory(FakeSegmentOnlineOfflineStateModelFactory.STATE_MODEL_DEF,
FakeSegmentOnlineOfflineStateModelFactory.FACTORY_INSTANCE);
helixManager.connect();
HelixAdmin helixAdmin = helixManager.getClusterManagmentTool();
if (isSingleTenant) {
helixAdmin.addInstanceTag(getHelixClusterName(), instanceId, TagNameUtils.getOfflineTagForTenant(null));
helixAdmin.addInstanceTag(getHelixClusterName(), instanceId, TagNameUtils.getRealtimeTagForTenant(null));
} else {
helixAdmin.addInstanceTag(getHelixClusterName(), instanceId, UNTAGGED_SERVER_INSTANCE);
}
HelixConfigScope configScope =
new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT, getHelixClusterName())
.forParticipant(instanceId).build();
helixAdmin.setConfig(configScope, Collections.singletonMap(ADMIN_PORT_KEY, Integer.toString(adminPort)));
_fakeInstanceHelixManagers.add(helixManager);
}
private void sendTableConfigRefreshMessage(String tableNameWithType) {
TableConfigRefreshMessage tableConfigRefreshMessage = new TableConfigRefreshMessage(tableNameWithType);
// Send table config refresh message to brokers
Criteria recipientCriteria = new Criteria();
recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
recipientCriteria.setInstanceName("%");
recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
recipientCriteria.setSessionSpecific(true);
recipientCriteria.setPartition(tableNameWithType);
// Send message with no callback and infinite timeout on the recipient
int numMessagesSent =
_helixZkManager.getMessagingService().send(recipientCriteria, tableConfigRefreshMessage, null, -1);
if (numMessagesSent > 0) {
// TODO: Would be nice if we can get the name of the instances to which messages were sent
LOGGER.info("Sent {} table config refresh messages to brokers for table: {}", numMessagesSent, tableNameWithType);
} else {
LOGGER.warn("No table config refresh message sent to brokers for table: {}", tableNameWithType);
}
}
@Override
public void removeInstanceTag(String clusterName, String instanceName, String tag) {
logger.info("Remove instance tag {} for instance {} in cluster {}.", tag, instanceName,
clusterName);
if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) {
throw new HelixException("cluster " + clusterName + " is not setup yet");
}
if (!ZKUtil.isInstanceSetup(_zkClient, clusterName, instanceName, InstanceType.PARTICIPANT)) {
throw new HelixException(
"cluster " + clusterName + " instance " + instanceName + " is not setup yet");
}
ZKHelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instanceName));
config.removeTag(tag);
accessor.setProperty(keyBuilder.instanceConfig(instanceName), config);
}
@Override
public void onBecomeLeaderFromStandby(Message message, NotificationContext context)
throws Exception {
String clusterName = message.getPartitionName();
String controllerName = message.getTgtName();
logger.info(controllerName + " becoming leader from standby for " + clusterName);
if (_controller == null) {
_controller =
HelixManagerFactory.getZKHelixManager(clusterName, controllerName,
InstanceType.CONTROLLER, _zkAddr);
_controller.setEnabledControlPipelineTypes(_enabledPipelineTypes);
_controller.connect();
_controller.startTimerTasks();
logStateTransition("STANDBY", "LEADER", clusterName, controllerName);
} else {
logger.error("controller already exists:" + _controller.getInstanceName() + " for "
+ clusterName);
}
}
/**
* Create a reply based on an incoming message
* @param srcMessage the incoming message
* @param instanceName the instance that is the source of the reply
* @param taskResultMap the result of executing the incoming message
* @return the reply Message
*/
public static Message createReplyMessage(Message srcMessage, String instanceName,
Map<String, String> taskResultMap) {
if (srcMessage.getCorrelationId() == null) {
throw new HelixException(
"Message " + srcMessage.getMsgId() + " does not contain correlation id");
}
Message replyMessage = new Message(MessageType.TASK_REPLY, UUID.randomUUID().toString());
replyMessage.setCorrelationId(srcMessage.getCorrelationId());
replyMessage.setResultMap(taskResultMap);
replyMessage.setTgtSessionId("*");
replyMessage.setMsgState(MessageState.NEW);
replyMessage.setSrcName(instanceName);
if (srcMessage.getSrcInstanceType() == InstanceType.CONTROLLER) {
replyMessage.setTgtName(InstanceType.CONTROLLER.name());
} else {
replyMessage.setTgtName(srcMessage.getMsgSrc());
}
return replyMessage;
}
@Test
public void TestSpectator() throws Exception {
HelixManager relayHelixManager =
HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, null, InstanceType.SPECTATOR, ZK_ADDR);
relayHelixManager.connect();
relayHelixManager.addExternalViewChangeListener(this);
_gSetupTool.addResourceToCluster(CLUSTER_NAME, "NextDB", 64, STATE_MODEL);
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, "NextDB", 3);
boolean result =
ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
ZK_ADDR, CLUSTER_NAME));
Assert.assertTrue(result);
Assert.assertTrue(_externalViewChanges.containsKey("NextDB"));
Assert.assertTrue(_externalViewChanges.containsKey(TEST_DB));
}
public void start() throws Exception {
_helixZkManager = HelixManagerFactory.getZKHelixManager(_helixClusterName, _instanceId,
InstanceType.PARTICIPANT, _zkServer);
StateMachineEngine stateMachineEngine = _helixZkManager.getStateMachineEngine();
// create a stateModelFactory that returns a statemodel object for each partition.
TestOnlineOfflineStateModelFactory stateModelFactory =
new TestOnlineOfflineStateModelFactory(_instanceId, 0);
stateMachineEngine.registerStateModelFactory("OnlineOffline", stateModelFactory);
_helixZkManager.connect();
}
public ManagerWorkerHelixHandler(WorkerConf workerConf, Properties helixProps,
WorkerInstance workerInstance) {
if (!workerConf.getFederatedEnabled()) {
throw new RuntimeException("ManagerWorkerHelixHandler only support federated mode");
}
this.workerConf = workerConf;
this.helixProps = helixProps;
this.helixZkURL = helixProps.getProperty(Constants.HELIX_ZK_SERVER,
Constants.ZK_SERVER);
this.instanceId = helixProps.getProperty(
Constants.HELIX_INSTANCE_ID,
"uReplicator-" + new Date().getTime());
this.federatedDeploymentName = helixProps.getProperty(Constants.FEDERATED_DEPLOYMENT_NAME,
null);
this.clusterName = WorkerUtils.getManagerWorkerHelixClusterName(federatedDeploymentName);
if (workerConf.getFederatedEnabled()) {
if (StringUtils.isEmpty(federatedDeploymentName)) {
LOGGER.error("{} is missing in helix properties for federated mode",
Constants.FEDERATED_DEPLOYMENT_NAME);
throw new IllegalArgumentException(Constants.FEDERATED_DEPLOYMENT_NAME
+ "is required on helix property for federated mode");
}
if (StringUtils.isEmpty(workerConf.getClusterConfigFile())) {
LOGGER.error("cluster config file required for federated mode");
throw new IllegalArgumentException("cluster config file required for federated mode");
}
}
this.workerInstance = workerInstance;
this.mangerWorkerHelixManager = HelixManagerFactory
.getZKHelixManager(clusterName,
instanceId,
InstanceType.PARTICIPANT,
helixZkURL);
}
public void start() throws Exception {
try {
workerInstance.start(srcCluster, dstCluster, routeId, federatedDeploymentName);
KafkaUReplicatorMetricsReporter.get()
.registerMetric(METRIC_ADD_TOPIC_PARTITION_FAILURE, addTopicPartitionFailureMeter);
KafkaUReplicatorMetricsReporter.get()
.registerMetric(METRIC_DELETE_TOPIC_PARTITION_FAILURE, deleteTopicPartitionFailureMeter);
if (helixManager != null && helixManager.isConnected()) {
LOGGER.warn("ControllerWorkerHelixManager already connected");
return;
}
LOGGER.info("Starting ControllerWorkerHelixHandler");
helixManager = HelixManagerFactory.getZKHelixManager(helixClusterName,
workerInstanceId,
InstanceType.PARTICIPANT,
helixZkURL);
helixManager.connect();
helixManager.getStateMachineEngine()
.registerStateModelFactory(OnlineOfflineStateFactory.STATE_MODE_DEF,
new OnlineOfflineStateFactory(this));
} catch (Exception e) {
LOGGER.error("Add instance to helix cluster failed. helixCluster: {}",
helixClusterName, e);
throw e;
}
LOGGER.info("Register ControllerWorkerHelixHandler finished");
}
public PinotZKChanger(String zkAddress, String clusterName) {
this.clusterName = clusterName;
helixAdmin = new ZKHelixAdmin(zkAddress);
helixManager = HelixManagerFactory
.getZKHelixManager(clusterName, "PinotNumReplicaChanger", InstanceType.ADMINISTRATOR, zkAddress);
try {
helixManager.connect();
} catch (Exception e) {
throw new RuntimeException(e);
}
ZNRecordSerializer serializer = new ZNRecordSerializer();
String path = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, clusterName);
propertyStore = new ZkHelixPropertyStore<>(zkAddress, serializer, path);
}
@BeforeClass
public void beforeClass() throws Exception {
_participants = new MockParticipantManager[NUM_NODES];
_gSetupTool.addCluster(CLUSTER_NAME, true);
_participants = new MockParticipantManager[NUM_NODES];
for (int i = 0; i < NUM_NODES; i++) {
String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
_gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
}
_gSetupTool.addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_PARTITIONS,
MASTER_SLAVE_STATE_MODEL, IdealState.RebalanceMode.FULL_AUTO.name());
_gSetupTool
.rebalanceStorageCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_REPLICAS);
for (int i = 0; i < NUM_NODES; i++) {
String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
_participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
// add a delayed state model
StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
MockDelayMSStateModelFactory delayFactory =
new MockDelayMSStateModelFactory().setDelay(-300000L);
stateMachine.registerStateModelFactory(MASTER_SLAVE_STATE_MODEL, delayFactory);
_participants[i].syncStart();
}
_manager = HelixManagerFactory
.getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
_manager.connect();
String controllerName = CONTROLLER_PREFIX + "_0";
_controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
_controller.syncStart();
_configAccessor = new ConfigAccessor(_gZkClient);
}
@Test()
public void sendSelfMsg() {
String hostSrc = "localhost_" + START_PORT;
for (int i = 0; i < NODE_NR; i++) {
TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
String hostDest = "localhost_" + (START_PORT + i);
_participants[i].getMessagingService().registerMessageHandlerFactory(
factory.getMessageTypes(), factory);
}
String msgId = new UUID(123, 456).toString();
Message msg = new Message(new TestMessagingHandlerFactory().getMessageTypes().get(0), msgId);
msg.setMsgId(msgId);
msg.setSrcName(hostSrc);
msg.setTgtSessionId("*");
msg.setMsgState(MessageState.NEW);
String para = "Testing messaging para";
msg.getRecord().setSimpleField("TestMessagingPara", para);
Criteria cr = new Criteria();
cr.setInstanceName("%");
cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
cr.setSessionSpecific(false);
cr.setSelfExcluded(false);
AsyncCallback callback1 = new MockAsyncCallback();
int messageSent1 =
_participants[0].getMessagingService().sendAndWait(cr, msg, callback1, 10000);
AssertJUnit.assertTrue(callback1.getMessageReplied().size() == NODE_NR);
AssertJUnit.assertTrue(callback1.getMessageReplied().get(0).getRecord()
.getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ReplyMessage")
.equals("TestReplyMessage"));
}
public void start() throws Exception {
System.out.println("STARTING " + instanceName);
configureInstance(instanceName);
participantManager =
HelixManagerFactory.getZKHelixManager(clusterName, instanceName, InstanceType.PARTICIPANT,
zkAddress);
participantManager.getStateMachineEngine().registerStateModelFactory("OnlineOffline",
new LockFactory());
participantManager.connect();
if (startController) {
startController();
}
System.out.println("STARTED " + instanceName);
}
@VisibleForTesting
protected void sendTokenFileUpdatedMessage(InstanceType instanceType, String instanceName) {
Criteria criteria = new Criteria();
criteria.setInstanceName(Strings.isNullOrEmpty(instanceName) ? "%" : instanceName);
criteria.setResource("%");
criteria.setPartition("%");
criteria.setPartitionState("%");
criteria.setRecipientInstanceType(instanceType);
/**
* #HELIX-0.6.7-WORKAROUND
* Add back when LIVESTANCES messaging is ported to 0.6 branch
if (instanceType == InstanceType.PARTICIPANT) {
criteria.setDataSource(Criteria.DataSource.LIVEINSTANCES);
}
**/
criteria.setSessionSpecific(true);
Message tokenFileUpdatedMessage = new Message(Message.MessageType.USER_DEFINE_MSG,
HelixMessageSubTypes.TOKEN_FILE_UPDATED.toString().toLowerCase() + UUID.randomUUID().toString());
tokenFileUpdatedMessage.setMsgSubType(HelixMessageSubTypes.TOKEN_FILE_UPDATED.toString());
tokenFileUpdatedMessage.setMsgState(Message.MessageState.NEW);
if (instanceType == InstanceType.CONTROLLER) {
tokenFileUpdatedMessage.setTgtSessionId("*");
}
// #HELIX-0.6.7-WORKAROUND
// Temporarily bypass the default messaging service to allow upgrade to 0.6.7 which is missing support
// for messaging to instances
//int messagesSent = this.helixManager.getMessagingService().send(criteria, tokenFileUpdatedMessage);
GobblinHelixMessagingService messagingService = new GobblinHelixMessagingService(helixManager);
int messagesSent = messagingService.send(criteria, tokenFileUpdatedMessage);
LOGGER.info(String.format("Sent %d token file updated message(s) to the %s", messagesSent, instanceType));
}
@BeforeClass
public void beforeClass() throws Exception {
super.beforeClass();
// Stop participants that have been started in super class
for (int i = 0; i < _numNodes; i++) {
super.stopParticipant(i);
}
// Check that participants are actually stopped
for (int i = 0; i < _numNodes; i++) {
Assert.assertFalse(_participants[i].isConnected());
}
// Start new participants that have new TaskStateModel (DelayedStopTask) information
_participants = new MockParticipantManager[_numNodes];
for (int i = 0; i < _numNodes; i++) {
Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
taskFactoryReg.put(DelayedStopTask.TASK_COMMAND, DelayedStopTask::new);
String instanceName = PARTICIPANT_PREFIX + "_" + (_startPort + i);
_participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
// Register a Task state model factory.
StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
stateMachine.registerStateModelFactory("Task",
new TaskStateModelFactory(_participants[i], taskFactoryReg));
_participants[i].syncStart();
}
_manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin",
InstanceType.ADMINISTRATOR, ZK_ADDR);
_manager.connect();
_driver = new TaskDriver(_manager);
_admin = _gSetupTool.getClusterManagementTool();
}
/**
* Build the {@link HelixManager} for the Application Master.
*/
protected static HelixManager buildHelixManager(Config config, String zkConnectionString, String clusterName, InstanceType type) {
String helixInstanceName = ConfigUtils.getString(config, GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY,
GobblinClusterManager.class.getSimpleName());
return HelixManagerFactory.getZKHelixManager(
config.getString(clusterName), helixInstanceName, type, zkConnectionString);
}