下面列出了org.springframework.http.server.reactive.AbstractServerHttpResponse#reactor.netty.http.server.HttpServerResponse 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {
ServerHttpResponse response = exchange.getResponse();
HttpServerResponse reactorResponse = ((AbstractServerHttpResponse) response).getNativeResponse();
HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
NettyDataBufferFactory bufferFactory = (NettyDataBufferFactory) response.bufferFactory();
return reactorResponse.sendWebsocket(subProtocol, this.maxFramePayloadLength,
(in, out) -> {
ReactorNettyWebSocketSession session =
new ReactorNettyWebSocketSession(
in, out, handshakeInfo, bufferFactory, this.maxFramePayloadLength);
URI uri = exchange.getRequest().getURI();
return handler.handle(session).checkpoint(uri + " [ReactorNettyRequestUpgradeStrategy]");
});
}
public Mono<Object> sendMessage(HttpServerRequest request, HttpServerResponse response, WebSocketServerHandle handleObject, Object requestAttributeObject) {
// return
return response.header("content-type", "text/plain")
.sendWebsocket((in, out) -> {
// return
return out.withConnection(
connect -> {
Channel channel = connect.channel();
connect.onDispose().subscribe(null, null, () -> {
handleObject.onClose(null, channel);
});
channel.attr(RequestAttribute.REQUEST_ATTRIBUTE).set((RequestAttribute) requestAttributeObject);
handleObject.onConnect(channel);
in.aggregateFrames().receiveFrames().subscribe(
frame->handleObject.handleEvent(frame, channel)
);
})
.sendString(UnicastProcessor.create());
})
.map(s->s);
}
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {
ServerHttpResponse response = exchange.getResponse();
HttpServerResponse reactorResponse = ((AbstractServerHttpResponse) response).getNativeResponse();
HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
NettyDataBufferFactory bufferFactory = (NettyDataBufferFactory) response.bufferFactory();
return reactorResponse.sendWebsocket(subProtocol, this.maxFramePayloadLength,
(in, out) -> {
ReactorNettyWebSocketSession session =
new ReactorNettyWebSocketSession(
in, out, handshakeInfo, bufferFactory, this.maxFramePayloadLength);
return handler.handle(session);
});
}
public void testThatSetupWithUnSpecifiedFrameSizeShouldSetMaxFrameSize() {
ArgumentCaptor<BiFunction> captor = ArgumentCaptor.forClass(BiFunction.class);
HttpServer httpServer = Mockito.spy(HttpServer.create());
Mockito.doAnswer(a -> httpServer).when(httpServer).handle(captor.capture());
Mockito.doAnswer(a -> Mono.empty()).when(httpServer).bind();
WebsocketServerTransport serverTransport = WebsocketServerTransport.create(httpServer);
serverTransport.start(c -> Mono.empty()).subscribe();
HttpServerRequest httpServerRequest = Mockito.mock(HttpServerRequest.class);
HttpServerResponse httpServerResponse = Mockito.mock(HttpServerResponse.class);
captor.getValue().apply(httpServerRequest, httpServerResponse);
Mockito.verify(httpServerResponse)
.sendWebsocket(
Mockito.nullable(String.class), Mockito.eq(FRAME_LENGTH_MASK), Mockito.any());
}
private Mono<Void> post(HttpServerRequest request, HttpServerResponse response) {
return Mono.from(metricFactory.decoratePublisherWithTimerMetricLogP99("JMAP-authentication-post",
Mono.just(request)
.map(this::assertJsonContentType)
.map(this::assertAcceptJsonOnly)
.flatMap(this::deserialize)
.flatMap(objectRequest -> {
if (objectRequest instanceof ContinuationTokenRequest) {
return handleContinuationTokenRequest((ContinuationTokenRequest) objectRequest, response);
} else if (objectRequest instanceof AccessTokenRequest) {
return handleAccessTokenRequest((AccessTokenRequest) objectRequest, response);
} else {
throw new RuntimeException(objectRequest.getClass() + " " + objectRequest);
}
})))
.onErrorResume(BadRequestException.class, e -> handleBadRequest(response, LOGGER, e))
.doOnEach(logOnError(e -> LOGGER.error("Unexpected error", e)))
.onErrorResume(e -> handleInternalError(response, e))
.subscriberContext(jmapContext(request))
.subscriberContext(jmapAction("auth-post"))
.subscribeOn(Schedulers.elastic());
}
private Mono<Void> returnEndPointsResponse(HttpServerResponse resp) {
try {
return resp.status(OK)
.header(CONTENT_TYPE, JSON_CONTENT_TYPE_UTF8)
.sendString(Mono.just(mapper.writeValueAsString(EndPointsResponse
.builder()
.api(JMAPUrls.JMAP)
.eventSource(JMAPUrls.NOT_IMPLEMENTED)
.upload(JMAPUrls.UPLOAD)
.download(JMAPUrls.DOWNLOAD)
.build())))
.then();
} catch (JsonProcessingException e) {
throw new InternalErrorException("Error serializing endpoint response", e);
}
}
private Mono<Void> returnAccessTokenResponse(HttpServerResponse resp, Username username) {
return Mono.from(accessTokenManager.grantAccessToken(username))
.map(accessToken -> AccessTokenResponse.builder()
.accessToken(accessToken)
.api(JMAPUrls.JMAP)
.eventSource(JMAPUrls.NOT_IMPLEMENTED)
.upload(JMAPUrls.UPLOAD)
.download(JMAPUrls.DOWNLOAD)
.build())
.flatMap(accessTokenResponse -> {
try {
return resp.status(CREATED)
.header(CONTENT_TYPE, JSON_CONTENT_TYPE_UTF8)
.sendString(Mono.just(mapper.writeValueAsString(accessTokenResponse)))
.then();
} catch (JsonProcessingException e) {
throw new InternalErrorException("Could not serialize access token response", e);
}
});
}
private Mono<Void> post(HttpServerRequest request, HttpServerResponse response) {
String contentType = request.requestHeaders().get(CONTENT_TYPE);
if (Strings.isNullOrEmpty(contentType)) {
return response.status(BAD_REQUEST).send();
} else {
return authenticator.authenticate(request)
.flatMap(session -> post(request, response, ContentType.of(contentType), session)
.subscriberContext(jmapAuthContext(session)))
.onErrorResume(CancelledUploadException.class, e -> handleCanceledUpload(response, e))
.onErrorResume(BadRequestException.class, e -> handleBadRequest(response, e))
.onErrorResume(UnauthorizedException.class, e -> handleAuthenticationFailure(response, LOGGER, e))
.doOnEach(logOnError(e -> LOGGER.error("Unexpected error", e)))
.onErrorResume(e -> handleInternalError(response, e))
.subscriberContext(jmapContext(request))
.subscriberContext(jmapAction("upload-get"))
.subscribeOn(Schedulers.elastic());
}
}
@Override
public void after(Object target, Object[] args, Object result, Throwable throwable) {
if (isDebug) {
logger.afterInterceptor(target, args, result, throwable);
}
try {
if (!isReceived(args)) {
return;
}
final HttpServerRequest request = (HttpServerRequest) args[0];
final HttpServerResponse response = (HttpServerResponse) args[0];
final int statusCode = getStatusCode(response);
this.servletRequestListenerInterceptorHelper.destroyed(request, throwable, statusCode);
} catch (Throwable t) {
if (isInfo) {
logger.info("Failed to servlet request event handle.", t);
}
}
}
public Mono<Object> sendFile(HttpServerRequest req, HttpServerResponse resp) {
String resourceUri = "";
for(String uriDir : resourceLocations.keySet()) {
if (!uriDir.equals("/") && req.uri().startsWith(uriDir)) {
resourceUri = req.uri().endsWith("/")
? resourceLocations.get(uriDir) + req.uri().substring(uriDir.length()) + "index.html"
: resourceLocations.get(uriDir) + req.uri().substring(uriDir.length());
// System.out.printf("{\n %s\n %s\n %s\n %s\n %s\n}\n",
// uriDir,
// resourceLocations.get(uriDir),
// req.uri().substring(uriDir.length()),
// req.uri(),
// resourceUri
// );
break;
}
}
if (resourceUri.equals("") && resourceLocations.get("/")!=null) {
resourceUri = req.uri().endsWith("/")
? resourceLocations.get("/") + req.uri().substring(1) + "index.html"
: resourceLocations.get("/") + req.uri().substring(1);
}
return ReactorGuiceServer.classResourcePath(resourceUri)
// .flatMap(path->this.setHeader(path, resp))
.flatMap(path -> {
resp.header(HttpHeaderNames.CONTENT_TYPE, contentType(path.toString())+"; charset=UTF-8");
if (Files.isDirectory(path)) {
return resp.sendRedirect(req.uri() + "/");
}
return resp.sendFile(path).then();
});
}
private Mono<Path> setHeader(Path path, HttpServerResponse resp) {
return Mono.fromCallable(()->{
if (!Files.isDirectory(path)) {
resp.header(HttpHeaderNames.CONTENT_LENGTH, String.valueOf(Files.size(path)));
resp.header(HttpHeaderNames.CONTENT_TYPE, contentType(path.toString())+"; charset=UTF-8");
}
return path;
}).subscribeOn(Schedulers.elastic());
}
public Mono<Object> sendMessage2(HttpServerRequest request,
HttpServerResponse response,
WebSocketServerHandle handleObject,
Object requestAttributeObject) {
return Mono.just((RequestAttribute) requestAttributeObject)
.flatMap(requestAttribute ->
response.header("content-type", "text/plain")
.sendWebsocket((in, out) -> {
// return
return out.withConnection(
connect -> {
// channel
Channel channel = connect.channel();
System.out.println("\n\n >>>" + in);
System.out.println(channel);
// on disconnect
connect.onDispose().subscribe(null, null, () -> {
// System.out.println(handleObject);
handleObject.onClose(null, channel);
});
// set requestAttribute to channel
channel.attr(RequestAttribute.REQUEST_ATTRIBUTE).set(requestAttribute);
// set channel to requestAttribute
// requestAttribute.setAttribute(CURRENT_CHANNEL, channel);
// on connect
handleObject.onConnect(channel);
// System.out.println("on connect");
// receive frame
in.aggregateFrames().receiveFrames().subscribe(
frame->handleObject.handleEvent(frame, channel)
);
})
// options
// .options(NettyPipeline.SendOptions::flushOnEach)
// .sendObject(Flux.just("abc".getBytes()));
.sendObject(rp);
})
);
}
public Mono<Object> sendMessage5(HttpServerRequest request,
HttpServerResponse response,
WebSocketServerHandle handleObject,
Object requestAttributeObject) {
return response.header("content-type", "text/plain")
.sendWebsocket((in, out) ->
out.sendString(in.receive()
.asString()
.publishOn(Schedulers.single())
// .doOnNext(s -> serverRes.incrementAndGet())
// .map(it -> it + ' ' + request.param("param") + '!')
.map(s-> "a")))
.then(Mono.empty());
}
@Override
public Mono<Object> doFilter(HttpServerRequest request, HttpServerResponse response, RequestAttribute requestAttribute) {
// return Mono.error(new Exception("sorry"));
// throw new RuntimeException("abc");
return Mono.just(requestAttribute);
// return Mono.error(new KReactorException(100, "zzz"));
}
private Publisher<Void> getBehaviors(HttpServerRequest req, HttpServerResponse res) {
MockSmtpBehaviors mockSmtpBehaviors = new MockSmtpBehaviors(smtpBehaviorRepository.remainingBehaviors()
.map(MockSMTPBehaviorInformation::getBehavior)
.collect(Guavate.toImmutableList()));
try {
return res.status(OK)
.header(CONTENT_TYPE, APPLICATION_JSON)
.sendString(Mono.just(OBJECT_MAPPER.writeValueAsString(mockSmtpBehaviors)));
} catch (JsonProcessingException e) {
LOGGER.error("Could not serialize JSON", e);
return res.status(INTERNAL_SERVER_ERROR).send();
}
}
private Publisher<Void> putBehaviors(HttpServerRequest req, HttpServerResponse res) {
return req.receive().aggregate().asInputStream()
.flatMap(inputStream -> {
try {
MockSmtpBehaviors behaviors = OBJECT_MAPPER.readValue(inputStream, MockSmtpBehaviors.class);
smtpBehaviorRepository.setBehaviors(behaviors);
return res.status(NO_CONTENT).send();
} catch (IOException e) {
LOGGER.info("Bad request", e);
return res.status(BAD_REQUEST).send();
}
});
}
private Publisher<Void> getMails(HttpServerRequest req, HttpServerResponse res) {
Mails mails = new Mails(receivedMailRepository.list());
try {
return res.status(OK)
.header(CONTENT_TYPE, APPLICATION_JSON)
.sendString(Mono.just(OBJECT_MAPPER.writeValueAsString(mails)));
} catch (JsonProcessingException e) {
LOGGER.error("Could not serialize JSON", e);
return res.status(INTERNAL_SERVER_ERROR).send();
}
}
private Mono<Void> post(HttpServerRequest request, HttpServerResponse response) {
return authenticator.authenticate(request)
.flatMap(session -> Flux.merge(
userProvisioner.provisionUser(session),
defaultMailboxesProvisioner.createMailboxesIfNeeded(session))
.then(Mono.from(metricFactory.decoratePublisherWithTimerMetric("JMAP-request",
post(request, response, session))))
.subscriberContext(jmapAuthContext(session)))
.onErrorResume(BadRequestException.class, e -> handleBadRequest(response, LOGGER, e))
.onErrorResume(UnauthorizedException.class, e -> handleAuthenticationFailure(response, LOGGER, e))
.doOnEach(logOnError(e -> LOGGER.error("Unexpected error", e)))
.onErrorResume(e -> handleInternalError(response, e))
.subscriberContext(jmapContext(request))
.subscribeOn(Schedulers.elastic());
}
private Mono<Void> post(HttpServerRequest request, HttpServerResponse response, MailboxSession session) {
Flux<Object[]> responses =
requestAsJsonStream(request)
.map(InvocationRequest::deserialize)
.map(invocationRequest -> AuthenticatedRequest.decorate(invocationRequest, session))
.concatMap(requestHandler::handle)
.map(InvocationResponse::asProtocolSpecification);
return sendResponses(response, responses);
}
private Mono<Void> sendResponses(HttpServerResponse response, Flux<Object[]> responses) {
return responses.collectList()
.map(objects -> {
try {
return objectMapper.writeValueAsString(objects);
} catch (JsonProcessingException e) {
throw new InternalErrorException("error serialising JMAP API response json");
}
})
.flatMap(json -> response.status(OK)
.header(CONTENT_TYPE, JSON_CONTENT_TYPE)
.sendString(Mono.just(json))
.then());
}
private Mono<Void> returnEndPointsResponse(HttpServerRequest req, HttpServerResponse resp) {
return authenticator.authenticate(req)
.flatMap(session -> returnEndPointsResponse(resp)
.subscriberContext(jmapAuthContext(session)))
.onErrorResume(BadRequestException.class, e -> handleBadRequest(resp, LOGGER, e))
.doOnEach(logOnError(e -> LOGGER.error("Unexpected error", e)))
.onErrorResume(InternalErrorException.class, e -> handleInternalError(resp, e))
.onErrorResume(UnauthorizedException.class, e -> handleAuthenticationFailure(resp, LOGGER, e))
.subscriberContext(jmapContext(req))
.subscriberContext(jmapAction("returnEndPoints"))
.subscribeOn(Schedulers.elastic());
}
private Mono<Void> delete(HttpServerRequest req, HttpServerResponse resp) {
String authorizationHeader = req.requestHeaders().get("Authorization");
return authenticator.authenticate(req)
.flatMap(session -> Mono.from(accessTokenManager.revoke(AccessToken.fromString(authorizationHeader)))
.then(resp.status(NO_CONTENT).send().then())
.subscriberContext(jmapAuthContext(session)))
.onErrorResume(UnauthorizedException.class, e -> handleAuthenticationFailure(resp, LOGGER, e))
.subscriberContext(jmapContext(req))
.subscriberContext(jmapAction("auth-delete"))
.subscribeOn(Schedulers.elastic());
}
private Mono<Void> handleContinuationTokenRequest(ContinuationTokenRequest request, HttpServerResponse resp) {
try {
ContinuationTokenResponse continuationTokenResponse = ContinuationTokenResponse
.builder()
.continuationToken(simpleTokenFactory.generateContinuationToken(request.getUsername()))
.methods(ContinuationTokenResponse.AuthenticationMethod.PASSWORD)
.build();
return resp.header(CONTENT_TYPE, JSON_CONTENT_TYPE_UTF8)
.sendString(Mono.just(mapper.writeValueAsString(continuationTokenResponse)))
.then();
} catch (Exception e) {
throw new InternalErrorException("Error while responding to continuation token", e);
}
}
private Mono<Void> handleAccessTokenRequest(AccessTokenRequest request, HttpServerResponse resp) {
SimpleTokenManager.TokenStatus validity = simpleTokenManager.getValidity(request.getToken());
switch (validity) {
case EXPIRED:
return returnForbiddenAuthentication(resp);
case INVALID:
return returnUnauthorizedResponse(resp)
.doOnEach(log(() -> LOGGER.warn("Use of an invalid ContinuationToken : {}", request.getToken().serialize())));
case OK:
return manageAuthenticationResponse(request, resp);
default:
throw new InternalErrorException(String.format("Validity %s is not implemented", validity));
}
}
private Mono<Void> manageAuthenticationResponse(AccessTokenRequest request, HttpServerResponse resp) {
Username username = Username.of(request.getToken().getUsername());
return authenticate(request, username)
.flatMap(success -> {
if (success) {
return returnAccessTokenResponse(resp, username);
} else {
return returnUnauthorizedResponse(resp)
.doOnEach(log(() -> LOGGER.info("Authentication failure for {}", username)));
}
});
}
private Mono<Void> post(HttpServerRequest request, HttpServerResponse response, DownloadPath downloadPath) {
return authenticator.authenticate(request)
.flatMap(session -> Mono.from(metricFactory.decoratePublisherWithTimerMetric("JMAP-download-post",
respondAttachmentAccessToken(session, downloadPath, response)))
.subscriberContext(jmapAuthContext(session)))
.onErrorResume(UnauthorizedException.class, e -> handleAuthenticationFailure(response, LOGGER, e))
.doOnEach(logOnError(e -> LOGGER.error("Unexpected error", e)))
.onErrorResume(e -> handleInternalError(response, e))
.subscriberContext(jmapContext(request))
.subscriberContext(jmapAction("download-post"))
.subscribeOn(Schedulers.elastic());
}
private Mono<Void> getFromIdAndName(HttpServerRequest request, HttpServerResponse response) {
String blobId = request.param(BLOB_ID_PATH_PARAM);
try {
String name = URLDecoder.decode(request.param(NAME_PATH_PARAM), StandardCharsets.UTF_8.toString());
DownloadPath downloadPath = DownloadPath.of(blobId, name);
return get(request, response, downloadPath);
} catch (UnsupportedEncodingException e) {
throw new BadRequestException("Wrong url encoding", e);
}
}
private Mono<Void> get(HttpServerRequest request, HttpServerResponse response, DownloadPath downloadPath) {
return authenticator.authenticate(request)
.flatMap(session -> Mono.from(metricFactory.decoratePublisherWithTimerMetric("JMAP-download-get",
download(session, downloadPath, response)))
.subscriberContext(jmapAuthContext(session)))
.onErrorResume(UnauthorizedException.class, e -> handleAuthenticationFailure(response, LOGGER, e))
.doOnEach(logOnError(e -> LOGGER.error("Unexpected error", e)))
.onErrorResume(e -> handleInternalError(response, e))
.subscriberContext(jmapContext(request))
.subscriberContext(jmapAction("download-get"))
.subscribeOn(Schedulers.elastic());
}
private Mono<Void> respondAttachmentAccessToken(MailboxSession mailboxSession, DownloadPath downloadPath, HttpServerResponse resp) {
String blobId = downloadPath.getBlobId();
try {
if (!attachmentExists(mailboxSession, blobId)) {
return resp.status(NOT_FOUND).send();
}
AttachmentAccessToken attachmentAccessToken = simpleTokenFactory.generateAttachmentAccessToken(mailboxSession.getUser().asString(), blobId);
return resp.header(CONTENT_TYPE, TEXT_PLAIN_CONTENT_TYPE)
.status(OK)
.sendString(Mono.just(attachmentAccessToken.serialize()))
.then();
} catch (MailboxException e) {
throw new InternalErrorException("Error while asking attachment access token", e);
}
}
private Mono<Void> downloadBlob(Optional<String> optionalName, HttpServerResponse response, long blobSize, ContentType blobContentType, InputStream stream) {
return addContentDispositionHeader(optionalName, response)
.header("Content-Length", String.valueOf(blobSize))
.header(CONTENT_TYPE, blobContentType.asString())
.status(OK)
.send(ReactorUtils.toChunks(stream, BUFFER_SIZE)
.map(Unpooled::wrappedBuffer)
.subscribeOn(Schedulers.elastic()))
.then();
}