下面列出了org.apache.zookeeper.KeeperException.SessionExpiredException#org.apache.solr.common.cloud.ZkNodeProps 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
void invokeAction(SolrQueryRequest req, SolrQueryResponse rsp,
CoreContainer cores,
CollectionParams.CollectionAction action,
CollectionOperation operation) throws Exception {
Map<String, Object> result = null;
if (action == CollectionParams.CollectionAction.COLLECTIONPROP) {
//Fake this action, since we don't want to write to ZooKeeper in this test
result = new HashMap<>();
result.put(NAME, req.getParams().required().get(NAME));
result.put(PROPERTY_NAME, req.getParams().required().get(PROPERTY_NAME));
result.put(PROPERTY_VALUE, req.getParams().required().get(PROPERTY_VALUE));
} else {
result = operation.execute(req, rsp, this);
}
if (result != null) {
result.put(QUEUE_OPERATION, operation.action.toLower());
rsp.add(ZkNodeProps.class.getName(), new ZkNodeProps(result));
}
}
/**
* Executes a query against each live and active replica of the specified shard
* and aserts that the results are identical.
*
* @see #queryAndCompare
*/
public QueryResponse queryAndCompareReplicas(SolrParams params, String shard)
throws Exception {
ArrayList<SolrClient> shardClients = new ArrayList<>(7);
updateMappingsFromZk(jettys, clients);
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
List<CloudJettyRunner> solrJetties = shardToJetty.get(shard);
assertNotNull("no jetties found for shard: " + shard, solrJetties);
for (CloudJettyRunner cjetty : solrJetties) {
ZkNodeProps props = cjetty.info;
String nodeName = props.getStr(ZkStateReader.NODE_NAME_PROP);
boolean active = Replica.State.getState(props.getStr(ZkStateReader.STATE_PROP)) == Replica.State.ACTIVE;
boolean live = zkStateReader.getClusterState().liveNodesContain(nodeName);
if (active && live) {
shardClients.add(cjetty.client.solrClient);
}
}
return queryAndCompare(params, shardClients);
}
public static String getUrlFromZk(ClusterState clusterState, String collection) {
Map<String,Slice> slices = clusterState.getCollection(collection).getSlicesMap();
if (slices == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Could not find collection:" + collection);
}
for (Map.Entry<String,Slice> entry : slices.entrySet()) {
Slice slice = entry.getValue();
Map<String,Replica> shards = slice.getReplicasMap();
Set<Map.Entry<String,Replica>> shardEntries = shards.entrySet();
for (Map.Entry<String,Replica> shardEntry : shardEntries) {
final ZkNodeProps node = shardEntry.getValue();
if (clusterState.liveNodesContain(node.getStr(ZkStateReader.NODE_NAME_PROP))) {
return ZkCoreNodeProps.getCoreUrl(node.getStr(ZkStateReader.BASE_URL_PROP), collection); //new ZkCoreNodeProps(node).getCoreUrl();
}
}
}
throw new RuntimeException("Could not find a live node for collection:" + collection);
}
private String getLeaderUrl(final String collection, final String slice)
throws KeeperException, InterruptedException {
int iterCount = 60;
while (iterCount-- > 0) {
try {
byte[] data = zkClient.getData(
ZkStateReader.getShardLeadersPath(collection, slice), null, null,
true);
ZkCoreNodeProps leaderProps = new ZkCoreNodeProps(
ZkNodeProps.load(data));
return leaderProps.getCoreUrl();
} catch (NoNodeException | SessionExpiredException e) {
Thread.sleep(500);
}
}
zkClient.printLayoutToStream(System.out);
throw new RuntimeException("Could not get leader props for " + collection + " " + slice);
}
private void rejoinElectionQueue(Slice slice, String electionNode, String core, boolean rejoinAtHead)
throws KeeperException, InterruptedException {
Replica replica = slice.getReplica(LeaderElector.getNodeName(electionNode));
Map<String, Object> propMap = new HashMap<>();
propMap.put(COLLECTION_PROP, collectionName);
propMap.put(SHARD_ID_PROP, slice.getName());
propMap.put(QUEUE_OPERATION, REBALANCELEADERS.toLower());
propMap.put(CORE_NAME_PROP, core);
propMap.put(CORE_NODE_NAME_PROP, replica.getName());
propMap.put(ZkStateReader.BASE_URL_PROP, replica.getProperties().get(ZkStateReader.BASE_URL_PROP));
propMap.put(REJOIN_AT_HEAD_PROP, Boolean.toString(rejoinAtHead)); // Get ourselves to be first in line.
propMap.put(ELECTION_NODE_PROP, electionNode);
String asyncId = REBALANCELEADERS.toLower() + "_" + core + "_" + Math.abs(System.nanoTime());
propMap.put(ASYNC, asyncId);
asyncRequests.add(asyncId);
collectionsHandler.sendToOCPQueue(new ZkNodeProps(propMap)); // ignore response; we construct our own
}
@Test
public void testBasic() throws IOException {
Map<String,Object> props = new HashMap<>();
props.put("prop1", "value1");
props.put("prop2", "value2");
props.put("prop3", "value3");
props.put("prop4", "value4");
props.put("prop5", "value5");
props.put("prop6", "value6");
ZkNodeProps zkProps = new ZkNodeProps(props);
byte[] bytes = Utils.toJSON(zkProps);
ZkNodeProps props2 = ZkNodeProps.load(bytes);
props.forEach((s, o) -> assertEquals(o, props2.get(s)));
SimplePostTool.BAOS baos = new SimplePostTool.BAOS();
try (JavaBinCodec jbc = new JavaBinCodec()) {
jbc.marshal(zkProps.getProperties(), baos);
}
bytes = baos.toByteArray();
System.out.println("BIN size : " + bytes.length);
ZkNodeProps props3 = ZkNodeProps.load(bytes);
props.forEach((s, o) -> assertEquals(o, props3.get(s)));
}
public void checkOverseerDesignate() {
try {
byte[] data = zkClient.getData(ZkStateReader.ROLES, null, new Stat(), true);
if (data == null) return;
@SuppressWarnings({"rawtypes"})
Map roles = (Map) Utils.fromJSON(data);
if (roles == null) return;
@SuppressWarnings({"rawtypes"})
List nodeList = (List) roles.get("overseer");
if (nodeList == null) return;
if (nodeList.contains(getNodeName())) {
ZkNodeProps props = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.ADDROLE.toString().toLowerCase(Locale.ROOT),
"node", getNodeName(),
"role", "overseer");
log.info("Going to add role {} ", props);
getOverseerCollectionQueue().offer(Utils.toJSON(props));
}
} catch (NoNodeException nne) {
return;
} catch (Exception e) {
log.warn("could not read the overseer designate ", e);
}
}
static List<String> verifyReplicaAvailability(List<ZkNodeProps> sourceReplicas, ClusterState state) {
List<String> res = new ArrayList<>();
for (ZkNodeProps sourceReplica : sourceReplicas) {
String coll = sourceReplica.getStr(COLLECTION_PROP);
String shard = sourceReplica.getStr(SHARD_ID_PROP);
String replicaName = sourceReplica.getStr(ZkStateReader.REPLICA_PROP);
DocCollection collection = state.getCollection(coll);
Slice slice = collection.getSlice(shard);
if (slice.getReplicas().size() < 2) {
// can't delete the only replica in existence
res.add(coll + "/" + shard + "/" + replicaName + ", type=" + sourceReplica.getStr(ZkStateReader.REPLICA_TYPE));
} else { // check replica types
int otherNonPullReplicas = 0;
for (Replica r : slice.getReplicas()) {
if (!r.getName().equals(replicaName) && !r.getType().equals(Replica.Type.PULL)) {
otherNonPullReplicas++;
}
}
// can't delete - there are no other non-pull replicas
if (otherNonPullReplicas == 0) {
res.add(coll + "/" + shard + "/" + replicaName + ", type=" + sourceReplica.getStr(ZkStateReader.REPLICA_TYPE));
}
}
}
return res;
}
public static List<String> populateShardNames(ZkNodeProps message, String router) {
List<String> shardNames = new ArrayList<>();
Integer numSlices = message.getInt(OverseerCollectionMessageHandler.NUM_SLICES, null);
if (ImplicitDocRouter.NAME.equals(router)) {
ClusterStateMutator.getShardNames(shardNames, message.getStr("shards", null));
numSlices = shardNames.size();
} else {
if (numSlices == null) {
throw new SolrException(ErrorCode.BAD_REQUEST, OverseerCollectionMessageHandler.NUM_SLICES + " is a required param (when using CompositeId router).");
}
if (numSlices <= 0) {
throw new SolrException(ErrorCode.BAD_REQUEST, OverseerCollectionMessageHandler.NUM_SLICES + " must be > 0");
}
ClusterStateMutator.getShardNames(numSlices, shardNames);
}
return shardNames;
}
public static List<String> getLiveOrLiveAndCreateNodeSetList(final Set<String> liveNodes, final ZkNodeProps message, final Random random) {
List<String> nodeList;
final String createNodeSetStr = message.getStr(CREATE_NODE_SET);
final List<String> createNodeList = (createNodeSetStr == null) ? null :
StrUtils.splitSmart((OverseerCollectionMessageHandler.CREATE_NODE_SET_EMPTY.equals(createNodeSetStr) ?
"" : createNodeSetStr), ",", true);
if (createNodeList != null) {
nodeList = new ArrayList<>(createNodeList);
nodeList.retainAll(liveNodes);
if (message.getBool(OverseerCollectionMessageHandler.CREATE_NODE_SET_SHUFFLE,
OverseerCollectionMessageHandler.CREATE_NODE_SET_SHUFFLE_DEFAULT)) {
Collections.shuffle(nodeList, random);
}
} else {
nodeList = new ArrayList<>(liveNodes);
Collections.shuffle(nodeList, random);
}
return nodeList;
}
private static void testAutoAddReplicas() throws Exception {
TimeOut timeOut = new TimeOut(30, TimeUnit.SECONDS, TimeSource.NANO_TIME);
while (!timeOut.hasTimedOut()) {
byte[] data = zkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, null, true);
ZkNodeProps loaded = ZkNodeProps.load(data);
@SuppressWarnings({"rawtypes"})
Map triggers = (Map) loaded.get("triggers");
if (triggers != null && triggers.containsKey(".auto_add_replicas")) {
@SuppressWarnings({"unchecked"})
Map<String, Object> autoAddReplicasTrigger = (Map<String, Object>) triggers.get(".auto_add_replicas");
assertNotNull(autoAddReplicasTrigger);
@SuppressWarnings({"unchecked"})
List<Map<String, Object>> actions = (List<Map<String, Object>>) autoAddReplicasTrigger.get("actions");
assertNotNull(actions);
assertEquals(2, actions.size());
assertEquals("auto_add_replicas_plan", actions.get(0).get("name").toString());
assertEquals("solr.AutoAddReplicasPlanAction", actions.get(0).get("class").toString());
break;
} else {
Thread.sleep(300);
}
}
if (timeOut.hasTimedOut()) {
fail("Timeout waiting for .auto_add_replicas being created");
}
}
@SuppressWarnings({"rawtypes"})
public void createSystemCollection() throws IOException {
try {
synchronized (this) {
if (colShardReplicaMap.containsKey(CollectionAdminParams.SYSTEM_COLL)) {
return;
}
}
String repFactor = String.valueOf(Math.min(3, liveNodes.size()));
ZkNodeProps props = new ZkNodeProps(
NAME, CollectionAdminParams.SYSTEM_COLL,
REPLICATION_FACTOR, repFactor,
OverseerCollectionMessageHandler.NUM_SLICES, "1",
CommonAdminParams.WAIT_FOR_FINAL_STATE, "true");
simCreateCollection(props, new NamedList());
CloudUtil.waitForState(cloudManager, CollectionAdminParams.SYSTEM_COLL, 120, TimeUnit.SECONDS,
CloudUtil.clusterShape(1, Integer.parseInt(repFactor), false, true));
} catch (Exception e) {
throw new IOException(e);
}
}
static List<ZkNodeProps> getReplicasOfNode(String source, ClusterState state) {
List<ZkNodeProps> sourceReplicas = new ArrayList<>();
for (Map.Entry<String, DocCollection> e : state.getCollectionsMap().entrySet()) {
for (Slice slice : e.getValue().getSlices()) {
for (Replica replica : slice.getReplicas()) {
if (source.equals(replica.getNodeName())) {
ZkNodeProps props = new ZkNodeProps(
COLLECTION_PROP, e.getKey(),
SHARD_ID_PROP, slice.getName(),
ZkStateReader.CORE_NAME_PROP, replica.getCoreName(),
ZkStateReader.REPLICA_PROP, replica.getName(),
ZkStateReader.REPLICA_TYPE, replica.getType().name(),
ZkStateReader.LEADER_PROP, String.valueOf(replica.equals(slice.getLeader())),
CoreAdminParams.NODE, source);
sourceReplicas.add(props);
}
}
}
}
return sourceReplicas;
}
/**
* Send request to all replicas of a collection
* @return List of replicas which is not live for receiving the request
*/
List<Replica> collectionCmd(ZkNodeProps message, ModifiableSolrParams params,
NamedList<Object> results, Replica.State stateMatcher, String asyncId, Set<String> okayExceptions) {
log.info("Executing Collection Cmd={}, asyncId={}", params, asyncId);
String collectionName = message.getStr(NAME);
@SuppressWarnings("deprecation")
ShardHandler shardHandler = shardHandlerFactory.getShardHandler(overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
ClusterState clusterState = zkStateReader.getClusterState();
DocCollection coll = clusterState.getCollection(collectionName);
List<Replica> notLivesReplicas = new ArrayList<>();
final ShardRequestTracker shardRequestTracker = new ShardRequestTracker(asyncId);
for (Slice slice : coll.getSlices()) {
notLivesReplicas.addAll(shardRequestTracker.sliceCmd(clusterState, params, stateMatcher, slice, shardHandler));
}
shardRequestTracker.processResponses(results, shardHandler, false, null, okayExceptions);
return notLivesReplicas;
}
@SuppressWarnings({"unchecked"})
public void testCommands() throws Exception {
try (ConfigSetsHandler handler = new ConfigSetsHandler(null) {
@Override
protected void sendToZk(SolrQueryResponse rsp,
ConfigSetOperation operation,
Map<String, Object> result)
throws KeeperException, InterruptedException {
result.put(QUEUE_OPERATION, operation.action.toLower());
rsp.add(ZkNodeProps.class.getName(), new ZkNodeProps(result));
}
}) {
ApiBag apiBag = new ApiBag(false);
for (Api api : handler.getApis()) apiBag.register(api, EMPTY_MAP);
compareOutput(apiBag, "/cluster/configs/sample", DELETE, null, null,
"{name :sample, operation:delete}");
compareOutput(apiBag, "/cluster/configs", POST, "{create:{name : newconf, baseConfigSet: sample }}", null,
"{operation:create, name :newconf, baseConfigSet: sample, immutable: false }");
}
}
/**
* Grabs an exclusive lock for this particular task.
* @return <code>null</code> if locking is not possible. When locking is not possible, it will remain
* impossible for the passed value of <code>batchSessionId</code>. This is to guarantee tasks are executed
* in queue order (and a later task is not run earlier than its turn just because it happens that a lock got released).
*/
@Override
public Lock lockTask(ZkNodeProps message, long batchSessionId) {
if (sessionId != batchSessionId) {
//this is always called in the same thread.
//Each batch is supposed to have a new taskBatch
//So if taskBatch changes we must create a new Session
lockSession = lockTree.getSession();
sessionId = batchSessionId;
}
return lockSession.lock(getCollectionAction(message.getStr(Overseer.QUEUE_OPERATION)),
Arrays.asList(
getTaskKey(message),
message.getStr(ZkStateReader.SHARD_ID_PROP),
message.getStr(ZkStateReader.REPLICA_PROP))
);
}
public void showCounts() {
Set<String> theShards = shardToJetty.keySet();
for (String shard : theShards) {
List<CloudJettyRunner> solrJetties = shardToJetty.get(shard);
for (CloudJettyRunner cjetty : solrJetties) {
ZkNodeProps props = cjetty.info;
System.err.println("PROPS:" + props);
try {
SolrParams query = params("q", "*:*", "rows", "0", "distrib",
"false", "tests", "checkShardConsistency"); // "tests" is just a
// tag that won't do
// anything except be
// echoed in logs
long num = cjetty.client.solrClient.query(query).getResults()
.getNumFound();
System.err.println("DOCS:" + num);
} catch (SolrServerException | SolrException | IOException e) {
System.err.println("error contacting client: " + e.getMessage()
+ "\n");
continue;
}
boolean live = false;
String nodeName = props.getStr(ZkStateReader.NODE_NAME_PROP);
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
if (zkStateReader.getClusterState().liveNodesContain(nodeName)) {
live = true;
}
System.err.println(" live:" + live);
}
}
}
private Type getTypeForJetty(String sliceName, CloudJettyRunner cjetty) {
DocCollection docCollection = zkStateReader.getClusterState().getCollection(collection);
Slice slice = docCollection.getSlice(sliceName);
ZkNodeProps props = slice.getReplicasMap().get(cjetty.coreNodeName);
if (props == null) {
throw new RuntimeException("shard name " + cjetty.coreNodeName + " not found in " + slice.getReplicasMap().keySet());
}
return Replica.Type.valueOf(props.getStr(ZkStateReader.REPLICA_TYPE));
}
private boolean canKillIndexer(String sliceName) throws KeeperException, InterruptedException {
int numIndexersFoundInShard = 0;
for (CloudJettyRunner cloudJetty : shardToJetty.get(sliceName)) {
// get latest cloud state
zkStateReader.forceUpdateCollection(collection);
DocCollection docCollection = zkStateReader.getClusterState().getCollection(collection);
Slice slice = docCollection.getSlice(sliceName);
ZkNodeProps props = slice.getReplicasMap().get(cloudJetty.coreNodeName);
if (props == null) {
throw new RuntimeException("shard name " + cloudJetty.coreNodeName + " not found in " + slice.getReplicasMap().keySet());
}
final Replica.State state = Replica.State.getState(props.getStr(ZkStateReader.STATE_PROP));
final Replica.Type replicaType = Replica.Type.valueOf(props.getStr(ZkStateReader.REPLICA_TYPE));
final String nodeName = props.getStr(ZkStateReader.NODE_NAME_PROP);
if (cloudJetty.jetty.isRunning()
&& state == Replica.State.ACTIVE
&& (replicaType == Replica.Type.TLOG || replicaType == Replica.Type.NRT)
&& zkStateReader.getClusterState().liveNodesContain(nodeName)) {
numIndexersFoundInShard++;
}
}
return numIndexersFoundInShard > 1;
}
public ZkWriteCommand removeRoutingRule(final ClusterState clusterState, ZkNodeProps message) {
String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP);
if (!checkCollectionKeyExistence(message)) return ZkStateWriter.NO_OP;
String shard = message.getStr(ZkStateReader.SHARD_ID_PROP);
String routeKeyStr = message.getStr("routeKey");
log.info("Overseer.removeRoutingRule invoked for collection: {} shard: {} routeKey: {}"
, collectionName, shard, routeKeyStr);
DocCollection collection = clusterState.getCollection(collectionName);
Slice slice = collection.getSlice(shard);
if (slice == null) {
log.warn("Unknown collection: {} shard: {}", collectionName, shard);
return ZkStateWriter.NO_OP;
}
Map<String, RoutingRule> routingRules = slice.getRoutingRules();
if (routingRules != null) {
routingRules.remove(routeKeyStr); // no rules left
Map<String, Object> props = slice.shallowCopy();
props.put("routingRules", routingRules);
Slice newSlice = new Slice(slice.getName(), slice.getReplicasCopy(), props,collectionName);
return new ZkWriteCommand(collectionName,
CollectionMutator.updateSlice(collectionName, collection, newSlice));
}
return ZkStateWriter.NO_OP;
}
private SolrCore checkProps(ZkNodeProps zkProps) {
String corename;
SolrCore core = null;
if (cores.getZkController().getNodeName().equals(zkProps.getStr(NODE_NAME_PROP))) {
corename = zkProps.getStr(CORE_NAME_PROP);
core = cores.getCore(corename);
}
return core;
}
@SuppressWarnings({"unchecked"})
void invokeAction(SolrQueryRequest req, SolrQueryResponse rsp, CoreContainer cores, CollectionAction action, CollectionOperation operation) throws Exception {
if (!coreContainer.isZooKeeperAware()) {
throw new SolrException(BAD_REQUEST,
"Invalid request. collections can be accessed only in SolrCloud mode");
}
Map<String, Object> props = operation.execute(req, rsp, this);
if (props == null) {
return;
}
String asyncId = req.getParams().get(ASYNC);
if (asyncId != null) {
props.put(ASYNC, asyncId);
}
props.put(QUEUE_OPERATION, operation.action.toLower());
if (operation.sendToOCPQueue) {
ZkNodeProps zkProps = new ZkNodeProps(props);
SolrResponse overseerResponse = sendToOCPQueue(zkProps, operation.timeOut);
rsp.getValues().addAll(overseerResponse.getResponse());
Exception exp = overseerResponse.getException();
if (exp != null) {
rsp.setException(exp);
}
//TODO yuck; shouldn't create-collection at the overseer do this? (conditionally perhaps)
if (action.equals(CollectionAction.CREATE) && asyncId == null) {
if (rsp.getException() == null) {
waitForActiveCollection(zkProps.getStr(NAME), cores, overseerResponse);
}
}
} else {
// submits and doesn't wait for anything (no response)
coreContainer.getZkController().getOverseer().offerStateUpdate(Utils.toJSON(props));
}
}
private Map<String, Object> getNewProperties(ZkNodeProps message) {
Map<String, Object> properties = null;
for (Map.Entry<String, Object> entry : message.getProperties().entrySet()) {
if (entry.getKey().startsWith(PROPERTY_PREFIX + ".")) {
if (properties == null) {
properties = new HashMap<String, Object>();
}
properties.put(entry.getKey().substring((PROPERTY_PREFIX + ".").length()),
entry.getValue());
}
}
return properties;
}
public ZkWriteCommand updateShardState(ClusterState clusterState, ZkNodeProps message) {
String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP);
if (!checkCollectionKeyExistence(message)) return ZkStateWriter.NO_OP;
log.info("Update shard state invoked for collection: {} with message: {}", collectionName, message);
DocCollection collection = clusterState.getCollection(collectionName);
Map<String, Slice> slicesCopy = new LinkedHashMap<>(collection.getSlicesMap());
for (String key : message.keySet()) {
if (ZkStateReader.COLLECTION_PROP.equals(key)) continue;
if (Overseer.QUEUE_OPERATION.equals(key)) continue;
Slice slice = collection.getSlice(key);
if (slice == null) {
throw new RuntimeException("Overseer.updateShardState unknown collection: " + collectionName + " slice: " + key);
}
if (log.isInfoEnabled()) {
log.info("Update shard state {} to {}", key, message.getStr(key));
}
Map<String, Object> props = slice.shallowCopy();
if (Slice.State.getState(message.getStr(key)) == Slice.State.ACTIVE) {
props.remove(Slice.PARENT);
props.remove("shard_parent_node");
props.remove("shard_parent_zk_session");
}
props.put(ZkStateReader.STATE_PROP, message.getStr(key));
// we need to use epoch time so that it's comparable across Overseer restarts
props.put(ZkStateReader.STATE_TIMESTAMP_PROP, String.valueOf(cloudManager.getTimeSource().getEpochTimeNs()));
Slice newSlice = new Slice(slice.getName(), slice.getReplicasCopy(), props,collectionName);
slicesCopy.put(slice.getName(), newSlice);
}
return new ZkWriteCommand(collectionName, collection.copyWithSlices(slicesCopy));
}
private void checkIfIAmLeader() throws KeeperException, InterruptedException {
SolrZkClient zkClient = core.getCoreContainer().getZkController().getZkClient();
ZkNodeProps props = ZkNodeProps.load(zkClient.getData(CdcrLeaderStateManager.this.getZnodePath(), null, null, true));
if (props != null) {
CdcrLeaderStateManager.this.setAmILeader(props.get("core").equals(core.getName()));
}
}
private String getLeaderUrl() {
ZkController zkController = core.getCoreContainer().getZkController();
ClusterState cstate = zkController.getClusterState();
DocCollection docCollection = cstate.getCollection(collection);
ZkNodeProps leaderProps = docCollection.getLeader(shardId);
if (leaderProps == null) { // we might not have a leader yet, returns null
return null;
}
ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(leaderProps);
return nodeProps.getCoreUrl();
}
public static CreateReplica assignReplicaDetails(SolrCloudManager cloudManager, ClusterState clusterState,
ZkNodeProps message, ReplicaPosition replicaPosition) {
boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false);
String collection = message.getStr(COLLECTION_PROP);
String node = replicaPosition.node;
String shard = message.getStr(SHARD_ID_PROP);
String coreName = message.getStr(CoreAdminParams.NAME);
String coreNodeName = message.getStr(CoreAdminParams.CORE_NODE_NAME);
Replica.Type replicaType = replicaPosition.type;
if (StringUtils.isBlank(coreName)) {
coreName = message.getStr(CoreAdminParams.PROPERTY_PREFIX + CoreAdminParams.NAME);
}
log.info("Node Identified {} for creating new replica of shard {} for collection {}", node, shard, collection);
if (!clusterState.liveNodesContain(node)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Node: " + node + " is not live");
}
DocCollection coll = clusterState.getCollection(collection);
if (coreName == null) {
coreName = Assign.buildSolrCoreName(cloudManager.getDistribStateManager(), coll, shard, replicaType);
} else if (!skipCreateReplicaInClusterState) {
//Validate that the core name is unique in that collection
for (Slice slice : coll.getSlices()) {
for (Replica replica : slice.getReplicas()) {
String replicaCoreName = replica.getStr(CORE_NAME_PROP);
if (coreName.equals(replicaCoreName)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Another replica with the same core name already exists" +
" for this collection");
}
}
}
}
log.info("Returning CreateReplica command.");
return new CreateReplica(collection, shard, node, replicaType, coreName, coreNodeName);
}
static ZkNodeProps compareOutput(final ApiBag apiBag, final String path, final SolrRequest.METHOD method,
final String payload, final CoreContainer cc, String expectedOutputMapJson) throws Exception {
Pair<SolrQueryRequest, SolrQueryResponse> ctx = makeCall(apiBag, path, method, payload, cc);
ZkNodeProps output = (ZkNodeProps) ctx.second().getValues().get(ZkNodeProps.class.getName());
@SuppressWarnings({"rawtypes"})
Map expected = (Map) fromJSONString(expectedOutputMapJson);
assertMapEqual(expected, output);
return output;
}
public Runner(OverseerMessageHandler messageHandler, ZkNodeProps message, String operation, QueueEvent head, OverseerMessageHandler.Lock lock) {
this.message = message;
this.operation = operation;
this.head = head;
this.messageHandler = messageHandler;
this.lock = lock;
response = null;
}
private void joinElection(CoreDescriptor cd, boolean afterExpiration, boolean joinAtHead)
throws InterruptedException, KeeperException, IOException {
// look for old context - if we find it, cancel it
String collection = cd.getCloudDescriptor().getCollectionName();
final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
ContextKey contextKey = new ContextKey(collection, coreNodeName);
ElectionContext prevContext = electionContexts.get(contextKey);
if (prevContext != null) {
prevContext.cancelElection();
}
String shardId = cd.getCloudDescriptor().getShardId();
Map<String, Object> props = new HashMap<>();
// we only put a subset of props into the leader node
props.put(ZkStateReader.BASE_URL_PROP, getBaseUrl());
props.put(ZkStateReader.CORE_NAME_PROP, cd.getName());
props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
props.put(ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName);
ZkNodeProps ourProps = new ZkNodeProps(props);
LeaderElector leaderElector = new LeaderElector(zkClient, contextKey, electionContexts);
ElectionContext context = new ShardLeaderElectionContext(leaderElector, shardId,
collection, coreNodeName, ourProps, this, cc);
leaderElector.setup(context);
electionContexts.put(contextKey, context);
leaderElector.joinElection(context, false, joinAtHead);
}