下面列出了怎么用org.apache.zookeeper.KeeperException.SessionExpiredException的API类实例代码及写法,或者点击链接到github查看源代码。
private String getLeaderUrl(final String collection, final String slice)
throws KeeperException, InterruptedException {
int iterCount = 60;
while (iterCount-- > 0) {
try {
byte[] data = zkClient.getData(
ZkStateReader.getShardLeadersPath(collection, slice), null, null,
true);
ZkCoreNodeProps leaderProps = new ZkCoreNodeProps(
ZkNodeProps.load(data));
return leaderProps.getCoreUrl();
} catch (NoNodeException | SessionExpiredException e) {
Thread.sleep(500);
}
}
zkClient.printLayoutToStream(System.out);
throw new RuntimeException("Could not get leader props for " + collection + " " + slice);
}
@Test
public void testGetPartitionsMetadata() throws Exception {
TopicName topic1 = TopicName.get("persistent://test/local/ns/my-topic-1");
PartitionedTopicMetadata m = service.getDiscoveryProvider().getPartitionedTopicMetadata(service, topic1, "role", null)
.get();
assertEquals(m.partitions, 0);
// Simulate ZK error
mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.GET
&& path.equals("/admin/partitioned-topics/test/local/ns/persistent/my-topic-2");
});
TopicName topic2 = TopicName.get("persistent://test/local/ns/my-topic-2");
CompletableFuture<PartitionedTopicMetadata> future = service.getDiscoveryProvider()
.getPartitionedTopicMetadata(service, topic2, "role", null);
try {
future.get();
fail("Partition metadata lookup should have failed");
} catch (ExecutionException e) {
assertEquals(e.getCause().getClass(), SessionExpiredException.class);
}
}
private boolean areAllReplicasParticipating() throws InterruptedException {
final String shardsElectZkPath = electionPath + LeaderElector.ELECTION_NODE;
final DocCollection docCollection = zkController.getClusterState().getCollectionOrNull(collection);
if (docCollection != null && docCollection.getSlice(shardId) != null) {
final Slice slices = docCollection.getSlice(shardId);
int found = 0;
try {
found = zkClient.getChildren(shardsElectZkPath, null, true).size();
} catch (KeeperException e) {
if (e instanceof KeeperException.SessionExpiredException) {
// if the session has expired, then another election will be launched, so
// quit here
throw new SolrException(ErrorCode.SERVER_ERROR,
"ZK session expired - cancelling election for " + collection + " " + shardId);
}
SolrException.log(log, "Error checking for the number of election participants", e);
}
if (found >= slices.getReplicasMap().size()) {
log.debug("All replicas are ready to participate in election.");
return true;
}
} else {
log.warn("Shard not found: {} for collection {}", shardId, collection);
return false;
}
return false;
}
private ZkController createMockZkController(String zkAddress, SolrZkClient zkClient, ZkStateReader reader) throws InterruptedException, NoSuchFieldException, SecurityException, SessionExpiredException {
ZkController zkController = mock(ZkController.class);
if (zkClient == null) {
SolrZkClient newZkClient = new SolrZkClient(server.getZkAddress(), AbstractZkTestCase.TIMEOUT);
Mockito.doAnswer(
new Answer<Void>() {
public Void answer(InvocationOnMock invocation) {
newZkClient.close();
return null;
}}).when(zkController).close();
zkClient = newZkClient;
} else {
doNothing().when(zkController).close();
}
CoreContainer mockAlwaysUpCoreContainer = mock(CoreContainer.class,
Mockito.withSettings().defaultAnswer(Mockito.CALLS_REAL_METHODS));
when(mockAlwaysUpCoreContainer.isShutDown()).thenReturn(testDone); // Allow retry on session expiry
when(mockAlwaysUpCoreContainer.getResourceLoader()).thenReturn(new SolrResourceLoader());
FieldSetter.setField(zkController, ZkController.class.getDeclaredField("zkClient"), zkClient);
FieldSetter.setField(zkController, ZkController.class.getDeclaredField("cc"), mockAlwaysUpCoreContainer);
when(zkController.getCoreContainer()).thenReturn(mockAlwaysUpCoreContainer);
when(zkController.getZkClient()).thenReturn(zkClient);
when(zkController.getZkStateReader()).thenReturn(reader);
when(zkController.getLeaderProps(anyString(), anyString(), anyInt())).thenCallRealMethod();
when(zkController.getLeaderProps(anyString(), anyString(), anyInt(), anyBoolean())).thenCallRealMethod();
doReturn(getCloudDataProvider(zkAddress, zkClient, reader))
.when(zkController).getSolrCloudManager();
return zkController;
}
private ZkController createZkController(SolrZkClient client) throws KeeperException, InterruptedException {
assumeWorkingMockito();
CoreContainer mockAlwaysUpCoreContainer = mock(CoreContainer.class,
Mockito.withSettings().defaultAnswer(Mockito.CALLS_REAL_METHODS));
when(mockAlwaysUpCoreContainer.isShutDown()).thenReturn(Boolean.FALSE); // Allow retry on session expiry
ZkController zkController = mock(ZkController.class,
Mockito.withSettings().defaultAnswer(Mockito.CALLS_REAL_METHODS));
when(zkController.getCoreContainer()).thenReturn(mockAlwaysUpCoreContainer);
when(zkController.getZkClient()).thenReturn(client);
Mockito.doAnswer(new Answer<Boolean>() {
volatile boolean sessionExpired=false;
@Override
public Boolean answer(InvocationOnMock invocation) throws Throwable {
String path = (String) invocation.getArguments()[0];
perhapsExpired();
Boolean exists = client.exists(path, true);
perhapsExpired();
return exists;
}
private void perhapsExpired() throws SessionExpiredException {
if (!sessionExpired && rarely()) {
sessionExpired = true;
throw new KeeperException.SessionExpiredException();
}
}
}).when(zkController).pathExists(Mockito.anyString());
return zkController;
}
/**
* A private method used to re-establish a zookeeper session with a peer cluster.
* @param ke
*/
protected void reconnect(KeeperException ke) {
if (ke instanceof ConnectionLossException || ke instanceof SessionExpiredException
|| ke instanceof AuthFailedException) {
String clusterKey = ctx.getPeerConfig().getClusterKey();
LOG.warn("Lost the ZooKeeper connection for peer " + clusterKey, ke);
try {
reloadZkWatcher();
} catch (IOException io) {
LOG.warn("Creation of ZookeeperWatcher failed for peer " + clusterKey, io);
}
}
}
@Override
public void abort(String reason, Throwable cause) {
if (cause instanceof SessionExpiredException) {
// called from ZKWatcher, let's wait a bit to make sure that we call stop before calling
// abort.
try {
latch.await();
} catch (InterruptedException e) {
}
} else {
// abort from other classes, usually LogRoller, now we can make progress on abort.
latch.countDown();
}
super.abort(reason, cause);
}
/**
* Get leader props directly from zk nodes.
* @throws SessionExpiredException on zk session expiration.
*/
public ZkCoreNodeProps getLeaderProps(final String collection,
final String slice, int timeoutms) throws InterruptedException, SessionExpiredException {
return getLeaderProps(collection, slice, timeoutms, true);
}
private boolean waitForReplicasToComeUp(int timeoutms) throws InterruptedException {
long timeoutAt = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutms, TimeUnit.MILLISECONDS);
final String shardsElectZkPath = electionPath + LeaderElector.ELECTION_NODE;
DocCollection docCollection = zkController.getClusterState().getCollectionOrNull(collection);
Slice slices = (docCollection == null) ? null : docCollection.getSlice(shardId);
int cnt = 0;
while (!isClosed && !cc.isShutDown()) {
// wait for everyone to be up
if (slices != null) {
int found = 0;
try {
found = zkClient.getChildren(shardsElectZkPath, null, true).size();
} catch (KeeperException e) {
if (e instanceof KeeperException.SessionExpiredException) {
// if the session has expired, then another election will be launched, so
// quit here
throw new SolrException(ErrorCode.SERVER_ERROR,
"ZK session expired - cancelling election for " + collection + " " + shardId);
}
SolrException.log(log,
"Error checking for the number of election participants", e);
}
// on startup and after connection timeout, wait for all known shards
if (found >= slices.getReplicas(EnumSet.of(Replica.Type.TLOG, Replica.Type.NRT)).size()) {
log.info("Enough replicas found to continue.");
return true;
} else {
if (cnt % 40 == 0) {
if (log.isInfoEnabled()) {
log.info("Waiting until we see more replicas up for shard {}: total={} found={} timeoute in={}ms"
, shardId, slices.getReplicas(EnumSet.of(Replica.Type.TLOG, Replica.Type.NRT)).size(), found,
TimeUnit.MILLISECONDS.convert(timeoutAt - System.nanoTime(), TimeUnit.NANOSECONDS));
}
}
}
if (System.nanoTime() > timeoutAt) {
log.info("Was waiting for replicas to come up, but they are taking too long - assuming they won't come back till later");
return false;
}
} else {
log.warn("Shard not found: {} for collection {}", shardId, collection);
return false;
}
Thread.sleep(500);
docCollection = zkController.getClusterState().getCollectionOrNull(collection);
slices = (docCollection == null) ? null : docCollection.getSlice(shardId);
cnt++;
}
return false;
}
void command() throws SessionExpiredException;