下面列出了org.springframework.context.expression.MapAccessor#de.codecentric.boot.admin.server.domain.events.InstanceEvent 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void should_store_events() {
InstanceEventStore store = createStore(100);
StepVerifier.create(store.findAll()).verifyComplete();
Instant now = Instant.now();
InstanceEvent event1 = new InstanceRegisteredEvent(id, 0L, now, registration);
InstanceEvent eventOther = new InstanceRegisteredEvent(InstanceId.of("other"), 0L, now.plusMillis(10),
registration);
InstanceEvent event2 = new InstanceDeregisteredEvent(id, 1L, now.plusMillis(20));
StepVerifier.create(store).expectSubscription()
.then(() -> StepVerifier.create(store.append(singletonList(event1))).verifyComplete())
.expectNext(event1)
.then(() -> StepVerifier.create(store.append(singletonList(eventOther))).verifyComplete())
.expectNext(eventOther)
.then(() -> StepVerifier.create(store.append(singletonList(event2))).verifyComplete())
.expectNext(event2).thenCancel().verify();
StepVerifier.create(store.find(id)).expectNext(event1, event2).verifyComplete();
StepVerifier.create(store.find(InstanceId.of("-"))).verifyComplete();
StepVerifier.create(store.findAll()).expectNext(event1, eventOther, event2).verifyComplete();
}
@Override
protected Mono<Void> sendNotifications(InstanceEvent event) {
while (true) {
Long lastSentEvent = this.sentNotifications.getOrDefault(event.getInstance(), -1L);
if (lastSentEvent >= event.getVersion()) {
log.debug("Notifications already sent. Not triggering notifiers for {}", event);
return Mono.empty();
}
if (lastSentEvent < 0) {
if (this.sentNotifications.putIfAbsent(event.getInstance(), event.getVersion()) == null) {
log.debug("Triggering notifiers for {}", event);
return super.sendNotifications(event);
}
}
else {
if (this.sentNotifications.replace(event.getInstance(), lastSentEvent, event.getVersion())) {
log.debug("Triggering notifiers for {}", event);
return super.sendNotifications(event);
}
}
}
}
@Override
protected Mono<Void> doNotify(InstanceEvent event, Instance instance) {
return Mono.fromRunnable(() -> {
Context ctx = new Context();
ctx.setVariables(additionalProperties);
ctx.setVariable("baseUrl", this.baseUrl);
ctx.setVariable("event", event);
ctx.setVariable("instance", instance);
ctx.setVariable("lastStatus", getLastStatus(event.getInstance()));
try {
MimeMessage mimeMessage = mailSender.createMimeMessage();
MimeMessageHelper message = new MimeMessageHelper(mimeMessage, StandardCharsets.UTF_8.name());
message.setText(getBody(ctx).replaceAll("\\s+\\n", "\n"), true);
message.setSubject(getSubject(ctx));
message.setTo(this.to);
message.setCc(this.cc);
message.setFrom(this.from);
mailSender.send(mimeMessage);
}
catch (MessagingException ex) {
throw new RuntimeException("Error sending mail notification", ex);
}
});
}
@Override
protected Mono<Void> doNotify(InstanceEvent event, Instance instance) {
return Mono.fromRunnable(() -> {
Context ctx = new Context();
ctx.setVariables(additionalProperties);
ctx.setVariable("baseUrl", this.baseUrl);
ctx.setVariable("event", event);
ctx.setVariable("instance", instance);
ctx.setVariable("lastStatus", getLastStatus(event.getInstance()));
try {
MimeMessage mimeMessage = mailSender.createMimeMessage();
MimeMessageHelper message = new MimeMessageHelper(mimeMessage, StandardCharsets.UTF_8.name());
message.setText(getBody(ctx).replaceAll("\\s+\\n", "\n"), true);
message.setSubject(getSubject(ctx));
message.setTo(this.to);
message.setCc(this.cc);
message.setFrom(this.from);
mailSender.send(mimeMessage);
} catch (MessagingException ex) {
throw new RuntimeException("Error sending mail notification", ex);
}
});
}
@Test
public void should_resubscribe_after_error() {
TestPublisher<InstanceEvent> eventPublisher = TestPublisher.create();
Flux<InstanceEvent> emittedNotifications = Flux.create((emitter) -> {
Notifier notifier = (event) -> {
emitter.next(event);
if (event.equals(errorTriggeringEvent)) {
return Mono.error(new IllegalArgumentException("TEST-ERROR"));
}
return Mono.empty();
};
RemindingNotifier reminder = new RemindingNotifier(notifier, this.repository);
eventPublisher.flux().flatMap(reminder::notify).subscribe();
reminder.setCheckReminderInverval(Duration.ofMillis(10));
reminder.setReminderPeriod(Duration.ofMillis(10));
reminder.start();
});
StepVerifier.create(emittedNotifications).expectSubscription().then(() -> eventPublisher.next(appDown))
.expectNext(appDown, appDown).then(() -> eventPublisher.next(errorTriggeringEvent))
.thenConsumeWhile((e) -> !e.equals(errorTriggeringEvent))
.expectNext(errorTriggeringEvent, appDown, appDown).thenCancel().verify();
}
@Test
public void test_filter() {
TestNotifier delegate = new TestNotifier();
FilteringNotifier notifier = new FilteringNotifier(delegate, repository);
AbstractNotificationFilter trueFilter = new AbstractNotificationFilter() {
@Override
public boolean filter(InstanceEvent event, Instance instance) {
return true;
}
};
notifier.addFilter(trueFilter);
StepVerifier.create(notifier.notify(event)).verifyComplete();
assertThat(delegate.getEvents()).doesNotContain(event);
notifier.removeFilter(trueFilter.getId());
StepVerifier.create(notifier.notify(event)).verifyComplete();
assertThat(delegate.getEvents()).contains(event);
}
protected Object createMessage(InstanceEvent event, Instance instance) {
Map<String, Object> messageJson = new HashMap<>();
messageJson.put("username", username);
if (icon != null) {
messageJson.put("icon_emoji", ":" + icon + ":");
}
if (channel != null) {
messageJson.put("channel", channel);
}
Map<String, Object> attachments = new HashMap<>();
attachments.put("text", getText(event, instance));
attachments.put("color", getColor(event));
attachments.put("mrkdwn_in", Collections.singletonList("text"));
messageJson.put("attachments", Collections.singletonList(attachments));
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
return new HttpEntity<>(messageJson, headers);
}
@Bean(initMethod = "start", destroyMethod = "stop")
@ConditionalOnMissingBean
public StatusUpdateTrigger statusUpdateTrigger(StatusUpdater statusUpdater, Publisher<InstanceEvent> events) {
StatusUpdateTrigger trigger = new StatusUpdateTrigger(statusUpdater, events);
trigger.setInterval(this.adminServerProperties.getMonitor().getStatusInterval());
trigger.setLifetime(this.adminServerProperties.getMonitor().getStatusLifetime());
return trigger;
}
@Test
public void verifyDeserializeOfInstanceStatusChangedEvent() throws JSONException, JsonProcessingException {
String json = new JSONObject().put("instance", "test123").put("timestamp", 1587751031.000000000)
.put("type", "STATUS_CHANGED").put("statusInfo", new JSONObject().put("status", "OFFLINE"))
.toString();
InstanceEvent event = objectMapper.readValue(json, InstanceEvent.class);
assertThat(event).isInstanceOf(InstanceStatusChangedEvent.class);
}
@Test
public void verifyDeserializeOfInstanceEndpointsDetectedEvent() throws JSONException, JsonProcessingException {
String json = new JSONObject().put("instance", "test123").put("timestamp", 1587751031.000000000)
.put("type", "ENDPOINTS_DETECTED").toString();
InstanceEvent event = objectMapper.readValue(json, InstanceEvent.class);
assertThat(event).isInstanceOf(InstanceEndpointsDetectedEvent.class);
}
@Override
protected Publisher<Void> handle(Flux<InstanceEvent> publisher) {
Scheduler scheduler = Schedulers.newSingle("endpoint-detector");
return publisher.subscribeOn(scheduler)
.filter(event -> event instanceof InstanceStatusChangedEvent ||
event instanceof InstanceRegistrationUpdatedEvent)
.flatMap(this::detectEndpoints)
.doFinally(s -> scheduler.dispose());
}
protected HttpEntity<Map<String, Object>> createHipChatNotification(InstanceEvent event, Instance instance) {
Map<String, Object> body = new HashMap<>();
body.put("color", getColor(event));
body.put("message", getMessage(event, instance));
body.put("notify", getNotify());
body.put("message_format", "html");
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
return new HttpEntity<>(body, headers);
}
@Bean(initMethod = "start", destroyMethod = "stop")
@ConditionalOnMissingBean
public InfoUpdateTrigger infoUpdateTrigger(InfoUpdater infoUpdater, Publisher<InstanceEvent> events) {
InfoUpdateTrigger trigger = new InfoUpdateTrigger(infoUpdater, events);
trigger.setInterval(this.adminServerProperties.getMonitor().getInfoInterval());
trigger.setLifetime(this.adminServerProperties.getMonitor().getInfoLifetime());
return trigger;
}
@Test
public void should_throw_when_applied_wrong_event() {
Instance instance = Instance.create(InstanceId.of("id"));
assertThatThrownBy(() -> instance.apply((InstanceEvent) null)).isInstanceOf(IllegalArgumentException.class)
.hasMessage("'event' must not be null");
assertThatThrownBy(() -> instance.apply(new InstanceDeregisteredEvent(InstanceId.of("wrong"), 0L)))
.isInstanceOf(IllegalArgumentException.class).hasMessage("'event' must refer the same instance");
assertThatThrownBy(() -> instance.apply(new InstanceDeregisteredEvent(InstanceId.of("id"), 1L))
.apply(new InstanceDeregisteredEvent(InstanceId.of("id"), 1L)))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Event 1 must be greater or equal to 2");
}
@Override
public Mono<Void> append(List<InstanceEvent> events) {
return Mono.fromRunnable(() -> {
while (true) {
if (doAppend(events)) {
return;
}
}
});
}
@Test
public void should_not_propagate_error() {
Notifier notifier = new AbstractStatusChangeNotifier(repository) {
@Override
protected Mono<Void> doNotify(InstanceEvent event, Instance application) {
return Mono.error(new IllegalStateException("test"));
}
};
StepVerifier
.create(notifier.notify(
new InstanceStatusChangedEvent(instance.getId(), instance.getVersion(), StatusInfo.ofUp())))
.verifyComplete();
}
protected HttpEntity<?> createRequest(InstanceEvent event, Instance instance) {
Map<String, Object> body = new HashMap<>();
if (user != null) {
body.put("user", user);
}
if (source != null) {
body.put("source", source);
}
if (event instanceof InstanceStatusChangedEvent &&
!StatusInfo.STATUS_UP.equals(((InstanceStatusChangedEvent) event).getStatusInfo().getStatus())) {
body.put("message", getMessage(event, instance));
body.put("alias", generateAlias(instance));
body.put("description", getDescription(event, instance));
if (actions != null) {
body.put("actions", actions);
}
if (tags != null) {
body.put("tags", tags);
}
if (entity != null) {
body.put("entity", entity);
}
Map<String, Object> details = new HashMap<>();
details.put("type", "link");
details.put("href", instance.getRegistration().getHealthUrl());
details.put("text", "Instance health-endpoint");
body.put("details", details);
}
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.set(HttpHeaders.AUTHORIZATION, "GenieKey " + apiKey);
return new HttpEntity<>(body, headers);
}
@Nullable
protected String getText(InstanceEvent event, Instance instance) {
Map<String, Object> root = new HashMap<>();
root.put("event", event);
root.put("instance", instance);
root.put("lastStatus", getLastStatus(event.getInstance()));
StandardEvaluationContext context = new StandardEvaluationContext(root);
context.addPropertyAccessor(new MapAccessor());
return message.getValue(context, String.class);
}
private Map<String, Object> createMessage(InstanceEvent event, Instance instance) {
Map<String, Object> parameters = new HashMap<>();
parameters.put("chat_id", this.chatId);
parameters.put("parse_mode", this.parseMode);
parameters.put("disable_notification", this.disableNotify);
parameters.put("text", getText(event, instance));
return parameters;
}
@Test
public void should_shorten_log_on_exceeded_capacity() {
InstanceEventStore store = createStore(2);
InstanceEvent event1 = new InstanceRegisteredEvent(id, 0L, registration);
InstanceEvent event2 = new InstanceStatusChangedEvent(id, 1L, StatusInfo.ofDown());
InstanceEvent event3 = new InstanceStatusChangedEvent(id, 2L, StatusInfo.ofUp());
StepVerifier.create(store.append(asList(event1, event2, event3))).verifyComplete();
StepVerifier.create(store.findAll()).expectNext(event1, event3).verifyComplete();
}
@Nullable
protected String getDescription(InstanceEvent event, Instance instance) {
Map<String, Object> root = new HashMap<>();
root.put("event", event);
root.put("instance", instance);
root.put("lastStatus", getLastStatus(event.getInstance()));
StandardEvaluationContext context = new StandardEvaluationContext(root);
context.addPropertyAccessor(new MapAccessor());
return description.getValue(context, String.class);
}
protected String getColor(InstanceEvent event) {
if (event instanceof InstanceStatusChangedEvent) {
return StatusInfo.STATUS_UP.equals(((InstanceStatusChangedEvent) event).getStatusInfo().getStatus())
? "good" : "danger";
}
else {
return "#439FE0";
}
}
@Override
protected Mono<Void> doNotify(InstanceEvent event, Instance instance) {
return Mono.fromRunnable(() -> restTemplate.postForEntity(buildUrl(),
createHipChatNotification(event, instance),
Void.class
));
}
protected HttpEntity<Map<String, Object>> createHipChatNotification(InstanceEvent event, Instance instance) {
Map<String, Object> body = new HashMap<>();
body.put("color", getColor(event));
body.put("message", getMessage(event, instance));
body.put("notify", getNotify());
body.put("message_format", "html");
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
return new HttpEntity<>(body, headers);
}
@Nullable
protected String getDescription(InstanceEvent event, Instance instance) {
Map<String, Object> root = new HashMap<>();
root.put("event", event);
root.put("instance", instance);
root.put("lastStatus", getLastStatus(event.getInstance()));
StandardEvaluationContext context = new StandardEvaluationContext(root);
context.addPropertyAccessor(new MapAccessor());
return description.getValue(context, String.class);
}
@Override
protected Mono<Void> doNotify(InstanceEvent event, Instance instance) {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
// Let's Chat requiers the token as basic username, the password can be an
// arbitrary string.
String auth = Base64Utils
.encodeToString(String.format("%s:%s", token, username).getBytes(StandardCharsets.UTF_8));
headers.add(HttpHeaders.AUTHORIZATION, String.format("Basic %s", auth));
return Mono.fromRunnable(() -> restTemplate.exchange(createUrl(), HttpMethod.POST,
new HttpEntity<>(createMessage(event, instance), headers), Void.class));
}
@Override
public Mono<Void> notify(InstanceEvent event) {
if (event.getInstance().getValue().equals("ERROR")) {
throw new IllegalArgumentException("TEST-ERROR");
}
this.publish(Collections.singletonList(event));
return Mono.empty();
}
@Test
public void should_shorten_log_on_exceeded_capacity() {
InstanceEventStore store = createStore(2);
InstanceEvent event1 = new InstanceRegisteredEvent(id, 0L, registration);
InstanceEvent event2 = new InstanceStatusChangedEvent(id, 1L, StatusInfo.ofDown());
InstanceEvent event3 = new InstanceStatusChangedEvent(id, 2L, StatusInfo.ofUp());
StepVerifier.create(store.append(asList(event1, event2, event3))).verifyComplete();
StepVerifier.create(store.findAll()).expectNext(event1, event3).verifyComplete();
}
@Nullable
protected String createContent(InstanceEvent event, Instance instance) {
Map<String, Object> root = new HashMap<>();
root.put("event", event);
root.put("instance", instance);
root.put("lastStatus", getLastStatus(event.getInstance()));
StandardEvaluationContext context = new StandardEvaluationContext(root);
context.addPropertyAccessor(new MapAccessor());
return message.getValue(context, String.class);
}
private boolean filter(InstanceEvent event, Instance instance) {
cleanUp();
for (Entry<String, NotificationFilter> entry : getNotificationFilters().entrySet()) {
if (entry.getValue().filter(event, instance)) {
LOGGER.debug("The event '{}' was suppressed by filter '{}'", event, entry);
return true;
}
}
return false;
}