类org.springframework.http.codec.ServerSentEvent源码实例Demo

下面列出了怎么用org.springframework.http.codec.ServerSentEvent的API类实例代码及写法,或者点击链接到github查看源代码。


@Test
public void sseAsEvent() {
	Flux<ServerSentEvent<String>> result = this.webClient.get()
			.uri("/event")
			.accept(TEXT_EVENT_STREAM)
			.retrieve()
			.bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {});

	StepVerifier.create(result)
			.consumeNextWith( event -> {
				assertEquals("0", event.id());
				assertEquals("foo", event.data());
				assertEquals("bar", event.comment());
				assertNull(event.event());
				assertNull(event.retry());
			})
			.consumeNextWith( event -> {
				assertEquals("1", event.id());
				assertEquals("foo", event.data());
				assertEquals("bar", event.comment());
				assertNull(event.event());
				assertNull(event.retry());
			})
			.expectComplete()
			.verify(Duration.ofSeconds(5L));
}
 

private void verifyPersonEvents(Flux<ServerSentEvent<Person>> result) {
	StepVerifier.create(result)
			.consumeNextWith( event -> {
				assertEquals("0", event.id());
				assertEquals(new Person("foo 0"), event.data());
				assertEquals("bar 0", event.comment());
				assertNull(event.event());
				assertNull(event.retry());
			})
			.consumeNextWith( event -> {
				assertEquals("1", event.id());
				assertEquals(new Person("foo 1"), event.data());
				assertEquals("bar 1", event.comment());
				assertNull(event.event());
				assertNull(event.retry());
			})
			.thenCancel()
			.verify(Duration.ofSeconds(5L));
}
 

@Test
public void writeServerSentEventsWithBuilder() throws Exception {

	ResolvableType type = ResolvableType.forClassWithGenerics(ServerSentEvent.class, String.class);

	EmitterProcessor<ServerSentEvent<?>> processor = EmitterProcessor.create();
	SseEmitter sseEmitter = (SseEmitter) handleValue(processor, Flux.class, type);

	EmitterHandler emitterHandler = new EmitterHandler();
	sseEmitter.initialize(emitterHandler);

	processor.onNext(ServerSentEvent.builder("foo").id("1").build());
	processor.onNext(ServerSentEvent.builder("bar").id("2").build());
	processor.onNext(ServerSentEvent.builder("baz").id("3").build());
	processor.onComplete();

	assertEquals("id:1\ndata:foo\n\nid:2\ndata:bar\n\nid:3\ndata:baz\n\n",
			emitterHandler.getValuesAsText());
}
 

@Test
public void sseAsEvent() {
	Flux<ServerSentEvent<String>> result = this.webClient.get()
			.uri("/event")
			.accept(TEXT_EVENT_STREAM)
			.retrieve()
			.bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {});

	StepVerifier.create(result)
			.consumeNextWith( event -> {
				assertEquals("0", event.id());
				assertEquals("foo", event.data());
				assertEquals("bar", event.comment());
				assertNull(event.event());
				assertNull(event.retry());
			})
			.consumeNextWith( event -> {
				assertEquals("1", event.id());
				assertEquals("foo", event.data());
				assertEquals("bar", event.comment());
				assertNull(event.event());
				assertNull(event.retry());
			})
			.expectComplete()
			.verify(Duration.ofSeconds(5L));
}
 

private void verifyPersonEvents(Flux<ServerSentEvent<Person>> result) {
	StepVerifier.create(result)
			.consumeNextWith( event -> {
				assertEquals("0", event.id());
				assertEquals(new Person("foo 0"), event.data());
				assertEquals("bar 0", event.comment());
				assertNull(event.event());
				assertNull(event.retry());
			})
			.consumeNextWith( event -> {
				assertEquals("1", event.id());
				assertEquals(new Person("foo 1"), event.data());
				assertEquals("bar 1", event.comment());
				assertNull(event.event());
				assertNull(event.retry());
			})
			.thenCancel()
			.verify(Duration.ofSeconds(5L));
}
 

@Test
public void writeServerSentEventsWithBuilder() throws Exception {

	ResolvableType type = ResolvableType.forClassWithGenerics(ServerSentEvent.class, String.class);

	EmitterProcessor<ServerSentEvent<?>> processor = EmitterProcessor.create();
	SseEmitter sseEmitter = (SseEmitter) handleValue(processor, Flux.class, type);

	EmitterHandler emitterHandler = new EmitterHandler();
	sseEmitter.initialize(emitterHandler);

	processor.onNext(ServerSentEvent.builder("foo").id("1").build());
	processor.onNext(ServerSentEvent.builder("bar").id("2").build());
	processor.onNext(ServerSentEvent.builder("baz").id("3").build());
	processor.onComplete();

	assertEquals("id:1\ndata:foo\n\nid:2\ndata:bar\n\nid:3\ndata:baz\n\n",
			emitterHandler.getValuesAsText());
}
 

@GetMapping("/sse/stocks")
public Flux<ServerSentEvent<?>> streamStocks() {
    return Flux
        .fromIterable(stringStocksServiceMap.values())
        .flatMap(StocksService::stream)
        .<ServerSentEvent<?>>map(item ->
            ServerSentEvent
                .builder(item)
                .event("StockItem")
                .id(item.getId())
                .build()
        )
        .startWith(
            ServerSentEvent
                .builder()
                .event("Stocks")
                .data(stringStocksServiceMap.keySet())
                .build()
        );
}
 

@Test
public void serverSentEvent() {
	 Flux<Long> numbers = 
	     WebClient.create("http://localhost:8080")
	              .get()
	              .uri("/randomNumber")
	              .accept(MediaType.TEXT_EVENT_STREAM)
	              .retrieve()
	              .bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {})
	              .map(event -> Long.parseLong(event.data()));

    StepVerifier.create(numbers)
                .expectNextCount(5)
                .expectComplete()
                .verify();
}
 

@GetMapping(value = "/projects/{projectId}/tightCouplingEvents", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<TightCouplingEvent>> streamTightCouplingEvents(@PathVariable Long projectId,
                                                                           HttpServletRequest request) {
    // Stream the events from MongoDB
    Flux<TightCouplingEvent> events = eventRepository.findByProjectId(projectId);

    // Check if this is an SSE reconnection from a client
    String lastEventId = request.getHeader("Last-Event-Id");

    // On SSE client reconnect, skip ahead in the stream to play back only new events
    if (lastEventId != null)
        events = events.skipUntil(e -> e.getId().equals(lastEventId)).skip(1);

    // Subscribe to the tailing events from the reactive repository query
    return events.map(s -> ServerSentEvent.builder(s)
            .event(s.getCreatedDate().toString())
            .id(s.getId())
            .build())
            .delayElements(Duration.ofMillis(100));
}
 

@Test
@SuppressWarnings("Duplicates")
public void sseAsEvent() {
	ResolvableType type = forClassWithGenerics(ServerSentEvent.class, String.class);
	Flux<ServerSentEvent<String>> result = this.webClient.get().uri("/event")
			.accept(TEXT_EVENT_STREAM).exchange()
			.flatMapMany(response -> response.body(
					toFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {
					})));

	StepVerifier.create(result).consumeNextWith(event -> {
		assertThat(event.id()).isEqualTo("0");
		assertThat(event.data()).isEqualTo("foo");
		assertThat(event.comment()).isEqualTo("bar");
		assertThat(event.event()).isNull();
		assertThat(event.retry()).isNull();
	}).consumeNextWith(event -> {
		assertThat(event.id()).isEqualTo("1");
		assertThat(event.data()).isEqualTo("foo");
		assertThat(event.comment()).isEqualTo("bar");
		assertThat(event.event()).isNull();
		assertThat(event.retry()).isNull();
	}).thenCancel().verify(Duration.ofSeconds(5L));
}
 

@Test
@SuppressWarnings("Duplicates")
public void sseAsEventWithoutAcceptHeader() {
	Flux<ServerSentEvent<String>> result = this.webClient.get().uri("/event")
			.accept(TEXT_EVENT_STREAM).exchange()
			.flatMapMany(response -> response.body(
					toFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {
					})));

	StepVerifier.create(result).consumeNextWith(event -> {
		assertThat(event.id()).isEqualTo("0");
		assertThat(event.data()).isEqualTo("foo");
		assertThat(event.comment()).isEqualTo("bar");
		assertThat(event.event()).isNull();
		assertThat(event.retry()).isNull();
	}).consumeNextWith(event -> {
		assertThat(event.id()).isEqualTo("1");
		assertThat(event.data()).isEqualTo("foo");
		assertThat(event.comment()).isEqualTo("bar");
		assertThat(event.event()).isNull();
		assertThat(event.retry()).isNull();
	}).thenCancel().verify(Duration.ofSeconds(5L));
}
 

/**
 * Inserter to write the given {@code ServerSentEvent} publisher.
 * <p>Alternatively, you can provide event data objects via
 * {@link #fromPublisher(Publisher, Class)}, and set the "Content-Type" to
 * {@link MediaType#TEXT_EVENT_STREAM text/event-stream}.
 * @param eventsPublisher the {@code ServerSentEvent} publisher to write to the response body
 * @param <T> the type of the data elements in the {@link ServerSentEvent}
 * @return the inserter to write a {@code ServerSentEvent} publisher
 * @see <a href="https://www.w3.org/TR/eventsource/">Server-Sent Events W3C recommendation</a>
 */
// Parameterized for server-side use
public static <T, S extends Publisher<ServerSentEvent<T>>> BodyInserter<S, ServerHttpResponse> fromServerSentEvents(
		S eventsPublisher) {

	Assert.notNull(eventsPublisher, "Publisher must not be null");
	return (serverResponse, context) -> {
		ResolvableType elementType = SSE_TYPE;
		MediaType mediaType = MediaType.TEXT_EVENT_STREAM;
		HttpMessageWriter<ServerSentEvent<T>> writer = findWriter(context, elementType, mediaType);
		return write(eventsPublisher, elementType, mediaType, serverResponse, context, writer);
	};
}
 

@Test
public void ofServerSentEventFlux() {
	ServerSentEvent<String> event = ServerSentEvent.builder("foo").build();
	Flux<ServerSentEvent<String>> body = Flux.just(event);
	BodyInserter<Flux<ServerSentEvent<String>>, ServerHttpResponse> inserter =
			BodyInserters.fromServerSentEvents(body);

	MockServerHttpResponse response = new MockServerHttpResponse();
	Mono<Void> result = inserter.insert(response, this.context);
	StepVerifier.create(result).expectNextCount(0).expectComplete().verify();
}
 

@Test
public void sseAsEvent() {

	Assume.assumeTrue(server instanceof JettyHttpServer);

	Flux<ServerSentEvent<Person>> result = this.webClient.get()
			.uri("/event")
			.accept(TEXT_EVENT_STREAM)
			.retrieve()
			.bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<Person>>() {});

	verifyPersonEvents(result);
}
 

@Test
public void sseAsEventWithoutAcceptHeader() {
	Flux<ServerSentEvent<Person>> result = this.webClient.get()
			.uri("/event")
			.accept(TEXT_EVENT_STREAM)
			.retrieve()
			.bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<Person>>() {});

	verifyPersonEvents(result);
}
 

@GetMapping("/event")
Flux<ServerSentEvent<Person>> sse() {
	return INTERVAL.take(2).map(l ->
			ServerSentEvent.builder(new Person("foo " + l))
					.id(Long.toString(l))
					.comment("bar " + l)
					.build());
}
 

@Override
protected void send(Object element) throws IOException {
	if (element instanceof ServerSentEvent) {
		ServerSentEvent<?> event = (ServerSentEvent<?>) element;
		((SseEmitter) getEmitter()).send(adapt(event));
	}
	else {
		getEmitter().send(element, MediaType.APPLICATION_JSON);
	}
}
 

@Test
public void canNotEncode() {
	assertFalse(this.encoder.canEncode(ResolvableType.forClass(String.class), null));
	assertFalse(this.encoder.canEncode(ResolvableType.forClass(Pojo.class), APPLICATION_XML));

	ResolvableType sseType = ResolvableType.forClass(ServerSentEvent.class);
	assertFalse(this.encoder.canEncode(sseType, CBOR_MIME_TYPE));
}
 

@Test
public void canNotEncode() {
	assertFalse(this.encoder.canEncode(ResolvableType.forClass(String.class), null));
	assertFalse(this.encoder.canEncode(ResolvableType.forClass(Pojo.class), APPLICATION_XML));

	ResolvableType sseType = ResolvableType.forClass(ServerSentEvent.class);
	assertFalse(this.encoder.canEncode(sseType, APPLICATION_JSON));
}
 

@Test
public void canNotEncode() {
	assertFalse(this.encoder.canEncode(ResolvableType.forClass(String.class), null));
	assertFalse(this.encoder.canEncode(ResolvableType.forClass(Pojo.class), APPLICATION_XML));

	ResolvableType sseType = ResolvableType.forClass(ServerSentEvent.class);
	assertFalse(this.encoder.canEncode(sseType, SMILE_MIME_TYPE));
}
 
源代码21 项目: Moss   文件: InstancesController.java

@GetMapping(path = "/instances/{id}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<Instance>> instanceStream(@PathVariable String id) {
    return Flux.from(eventStore)
               .filter(event -> event.getInstance().equals(InstanceId.of(id)))
               .flatMap(event -> registry.getInstance(event.getInstance()))
               .map(event -> ServerSentEvent.builder(event).build())
               .mergeWith(ping());
}
 
源代码22 项目: Moss   文件: ApplicationsController.java

@GetMapping(path = "/applications", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<Application>> applicationsStream() {
    return Flux.from(eventPublisher)
               .flatMap(event -> registry.getInstance(event.getInstance()))
               .map(this::getApplicationForInstance)
               .flatMap(group -> toApplication(group.getT1(), group.getT2()))
               .map(application -> ServerSentEvent.builder(application).build())
               .mergeWith(ping());
}
 

/**
 * Inserter to write the given {@code ServerSentEvent} publisher.
 * <p>Alternatively, you can provide event data objects via
 * {@link #fromPublisher(Publisher, Class)}, and set the "Content-Type" to
 * {@link MediaType#TEXT_EVENT_STREAM text/event-stream}.
 * @param eventsPublisher the {@code ServerSentEvent} publisher to write to the response body
 * @param <T> the type of the data elements in the {@link ServerSentEvent}
 * @return the inserter to write a {@code ServerSentEvent} publisher
 * @see <a href="https://www.w3.org/TR/eventsource/">Server-Sent Events W3C recommendation</a>
 */
// Parameterized for server-side use
public static <T, S extends Publisher<ServerSentEvent<T>>> BodyInserter<S, ServerHttpResponse> fromServerSentEvents(
		S eventsPublisher) {

	Assert.notNull(eventsPublisher, "Publisher must not be null");
	return (serverResponse, context) -> {
		ResolvableType elementType = SSE_TYPE;
		MediaType mediaType = MediaType.TEXT_EVENT_STREAM;
		HttpMessageWriter<ServerSentEvent<T>> writer = findWriter(context, elementType, mediaType);
		return write(eventsPublisher, elementType, mediaType, serverResponse, context, writer);
	};
}
 

@Test
public void ofServerSentEventFlux() {
	ServerSentEvent<String> event = ServerSentEvent.builder("foo").build();
	Flux<ServerSentEvent<String>> body = Flux.just(event);
	BodyInserter<Flux<ServerSentEvent<String>>, ServerHttpResponse> inserter =
			BodyInserters.fromServerSentEvents(body);

	MockServerHttpResponse response = new MockServerHttpResponse();
	Mono<Void> result = inserter.insert(response, this.context);
	StepVerifier.create(result).expectNextCount(0).expectComplete().verify();
}
 

@Test
public void sseAsEvent() {

	Assume.assumeTrue(server instanceof JettyHttpServer);

	Flux<ServerSentEvent<Person>> result = this.webClient.get()
			.uri("/event")
			.accept(TEXT_EVENT_STREAM)
			.retrieve()
			.bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<Person>>() {});

	verifyPersonEvents(result);
}
 

@Test
public void sseAsEventWithoutAcceptHeader() {
	Flux<ServerSentEvent<Person>> result = this.webClient.get()
			.uri("/event")
			.accept(TEXT_EVENT_STREAM)
			.retrieve()
			.bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<Person>>() {});

	verifyPersonEvents(result);
}
 

@GetMapping("/event")
Flux<ServerSentEvent<Person>> sse() {
	return INTERVAL.take(2).map(l ->
			ServerSentEvent.builder(new Person("foo " + l))
					.id(Long.toString(l))
					.comment("bar " + l)
					.build());
}
 

@Override
protected void send(Object element) throws IOException {
	if (element instanceof ServerSentEvent) {
		ServerSentEvent<?> event = (ServerSentEvent<?>) element;
		((SseEmitter) getEmitter()).send(adapt(event));
	}
	else {
		getEmitter().send(element, MediaType.APPLICATION_JSON);
	}
}
 

@Test
public void canNotEncode() {
	assertFalse(this.encoder.canEncode(ResolvableType.forClass(String.class), null));
	assertFalse(this.encoder.canEncode(ResolvableType.forClass(Pojo.class), APPLICATION_XML));

	ResolvableType sseType = ResolvableType.forClass(ServerSentEvent.class);
	assertFalse(this.encoder.canEncode(sseType, APPLICATION_JSON));
}
 

@Test
public void canNotEncode() {
	assertFalse(this.encoder.canEncode(ResolvableType.forClass(String.class), null));
	assertFalse(this.encoder.canEncode(ResolvableType.forClass(Pojo.class), APPLICATION_XML));

	ResolvableType sseType = ResolvableType.forClass(ServerSentEvent.class);
	assertFalse(this.encoder.canEncode(sseType, SMILE_MIME_TYPE));
}
 
 类所在包
 同包方法