下面列出了org.springframework.boot.actuate.endpoint.annotation.ReadOperation#com.alibaba.nacos.api.naming.NamingService 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static void main(String[] args) throws NacosException {
Properties properties = new Properties();
properties.setProperty("serverAddr", Constants.NACOS_SERVER_ADDRESS);
properties.setProperty("namespace", Constants.NAMESPACE);
NamingService naming = NamingFactory.createNamingService(properties);
naming.subscribe(Constants.SERVICE_NAME, new EventListener() {
@Override
public void onEvent(Event event) {
NamingEvent namingEvent = (NamingEvent) event;
printInstances(namingEvent);
mockConsume(naming, Constants.SERVICE_NAME);
}
});
try {
int in = System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void mockConsume(NamingService naming, String serviceName) {
int i = 0, loop = 5;
try {
while (i++ < loop) {
Instance instance = naming.selectOneHealthyInstance(serviceName);
if (instance != null) {
System.out.println("get one healthy instance of service:" + serviceName
+ "\nip=" + instance.getIp() + ", port=" + instance.getPort()
+ ", cluster=" + instance.getClusterName()
+ "\n=========================================\n");
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws NacosException {
Properties properties = new Properties();
properties.setProperty("serverAddr", Constants.NACOS_SERVER_ADDRESS);
properties.setProperty("namespace", Constants.NAMESPACE);
NamingService naming = NamingFactory.createNamingService(properties);
naming.registerInstance(Constants.SERVICE_NAME, Constants.IP_1, Constants.PORT_1, Constants.CLUSTER_NAME_1);
naming.registerInstance(Constants.SERVICE_NAME, Constants.IP_2, Constants.PORT_2, Constants.CLUSTER_NAME_2);
List<Instance> instances = naming.getAllInstances(Constants.SERVICE_NAME);
System.out.println("getAllInstances after registered\ninstance size="
+ instances.size() + "\ninstance list=" + instances);
try {
int in = System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public List<TaskModel> getAllInstance(InstanceQueryModel instanceQueryModel) {
NamingService namingService = nacosServerHolder
.get(instanceQueryModel.getSourceClusterId(), instanceQueryModel.getGroupName());
try {
ListView<String> servicesOfServer = namingService
.getServicesOfServer(instanceQueryModel.getPageNo(),
instanceQueryModel.getPageSize());
return servicesOfServer.getData().stream()
.map(serviceName -> buildTaskModel(instanceQueryModel, serviceName))
.collect(Collectors.toList());
} catch (NacosException e) {
log.error("When using nacos client failure query tasks", e);
return Collections.emptyList();
}
}
@Override
public boolean delete(TaskDO taskDO) {
try {
NamingService sourceNamingService =
nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getGroupName());
NamingService destNamingService = nacosServerHolder.get(taskDO.getDestClusterId(), taskDO.getGroupName());
sourceNamingService.unsubscribe(taskDO.getServiceName(), nacosListenerMap.get(taskDO.getTaskId()));
// 删除目标集群中同步的实例列表
List<Instance> instances = destNamingService.getAllInstances(taskDO.getServiceName());
for (Instance instance : instances) {
if (needDelete(instance.getMetadata(), taskDO)) {
destNamingService.deregisterInstance(taskDO.getServiceName(), instance.getIp(), instance.getPort());
}
}
} catch (Exception e) {
log.error("delete task from nacos to nacos was failed, taskId:{}", taskDO.getTaskId(), e);
metricsManager.recordError(MetricsStatisticsType.DELETE_ERROR);
return false;
}
return true;
}
@Override
public boolean delete(TaskDO taskDO) {
try {
specialSyncEventBus.unsubscribe(taskDO);
NamingService destNamingService = nacosServerHolder.get(taskDO.getDestClusterId(), taskDO.getNameSpace());
List<Instance> allInstances = destNamingService.getAllInstances(taskDO.getServiceName());
deleteAllInstance(taskDO, destNamingService, allInstances);
} catch (Exception e) {
log.error("delete a task from eureka to nacos was failed, taskId:{}", taskDO.getTaskId(), e);
metricsManager.recordError(MetricsStatisticsType.DELETE_ERROR);
return false;
}
return true;
}
@Override
public boolean sync(TaskDO taskDO) {
try {
EurekaNamingService eurekaNamingService = eurekaServerHolder.get(taskDO.getSourceClusterId(), null);
NamingService destNamingService = nacosServerHolder.get(taskDO.getDestClusterId(), null);
List<InstanceInfo> eurekaInstances = eurekaNamingService.getApplications(taskDO.getServiceName());
List<Instance> nacosInstances = destNamingService.getAllInstances(taskDO.getServiceName());
if (CollectionUtils.isEmpty(eurekaInstances)) {
// Clear all instance from Nacos
deleteAllInstance(taskDO, destNamingService, nacosInstances);
} else {
if (!CollectionUtils.isEmpty(nacosInstances)) {
// Remove invalid instance from Nacos
removeInvalidInstance(taskDO, destNamingService, eurekaInstances, nacosInstances);
}
addValidInstance(taskDO, destNamingService, eurekaInstances);
}
specialSyncEventBus.subscribe(taskDO, this::sync);
} catch (Exception e) {
log.error("sync task from eureka to nacos was failed, taskId:{}", taskDO.getTaskId(), e);
metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR);
return false;
}
return true;
}
@Override
public boolean delete(TaskDO taskDO) {
try {
NamingService sourceNamingService =
nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getGroupName());
EurekaNamingService destNamingService =
eurekaServerHolder.get(taskDO.getDestClusterId(), taskDO.getGroupName());
sourceNamingService.unsubscribe(taskDO.getServiceName(), nacosListenerMap.get(taskDO.getTaskId()));
// 删除目标集群中同步的实例列表
List<InstanceInfo> allInstances = destNamingService.getApplications(taskDO.getServiceName());
if (allInstances != null) {
for (InstanceInfo instance : allInstances) {
if (needDelete(instance.getMetadata(), taskDO)) {
destNamingService.deregisterInstance(instance);
}
}
}
} catch (Exception e) {
log.error("delete task from nacos to eureka was failed, taskId:{}", taskDO.getTaskId(), e);
metricsManager.recordError(MetricsStatisticsType.DELETE_ERROR);
return false;
}
return true;
}
@Override
public boolean delete(TaskDO taskDO) {
try {
specialSyncEventBus.unsubscribe(taskDO);
NamingService destNamingService = nacosServerHolder.get(taskDO.getDestClusterId(), null);
List<Instance> allInstances = destNamingService.getAllInstances(taskDO.getServiceName());
for (Instance instance : allInstances) {
if (needDelete(instance.getMetadata(), taskDO)) {
destNamingService.deregisterInstance(taskDO.getServiceName(), instance.getIp(), instance.getPort());
}
}
} catch (Exception e) {
log.error("delete task from consul to nacos was failed, taskId:{}", taskDO.getTaskId(), e);
metricsManager.recordError(MetricsStatisticsType.DELETE_ERROR);
return false;
}
return true;
}
@Override
public boolean sync(TaskDO taskDO) {
try {
ConsulClient consulClient = consulServerHolder.get(taskDO.getSourceClusterId(), null);
NamingService destNamingService = nacosServerHolder.get(taskDO.getDestClusterId(), null);
Response<List<HealthService>> response =
consulClient.getHealthServices(taskDO.getServiceName(), true, QueryParams.DEFAULT);
List<HealthService> healthServiceList = response.getValue();
Set<String> instanceKeys = new HashSet<>();
overrideAllInstance(taskDO, destNamingService, healthServiceList, instanceKeys);
cleanAllOldInstance(taskDO, destNamingService, instanceKeys);
specialSyncEventBus.subscribe(taskDO, this::sync);
} catch (Exception e) {
log.error("Sync task from consul to nacos was failed, taskId:{}", taskDO.getTaskId(), e);
metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR);
return false;
}
return true;
}
@Override
public boolean delete(TaskDO taskDO) {
try {
NamingService sourceNamingService =
nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getGroupName());
EventListener eventListener = nacosListenerMap.remove(taskDO.getTaskId());
PathChildrenCache pathChildrenCache = pathChildrenCacheMap.get(taskDO.getTaskId());
sourceNamingService.unsubscribe(taskDO.getServiceName(), eventListener);
CloseableUtils.closeQuietly(pathChildrenCache);
Set<String> instanceUrlSet = instanceBackupMap.get(taskDO.getTaskId());
CuratorFramework client = zookeeperServerHolder.get(taskDO.getDestClusterId(), taskDO.getGroupName());
for (String instanceUrl : instanceUrlSet) {
client.delete().quietly().forPath(instanceUrl);
}
} catch (Exception e) {
log.error("delete task from nacos to zk was failed, taskId:{}", taskDO.getTaskId(), e);
metricsManager.recordError(MetricsStatisticsType.DELETE_ERROR);
return false;
}
return true;
}
private void tryToCompensate(TaskDO taskDO, NamingService sourceNamingService, List<Instance> sourceInstances) {
if (!CollectionUtils.isEmpty(sourceInstances)) {
final PathChildrenCache pathCache = getPathCache(taskDO);
if (pathCache.getListenable().size() == 0) { // 防止重复注册
pathCache.getListenable().addListener((zkClient, zkEvent) -> {
if (zkEvent.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED) {
List<Instance> allInstances =
sourceNamingService.getAllInstances(taskDO.getServiceName());
for (Instance instance : allInstances) {
String instanceUrl = buildSyncInstance(instance, taskDO);
String zkInstancePath = zkEvent.getData().getPath();
if (zkInstancePath.equals(instanceUrl)) {
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
.forPath(zkInstancePath);
break;
}
}
}
});
}
}
}
private void processEvent(TaskDO taskDO, NamingService destNamingService, PathChildrenCacheEvent event, String path,
Map<String, String> queryParam) throws NacosException {
Map<String, String> ipAndPortParam = parseIpAndPortString(path);
Instance instance = buildSyncInstance(queryParam, ipAndPortParam, taskDO);
switch (event.getType()) {
case CHILD_ADDED:
case CHILD_UPDATED:
destNamingService.registerInstance(
getServiceNameFromCache(taskDO.getTaskId(), queryParam), instance);
break;
case CHILD_REMOVED:
destNamingService.deregisterInstance(
getServiceNameFromCache(taskDO.getTaskId(), queryParam),
ipAndPortParam.get(INSTANCE_IP_KEY),
Integer.parseInt(ipAndPortParam.get(INSTANCE_PORT_KEY)));
nacosServiceNameMap.remove(taskDO.getTaskId());
break;
default:
break;
}
}
@Override
public boolean delete(TaskDO taskDO) {
try {
CloseableUtils.closeQuietly(pathChildrenCacheMap.get(taskDO.getTaskId()));
NamingService destNamingService = nacosServerHolder.get(taskDO.getDestClusterId(), null);
if (nacosServiceNameMap.containsKey(taskDO.getTaskId())) {
List<Instance> allInstances =
destNamingService.getAllInstances(nacosServiceNameMap.get(taskDO.getTaskId()));
for (Instance instance : allInstances) {
if (needDelete(instance.getMetadata(), taskDO)) {
destNamingService.deregisterInstance(instance.getServiceName(), instance.getIp(),
instance.getPort());
}
nacosServiceNameMap.remove(taskDO.getTaskId());
}
}
} catch (Exception e) {
log.error("delete task from zookeeper to nacos was failed, taskId:{}", taskDO.getTaskId(), e);
metricsManager.recordError(MetricsStatisticsType.DELETE_ERROR);
return false;
}
return true;
}
@SuppressWarnings("unchecked")
public void publishDeferService(ApplicationContext context) throws NacosException {
setApplicationContext(context);
for (DeferServiceHolder holder : deferServiceCache) {
final Object o = holder.getHolder();
final Properties properties = holder.getProperties();
if (o instanceof ConfigService) {
ConfigService configService = (ConfigService) o;
createWorkerManager.get(ServiceType.CONFIG).run(properties,
configService);
}
else if (o instanceof NamingService) {
NamingService namingService = (NamingService) o;
createWorkerManager.get(ServiceType.NAMING).run(properties,
namingService);
}
else if (o instanceof NamingMaintainService) {
NamingMaintainService maintainService = (NamingMaintainService) o;
createWorkerManager.get(ServiceType.MAINTAIN).run(properties,
maintainService);
}
}
deferServiceCache.clear();
}
@Override
protected void doHealthCheck(Health.Builder builder) throws Exception {
builder.up();
NacosServiceFactory nacosServiceFactory = CacheableEventPublishingNacosServiceFactory
.getSingleton();
for (NamingService namingService : nacosServiceFactory.getNamingServices()) {
if (namingService instanceof NacosServiceMetaData) {
NacosServiceMetaData nacosServiceMetaData = (NacosServiceMetaData) namingService;
Properties properties = nacosServiceMetaData.getProperties();
builder.withDetail(
JSON.toJSONString(
PropertiesUtils.extractSafeProperties(properties)),
namingService.getServerStatus());
}
if (!namingService.getServerStatus().equalsIgnoreCase(UP_STATUS)) {
builder.down();
}
}
}
@Override
public void deregister(Registration registration) {
log.info("De-registering from Nacos Server now...");
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No dom to de-register for nacos client...");
return;
}
NamingService namingService = namingService();
String serviceId = registration.getServiceId();
String group = nacosDiscoveryProperties.getGroup();
try {
namingService.deregisterInstance(serviceId, group, registration.getHost(),
registration.getPort(), nacosDiscoveryProperties.getClusterName());
}
catch (Exception e) {
log.error("ERR_NACOS_DEREGISTER, de-register failed...{},",
registration.toString(), e);
}
log.info("De-registration finished.");
}
/**
* @return nacos discovery endpoint
*/
@ReadOperation
public Map<String, Object> nacosDiscovery() {
Map<String, Object> result = new HashMap<>();
result.put("NacosDiscoveryProperties", nacosDiscoveryProperties);
NamingService namingService = nacosDiscoveryProperties.namingServiceInstance();
List<ServiceInfo> subscribe = Collections.emptyList();
try {
subscribe = namingService.getSubscribeServices();
}
catch (Exception e) {
log.error("get subscribe services from nacos fail,", e);
}
result.put("subscribe", subscribe);
return result;
}
@Test
@SuppressWarnings("unchecked")
public void testEmptyInstancesReturnsEmptyList() throws Exception {
NacosDiscoveryProperties nacosDiscoveryProperties = mock(
NacosDiscoveryProperties.class);
NamingService namingService = mock(NamingService.class);
when(nacosDiscoveryProperties.namingServiceInstance()).thenReturn(namingService);
when(namingService.selectInstances(anyString(), eq("DEFAULT"), eq(true)))
.thenReturn(null);
NacosServerList serverList = new NacosServerList(nacosDiscoveryProperties);
List<NacosServer> servers = serverList.getInitialListOfServers();
assertThat(servers).isEmpty();
}
public MultRegisterCenter(Map<String, NamingService> multEurekaMap, Map<String, MossNacosAutoServiceRegistration> multRegistrationMap,
Map<NamingService, HeartbeatMonitor> multHeartbeatMonitorMap) {
this.multNacosMap = multEurekaMap;
this.multRegistrationMap = multRegistrationMap;
this.multNacosCodeMap = multEurekaMap.entrySet().stream().collect(Collectors.toMap(e -> e.getValue(), e -> e.getKey()));
this.multHeartbeatMonitorMap=multHeartbeatMonitorMap;
}
@Bean
public MultRegisterCenter initMultNacos() {
URL_MAP.put("nacos1","127.0.0.1:8848");
Map<String, NamingService> multEurekaMap = Maps.newConcurrentMap();
Map<String, MossNacosAutoServiceRegistration> multRegistrationMap = Maps.newConcurrentMap();
Map<NamingService, HeartbeatMonitor> multHeartbeatMonitorMap=new ConcurrentHashMap<NamingService, HeartbeatMonitor>();
URL_MAP.entrySet().forEach(e -> {
NacosDiscoveryProperties nacosDiscoveryProperties=new NacosDiscoveryProperties();
nacosDiscoveryProperties.setService("halo-moss");
nacosDiscoveryProperties.setServerAddr(e.getValue());
try {
NamingService namingService=NacosFactory.createNamingService(e.getValue());
com.alibaba.nacos.api.naming.pojo.Instance instance = new com.alibaba.nacos.api.naming.pojo.Instance();
instance.setIp(inetUtils.findFirstNonLoopbackHostInfo().getIpAddress());
instance.setPort(-1);
instance.setWeight(1);
instance.setClusterName("DEFAULT");
namingService.registerInstance("halo-moss", instance);
this.context.publishEvent(
new InstanceRegisteredEvent<>(this, namingService));
multEurekaMap.put(e.getKey(),namingService);
multHeartbeatMonitorMap.put(namingService,new HeartbeatMonitor());
} catch (NacosException e1) {
e1.printStackTrace();
}
//NacosServiceRegistry serviceRegistry=new NacosServiceRegistry();
//AutoServiceRegistrationProperties autoServiceRegistrationProperties=new AutoServiceRegistrationProperties();
//MossNacosAutoServiceRegistration autoServiceRegistration = new MossNacosAutoServiceRegistration(serviceRegistry,autoServiceRegistrationProperties,registration,registration);
//autoServiceRegistration.setRegistration(registration);
//multRegistrationMap.put(e.getKey(), autoServiceRegistration);
});
multRegisterCenter = new MultRegisterCenter(multEurekaMap, multRegistrationMap,multHeartbeatMonitorMap);
return multRegisterCenter;
}
/**
* Create an instance of {@link NamingService} from specified {@link URL connection url}
*
* @param connectionURL {@link URL connection url}
* @return {@link NamingService}
* @since 2.7.5
*/
public static NamingService createNamingService(URL connectionURL) {
Properties nacosProperties = buildNacosProperties(connectionURL);
NamingService namingService;
try {
namingService = NacosFactory.createNamingService(nacosProperties);
} catch (NacosException e) {
if (logger.isErrorEnabled()) {
logger.error(e.getErrMsg(), e);
}
throw new IllegalStateException(e);
}
return namingService;
}
@Override
NamingService createServer(String clusterId, Supplier<String> serverAddressSupplier, String namespace) throws Exception {
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.SERVER_ADDR, serverAddressSupplier.get());
properties.setProperty(PropertyKeyConst.NAMESPACE, namespace);
return NamingFactory.createNamingService(properties);
}
private void addValidInstance(TaskDO taskDO, NamingService destNamingService, List<InstanceInfo> eurekaInstances)
throws NacosException {
for (InstanceInfo instance : eurekaInstances) {
if (needSync(instance.getMetadata())) {
log.info("Add service instance from Eureka, serviceName={}, Ip={}, port={}",
instance.getAppName(), instance.getIPAddr(), instance.getPort());
destNamingService.registerInstance(taskDO.getServiceName(), buildSyncInstance(instance, taskDO));
}
}
}
private void removeInvalidInstance(TaskDO taskDO, NamingService destNamingService,
List<InstanceInfo> eurekaInstances, List<Instance> nacosInstances) throws NacosException {
for (Instance instance : nacosInstances) {
if (!isExistInEurekaInstance(eurekaInstances, instance) && needDelete(instance.getMetadata(), taskDO)) {
log.info("Remove invalid service instance from Nacos, serviceName={}, Ip={}, port={}",
instance.getServiceName(), instance.getIp(), instance.getPort());
destNamingService.deregisterInstance(taskDO.getServiceName(), instance.getIp(), instance.getPort());
}
}
}
private void deleteAllInstance(TaskDO taskDO, NamingService destNamingService, List<Instance> allInstances)
throws NacosException {
for (Instance instance : allInstances) {
if (needDelete(instance.getMetadata(), taskDO)) {
destNamingService.deregisterInstance(taskDO.getServiceName(), instance);
}
}
}
@Override
public boolean delete(TaskDO taskDO) {
try {
NamingService sourceNamingService =
nacosServerHolder.get(taskDO.getSourceClusterId(), taskDO.getGroupName());
ConsulClient consulClient = consulServerHolder.get(taskDO.getDestClusterId(), taskDO.getGroupName());
sourceNamingService.unsubscribe(taskDO.getServiceName(), nacosListenerMap.get(taskDO.getTaskId()));
// 删除目标集群中同步的实例列表
Response<List<HealthService>> serviceResponse =
consulClient.getHealthServices(taskDO.getServiceName(), true, QueryParams.DEFAULT);
List<HealthService> healthServices = serviceResponse.getValue();
for (HealthService healthService : healthServices) {
if (needDelete(ConsulUtils.transferMetadata(healthService.getService().getTags()), taskDO)) {
consulClient.agentServiceDeregister(healthService.getService().getId());
}
}
} catch (Exception e) {
log.error("delete a task from nacos to nacos was failed, taskId:{}", taskDO.getTaskId(), e);
metricsManager.recordError(MetricsStatisticsType.DELETE_ERROR);
return false;
}
return true;
}
private void cleanAllOldInstance(TaskDO taskDO, NamingService destNamingService, Set<String> instanceKeys)
throws NacosException {
List<Instance> allInstances = destNamingService.getAllInstances(taskDO.getServiceName());
for (Instance instance : allInstances) {
if (needDelete(instance.getMetadata(), taskDO)
&& !instanceKeys.contains(composeInstanceKey(instance.getIp(), instance.getPort()))) {
destNamingService.deregisterInstance(taskDO.getServiceName(), instance.getIp(), instance.getPort());
}
}
}
private void overrideAllInstance(TaskDO taskDO, NamingService destNamingService,
List<HealthService> healthServiceList, Set<String> instanceKeys) throws NacosException {
for (HealthService healthService : healthServiceList) {
if (needSync(ConsulUtils.transferMetadata(healthService.getService().getTags()))) {
destNamingService.registerInstance(taskDO.getServiceName(),
buildSyncInstance(healthService, taskDO));
instanceKeys.add(composeInstanceKey(healthService.getService().getAddress(),
healthService.getService().getPort()));
}
}
}
private void registerAllInstances(TaskDO taskDO, PathChildrenCache pathChildrenCache,
NamingService destNamingService) throws NacosException {
List<ChildData> currentData = pathChildrenCache.getCurrentData();
for (ChildData childData : currentData) {
String path = childData.getPath();
Map<String, String> queryParam = parseQueryString(childData.getPath());
if (isMatch(taskDO, queryParam) && needSync(queryParam)) {
Map<String, String> ipAndPortParam = parseIpAndPortString(path);
Instance instance = buildSyncInstance(queryParam, ipAndPortParam, taskDO);
destNamingService.registerInstance(getServiceNameFromCache(taskDO.getTaskId(), queryParam),
instance);
}
}
}