下面列出了org.springframework.http.server.reactive.ServerHttpResponse#bufferFactory ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {
ServerHttpResponse response = exchange.getResponse();
HttpServerResponse reactorResponse = ((AbstractServerHttpResponse) response).getNativeResponse();
HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
NettyDataBufferFactory bufferFactory = (NettyDataBufferFactory) response.bufferFactory();
return reactorResponse.sendWebsocket(subProtocol, this.maxFramePayloadLength,
(in, out) -> {
ReactorNettyWebSocketSession session =
new ReactorNettyWebSocketSession(
in, out, handshakeInfo, bufferFactory, this.maxFramePayloadLength);
URI uri = exchange.getRequest().getURI();
return handler.handle(session).checkpoint(uri + " [ReactorNettyRequestUpgradeStrategy]");
});
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
ServerHttpResponse response = exchange.getResponse();
DataBufferFactory bufferFactory = response.bufferFactory();
ServerHttpResponseDecorator decoratedResponse = new ServerHttpResponseDecorator(response) {
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
if (body instanceof Flux) {
Flux<? extends DataBuffer> fluxBody = (Flux<? extends DataBuffer>) body;
return super.writeWith(fluxBody.map(dataBuffer -> {
// probably should reuse buffers
byte[] content = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(content);
//释放掉内存
DataBufferUtils.release(dataBuffer);
return bufferFactory.wrap(content);
}));
}
// if body is not a flux. never got there.
return super.writeWith(body);
}
};
return chain.filter(exchange.mutate().response(decoratedResponse).build()).then(Mono.fromRunnable(()->{
accessLogService.sendLog(exchange, null);
}));
}
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {
ServerHttpResponse response = exchange.getResponse();
HttpServerResponse reactorResponse = ((AbstractServerHttpResponse) response).getNativeResponse();
HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
NettyDataBufferFactory bufferFactory = (NettyDataBufferFactory) response.bufferFactory();
return reactorResponse.sendWebsocket(subProtocol, this.maxFramePayloadLength,
(in, out) -> {
ReactorNettyWebSocketSession session =
new ReactorNettyWebSocketSession(
in, out, handshakeInfo, bufferFactory, this.maxFramePayloadLength);
return handler.handle(session);
});
}
private Mono<DataBuffer> writeBody(ServerHttpResponse httpResponse,
CachedBodyOutputMessage message, Class<?> outClass) {
Mono<DataBuffer> response = DataBufferUtils.join(message.getBody());
if (byte[].class.isAssignableFrom(outClass)) {
return response;
}
List<String> encodingHeaders = httpResponse.getHeaders()
.getOrEmpty(HttpHeaders.CONTENT_ENCODING);
for (String encoding : encodingHeaders) {
MessageBodyEncoder encoder = messageBodyEncoders.get(encoding);
if (encoder != null) {
DataBufferFactory dataBufferFactory = httpResponse.bufferFactory();
response = response.publishOn(Schedulers.parallel())
.map(encoder::encode).map(dataBufferFactory::wrap);
break;
}
}
return response;
}
@RequestMapping(path = "/gzip", produces = MediaType.APPLICATION_JSON_VALUE)
public Mono<Void> gzip(ServerWebExchange exchange) throws IOException {
if (log.isDebugEnabled()) {
log.debug("httpbin /gzip");
}
String jsonResponse = OBJECT_MAPPER.writeValueAsString("httpbin compatible home");
byte[] bytes = jsonResponse.getBytes(StandardCharsets.UTF_8);
ServerHttpResponse response = exchange.getResponse();
response.getHeaders().add(HttpHeaders.CONTENT_ENCODING, "gzip");
DataBufferFactory dataBufferFactory = response.bufferFactory();
response.setStatusCode(HttpStatus.OK);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
GZIPOutputStream is = new GZIPOutputStream(bos);
FileCopyUtils.copy(bytes, is);
byte[] gzippedResponse = bos.toByteArray();
DataBuffer wrap = dataBufferFactory.wrap(gzippedResponse);
return response.writeWith(Flux.just(wrap));
}
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse response = exchange.getResponse();
HttpServletRequest servletRequest = getHttpServletRequest(request);
HttpServletResponse servletResponse = getHttpServletResponse(response);
HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
DataBufferFactory factory = response.bufferFactory();
JettyWebSocketHandlerAdapter adapter = new JettyWebSocketHandlerAdapter(
handler, session -> new JettyWebSocketSession(session, handshakeInfo, factory));
startLazily(servletRequest);
Assert.state(this.factory != null, "No WebSocketServerFactory available");
boolean isUpgrade = this.factory.isUpgradeRequest(servletRequest, servletResponse);
Assert.isTrue(isUpgrade, "Not a WebSocket handshake");
try {
adapterHolder.set(new WebSocketHandlerContainer(adapter, subProtocol));
this.factory.acceptWebSocket(servletRequest, servletResponse);
}
catch (IOException ex) {
return Mono.error(ex);
}
finally {
adapterHolder.remove();
}
return Mono.empty();
}
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory){
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse response = exchange.getResponse();
HttpServletRequest servletRequest = getHttpServletRequest(request);
HttpServletResponse servletResponse = getHttpServletResponse(response);
HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
DataBufferFactory bufferFactory = response.bufferFactory();
Endpoint endpoint = new StandardWebSocketHandlerAdapter(
handler, session -> new TomcatWebSocketSession(session, handshakeInfo, bufferFactory));
String requestURI = servletRequest.getRequestURI();
DefaultServerEndpointConfig config = new DefaultServerEndpointConfig(requestURI, endpoint);
config.setSubprotocols(subProtocol != null ?
Collections.singletonList(subProtocol) : Collections.emptyList());
try {
WsServerContainer container = getContainer(servletRequest);
container.doUpgrade(servletRequest, servletResponse, config, Collections.emptyMap());
}
catch (ServletException | IOException ex) {
return Mono.error(ex);
}
return Mono.empty();
}
public static Mono<Void> responseWrite(ServerWebExchange exchange, int httpStatus, Result result) {
if (httpStatus == 0) {
httpStatus = HttpStatus.INTERNAL_SERVER_ERROR.value();
}
ServerHttpResponse response = exchange.getResponse();
response.getHeaders().setAccessControlAllowCredentials(true);
response.getHeaders().setAccessControlAllowOrigin("*");
response.setStatusCode(HttpStatus.valueOf(httpStatus));
response.getHeaders().setContentType(MediaType.APPLICATION_JSON_UTF8);
DataBufferFactory dataBufferFactory = response.bufferFactory();
DataBuffer buffer = dataBufferFactory.wrap(JSONObject.toJSONString(result).getBytes(Charset.defaultCharset()));
return response.writeWith(Mono.just(buffer)).doOnError((error) -> {
DataBufferUtils.release(buffer);
});
}
public <T> Mono<Void> write(ServerHttpResponse httpResponse, T object) {
DataBufferFactory bufferFactory = httpResponse.bufferFactory();
return httpResponse
.writeWith(Mono.fromSupplier(() -> {
try {
return bufferFactory.wrap(objectMapper.writeValueAsBytes(object));
} catch (Exception ex) {
log.warn("Error writing response", ex);
return bufferFactory.wrap(new byte[0]);
}
}));
}
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse response = exchange.getResponse();
HttpServletRequest servletRequest = getHttpServletRequest(request);
HttpServletResponse servletResponse = getHttpServletResponse(response);
HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
DataBufferFactory factory = response.bufferFactory();
JettyWebSocketHandlerAdapter adapter = new JettyWebSocketHandlerAdapter(
handler, session -> new JettyWebSocketSession(session, handshakeInfo, factory));
startLazily(servletRequest);
Assert.state(this.factory != null, "No WebSocketServerFactory available");
boolean isUpgrade = this.factory.isUpgradeRequest(servletRequest, servletResponse);
Assert.isTrue(isUpgrade, "Not a WebSocket handshake");
try {
adapterHolder.set(new WebSocketHandlerContainer(adapter, subProtocol));
this.factory.acceptWebSocket(servletRequest, servletResponse);
}
catch (IOException ex) {
return Mono.error(ex);
}
finally {
adapterHolder.remove();
}
return Mono.empty();
}
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory){
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse response = exchange.getResponse();
HttpServletRequest servletRequest = getHttpServletRequest(request);
HttpServletResponse servletResponse = getHttpServletResponse(response);
HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
DataBufferFactory bufferFactory = response.bufferFactory();
Endpoint endpoint = new StandardWebSocketHandlerAdapter(
handler, session -> new TomcatWebSocketSession(session, handshakeInfo, bufferFactory));
String requestURI = servletRequest.getRequestURI();
DefaultServerEndpointConfig config = new DefaultServerEndpointConfig(requestURI, endpoint);
config.setSubprotocols(subProtocol != null ?
Collections.singletonList(subProtocol) : Collections.emptyList());
try {
WsServerContainer container = getContainer(servletRequest);
container.doUpgrade(servletRequest, servletResponse, config, Collections.emptyMap());
}
catch (ServletException | IOException ex) {
return Mono.error(ex);
}
return Mono.empty();
}
/**
* Caches the request body in a ServerWebExchange attribute. The attribute is
* {@link #CACHED_REQUEST_BODY_ATTR}. If this method is called from a location that
* can not mutate the ServerWebExchange (such as a Predicate), setting
* cacheDecoratedRequest to true will put a {@link ServerHttpRequestDecorator} in an
* attribute {@link #CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR} for adaptation later.
* @param exchange the available ServerWebExchange.
* @param cacheDecoratedRequest if true, the ServerHttpRequestDecorator will be
* cached.
* @param function a function that accepts a ServerHttpRequest. It can be the created
* ServerHttpRequestDecorator or the originial if there is no body.
* @param <T> generic type for the return {@link Mono}.
* @return Mono of type T created by the function parameter.
*/
private static <T> Mono<T> cacheRequestBody(ServerWebExchange exchange,
boolean cacheDecoratedRequest,
Function<ServerHttpRequest, Mono<T>> function) {
ServerHttpResponse response = exchange.getResponse();
NettyDataBufferFactory factory = (NettyDataBufferFactory) response
.bufferFactory();
// Join all the DataBuffers so we have a single DataBuffer for the body
return DataBufferUtils.join(exchange.getRequest().getBody())
.defaultIfEmpty(
factory.wrap(new EmptyByteBuf(factory.getByteBufAllocator())))
.map(dataBuffer -> decorate(exchange, dataBuffer, cacheDecoratedRequest))
.switchIfEmpty(Mono.just(exchange.getRequest())).flatMap(function);
}