下面列出了org.springframework.context.expression.MapAccessor#de.codecentric.boot.admin.server.domain.entities.Instance 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
protected Function<ClientResponse, Mono<Endpoints>> convert(Instance instance, String managementUrl) {
return (response) -> {
if (!response.statusCode().is2xxSuccessful()) {
log.debug("Querying actuator-index for instance {} on '{}' failed with status {}.", instance.getId(),
managementUrl, response.rawStatusCode());
return response.releaseBody().then(Mono.empty());
}
if (!response.headers().contentType().map(actuatorMediaType::isCompatibleWith).orElse(false)) {
log.debug("Querying actuator-index for instance {} on '{}' failed with incompatible Content-Type '{}'.",
instance.getId(), managementUrl,
response.headers().contentType().map(Objects::toString).orElse("(missing)"));
return response.releaseBody().then(Mono.empty());
}
log.debug("Querying actuator-index for instance {} on '{}' successful.", instance.getId(), managementUrl);
return response.bodyToMono(Response.class).flatMap(this::convertResponse)
.map(this.alignWithManagementUrl(instance.getId(), managementUrl));
};
}
@Test
public void should_retry() {
// given
Registration registration = Registration.create("foo", this.wireMock.url("/health")).build();
Instance instance = Instance.create(InstanceId.of("onl")).register(registration)
.withEndpoints(Endpoints.single("info", this.wireMock.url("/info"))).withStatusInfo(StatusInfo.ofUp());
StepVerifier.create(this.repository.save(instance)).expectNextCount(1).verifyComplete();
this.wireMock.stubFor(get("/info").inScenario("retry").whenScenarioStateIs(STARTED)
.willReturn(aResponse().withFixedDelay(5000)).willSetStateTo("recovered"));
String body = "{ \"foo\": \"bar\" }";
this.wireMock.stubFor(get("/info").inScenario("retry").whenScenarioStateIs("recovered")
.willReturn(okJson(body).withHeader("Content-Length", Integer.toString(body.length()))));
// when
StepVerifier.create(this.eventStore).expectSubscription()
.then(() -> StepVerifier.create(this.updater.updateInfo(instance.getId())).verifyComplete())
// then
.assertNext((event) -> assertThat(event).isInstanceOf(InstanceInfoChangedEvent.class)).thenCancel()
.verify();
StepVerifier.create(this.repository.find(instance.getId()))
.assertNext((app) -> assertThat(app.getInfo()).isEqualTo(Info.from(singletonMap("foo", "bar"))))
.verifyComplete();
}
@Test
public void should_return_empty() {
//given
Instance instance = Instance.create(InstanceId.of("id"))
.register(Registration.create("test", wireMock.url("/mgmt/health"))
.managementUrl(wireMock.url("/mgmt"))
.build());
wireMock.stubFor(options(urlEqualTo("/mgmt/stats")).willReturn(aResponse().withStatus(HttpStatus.NOT_FOUND_404)));
ProbeEndpointsStrategy strategy = new ProbeEndpointsStrategy(instanceWebClient, new String[]{"metrics:stats"});
//when
StepVerifier.create(strategy.detectEndpoints(instance))
//then
.verifyComplete();
}
@Test
public void should_clear_info_on_http_error() {
// given
Instance instance = Instance.create(InstanceId.of("onl"))
.register(Registration.create("foo", this.wireMock.url("/health")).build())
.withEndpoints(Endpoints.single("info", this.wireMock.url("/info"))).withStatusInfo(StatusInfo.ofUp())
.withInfo(Info.from(singletonMap("foo", "bar")));
StepVerifier.create(this.repository.save(instance)).expectNextCount(1).verifyComplete();
this.wireMock.stubFor(get("/info").willReturn(serverError()));
// when
StepVerifier.create(this.eventStore).expectSubscription()
.then(() -> StepVerifier.create(this.updater.updateInfo(instance.getId())).verifyComplete())
// then
.assertNext((event) -> assertThat(event).isInstanceOf(InstanceInfoChangedEvent.class)).thenCancel()
.verify();
StepVerifier.create(this.repository.find(instance.getId()))
.assertNext((app) -> assertThat(app.getInfo()).isEqualTo(Info.empty())).verifyComplete();
}
@Before
public void setup() {
eventStore = new InMemoryEventStore();
repository = new EventsourcingInstanceRepository(eventStore);
instance = Instance.create(InstanceId.of("id"))
.register(Registration.create("foo", wireMock.url("/health")).build());
StepVerifier.create(repository.save(instance)).expectNextCount(1).verifyComplete();
updater = new StatusUpdater(repository,
InstanceWebClient.builder()
.connectTimeout(Duration.ofSeconds(2))
.readTimeout(Duration.ofSeconds(2))
.retries(singletonMap(Endpoint.HEALTH, 1))
.build()
);
}
@Override
protected Mono<Void> doNotify(InstanceEvent event, Instance instance) {
return Mono.fromRunnable(() -> {
Context ctx = new Context();
ctx.setVariables(additionalProperties);
ctx.setVariable("baseUrl", this.baseUrl);
ctx.setVariable("event", event);
ctx.setVariable("instance", instance);
ctx.setVariable("lastStatus", getLastStatus(event.getInstance()));
try {
MimeMessage mimeMessage = mailSender.createMimeMessage();
MimeMessageHelper message = new MimeMessageHelper(mimeMessage, StandardCharsets.UTF_8.name());
message.setText(getBody(ctx).replaceAll("\\s+\\n", "\n"), true);
message.setSubject(getSubject(ctx));
message.setTo(this.to);
message.setCc(this.cc);
message.setFrom(this.from);
mailSender.send(mimeMessage);
} catch (MessagingException ex) {
throw new RuntimeException("Error sending mail notification", ex);
}
});
}
@Test
public void should_return_empty_on_wrong_content_type() {
// given
Instance instance = Instance.create(InstanceId.of("id")).register(Registration
.create("test", this.wireMock.url("/mgmt/health")).managementUrl(this.wireMock.url("/mgmt")).build());
String body = "HELLOW WORLD";
this.wireMock.stubFor(get("/mgmt").willReturn(ok(body).withHeader("Content-Type", MediaType.TEXT_PLAIN_VALUE)));
QueryIndexEndpointStrategy strategy = new QueryIndexEndpointStrategy(this.instanceWebClient);
// when
StepVerifier.create(strategy.detectEndpoints(instance))
// then
.verifyComplete();
}
@Test
public void should_return_endpoints() {
//given
Instance instance = Instance.create(InstanceId.of("id"))
.register(Registration.create("test", wireMock.url("/mgmt/health"))
.managementUrl(wireMock.url("/mgmt"))
.build());
String body = "{\"_links\":{\"metrics-requiredMetricName\":{\"templated\":true,\"href\":\"\\/mgmt\\/metrics\\/{requiredMetricName}\"},\"self\":{\"templated\":false,\"href\":\"\\/mgmt\"},\"metrics\":{\"templated\":false,\"href\":\"\\/mgmt\\/stats\"},\"info\":{\"templated\":false,\"href\":\"\\/mgmt\\/info\"}}}";
wireMock.stubFor(get("/mgmt").willReturn(ok(body).withHeader("Content-Type", ActuatorMediaType.V2_JSON)
.withHeader("Content-Length",
Integer.toString(body.length())
)));
QueryIndexEndpointStrategy strategy = new QueryIndexEndpointStrategy(instanceWebClient);
//when
StepVerifier.create(strategy.detectEndpoints(instance))
//then
.expectNext(Endpoints.single("metrics", "/mgmt/stats").withEndpoint("info", "/mgmt/info"))//
.verifyComplete();
}
@Test
public void should_return_empty_on_wrong_content_type() {
//given
Instance instance = Instance.create(InstanceId.of("id"))
.register(Registration.create("test", wireMock.url("/mgmt/health"))
.managementUrl(wireMock.url("/mgmt"))
.build());
String body = "HELLOW WORLD";
wireMock.stubFor(get("/mgmt").willReturn(ok(body).withHeader("Content-Type", MediaType.TEXT_PLAIN_VALUE)
.withHeader("Content-Length",
Integer.toString(body.length())
)));
QueryIndexEndpointStrategy strategy = new QueryIndexEndpointStrategy(instanceWebClient);
//when
StepVerifier.create(strategy.detectEndpoints(instance))
//then
.verifyComplete();
}
@Test
public void test_filter() {
TestNotifier delegate = new TestNotifier();
FilteringNotifier notifier = new FilteringNotifier(delegate, repository);
AbstractNotificationFilter trueFilter = new AbstractNotificationFilter() {
@Override
public boolean filter(InstanceEvent event, Instance instance) {
return true;
}
};
notifier.addFilter(trueFilter);
StepVerifier.create(notifier.notify(event)).verifyComplete();
assertThat(delegate.getEvents()).doesNotContain(event);
notifier.removeFilter(trueFilter.getId());
StepVerifier.create(notifier.notify(event)).verifyComplete();
assertThat(delegate.getEvents()).contains(event);
}
protected Object createDiscordNotification(InstanceEvent event, Instance instance) {
Map<String, Object> body = new HashMap<>();
body.put("content", createContent(event, instance));
body.put("tts", tts);
if (avatarUrl != null) {
body.put("avatar_url", avatarUrl);
}
if (username != null) {
body.put("username", username);
}
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.add(HttpHeaders.USER_AGENT, "RestTemplate");
return new HttpEntity<>(body, headers);
}
@Override
protected Mono<Void> doNotify(InstanceEvent event, Instance instance) {
return Mono.fromRunnable(() -> {
Context ctx = new Context();
ctx.setVariables(additionalProperties);
ctx.setVariable("baseUrl", this.baseUrl);
ctx.setVariable("event", event);
ctx.setVariable("instance", instance);
ctx.setVariable("lastStatus", getLastStatus(event.getInstance()));
try {
MimeMessage mimeMessage = mailSender.createMimeMessage();
MimeMessageHelper message = new MimeMessageHelper(mimeMessage, StandardCharsets.UTF_8.name());
message.setText(getBody(ctx).replaceAll("\\s+\\n", "\n"), true);
message.setSubject(getSubject(ctx));
message.setTo(this.to);
message.setCc(this.cc);
message.setFrom(this.from);
mailSender.send(mimeMessage);
}
catch (MessagingException ex) {
throw new RuntimeException("Error sending mail notification", ex);
}
});
}
@Test
public void refresh() {
// Given instance is already reegistered and has status and info.
StatusInfo status = StatusInfo.ofUp();
Info info = Info.from(singletonMap("foo", "bar"));
Registration registration = Registration.create("abc", "http://localhost:8080/health").build();
InstanceId id = idGenerator.generateId(registration);
Instance app = Instance.create(id).register(registration).withStatusInfo(status).withInfo(info);
StepVerifier.create(repository.save(app)).expectNextCount(1).verifyComplete();
// When instance registers second time
InstanceId refreshId = registry.register(Registration.create("abc", "http://localhost:8080/health").build())
.block();
assertThat(refreshId).isEqualTo(id);
StepVerifier.create(registry.getInstance(id)).assertNext(registered -> {
// Then info and status are retained
assertThat(registered.getInfo()).isEqualTo(info);
assertThat(registered.getStatusInfo()).isEqualTo(status);
}).verifyComplete();
}
protected Message createMessage(Instance instance, String registeredTitle, String activitySubtitle) {
List<Fact> facts = new ArrayList<>();
facts.add(new Fact(STATUS_KEY, instance.getStatusInfo().getStatus()));
facts.add(new Fact(SERVICE_URL_KEY, instance.getRegistration().getServiceUrl()));
facts.add(new Fact(HEALTH_URL_KEY, instance.getRegistration().getHealthUrl()));
facts.add(new Fact(MANAGEMENT_URL_KEY, instance.getRegistration().getManagementUrl()));
facts.add(new Fact(SOURCE_KEY, instance.getRegistration().getSource()));
Section section = Section.builder()
.activityTitle(instance.getRegistration().getName())
.activitySubtitle(activitySubtitle)
.facts(facts)
.build();
return Message.builder()
.title(registeredTitle)
.summary(messageSummary)
.themeColor(themeColor)
.sections(singletonList(section))
.build();
}
@Test
public void should_retry() {
// given
Instance instance = Instance.create(InstanceId.of("id")).register(Registration
.create("test", this.wireMock.url("/mgmt/health")).managementUrl(this.wireMock.url("/mgmt")).build());
this.wireMock.stubFor(options(urlEqualTo("/mgmt/metrics")).inScenario("retry").whenScenarioStateIs(STARTED)
.willReturn(aResponse().withFixedDelay(5000)).willSetStateTo("recovered"));
this.wireMock.stubFor(options(urlEqualTo("/mgmt/metrics")).inScenario("retry").whenScenarioStateIs("recovered")
.willReturn(ok()));
this.wireMock.stubFor(options(urlEqualTo("/mgmt/stats")).willReturn(ok()));
this.wireMock.stubFor(options(urlEqualTo("/mgmt/info")).willReturn(ok()));
this.wireMock.stubFor(options(urlEqualTo("/mgmt/non-exist")).willReturn(notFound()));
ProbeEndpointsStrategy strategy = new ProbeEndpointsStrategy(this.instanceWebClient,
new String[] { "metrics:stats", "metrics", "info", "non-exist" });
// when
StepVerifier.create(strategy.detectEndpoints(instance))
// then
.expectNext(Endpoints.single("metrics", this.wireMock.url("/mgmt/stats")).withEndpoint("info",
this.wireMock.url("/mgmt/info")))//
.verifyComplete();
}
@Test
public void should_return_endpoints_with_aligned_scheme() {
// given
Instance instance = Instance.create(InstanceId.of("id")).register(Registration
.create("test", this.wireMock.url("/mgmt/health")).managementUrl(this.wireMock.url("/mgmt")).build());
String host = "http://localhost:" + this.wireMock.httpsPort();
String body = "{\"_links\":{\"metrics-requiredMetricName\":{\"templated\":true,\"href\":\"" + host
+ "/mgmt/metrics/{requiredMetricName}\"},\"self\":{\"templated\":false,\"href\":\"" + host
+ "/mgmt\"},\"metrics\":{\"templated\":false,\"href\":\"" + host
+ "/mgmt/stats\"},\"info\":{\"templated\":false,\"href\":\"" + host + "/mgmt/info\"}}}";
this.wireMock.stubFor(get("/mgmt").willReturn(ok(body).withHeader("Content-Type", ActuatorMediaType.V2_JSON)));
QueryIndexEndpointStrategy strategy = new QueryIndexEndpointStrategy(this.instanceWebClient);
// when
String secureHost = "https://localhost:" + this.wireMock.httpsPort();
StepVerifier.create(strategy.detectEndpoints(instance))
// then
.expectNext(Endpoints.single("metrics", secureHost + "/mgmt/stats").withEndpoint("info",
secureHost + "/mgmt/info"))//
.verifyComplete();
}
@Override
public HttpHeaders getHeaders(Instance instance) {
HttpHeaders headers = super.getHeaders(instance);
Map<String, List<String>> headerMap = new HashMap<>(1);
DiscoveryHeaderHelper.getInstance().getRequestHeaderInfo().forEach((key, value) -> {
headerMap.put(key, Collections.singletonList(value));
});
headers.putAll(headerMap);
return headers;
}
private Mono<ClientResponse> forward(Instance instance,
URI uri,
HttpMethod method,
HttpHeaders headers,
Supplier<BodyInserter<?, ? super ClientHttpRequest>> bodyInserter) {
WebClient.RequestBodySpec bodySpec = instanceWebClient.instance(instance)
.method(method)
.uri(uri)
.headers(h -> h.addAll(filterHeaders(headers)));
WebClient.RequestHeadersSpec<?> headersSpec = bodySpec;
if (requiresBody(method)) {
try {
headersSpec = bodySpec.body(bodyInserter.get());
} catch (Exception ex) {
return Mono.error(ex);
}
}
return headersSpec.exchange().onErrorResume(ReadTimeoutException.class, ex -> Mono.fromSupplier(() -> {
log.trace("Timeout for Proxy-Request for instance {} with URL '{}'", instance.getId(), uri);
return ClientResponse.create(HttpStatus.GATEWAY_TIMEOUT, strategies).build();
})).onErrorResume(ResolveEndpointException.class, ex -> Mono.fromSupplier(() -> {
log.trace("No Endpoint found for Proxy-Request for instance {} with URL '{}'", instance.getId(), uri);
return ClientResponse.create(HttpStatus.NOT_FOUND, strategies).build();
})).onErrorResume(IOException.class, ex -> Mono.fromSupplier(() -> {
log.trace("Proxy-Request for instance {} with URL '{}' errored", instance.getId(), uri, ex);
return ClientResponse.create(HttpStatus.BAD_GATEWAY, strategies).build();
})).onErrorResume(ConnectException.class, ex -> Mono.fromSupplier(() -> {
log.trace("Connect for Proxy-Request for instance {} with URL '{}' failed", instance.getId(), uri, ex);
return ClientResponse.create(HttpStatus.BAD_GATEWAY, strategies).build();
}));
}
private Mono<Instance> doDetectEndpoints(Instance instance) {
if (!StringUtils.hasText(instance.getRegistration().getManagementUrl()) || instance.getStatusInfo().isOffline()
|| instance.getStatusInfo().isUnknown()) {
return Mono.empty();
}
log.debug("Detect endpoints for {}", instance);
return strategy.detectEndpoints(instance).map(instance::withEndpoints);
}
@Test
public void should_return_empty_when_mgmt_equals_service_url() {
// given
Instance instance = Instance.create(InstanceId.of("id"))
.register(Registration.create("test", this.wireMock.url("/app/health"))
.managementUrl(this.wireMock.url("/app")).serviceUrl(this.wireMock.url("/app")).build());
QueryIndexEndpointStrategy strategy = new QueryIndexEndpointStrategy(this.instanceWebClient);
// when/then
StepVerifier.create(strategy.detectEndpoints(instance)).verifyComplete();
this.wireMock.verify(0, anyRequestedFor(urlPathEqualTo("/app")));
}
@GetMapping(path = "/applications", produces = MediaType.APPLICATION_JSON_VALUE)
public Flux<Application> applications() {
return registry.getInstances()
.filter(Instance::isRegistered)
.groupBy(instance -> instance.getRegistration().getName())
.flatMap(grouped -> toApplication(grouped.key(), grouped));
}
@GetMapping(path = "/applications/{name}", produces = MediaType.APPLICATION_JSON_VALUE)
public Mono<ResponseEntity<Application>> application(@PathVariable("name") String name) {
return this.toApplication(name, registry.getInstances(name).filter(Instance::isRegistered))
.filter(a -> !a.getInstances().isEmpty())
.map(ResponseEntity::ok)
.defaultIfEmpty(ResponseEntity.notFound().build());
}
@Test
public void should_error_on_missing_endpoint() {
Instance instance = Instance.create(InstanceId.of("id"))
.register(Registration.create("test", wireMock.url("/status")).build());
Mono<ClientResponse> exchange = instanceWebClient.instance(instance).get().uri("/").exchange();
StepVerifier.create(exchange)
.verifyErrorSatisfies(ex -> assertThat(ex).isInstanceOf(ResolveEndpointException.class)
.hasMessageContaining("No endpoint specified"));
}
/**
* Register instance.
* @param registration instance to be registered.
* @return the id of the registered instance.
*/
public Mono<InstanceId> register(Registration registration) {
Assert.notNull(registration, "'registration' must not be null");
InstanceId id = generator.generateId(registration);
Assert.notNull(id, "'id' must not be null");
return repository.compute(id, (key, instance) -> {
if (instance == null) {
instance = Instance.create(key);
}
return Mono.just(instance.register(registration));
}).map(Instance::getId);
}
@Test
public void should_retry() {
//given
Registration registration = Registration.create("foo", wireMock.url("/health")).build();
Instance instance = Instance.create(InstanceId.of("onl"))
.register(registration)
.withEndpoints(Endpoints.single("info", wireMock.url("/info")))
.withStatusInfo(StatusInfo.ofUp());
StepVerifier.create(repository.save(instance)).expectNextCount(1).verifyComplete();
wireMock.stubFor(get("/info").inScenario("retry")
.whenScenarioStateIs(STARTED)
.willReturn(aResponse().withFixedDelay(5000))
.willSetStateTo("recovered"));
String body = "{ \"foo\": \"bar\" }";
wireMock.stubFor(get("/info").inScenario("retry")
.whenScenarioStateIs("recovered")
.willReturn(okJson(body).withHeader("Content-Length",
Integer.toString(body.length())
)));
//when
StepVerifier.create(eventStore)
.expectSubscription()
.then(() -> StepVerifier.create(updater.updateInfo(instance.getId())).verifyComplete())
//then
.assertNext(event -> assertThat(event).isInstanceOf(InstanceInfoChangedEvent.class))
.thenCancel()
.verify();
StepVerifier.create(repository.find(instance.getId()))
.assertNext(app -> assertThat(app.getInfo()).isEqualTo(Info.from(singletonMap("foo", "bar"))))
.verifyComplete();
}
protected String buildUrl(InstanceEvent event, Instance instance) {
if ((event instanceof InstanceStatusChangedEvent)
&& (StatusInfo.STATUS_UP.equals(((InstanceStatusChangedEvent) event).getStatusInfo().getStatus()))) {
return String.format("%s/%s/close", url.toString(), generateAlias(instance));
}
return url.toString();
}
@Bean
public MultRegisterCenter initMultNacos() {
URL_MAP.put("nacos1","127.0.0.1:8848");
Map<String, NamingService> multEurekaMap = Maps.newConcurrentMap();
Map<String, MossNacosAutoServiceRegistration> multRegistrationMap = Maps.newConcurrentMap();
Map<NamingService, HeartbeatMonitor> multHeartbeatMonitorMap=new ConcurrentHashMap<NamingService, HeartbeatMonitor>();
URL_MAP.entrySet().forEach(e -> {
NacosDiscoveryProperties nacosDiscoveryProperties=new NacosDiscoveryProperties();
nacosDiscoveryProperties.setService("halo-moss");
nacosDiscoveryProperties.setServerAddr(e.getValue());
try {
NamingService namingService=NacosFactory.createNamingService(e.getValue());
com.alibaba.nacos.api.naming.pojo.Instance instance = new com.alibaba.nacos.api.naming.pojo.Instance();
instance.setIp(inetUtils.findFirstNonLoopbackHostInfo().getIpAddress());
instance.setPort(-1);
instance.setWeight(1);
instance.setClusterName("DEFAULT");
namingService.registerInstance("halo-moss", instance);
this.context.publishEvent(
new InstanceRegisteredEvent<>(this, namingService));
multEurekaMap.put(e.getKey(),namingService);
multHeartbeatMonitorMap.put(namingService,new HeartbeatMonitor());
} catch (NacosException e1) {
e1.printStackTrace();
}
//NacosServiceRegistry serviceRegistry=new NacosServiceRegistry();
//AutoServiceRegistrationProperties autoServiceRegistrationProperties=new AutoServiceRegistrationProperties();
//MossNacosAutoServiceRegistration autoServiceRegistration = new MossNacosAutoServiceRegistration(serviceRegistry,autoServiceRegistrationProperties,registration,registration);
//autoServiceRegistration.setRegistration(registration);
//multRegistrationMap.put(e.getKey(), autoServiceRegistration);
});
multRegisterCenter = new MultRegisterCenter(multEurekaMap, multRegistrationMap,multHeartbeatMonitorMap);
return multRegisterCenter;
}
@Test
public void should_return_empty_endpoints_when_all_empty() {
// given
Instance instance = Instance.create(InstanceId.of("id"));
ChainingStrategy strategy = new ChainingStrategy((a) -> Mono.empty());
// when/then
StepVerifier.create(strategy.detectEndpoints(instance)).expectNext(Endpoints.empty()).verifyComplete();
}
public static ExchangeFilterFunction toExchangeFilterFunction(InstanceExchangeFilterFunction delegate) {
return (request, next) -> {
Optional<?> instance = request.attribute(ATTRIBUTE_INSTANCE);
if (instance.isPresent() && instance.get() instanceof Instance) {
return delegate.exchange((Instance) instance.get(), request, next);
}
return next.exchange(request);
};
}
@Test
public void should_return_empty_endpoints_when_all_empty() {
//given
Instance instance = Instance.create(InstanceId.of("id"));
ChainingStrategy strategy = new ChainingStrategy((a) -> Mono.empty());
//when/then
StepVerifier.create(strategy.detectEndpoints(instance)).expectNext(Endpoints.empty()).verifyComplete();
}