下面列出了怎么用org.springframework.http.codec.ServerSentEvent的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void sseAsEvent() {
Flux<ServerSentEvent<String>> result = this.webClient.get()
.uri("/event")
.accept(TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {});
StepVerifier.create(result)
.consumeNextWith( event -> {
assertEquals("0", event.id());
assertEquals("foo", event.data());
assertEquals("bar", event.comment());
assertNull(event.event());
assertNull(event.retry());
})
.consumeNextWith( event -> {
assertEquals("1", event.id());
assertEquals("foo", event.data());
assertEquals("bar", event.comment());
assertNull(event.event());
assertNull(event.retry());
})
.expectComplete()
.verify(Duration.ofSeconds(5L));
}
private void verifyPersonEvents(Flux<ServerSentEvent<Person>> result) {
StepVerifier.create(result)
.consumeNextWith( event -> {
assertEquals("0", event.id());
assertEquals(new Person("foo 0"), event.data());
assertEquals("bar 0", event.comment());
assertNull(event.event());
assertNull(event.retry());
})
.consumeNextWith( event -> {
assertEquals("1", event.id());
assertEquals(new Person("foo 1"), event.data());
assertEquals("bar 1", event.comment());
assertNull(event.event());
assertNull(event.retry());
})
.thenCancel()
.verify(Duration.ofSeconds(5L));
}
@Test
public void writeServerSentEventsWithBuilder() throws Exception {
ResolvableType type = ResolvableType.forClassWithGenerics(ServerSentEvent.class, String.class);
EmitterProcessor<ServerSentEvent<?>> processor = EmitterProcessor.create();
SseEmitter sseEmitter = (SseEmitter) handleValue(processor, Flux.class, type);
EmitterHandler emitterHandler = new EmitterHandler();
sseEmitter.initialize(emitterHandler);
processor.onNext(ServerSentEvent.builder("foo").id("1").build());
processor.onNext(ServerSentEvent.builder("bar").id("2").build());
processor.onNext(ServerSentEvent.builder("baz").id("3").build());
processor.onComplete();
assertEquals("id:1\ndata:foo\n\nid:2\ndata:bar\n\nid:3\ndata:baz\n\n",
emitterHandler.getValuesAsText());
}
@Test
public void sseAsEvent() {
Flux<ServerSentEvent<String>> result = this.webClient.get()
.uri("/event")
.accept(TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {});
StepVerifier.create(result)
.consumeNextWith( event -> {
assertEquals("0", event.id());
assertEquals("foo", event.data());
assertEquals("bar", event.comment());
assertNull(event.event());
assertNull(event.retry());
})
.consumeNextWith( event -> {
assertEquals("1", event.id());
assertEquals("foo", event.data());
assertEquals("bar", event.comment());
assertNull(event.event());
assertNull(event.retry());
})
.expectComplete()
.verify(Duration.ofSeconds(5L));
}
private void verifyPersonEvents(Flux<ServerSentEvent<Person>> result) {
StepVerifier.create(result)
.consumeNextWith( event -> {
assertEquals("0", event.id());
assertEquals(new Person("foo 0"), event.data());
assertEquals("bar 0", event.comment());
assertNull(event.event());
assertNull(event.retry());
})
.consumeNextWith( event -> {
assertEquals("1", event.id());
assertEquals(new Person("foo 1"), event.data());
assertEquals("bar 1", event.comment());
assertNull(event.event());
assertNull(event.retry());
})
.thenCancel()
.verify(Duration.ofSeconds(5L));
}
@Test
public void writeServerSentEventsWithBuilder() throws Exception {
ResolvableType type = ResolvableType.forClassWithGenerics(ServerSentEvent.class, String.class);
EmitterProcessor<ServerSentEvent<?>> processor = EmitterProcessor.create();
SseEmitter sseEmitter = (SseEmitter) handleValue(processor, Flux.class, type);
EmitterHandler emitterHandler = new EmitterHandler();
sseEmitter.initialize(emitterHandler);
processor.onNext(ServerSentEvent.builder("foo").id("1").build());
processor.onNext(ServerSentEvent.builder("bar").id("2").build());
processor.onNext(ServerSentEvent.builder("baz").id("3").build());
processor.onComplete();
assertEquals("id:1\ndata:foo\n\nid:2\ndata:bar\n\nid:3\ndata:baz\n\n",
emitterHandler.getValuesAsText());
}
@GetMapping("/sse/stocks")
public Flux<ServerSentEvent<?>> streamStocks() {
return Flux
.fromIterable(stringStocksServiceMap.values())
.flatMap(StocksService::stream)
.<ServerSentEvent<?>>map(item ->
ServerSentEvent
.builder(item)
.event("StockItem")
.id(item.getId())
.build()
)
.startWith(
ServerSentEvent
.builder()
.event("Stocks")
.data(stringStocksServiceMap.keySet())
.build()
);
}
@Test
public void serverSentEvent() {
Flux<Long> numbers =
WebClient.create("http://localhost:8080")
.get()
.uri("/randomNumber")
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {})
.map(event -> Long.parseLong(event.data()));
StepVerifier.create(numbers)
.expectNextCount(5)
.expectComplete()
.verify();
}
@GetMapping(value = "/projects/{projectId}/tightCouplingEvents", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<TightCouplingEvent>> streamTightCouplingEvents(@PathVariable Long projectId,
HttpServletRequest request) {
// Stream the events from MongoDB
Flux<TightCouplingEvent> events = eventRepository.findByProjectId(projectId);
// Check if this is an SSE reconnection from a client
String lastEventId = request.getHeader("Last-Event-Id");
// On SSE client reconnect, skip ahead in the stream to play back only new events
if (lastEventId != null)
events = events.skipUntil(e -> e.getId().equals(lastEventId)).skip(1);
// Subscribe to the tailing events from the reactive repository query
return events.map(s -> ServerSentEvent.builder(s)
.event(s.getCreatedDate().toString())
.id(s.getId())
.build())
.delayElements(Duration.ofMillis(100));
}
@Test
@SuppressWarnings("Duplicates")
public void sseAsEvent() {
ResolvableType type = forClassWithGenerics(ServerSentEvent.class, String.class);
Flux<ServerSentEvent<String>> result = this.webClient.get().uri("/event")
.accept(TEXT_EVENT_STREAM).exchange()
.flatMapMany(response -> response.body(
toFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {
})));
StepVerifier.create(result).consumeNextWith(event -> {
assertThat(event.id()).isEqualTo("0");
assertThat(event.data()).isEqualTo("foo");
assertThat(event.comment()).isEqualTo("bar");
assertThat(event.event()).isNull();
assertThat(event.retry()).isNull();
}).consumeNextWith(event -> {
assertThat(event.id()).isEqualTo("1");
assertThat(event.data()).isEqualTo("foo");
assertThat(event.comment()).isEqualTo("bar");
assertThat(event.event()).isNull();
assertThat(event.retry()).isNull();
}).thenCancel().verify(Duration.ofSeconds(5L));
}
@Test
@SuppressWarnings("Duplicates")
public void sseAsEventWithoutAcceptHeader() {
Flux<ServerSentEvent<String>> result = this.webClient.get().uri("/event")
.accept(TEXT_EVENT_STREAM).exchange()
.flatMapMany(response -> response.body(
toFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {
})));
StepVerifier.create(result).consumeNextWith(event -> {
assertThat(event.id()).isEqualTo("0");
assertThat(event.data()).isEqualTo("foo");
assertThat(event.comment()).isEqualTo("bar");
assertThat(event.event()).isNull();
assertThat(event.retry()).isNull();
}).consumeNextWith(event -> {
assertThat(event.id()).isEqualTo("1");
assertThat(event.data()).isEqualTo("foo");
assertThat(event.comment()).isEqualTo("bar");
assertThat(event.event()).isNull();
assertThat(event.retry()).isNull();
}).thenCancel().verify(Duration.ofSeconds(5L));
}
/**
* Inserter to write the given {@code ServerSentEvent} publisher.
* <p>Alternatively, you can provide event data objects via
* {@link #fromPublisher(Publisher, Class)}, and set the "Content-Type" to
* {@link MediaType#TEXT_EVENT_STREAM text/event-stream}.
* @param eventsPublisher the {@code ServerSentEvent} publisher to write to the response body
* @param <T> the type of the data elements in the {@link ServerSentEvent}
* @return the inserter to write a {@code ServerSentEvent} publisher
* @see <a href="https://www.w3.org/TR/eventsource/">Server-Sent Events W3C recommendation</a>
*/
// Parameterized for server-side use
public static <T, S extends Publisher<ServerSentEvent<T>>> BodyInserter<S, ServerHttpResponse> fromServerSentEvents(
S eventsPublisher) {
Assert.notNull(eventsPublisher, "Publisher must not be null");
return (serverResponse, context) -> {
ResolvableType elementType = SSE_TYPE;
MediaType mediaType = MediaType.TEXT_EVENT_STREAM;
HttpMessageWriter<ServerSentEvent<T>> writer = findWriter(context, elementType, mediaType);
return write(eventsPublisher, elementType, mediaType, serverResponse, context, writer);
};
}
@Test
public void ofServerSentEventFlux() {
ServerSentEvent<String> event = ServerSentEvent.builder("foo").build();
Flux<ServerSentEvent<String>> body = Flux.just(event);
BodyInserter<Flux<ServerSentEvent<String>>, ServerHttpResponse> inserter =
BodyInserters.fromServerSentEvents(body);
MockServerHttpResponse response = new MockServerHttpResponse();
Mono<Void> result = inserter.insert(response, this.context);
StepVerifier.create(result).expectNextCount(0).expectComplete().verify();
}
@Test
public void sseAsEvent() {
Assume.assumeTrue(server instanceof JettyHttpServer);
Flux<ServerSentEvent<Person>> result = this.webClient.get()
.uri("/event")
.accept(TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<Person>>() {});
verifyPersonEvents(result);
}
@Test
public void sseAsEventWithoutAcceptHeader() {
Flux<ServerSentEvent<Person>> result = this.webClient.get()
.uri("/event")
.accept(TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<Person>>() {});
verifyPersonEvents(result);
}
@GetMapping("/event")
Flux<ServerSentEvent<Person>> sse() {
return INTERVAL.take(2).map(l ->
ServerSentEvent.builder(new Person("foo " + l))
.id(Long.toString(l))
.comment("bar " + l)
.build());
}
@Override
protected void send(Object element) throws IOException {
if (element instanceof ServerSentEvent) {
ServerSentEvent<?> event = (ServerSentEvent<?>) element;
((SseEmitter) getEmitter()).send(adapt(event));
}
else {
getEmitter().send(element, MediaType.APPLICATION_JSON);
}
}
@Test
public void canNotEncode() {
assertFalse(this.encoder.canEncode(ResolvableType.forClass(String.class), null));
assertFalse(this.encoder.canEncode(ResolvableType.forClass(Pojo.class), APPLICATION_XML));
ResolvableType sseType = ResolvableType.forClass(ServerSentEvent.class);
assertFalse(this.encoder.canEncode(sseType, CBOR_MIME_TYPE));
}
@Test
public void canNotEncode() {
assertFalse(this.encoder.canEncode(ResolvableType.forClass(String.class), null));
assertFalse(this.encoder.canEncode(ResolvableType.forClass(Pojo.class), APPLICATION_XML));
ResolvableType sseType = ResolvableType.forClass(ServerSentEvent.class);
assertFalse(this.encoder.canEncode(sseType, APPLICATION_JSON));
}
@Test
public void canNotEncode() {
assertFalse(this.encoder.canEncode(ResolvableType.forClass(String.class), null));
assertFalse(this.encoder.canEncode(ResolvableType.forClass(Pojo.class), APPLICATION_XML));
ResolvableType sseType = ResolvableType.forClass(ServerSentEvent.class);
assertFalse(this.encoder.canEncode(sseType, SMILE_MIME_TYPE));
}
@GetMapping(path = "/instances/{id}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<Instance>> instanceStream(@PathVariable String id) {
return Flux.from(eventStore)
.filter(event -> event.getInstance().equals(InstanceId.of(id)))
.flatMap(event -> registry.getInstance(event.getInstance()))
.map(event -> ServerSentEvent.builder(event).build())
.mergeWith(ping());
}
@GetMapping(path = "/applications", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<Application>> applicationsStream() {
return Flux.from(eventPublisher)
.flatMap(event -> registry.getInstance(event.getInstance()))
.map(this::getApplicationForInstance)
.flatMap(group -> toApplication(group.getT1(), group.getT2()))
.map(application -> ServerSentEvent.builder(application).build())
.mergeWith(ping());
}
/**
* Inserter to write the given {@code ServerSentEvent} publisher.
* <p>Alternatively, you can provide event data objects via
* {@link #fromPublisher(Publisher, Class)}, and set the "Content-Type" to
* {@link MediaType#TEXT_EVENT_STREAM text/event-stream}.
* @param eventsPublisher the {@code ServerSentEvent} publisher to write to the response body
* @param <T> the type of the data elements in the {@link ServerSentEvent}
* @return the inserter to write a {@code ServerSentEvent} publisher
* @see <a href="https://www.w3.org/TR/eventsource/">Server-Sent Events W3C recommendation</a>
*/
// Parameterized for server-side use
public static <T, S extends Publisher<ServerSentEvent<T>>> BodyInserter<S, ServerHttpResponse> fromServerSentEvents(
S eventsPublisher) {
Assert.notNull(eventsPublisher, "Publisher must not be null");
return (serverResponse, context) -> {
ResolvableType elementType = SSE_TYPE;
MediaType mediaType = MediaType.TEXT_EVENT_STREAM;
HttpMessageWriter<ServerSentEvent<T>> writer = findWriter(context, elementType, mediaType);
return write(eventsPublisher, elementType, mediaType, serverResponse, context, writer);
};
}
@Test
public void ofServerSentEventFlux() {
ServerSentEvent<String> event = ServerSentEvent.builder("foo").build();
Flux<ServerSentEvent<String>> body = Flux.just(event);
BodyInserter<Flux<ServerSentEvent<String>>, ServerHttpResponse> inserter =
BodyInserters.fromServerSentEvents(body);
MockServerHttpResponse response = new MockServerHttpResponse();
Mono<Void> result = inserter.insert(response, this.context);
StepVerifier.create(result).expectNextCount(0).expectComplete().verify();
}
@Test
public void sseAsEvent() {
Assume.assumeTrue(server instanceof JettyHttpServer);
Flux<ServerSentEvent<Person>> result = this.webClient.get()
.uri("/event")
.accept(TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<Person>>() {});
verifyPersonEvents(result);
}
@Test
public void sseAsEventWithoutAcceptHeader() {
Flux<ServerSentEvent<Person>> result = this.webClient.get()
.uri("/event")
.accept(TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<Person>>() {});
verifyPersonEvents(result);
}
@GetMapping("/event")
Flux<ServerSentEvent<Person>> sse() {
return INTERVAL.take(2).map(l ->
ServerSentEvent.builder(new Person("foo " + l))
.id(Long.toString(l))
.comment("bar " + l)
.build());
}
@Override
protected void send(Object element) throws IOException {
if (element instanceof ServerSentEvent) {
ServerSentEvent<?> event = (ServerSentEvent<?>) element;
((SseEmitter) getEmitter()).send(adapt(event));
}
else {
getEmitter().send(element, MediaType.APPLICATION_JSON);
}
}
@Test
public void canNotEncode() {
assertFalse(this.encoder.canEncode(ResolvableType.forClass(String.class), null));
assertFalse(this.encoder.canEncode(ResolvableType.forClass(Pojo.class), APPLICATION_XML));
ResolvableType sseType = ResolvableType.forClass(ServerSentEvent.class);
assertFalse(this.encoder.canEncode(sseType, APPLICATION_JSON));
}
@Test
public void canNotEncode() {
assertFalse(this.encoder.canEncode(ResolvableType.forClass(String.class), null));
assertFalse(this.encoder.canEncode(ResolvableType.forClass(Pojo.class), APPLICATION_XML));
ResolvableType sseType = ResolvableType.forClass(ServerSentEvent.class);
assertFalse(this.encoder.canEncode(sseType, SMILE_MIME_TYPE));
}