下面列出了org.apache.zookeeper.KeeperException.SessionExpiredException#org.apache.solr.common.cloud.Slice 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static String getAssignedCoreNodeName(DocCollection collection, String forNodeName, String forCoreName) {
Collection<Slice> slices = collection != null ? collection.getSlices() : null;
if (slices != null) {
for (Slice slice : slices) {
for (Replica replica : slice.getReplicas()) {
String nodeName = replica.getStr(ZkStateReader.NODE_NAME_PROP);
String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
if (nodeName.equals(forNodeName) && core.equals(forCoreName)) {
return replica.getName();
}
}
}
}
return null;
}
private void waitTillAllNodesActive() throws Exception {
for (int i = 0; i < 60; i++) {
Thread.sleep(3000);
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
ClusterState clusterState = zkStateReader.getClusterState();
DocCollection collection1 = clusterState.getCollection("collection1");
Slice slice = collection1.getSlice("shard1");
Collection<Replica> replicas = slice.getReplicas();
boolean allActive = true;
for (Replica replica : replicas) {
if (!clusterState.liveNodesContain(replica.getNodeName()) || replica.getState() != Replica.State.ACTIVE) {
allActive = false;
break;
}
}
if (allActive) {
return;
}
}
printLayout();
fail("timeout waiting to see all nodes active");
}
/**
* Send request to all replicas of a collection
* @return List of replicas which is not live for receiving the request
*/
List<Replica> collectionCmd(ZkNodeProps message, ModifiableSolrParams params,
NamedList<Object> results, Replica.State stateMatcher, String asyncId, Set<String> okayExceptions) {
log.info("Executing Collection Cmd={}, asyncId={}", params, asyncId);
String collectionName = message.getStr(NAME);
@SuppressWarnings("deprecation")
ShardHandler shardHandler = shardHandlerFactory.getShardHandler(overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
ClusterState clusterState = zkStateReader.getClusterState();
DocCollection coll = clusterState.getCollection(collectionName);
List<Replica> notLivesReplicas = new ArrayList<>();
final ShardRequestTracker shardRequestTracker = new ShardRequestTracker(asyncId);
for (Slice slice : coll.getSlices()) {
notLivesReplicas.addAll(shardRequestTracker.sliceCmd(clusterState, params, stateMatcher, slice, shardHandler));
}
shardRequestTracker.processResponses(results, shardHandler, false, null, okayExceptions);
return notLivesReplicas;
}
protected void checkSubShardConsistency(String shard) throws SolrServerException, IOException {
SolrQuery query = new SolrQuery("*:*").setRows(1000).setFields("id", "_version_");
query.set("distrib", false);
ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
Slice slice = clusterState.getCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION).getSlice(shard);
long[] numFound = new long[slice.getReplicasMap().size()];
int c = 0;
for (Replica replica : slice.getReplicas()) {
String coreUrl = new ZkCoreNodeProps(replica).getCoreUrl();
QueryResponse response;
try (HttpSolrClient client = getHttpSolrClient(coreUrl)) {
response = client.query(query);
}
numFound[c++] = response.getResults().getNumFound();
if (log.isInfoEnabled()) {
log.info("Shard: {} Replica: {} has {} docs", shard, coreUrl, String.valueOf(response.getResults().getNumFound()));
}
assertTrue("Shard: " + shard + " Replica: " + coreUrl + " has 0 docs", response.getResults().getNumFound() > 0);
}
for (int i = 0; i < slice.getReplicasMap().size(); i++) {
assertEquals(shard + " is not consistent", numFound[0], numFound[i]);
}
}
private static CollectionStatePredicate expectAllReplicasOnSpecificNode
(final String expectedNodeName,
final int expectedSliceCount,
final int expectedReplicaCount) {
return (liveNodes, collection) -> {
if (null == collection || expectedSliceCount != collection.getSlices().size()) {
return false;
}
int actualReplicaCount = 0;
for (Slice slice : collection) {
for (Replica replica : slice) {
if ( ! (replica.isActive(liveNodes)
&& expectedNodeName.equals(replica.getNodeName())) ) {
return false;
}
actualReplicaCount++;
}
}
return expectedReplicaCount == actualReplicaCount;
};
}
public static Map<String, String> mapReplicasToReplicaType(DocCollection collection) {
Map<String, String> replicaTypeMap = new HashMap<>();
for (Slice slice : collection.getSlices()) {
for (Replica replica : slice.getReplicas()) {
String coreUrl = replica.getCoreUrl();
// It seems replica reports its core URL with a trailing slash while shard
// info returned from the query doesn't. Oh well. We will include both, just in case
replicaTypeMap.put(coreUrl, replica.getType().toString());
if (coreUrl.endsWith("/")) {
replicaTypeMap.put(coreUrl.substring(0, coreUrl.length() - 1), replica.getType().toString());
}else {
replicaTypeMap.put(coreUrl + "/", replica.getType().toString());
}
}
}
return replicaTypeMap;
}
/**
* Asserts that Update logs don't exist for replicas of type {@link org.apache.solr.common.cloud.Replica.Type#PULL}
*/
private void assertUlogPresence(DocCollection collection) {
for (Slice s:collection.getSlices()) {
for (Replica r:s.getReplicas()) {
if (r.getType() == Replica.Type.NRT) {
continue;
}
SolrCore core = null;
try {
core = cluster.getReplicaJetty(r).getCoreContainer().getCore(r.getCoreName());
assertNotNull(core);
assertFalse("Update log should not exist for replicas of type Passive but file is present: " + core.getUlogDir(),
new java.io.File(core.getUlogDir()).exists());
} finally {
core.close();
}
}
}
}
protected static void assertAllActive(String collection, ZkStateReader zkStateReader)
throws KeeperException, InterruptedException {
zkStateReader.forceUpdateCollection(collection);
ClusterState clusterState = zkStateReader.getClusterState();
final DocCollection docCollection = clusterState.getCollectionOrNull(collection);
if (docCollection == null || docCollection.getSlices() == null) {
throw new IllegalArgumentException("Cannot find collection:" + collection);
}
Map<String,Slice> slices = docCollection.getSlicesMap();
for (Map.Entry<String,Slice> entry : slices.entrySet()) {
Slice slice = entry.getValue();
if (slice.getState() != Slice.State.ACTIVE) {
fail("Not all shards are ACTIVE - found a shard " + slice.getName() + " that is: " + slice.getState());
}
Map<String,Replica> shards = slice.getReplicasMap();
for (Map.Entry<String,Replica> shard : shards.entrySet()) {
Replica replica = shard.getValue();
if (replica.getState() != Replica.State.ACTIVE) {
fail("Not all replicas are ACTIVE - found a replica " + replica.getName() + " that is: " + replica.getState());
}
}
}
}
public static CollectionStatePredicate expectedShardsAndActiveReplicas(int expectedShards, int expectedReplicas) {
return (liveNodes, collectionState) -> {
if (collectionState == null)
return false;
if (collectionState.getSlices().size() != expectedShards) {
return false;
}
int activeReplicas = 0;
for (Slice slice : collectionState) {
for (Replica replica : slice) {
if (replica.isActive(liveNodes)) {
activeReplicas++;
}
}
}
if (activeReplicas == expectedReplicas) {
return true;
}
return false;
};
}
@Test
public void testBuildCoreName() throws Exception {
Path zkDir = createTempDir("zkData");
ZkTestServer server = new ZkTestServer(zkDir);
server.run();
try (SolrZkClient zkClient = new SolrZkClient(server.getZkAddress(), 10000)) {
// TODO: fix this to be independent of ZK
ZkDistribStateManager stateManager = new ZkDistribStateManager(zkClient);
Map<String, Slice> slices = new HashMap<>();
slices.put("shard1", new Slice("shard1", new HashMap<>(), null,"collection1"));
slices.put("shard2", new Slice("shard2", new HashMap<>(), null,"collection1"));
DocCollection docCollection = new DocCollection("collection1", slices, null, DocRouter.DEFAULT);
assertEquals("Core name pattern changed", "collection1_shard1_replica_n1", Assign.buildSolrCoreName(stateManager, docCollection, "shard1", Replica.Type.NRT));
assertEquals("Core name pattern changed", "collection1_shard2_replica_p2", Assign.buildSolrCoreName(stateManager, docCollection, "shard2", Replica.Type.PULL));
} finally {
server.shutdown();
}
}
/** For {@link org.apache.solr.common.params.CollectionParams.CollectionAction#SPLITSHARD} */
protected boolean amISubShardLeader(DocCollection coll, Slice parentSlice, String id, SolrInputDocument doc) throws InterruptedException {
// Am I the leader of a shard in "construction/recovery" state?
String myShardId = cloudDesc.getShardId();
Slice mySlice = coll.getSlice(myShardId);
final Slice.State state = mySlice.getState();
if (state == Slice.State.CONSTRUCTION || state == Slice.State.RECOVERY) {
Replica myLeader = zkController.getZkStateReader().getLeaderRetry(collection, myShardId);
boolean amILeader = myLeader.getName().equals(cloudDesc.getCoreNodeName());
if (amILeader) {
// Does the document belong to my hash range as well?
DocRouter.Range myRange = mySlice.getRange();
if (myRange == null) myRange = new DocRouter.Range(Integer.MIN_VALUE, Integer.MAX_VALUE);
if (parentSlice != null) {
boolean isSubset = parentSlice.getRange() != null && myRange.isSubsetOf(parentSlice.getRange());
return isSubset && coll.getRouter().isTargetSlice(id, doc, req.getParams(), myShardId, coll);
} else {
// delete by query case -- as long as I am a sub shard leader we're fine
return true;
}
}
}
return false;
}
private void verifyPropCorrectlyDistributed(String prop) throws KeeperException, InterruptedException {
TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
String propLC = prop.toLowerCase(Locale.ROOT);
DocCollection docCollection = null;
while (timeout.hasTimedOut() == false) {
forceUpdateCollectionStatus();
docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
int maxPropCount = Integer.MAX_VALUE;
int minPropCount = Integer.MIN_VALUE;
for (Slice slice : docCollection.getSlices()) {
int repCount = 0;
for (Replica rep : slice.getReplicas()) {
if (rep.getBool("property." + propLC, false)) {
repCount++;
}
}
maxPropCount = Math.max(maxPropCount, repCount);
minPropCount = Math.min(minPropCount, repCount);
}
if (Math.abs(maxPropCount - minPropCount) < 2) return;
}
log.error("Property {} is not distributed evenly. {}", prop, docCollection);
fail("Property is not distributed evenly " + prop);
}
private String createHashRangeFq() {
if (routedByJoinKey) {
ClusterState clusterState = searcher.getCore().getCoreContainer().getZkController().getClusterState();
CloudDescriptor desc = searcher.getCore().getCoreDescriptor().getCloudDescriptor();
Slice slice = clusterState.getCollection(desc.getCollectionName()).getSlicesMap().get(desc.getShardId());
DocRouter.Range range = slice.getRange();
// In CompositeIdRouter, the routing prefix only affects the top 16 bits
int min = range.min & 0xffff0000;
int max = range.max | 0x0000ffff;
return String.format(Locale.ROOT, "{!hash_range f=%s l=%d u=%d}", fromField, min, max);
} else {
return null;
}
}
private static void assertClusterStateEquals(ClusterState one, ClusterState two) {
assertEquals(one.getLiveNodes(), two.getLiveNodes());
assertEquals(one.getCollectionsMap().keySet(), two.getCollectionsMap().keySet());
one.forEachCollection(oneColl -> {
DocCollection twoColl = two.getCollection(oneColl.getName());
Map<String, Slice> oneSlices = oneColl.getSlicesMap();
Map<String, Slice> twoSlices = twoColl.getSlicesMap();
assertEquals(oneSlices.keySet(), twoSlices.keySet());
oneSlices.forEach((s, slice) -> {
Slice sTwo = twoSlices.get(s);
for (Replica oneReplica : slice.getReplicas()) {
Replica twoReplica = sTwo.getReplica(oneReplica.getName());
assertNotNull(twoReplica);
SimSolrCloudTestCase.assertReplicaEquals(oneReplica, twoReplica);
}
});
});
}
private void checkLeaderStatus() throws InterruptedException, KeeperException {
for (int idx = 0; pendingOps.size() > 0 && idx < 600; ++idx) {
ClusterState clusterState = coreContainer.getZkController().getClusterState();
Set<String> liveNodes = clusterState.getLiveNodes();
DocCollection dc = clusterState.getCollection(collectionName);
for (Slice slice : dc.getSlices()) {
for (Replica replica : slice.getReplicas()) {
if (replica.isActive(liveNodes) && replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false)) {
if (replica.getBool(LEADER_PROP, false)) {
if (pendingOps.containsKey(slice.getName())) {
// Record for return that the leader changed successfully
pendingOps.remove(slice.getName());
addToSuccesses(slice, replica);
break;
}
}
}
}
}
TimeUnit.MILLISECONDS.sleep(100);
coreContainer.getZkController().getZkStateReader().forciblyRefreshAllClusterStateSlow();
}
addAnyFailures();
}
int waitForNodeChange(Slice slice, String electionNode) throws InterruptedException, KeeperException {
String nodeName = LeaderElector.getNodeName(electionNode);
int oldSeq = LeaderElector.getSeq(electionNode);
for (int idx = 0; idx < 600; ++idx) {
ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
List<String> electionNodes = OverseerTaskProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
for (String testNode : electionNodes) {
if (LeaderElector.getNodeName(testNode).equals(nodeName) && oldSeq != LeaderElector.getSeq(testNode)) {
return LeaderElector.getSeq(testNode);
}
}
TimeUnit.MILLISECONDS.sleep(100);
zkStateReader.forciblyRefreshAllClusterStateSlow();
}
return -1;
}
protected static List<String> getActiveReplicaCoreUrls(ZkController zkController, String collection, String localCoreNodeName) {
List<String> activeReplicaCoreUrls = new ArrayList<>();
ZkStateReader zkStateReader = zkController.getZkStateReader();
ClusterState clusterState = zkStateReader.getClusterState();
Set<String> liveNodes = clusterState.getLiveNodes();
final DocCollection docCollection = clusterState.getCollectionOrNull(collection);
if (docCollection != null && docCollection.getActiveSlicesArr().length > 0) {
final Slice[] activeSlices = docCollection.getActiveSlicesArr();
for (Slice next : activeSlices) {
Map<String, Replica> replicasMap = next.getReplicasMap();
if (replicasMap != null) {
for (Map.Entry<String, Replica> entry : replicasMap.entrySet()) {
Replica replica = entry.getValue();
if (!localCoreNodeName.equals(replica.getName()) &&
replica.getState() == Replica.State.ACTIVE &&
liveNodes.contains(replica.getNodeName())) {
ZkCoreNodeProps replicaCoreProps = new ZkCoreNodeProps(replica);
activeReplicaCoreUrls.add(replicaCoreProps.getCoreUrl());
}
}
}
}
}
return activeReplicaCoreUrls;
}
@Test
public void testBuildClusterState_ReplicaStateAndType() {
try (ZkStateReader zkStateReader = ClusterStateMockUtil.buildClusterState("csrStRpDnF", "baseUrl1_")) {
ClusterState clusterState = zkStateReader.getClusterState();
assertNotNull(clusterState);
assertEquals(1, clusterState.getCollectionStates().size());
DocCollection collection1 = clusterState.getCollectionOrNull("collection1");
assertNotNull(collection1);
assertEquals(DocRouter.DEFAULT, collection1.getRouter());
assertEquals(1, collection1.getActiveSlices().size());
assertEquals(1, collection1.getSlices().size());
Slice slice1 = collection1.getSlice("slice1");
assertNotNull(slice1);
assertEquals(4, slice1.getReplicas().size());
assertEquals(1, slice1.getReplicas(replica -> replica.getType() == Replica.Type.NRT && replica.getState() == Replica.State.ACTIVE).size());
assertEquals(1, slice1.getReplicas(replica -> replica.getType() == Replica.Type.NRT && replica.getState() == Replica.State.RECOVERY_FAILED).size());
assertEquals(1, slice1.getReplicas(replica -> replica.getType() == Replica.Type.TLOG && replica.getState() == Replica.State.RECOVERING).size());
assertEquals(1, slice1.getReplicas(replica -> replica.getType() == Replica.Type.PULL && replica.getState() == Replica.State.DOWN).size());
}
}
/**
* Validate if there is less replicas than requested to remove. Also error out if there is
* only one replica available
*/
private void validateReplicaAvailability(Slice slice, String shard, String collectionName, int count) {
//If there is a specific shard passed, validate if there any or just 1 replica left
if (slice != null) {
Collection<Replica> allReplicasForShard = slice.getReplicas();
if (allReplicasForShard == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No replicas found in shard/collection: " +
shard + "/" + collectionName);
}
if (allReplicasForShard.size() == 1) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "There is only one replica available in shard/collection: " +
shard + "/" + collectionName + ". Cannot delete that.");
}
if (allReplicasForShard.size() <= count) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "There are lesser num replicas requested to be deleted than are available in shard/collection : " +
shard + "/" + collectionName + " Requested: " + count + " Available: " + allReplicasForShard.size() + ".");
}
}
}
boolean waitForCoreNodeGone(String collectionName, String shard, String replicaName, int timeoutms) throws InterruptedException {
try {
zkStateReader.waitForState(collectionName, timeoutms, TimeUnit.MILLISECONDS, (c) -> {
if (c == null)
return true;
Slice slice = c.getSlice(shard);
if(slice == null || slice.getReplica(replicaName) == null) {
return true;
}
return false;
});
} catch (TimeoutException e) {
return false;
}
return true;
}
public void testCantConnectToLeader() throws Exception {
int numShards = 1;
CollectionAdminRequest.createCollection(collectionName, "conf", numShards, 1, 0, 1)
.setMaxShardsPerNode(1)
.process(cluster.getSolrClient());
cluster.waitForActiveCollection(collectionName, numShards, numShards * 2);
addDocs(10);
DocCollection docCollection = assertNumberOfReplicas(numShards, 0, numShards, false, true);
Slice s = docCollection.getSlices().iterator().next();
SocketProxy proxy = getProxyForReplica(s.getLeader());
try {
// wait for replication
try (HttpSolrClient pullReplicaClient = getHttpSolrClient(s.getReplicas(EnumSet.of(Replica.Type.PULL)).get(0).getCoreUrl())) {
assertNumDocs(10, pullReplicaClient);
}
proxy.close();
expectThrows(SolrException.class, ()->addDocs(1));
try (HttpSolrClient pullReplicaClient = getHttpSolrClient(s.getReplicas(EnumSet.of(Replica.Type.PULL)).get(0).getCoreUrl())) {
assertNumDocs(10, pullReplicaClient);
}
assertNumDocs(10, cluster.getSolrClient());
} finally {
log.info("Opening leader node");
proxy.reopen();
}
// Back to normal
// Even if the leader is back to normal, the replica can get broken pipe for some time when trying to connect to it. The commit
// can fail if it's sent to the replica and it forwards it to the leader, and since it uses CUSC the error is hidden! That breaks
// the last part of this test.
// addDocs(20);
// assertNumDocs(20, cluster.getSolrClient(), 300);
// try (HttpSolrClient pullReplicaClient = getHttpSolrClient(s.getReplicas(EnumSet.of(Replica.Type.PULL)).get(0).getCoreUrl())) {
// assertNumDocs(20, pullReplicaClient);
// }
}
public ZkWriteCommand removeRoutingRule(final ClusterState clusterState, ZkNodeProps message) {
String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP);
if (!checkCollectionKeyExistence(message)) return ZkStateWriter.NO_OP;
String shard = message.getStr(ZkStateReader.SHARD_ID_PROP);
String routeKeyStr = message.getStr("routeKey");
log.info("Overseer.removeRoutingRule invoked for collection: {} shard: {} routeKey: {}"
, collectionName, shard, routeKeyStr);
DocCollection collection = clusterState.getCollection(collectionName);
Slice slice = collection.getSlice(shard);
if (slice == null) {
log.warn("Unknown collection: {} shard: {}", collectionName, shard);
return ZkStateWriter.NO_OP;
}
Map<String, RoutingRule> routingRules = slice.getRoutingRules();
if (routingRules != null) {
routingRules.remove(routeKeyStr); // no rules left
Map<String, Object> props = slice.shallowCopy();
props.put("routingRules", routingRules);
Slice newSlice = new Slice(slice.getName(), slice.getReplicasCopy(), props,collectionName);
return new ZkWriteCommand(collectionName,
CollectionMutator.updateSlice(collectionName, collection, newSlice));
}
return ZkStateWriter.NO_OP;
}
private static boolean isReplicaAvailable (Slice s, String coreName) {
for (Replica r: s.getReplicas()) {
if (coreName.equals(r.getCoreName())) {
return true;
}
}
return false;
}
/** Constructs a slices map from a collection of slices and handles disambiguation if multiple collections are being queried simultaneously */
public static void addSlices(Map<String,Slice> target, String collectionName, Collection<Slice> slices, boolean multiCollection) {
for (Slice slice : slices) {
String key = slice.getName();
if (multiCollection) key = collectionName + "_" + key;
target.put(key, slice);
}
}
/**
* Wait for all the collection shards to be ready.
*/
private static void waitForRecoveriesToFinish(CloudSolrClient server, String collection) throws KeeperException, InterruptedException {
ZkStateReader zkStateReader = server.getZkStateReader();
try {
boolean cont = true;
while (cont) {
boolean sawLiveRecovering = false;
zkStateReader.updateClusterState();
ClusterState clusterState = zkStateReader.getClusterState();
Map<String, Slice> slices = clusterState.getSlicesMap(collection);
Preconditions.checkNotNull("Could not find collection:" + collection, slices);
for (Map.Entry<String, Slice> entry : slices.entrySet()) {
Map<String, Replica> shards = entry.getValue().getReplicasMap();
for (Map.Entry<String, Replica> shard : shards.entrySet()) {
String state = shard.getValue().getStr(ZkStateReader.STATE_PROP);
if ((state.equals(Replica.State.RECOVERING.toString())
|| state.equals(Replica.State.DOWN.toString()))
&& clusterState.liveNodesContain(shard.getValue().getStr(
ZkStateReader.NODE_NAME_PROP))) {
sawLiveRecovering = true;
}
}
}
if (!sawLiveRecovering) {
cont = false;
} else {
Thread.sleep(1000);
}
}
} finally {
logger.info("Exiting solr wait");
}
}
public static Slice[] getSlices(String collectionName, ZkStateReader zkStateReader, boolean checkAlias) throws IOException {
ClusterState clusterState = zkStateReader.getClusterState();
Map<String, DocCollection> collectionsMap = clusterState.getCollectionsMap();
//TODO we should probably split collection by comma to query more than one
// which is something already supported in other parts of Solr
// check for alias or collection
List<String> allCollections = new ArrayList<>();
String[] collectionNames = collectionName.split(",");
for(String col : collectionNames) {
List<String> collections = checkAlias
? zkStateReader.getAliases().resolveAliases(col) // if not an alias, returns collectionName
: Collections.singletonList(collectionName);
allCollections.addAll(collections);
}
// Lookup all actives slices for these collections
List<Slice> slices = allCollections.stream()
.map(collectionsMap::get)
.filter(Objects::nonNull)
.flatMap(docCol -> Arrays.stream(docCol.getActiveSlicesArr()))
.collect(Collectors.toList());
if (!slices.isEmpty()) {
return slices.toArray(new Slice[slices.size()]);
}
// Check collection case insensitive
for(Entry<String, DocCollection> entry : collectionsMap.entrySet()) {
if(entry.getKey().equalsIgnoreCase(collectionName)) {
return entry.getValue().getActiveSlicesArr();
}
}
throw new IOException("Slices not found for " + collectionName);
}
protected List<String> getShardUrls() throws IOException {
try {
ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
Slice[] slices = CloudSolrStream.getSlices(this.collection, zkStateReader, false);
ClusterState clusterState = zkStateReader.getClusterState();
Set<String> liveNodes = clusterState.getLiveNodes();
List<String> baseUrls = new ArrayList<>();
for(Slice slice : slices) {
Collection<Replica> replicas = slice.getReplicas();
List<Replica> shuffler = new ArrayList<>();
for(Replica replica : replicas) {
if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName())) {
shuffler.add(replica);
}
}
Collections.shuffle(shuffler, new Random());
Replica rep = shuffler.get(0);
ZkCoreNodeProps zkProps = new ZkCoreNodeProps(rep);
String url = zkProps.getCoreUrl();
baseUrls.add(url);
}
return baseUrls;
} catch (Exception e) {
throw new IOException(e);
}
}
/**
* Get a (reproducibly) random replica from a {@link Slice}
*/
protected static Replica getRandomReplica(Slice slice) {
List<Replica> replicas = new ArrayList<>(slice.getReplicas());
if (replicas.size() == 0)
fail("Couldn't get random replica from shard as it has no replicas!\n" + slice.toString());
Collections.shuffle(replicas, random());
return replicas.get(0);
}
private void getPersistedCheckpoints() throws IOException {
ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
Slice[] slices = CloudSolrStream.getSlices(checkpointCollection, zkStateReader, false);
ClusterState clusterState = zkStateReader.getClusterState();
Set<String> liveNodes = clusterState.getLiveNodes();
OUTER:
for(Slice slice : slices) {
Collection<Replica> replicas = slice.getReplicas();
for(Replica replica : replicas) {
if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName())){
HttpSolrClient httpClient = streamContext.getSolrClientCache().getHttpSolrClient(replica.getCoreUrl());
try {
SolrDocument doc = httpClient.getById(id);
if(doc != null) {
@SuppressWarnings({"unchecked"})
List<String> checkpoints = (List<String>)doc.getFieldValue("checkpoint_ss");
for (String checkpoint : checkpoints) {
String[] pair = checkpoint.split("~");
this.checkpoints.put(pair[0], Long.parseLong(pair[1]));
}
}
} catch (Exception e) {
throw new IOException(e);
}
break OUTER;
}
}
}
}
protected SolrStream constructStream(String sql) throws IOException {
try {
ZkStateReader zkStateReader = this.connection.getClient().getZkStateReader();
Slice[] slices = CloudSolrStream.getSlices(this.connection.getCollection(), zkStateReader, true);
List<Replica> shuffler = new ArrayList<>();
for(Slice slice : slices) {
Collection<Replica> replicas = slice.getReplicas();
for (Replica replica : replicas) {
shuffler.add(replica);
}
}
Collections.shuffle(shuffler, new Random());
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CommonParams.QT, "/sql");
params.set("stmt", sql);
for(String propertyName : this.connection.getProperties().stringPropertyNames()) {
params.set(propertyName, this.connection.getProperties().getProperty(propertyName));
}
Replica rep = shuffler.get(0);
ZkCoreNodeProps zkProps = new ZkCoreNodeProps(rep);
String url = zkProps.getCoreUrl();
return new SolrStream(url, params);
} catch (Exception e) {
throw new IOException(e);
}
}