下面列出了org.springframework.http.MediaType#TEXT_EVENT_STREAM_VALUE 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Flux : 返回0-n个元素 注:需要指定MediaType
*
* @return
*/
@GetMapping(value = "/flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
private Flux<String> flux() {
Flux<String> result = Flux.fromStream(IntStream.range(1, 5).mapToObj(i -> {
if (i == 3) {
throw new ResultException("flux 错误测试");
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
return "flux data--" + i;
}));
return result;
}
@GetMapping(path = "/events", produces = { //
MediaType.APPLICATION_STREAM_JSON_VALUE, //
MediaType.TEXT_EVENT_STREAM_VALUE //
})
Flux<Event> streamEvents() {
return eventRepository.findPeopleBy();
}
@PostMapping(path = "/**", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Mono<ResponseEntity<Publisher<?>>> postStream(WebRequest request,
@RequestBody(required = false) String body) {
FunctionWrapper wrapper = wrapper(request);
return this.processor.post(wrapper, body, true)
.map(response -> ResponseEntity.ok().headers(response.getHeaders())
.body((Publisher<?>) response.getBody()));
}
@GetMapping(value = "/numbers1", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Long> handleSeries1() {
Flux<Long> fibonacciGenerator = Flux.generate(() -> Tuples.<Long,
Long>of(0L, 1L), (state, sink) -> {
if (state.getT1() < 0)
sink.complete();
else
sink.next(state.getT1());
System.out.println("numbers1 generated :"+state.getT1());
return Tuples.of(state.getT2(), state.getT1() + state.getT2());
});
return fibonacciGenerator.delayElements(Duration.ofSeconds(1));
}
@GetMapping(value = "/client/{id}/_subscribe/{type}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Object> subscribe(@PathVariable String id, @PathVariable PayloadType type) {
return networkManager.<TcpClient>getNetwork(DefaultNetworkType.TCP_CLIENT, id)
.flatMapMany(TcpClient::subscribe)
.map(tcpMessage -> type.read(tcpMessage.getPayload()))
;
}
@GetMapping(value = "/{dashboard}/{object}/{measurement}/{dimension}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@Authorize(merge = false)
public Flux<MeasurementValue> getMeasurementValue(@PathVariable String dashboard,
@PathVariable String object,
@PathVariable String dimension,
@PathVariable String measurement,
@RequestParam Map<String, Object> params) {
return dashboardManager
.getDashboard(dashboard)
.flatMap(dash -> dash.getObject(object))
.flatMap(obj -> obj.getMeasurement(measurement))
.flatMap(meas -> meas.getDimension(dimension))
.switchIfEmpty(Mono.error(() -> new NotFoundException("不支持的仪表盘")))
.flatMapMany(dim -> dim.getValue(MeasurementParameter.of(params)));
}
/**
* 使用EventSource方式批量获取仪表数据,支持获取实时数据.
*
* @param requestJson 请求集合json
* @return 仪表数据
*/
@GetMapping(value = "/_multi", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@Authorize(merge = false)
public Flux<DashboardMeasurementResponse> getMultiMeasurementValue(@RequestParam String requestJson) {
return Flux.fromIterable(JSON.parseArray(requestJson, DashboardMeasurementRequest.class))
.flatMap(request -> dashboardManager
.getDashboard(request.getDashboard())
.flatMap(dash -> dash.getObject(request.getObject()))
.flatMap(obj -> obj.getMeasurement(request.getMeasurement()))
.flatMap(meas -> meas.getDimension(request.getDimension()))
.flatMapMany(dim -> dim.getValue(MeasurementParameter.of(request.getParams())))
.map(val -> DashboardMeasurementResponse.of(request.getGroup(), val)));
}
@ResponseBody // The response payload for this request will be rendered in JSON, not HTML
@RequestMapping(
value = "/match/{matchId}/commentStream",
produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<MatchComment> matchCommentStream(
@PathVariable String matchId, @RequestParam String timestamp) {
// Get the stream of MatchComment objects after the timestamp, based on a tailable cursor.
// See https://docs.mongodb.com/manual/core/tailable-cursors/
return this.matchCommentRepository.findByMatchIdAndTimestampGreaterThan(matchId, timestamp);
}
@GetMapping(value = "/numbers1", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Long> handleSeries1() {
Flux<Long> fibonacciGenerator = Flux.generate(() -> Tuples.<Long,
Long>of(0L, 1L), (state, sink) -> {
if (state.getT1() < 0)
sink.complete();
else
sink.next(state.getT1());
System.out.println("numbers1 generated :"+state.getT1());
return Tuples.of(state.getT2(), state.getT1() + state.getT2());
});
return fibonacciGenerator.delayElements(Duration.ofSeconds(1));
}
/**
* Flux : 返回0-n个元素 注:需要指定MediaType
*
* @return
*/
@GetMapping(value = "/flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
private Flux<String> flux() {
Flux<String> result = Flux.fromStream(IntStream.range(1, 5).mapToObj(i -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
return "flux data--" + i;
}));
return result;
}
@GetMapping(value = "/{singer}/comments", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Comment> querySingerComments(@PathVariable String singer) {
// generate one flux element per second
Flux<Long> intervalToGenerateComments = Flux.interval(Duration.ofSeconds(1));
Flux<Comment> comments = Flux.fromStream(Stream.generate(() -> new Comment(composeComment(singer), new Date())));
return Flux.zip(intervalToGenerateComments, comments)
.map(fluxTuple -> fluxTuple.getT2());
}
@Operation(description = "Tweets are Sent to the client as Server Sent Events", responses = {
@ApiResponse(responseCode = "200", description = "stream All Tweets") })
@GetMapping(value = "/stream/tweets", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<TweetDTO> streamAllTweets() {
return null;
}
@GetMapping(path = "/applications", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<Application>> applicationsStream() {
return registry.getApplicationStream().map((application) -> ServerSentEvent.builder(application).build())
.mergeWith(ping());
}
@GetMapping(value = "/deploy", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@SaveAction
public Flux<DeviceDeployResult> deployAll(QueryParamEntity query) {
query.setPaging(false);
return service.query(query).as(service::deploy);
}
/**
* 以 Server sent events形式多次返回数据
*/
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> getUsersStream() {
return userService.getUsers();
}
@GetMapping(path = "/comment/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Comment> feed() {
return this.commentRepository.findAll();
}
@GetMapping(path = "/search", produces = {MediaType.TEXT_EVENT_STREAM_VALUE,
MediaType.APPLICATION_STREAM_JSON_VALUE})
Flux<Item> getItemsStream(@Valid SearchRequest searchRequest);
@PostMapping(value = "/search", consumes = MediaType.APPLICATION_JSON_VALUE,
produces = {MediaType.TEXT_EVENT_STREAM_VALUE, MediaType.APPLICATION_STREAM_JSON_VALUE})
Flux<Item> getItemsPostStream(@Valid @RequestBody SearchRequest searchRequest);
/**
* Created Books sent as stream to the client as Server Sent Events.
* @return books Events.
*/
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Book> streamAllBooks() {
// return this.events.map(bce -> (Book)bce.getSource());
return this.reactiveBookService.findAllBooks();
}
@GetMapping(path = "/spr16869", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Flux<String> sseFlux() {
return Flux.interval(Duration.ofSeconds(1)).take(3)
.map(aLong -> String.format("event%d", aLong));
}