org.springframework.http.server.reactive.AbstractServerHttpResponse#reactor.netty.http.server.HttpServerResponse源码实例Demo

下面列出了org.springframework.http.server.reactive.AbstractServerHttpResponse#reactor.netty.http.server.HttpServerResponse 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
		@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {

	ServerHttpResponse response = exchange.getResponse();
	HttpServerResponse reactorResponse = ((AbstractServerHttpResponse) response).getNativeResponse();
	HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
	NettyDataBufferFactory bufferFactory = (NettyDataBufferFactory) response.bufferFactory();

	return reactorResponse.sendWebsocket(subProtocol, this.maxFramePayloadLength,
			(in, out) -> {
				ReactorNettyWebSocketSession session =
						new ReactorNettyWebSocketSession(
								in, out, handshakeInfo, bufferFactory, this.maxFramePayloadLength);
				URI uri = exchange.getRequest().getURI();
				return handler.handle(session).checkpoint(uri + " [ReactorNettyRequestUpgradeStrategy]");
			});
}
 
源代码2 项目: reactor-guice   文件: WebsocketPublisher.java
public Mono<Object> sendMessage(HttpServerRequest request, HttpServerResponse response, WebSocketServerHandle handleObject, Object requestAttributeObject) {
    // return
    return response.header("content-type", "text/plain")
            .sendWebsocket((in, out) -> {
                // return
                return out.withConnection(
                        connect -> {
                            Channel channel = connect.channel();
                            connect.onDispose().subscribe(null, null, () -> {
                                handleObject.onClose(null, channel);
                            });
                            channel.attr(RequestAttribute.REQUEST_ATTRIBUTE).set((RequestAttribute) requestAttributeObject);
                            handleObject.onConnect(channel);
                            in.aggregateFrames().receiveFrames().subscribe(
                                    frame->handleObject.handleEvent(frame, channel)
                            );
                        })
                        .sendString(UnicastProcessor.create());
            })
            .map(s->s);
}
 
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
		@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {

	ServerHttpResponse response = exchange.getResponse();
	HttpServerResponse reactorResponse = ((AbstractServerHttpResponse) response).getNativeResponse();
	HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
	NettyDataBufferFactory bufferFactory = (NettyDataBufferFactory) response.bufferFactory();

	return reactorResponse.sendWebsocket(subProtocol, this.maxFramePayloadLength,
			(in, out) -> {
				ReactorNettyWebSocketSession session =
						new ReactorNettyWebSocketSession(
								in, out, handshakeInfo, bufferFactory, this.maxFramePayloadLength);
				return handler.handle(session);
			});
}
 
public void testThatSetupWithUnSpecifiedFrameSizeShouldSetMaxFrameSize() {
  ArgumentCaptor<BiFunction> captor = ArgumentCaptor.forClass(BiFunction.class);
  HttpServer httpServer = Mockito.spy(HttpServer.create());
  Mockito.doAnswer(a -> httpServer).when(httpServer).handle(captor.capture());
  Mockito.doAnswer(a -> Mono.empty()).when(httpServer).bind();

  WebsocketServerTransport serverTransport = WebsocketServerTransport.create(httpServer);

  serverTransport.start(c -> Mono.empty()).subscribe();

  HttpServerRequest httpServerRequest = Mockito.mock(HttpServerRequest.class);
  HttpServerResponse httpServerResponse = Mockito.mock(HttpServerResponse.class);

  captor.getValue().apply(httpServerRequest, httpServerResponse);

  Mockito.verify(httpServerResponse)
      .sendWebsocket(
          Mockito.nullable(String.class), Mockito.eq(FRAME_LENGTH_MASK), Mockito.any());
}
 
源代码5 项目: james-project   文件: AuthenticationRoutes.java
private Mono<Void> post(HttpServerRequest request, HttpServerResponse response) {
    return Mono.from(metricFactory.decoratePublisherWithTimerMetricLogP99("JMAP-authentication-post",
        Mono.just(request)
            .map(this::assertJsonContentType)
            .map(this::assertAcceptJsonOnly)
            .flatMap(this::deserialize)
            .flatMap(objectRequest -> {
                if (objectRequest instanceof ContinuationTokenRequest) {
                    return handleContinuationTokenRequest((ContinuationTokenRequest) objectRequest, response);
                } else if (objectRequest instanceof AccessTokenRequest) {
                    return handleAccessTokenRequest((AccessTokenRequest) objectRequest, response);
                } else {
                    throw new RuntimeException(objectRequest.getClass() + " " + objectRequest);
                }
            })))
        .onErrorResume(BadRequestException.class, e -> handleBadRequest(response, LOGGER, e))
        .doOnEach(logOnError(e -> LOGGER.error("Unexpected error", e)))
        .onErrorResume(e -> handleInternalError(response, e))
        .subscriberContext(jmapContext(request))
        .subscriberContext(jmapAction("auth-post"))
        .subscribeOn(Schedulers.elastic());
}
 
源代码6 项目: james-project   文件: AuthenticationRoutes.java
private Mono<Void> returnEndPointsResponse(HttpServerResponse resp) {
    try {
        return resp.status(OK)
            .header(CONTENT_TYPE, JSON_CONTENT_TYPE_UTF8)
            .sendString(Mono.just(mapper.writeValueAsString(EndPointsResponse
                .builder()
                .api(JMAPUrls.JMAP)
                .eventSource(JMAPUrls.NOT_IMPLEMENTED)
                .upload(JMAPUrls.UPLOAD)
                .download(JMAPUrls.DOWNLOAD)
                .build())))
            .then();
    } catch (JsonProcessingException e) {
        throw new InternalErrorException("Error serializing endpoint response", e);
    }
}
 
源代码7 项目: james-project   文件: AuthenticationRoutes.java
private Mono<Void> returnAccessTokenResponse(HttpServerResponse resp, Username username) {
    return Mono.from(accessTokenManager.grantAccessToken(username))
        .map(accessToken -> AccessTokenResponse.builder()
            .accessToken(accessToken)
            .api(JMAPUrls.JMAP)
            .eventSource(JMAPUrls.NOT_IMPLEMENTED)
            .upload(JMAPUrls.UPLOAD)
            .download(JMAPUrls.DOWNLOAD)
            .build())
        .flatMap(accessTokenResponse -> {
            try {
                return resp.status(CREATED)
                    .header(CONTENT_TYPE, JSON_CONTENT_TYPE_UTF8)
                    .sendString(Mono.just(mapper.writeValueAsString(accessTokenResponse)))
                    .then();
            } catch (JsonProcessingException e) {
                throw new InternalErrorException("Could not serialize access token response", e);
            }
        });
}
 
源代码8 项目: james-project   文件: UploadRoutes.java
private Mono<Void> post(HttpServerRequest request, HttpServerResponse response)  {
    String contentType = request.requestHeaders().get(CONTENT_TYPE);
    if (Strings.isNullOrEmpty(contentType)) {
        return response.status(BAD_REQUEST).send();
    } else {
        return authenticator.authenticate(request)
            .flatMap(session -> post(request, response, ContentType.of(contentType), session)
                .subscriberContext(jmapAuthContext(session)))
            .onErrorResume(CancelledUploadException.class, e -> handleCanceledUpload(response, e))
            .onErrorResume(BadRequestException.class, e -> handleBadRequest(response, e))
            .onErrorResume(UnauthorizedException.class, e -> handleAuthenticationFailure(response, LOGGER, e))
            .doOnEach(logOnError(e -> LOGGER.error("Unexpected error", e)))
            .onErrorResume(e -> handleInternalError(response, e))
            .subscriberContext(jmapContext(request))
            .subscriberContext(jmapAction("upload-get"))
            .subscribeOn(Schedulers.elastic());
    }
}
 
@Override
public void after(Object target, Object[] args, Object result, Throwable throwable) {
    if (isDebug) {
        logger.afterInterceptor(target, args, result, throwable);
    }

    try {
        if (!isReceived(args)) {
            return;
        }
        final HttpServerRequest request = (HttpServerRequest) args[0];
        final HttpServerResponse response = (HttpServerResponse) args[0];
        final int statusCode = getStatusCode(response);
        this.servletRequestListenerInterceptorHelper.destroyed(request, throwable, statusCode);
    } catch (Throwable t) {
        if (isInfo) {
            logger.info("Failed to servlet request event handle.", t);
        }
    }
}
 
源代码10 项目: reactor-guice   文件: StaticFilePublisher.java
public Mono<Object> sendFile(HttpServerRequest req, HttpServerResponse resp) {

        String resourceUri = "";

        for(String uriDir : resourceLocations.keySet()) {
            if (!uriDir.equals("/") && req.uri().startsWith(uriDir)) {
                resourceUri = req.uri().endsWith("/")
                    ? resourceLocations.get(uriDir) + req.uri().substring(uriDir.length()) + "index.html"
                    : resourceLocations.get(uriDir) + req.uri().substring(uriDir.length());
//                System.out.printf("{\n  %s\n  %s\n  %s\n  %s\n  %s\n}\n",
//                    uriDir,
//                    resourceLocations.get(uriDir),
//                    req.uri().substring(uriDir.length()),
//                    req.uri(),
//                    resourceUri
//                );
                break;
            }
        }

        if (resourceUri.equals("") && resourceLocations.get("/")!=null) {
            resourceUri = req.uri().endsWith("/")
                ? resourceLocations.get("/") + req.uri().substring(1) + "index.html"
                : resourceLocations.get("/") + req.uri().substring(1);
        }

        return ReactorGuiceServer.classResourcePath(resourceUri)
                // .flatMap(path->this.setHeader(path, resp))
                .flatMap(path -> {
                    resp.header(HttpHeaderNames.CONTENT_TYPE, contentType(path.toString())+"; charset=UTF-8");
                    if (Files.isDirectory(path)) {
                        return resp.sendRedirect(req.uri() + "/");
                    }
                    return resp.sendFile(path).then();
                });
    }
 
源代码11 项目: reactor-guice   文件: StaticFilePublisher.java
private Mono<Path> setHeader(Path path, HttpServerResponse resp) {
    return Mono.fromCallable(()->{
        if (!Files.isDirectory(path)) {
            resp.header(HttpHeaderNames.CONTENT_LENGTH, String.valueOf(Files.size(path)));
            resp.header(HttpHeaderNames.CONTENT_TYPE, contentType(path.toString())+"; charset=UTF-8");
        }
        return path;
    }).subscribeOn(Schedulers.elastic());
}
 
源代码12 项目: reactor-guice   文件: WebsocketPublisher.java
public Mono<Object> sendMessage2(HttpServerRequest request,
                                HttpServerResponse response,
                                WebSocketServerHandle handleObject,
                                Object requestAttributeObject) {

    return Mono.just((RequestAttribute) requestAttributeObject)
            .flatMap(requestAttribute ->
                    response.header("content-type", "text/plain")
                            .sendWebsocket((in, out) -> {
                                // return
                                return out.withConnection(
                                        connect -> {
                                            // channel
                                            Channel channel = connect.channel();
                                            System.out.println("\n\n >>>" + in);
                                            System.out.println(channel);
                                            // on disconnect
                                            connect.onDispose().subscribe(null, null, () -> {
                                                // System.out.println(handleObject);
                                                handleObject.onClose(null, channel);
                                            });
                                            // set requestAttribute to channel
                                            channel.attr(RequestAttribute.REQUEST_ATTRIBUTE).set(requestAttribute);
                                            // set channel to requestAttribute
                                            // requestAttribute.setAttribute(CURRENT_CHANNEL, channel);
                                            // on connect
                                            handleObject.onConnect(channel);
                                            // System.out.println("on connect");
                                            // receive frame
                                            in.aggregateFrames().receiveFrames().subscribe(
                                                    frame->handleObject.handleEvent(frame, channel)
                                            );
                                        })
                                        // options
                                        // .options(NettyPipeline.SendOptions::flushOnEach)
                                        // .sendObject(Flux.just("abc".getBytes()));
                                        .sendObject(rp);
                            })
            );
}
 
源代码13 项目: reactor-guice   文件: WebsocketPublisher.java
public Mono<Object> sendMessage5(HttpServerRequest request,
                                 HttpServerResponse response,
                                 WebSocketServerHandle handleObject,
                                 Object requestAttributeObject) {
    return response.header("content-type", "text/plain")
            .sendWebsocket((in, out) ->
                    out.sendString(in.receive()
                            .asString()
                            .publishOn(Schedulers.single())
                            // .doOnNext(s -> serverRes.incrementAndGet())
                            // .map(it -> it + ' ' + request.param("param") + '!')
                            .map(s-> "a")))
            .then(Mono.empty());
}
 
源代码14 项目: reactor-guice   文件: TestFilter.java
@Override
public Mono<Object> doFilter(HttpServerRequest request, HttpServerResponse response, RequestAttribute requestAttribute) {
    // return Mono.error(new Exception("sorry"));
    // throw new RuntimeException("abc");
    return Mono.just(requestAttribute);
    // return Mono.error(new KReactorException(100, "zzz"));
}
 
源代码15 项目: james-project   文件: HTTPConfigurationServer.java
private Publisher<Void> getBehaviors(HttpServerRequest req, HttpServerResponse res) {
    MockSmtpBehaviors mockSmtpBehaviors = new MockSmtpBehaviors(smtpBehaviorRepository.remainingBehaviors()
        .map(MockSMTPBehaviorInformation::getBehavior)
        .collect(Guavate.toImmutableList()));

    try {
        return res.status(OK)
            .header(CONTENT_TYPE, APPLICATION_JSON)
            .sendString(Mono.just(OBJECT_MAPPER.writeValueAsString(mockSmtpBehaviors)));
    } catch (JsonProcessingException e) {
        LOGGER.error("Could not serialize JSON", e);
        return res.status(INTERNAL_SERVER_ERROR).send();
    }
}
 
源代码16 项目: james-project   文件: HTTPConfigurationServer.java
private Publisher<Void> putBehaviors(HttpServerRequest req, HttpServerResponse res) {
    return req.receive().aggregate().asInputStream()
        .flatMap(inputStream -> {
            try {
                MockSmtpBehaviors behaviors = OBJECT_MAPPER.readValue(inputStream, MockSmtpBehaviors.class);
                smtpBehaviorRepository.setBehaviors(behaviors);
                return res.status(NO_CONTENT).send();
            } catch (IOException e) {
                LOGGER.info("Bad request", e);
                return res.status(BAD_REQUEST).send();
            }
        });
}
 
源代码17 项目: james-project   文件: HTTPConfigurationServer.java
private Publisher<Void> getMails(HttpServerRequest req, HttpServerResponse res) {
    Mails mails = new Mails(receivedMailRepository.list());

    try {
        return res.status(OK)
            .header(CONTENT_TYPE, APPLICATION_JSON)
            .sendString(Mono.just(OBJECT_MAPPER.writeValueAsString(mails)));
    } catch (JsonProcessingException e) {
        LOGGER.error("Could not serialize JSON", e);
        return res.status(INTERNAL_SERVER_ERROR).send();
    }
}
 
源代码18 项目: james-project   文件: JMAPApiRoutes.java
private Mono<Void> post(HttpServerRequest request, HttpServerResponse response) {
    return authenticator.authenticate(request)
        .flatMap(session -> Flux.merge(
            userProvisioner.provisionUser(session),
            defaultMailboxesProvisioner.createMailboxesIfNeeded(session))
            .then(Mono.from(metricFactory.decoratePublisherWithTimerMetric("JMAP-request",
                post(request, response, session))))
            .subscriberContext(jmapAuthContext(session)))
        .onErrorResume(BadRequestException.class, e -> handleBadRequest(response, LOGGER, e))
        .onErrorResume(UnauthorizedException.class, e -> handleAuthenticationFailure(response, LOGGER, e))
        .doOnEach(logOnError(e -> LOGGER.error("Unexpected error", e)))
        .onErrorResume(e -> handleInternalError(response, e))
        .subscriberContext(jmapContext(request))
        .subscribeOn(Schedulers.elastic());
}
 
源代码19 项目: james-project   文件: JMAPApiRoutes.java
private Mono<Void> post(HttpServerRequest request, HttpServerResponse response, MailboxSession session) {
    Flux<Object[]> responses =
        requestAsJsonStream(request)
            .map(InvocationRequest::deserialize)
            .map(invocationRequest -> AuthenticatedRequest.decorate(invocationRequest, session))
            .concatMap(requestHandler::handle)
            .map(InvocationResponse::asProtocolSpecification);

    return sendResponses(response, responses);
}
 
源代码20 项目: james-project   文件: JMAPApiRoutes.java
private Mono<Void> sendResponses(HttpServerResponse response, Flux<Object[]> responses) {
    return responses.collectList()
        .map(objects -> {
            try {
                return objectMapper.writeValueAsString(objects);
            } catch (JsonProcessingException e) {
                throw new InternalErrorException("error serialising JMAP API response json");
            }
        })
        .flatMap(json -> response.status(OK)
            .header(CONTENT_TYPE, JSON_CONTENT_TYPE)
            .sendString(Mono.just(json))
            .then());
}
 
源代码21 项目: james-project   文件: AuthenticationRoutes.java
private Mono<Void> returnEndPointsResponse(HttpServerRequest req, HttpServerResponse resp) {
        return authenticator.authenticate(req)
            .flatMap(session -> returnEndPointsResponse(resp)
                .subscriberContext(jmapAuthContext(session)))
            .onErrorResume(BadRequestException.class, e -> handleBadRequest(resp, LOGGER, e))
            .doOnEach(logOnError(e -> LOGGER.error("Unexpected error", e)))
            .onErrorResume(InternalErrorException.class, e -> handleInternalError(resp, e))
            .onErrorResume(UnauthorizedException.class, e -> handleAuthenticationFailure(resp, LOGGER, e))
            .subscriberContext(jmapContext(req))
            .subscriberContext(jmapAction("returnEndPoints"))
            .subscribeOn(Schedulers.elastic());
}
 
源代码22 项目: james-project   文件: AuthenticationRoutes.java
private Mono<Void> delete(HttpServerRequest req, HttpServerResponse resp) {
    String authorizationHeader = req.requestHeaders().get("Authorization");

    return authenticator.authenticate(req)
        .flatMap(session -> Mono.from(accessTokenManager.revoke(AccessToken.fromString(authorizationHeader)))
                .then(resp.status(NO_CONTENT).send().then())
            .subscriberContext(jmapAuthContext(session)))
        .onErrorResume(UnauthorizedException.class, e -> handleAuthenticationFailure(resp, LOGGER, e))
        .subscriberContext(jmapContext(req))
        .subscriberContext(jmapAction("auth-delete"))
        .subscribeOn(Schedulers.elastic());
}
 
源代码23 项目: james-project   文件: AuthenticationRoutes.java
private Mono<Void> handleContinuationTokenRequest(ContinuationTokenRequest request, HttpServerResponse resp) {
    try {
        ContinuationTokenResponse continuationTokenResponse = ContinuationTokenResponse
            .builder()
            .continuationToken(simpleTokenFactory.generateContinuationToken(request.getUsername()))
            .methods(ContinuationTokenResponse.AuthenticationMethod.PASSWORD)
            .build();
        return resp.header(CONTENT_TYPE, JSON_CONTENT_TYPE_UTF8)
            .sendString(Mono.just(mapper.writeValueAsString(continuationTokenResponse)))
            .then();
    } catch (Exception e) {
        throw new InternalErrorException("Error while responding to continuation token", e);
    }
}
 
源代码24 项目: james-project   文件: AuthenticationRoutes.java
private Mono<Void> handleAccessTokenRequest(AccessTokenRequest request, HttpServerResponse resp) {
    SimpleTokenManager.TokenStatus validity = simpleTokenManager.getValidity(request.getToken());
    switch (validity) {
        case EXPIRED:
            return returnForbiddenAuthentication(resp);
        case INVALID:
            return returnUnauthorizedResponse(resp)
                .doOnEach(log(() -> LOGGER.warn("Use of an invalid ContinuationToken : {}", request.getToken().serialize())));
        case OK:
            return manageAuthenticationResponse(request, resp);
        default:
            throw new InternalErrorException(String.format("Validity %s is not implemented", validity));
    }
}
 
源代码25 项目: james-project   文件: AuthenticationRoutes.java
private Mono<Void> manageAuthenticationResponse(AccessTokenRequest request, HttpServerResponse resp) {
    Username username = Username.of(request.getToken().getUsername());

    return authenticate(request, username)
        .flatMap(success -> {
            if (success) {
                return returnAccessTokenResponse(resp, username);
            } else {
                return returnUnauthorizedResponse(resp)
                    .doOnEach(log(() -> LOGGER.info("Authentication failure for {}", username)));
            }
        });
}
 
源代码26 项目: james-project   文件: DownloadRoutes.java
private Mono<Void> post(HttpServerRequest request, HttpServerResponse response, DownloadPath downloadPath) {
    return authenticator.authenticate(request)
        .flatMap(session -> Mono.from(metricFactory.decoratePublisherWithTimerMetric("JMAP-download-post",
                respondAttachmentAccessToken(session, downloadPath, response)))
            .subscriberContext(jmapAuthContext(session)))
        .onErrorResume(UnauthorizedException.class, e -> handleAuthenticationFailure(response, LOGGER, e))
        .doOnEach(logOnError(e -> LOGGER.error("Unexpected error", e)))
        .onErrorResume(e -> handleInternalError(response, e))
        .subscriberContext(jmapContext(request))
        .subscriberContext(jmapAction("download-post"))
        .subscribeOn(Schedulers.elastic());
}
 
源代码27 项目: james-project   文件: DownloadRoutes.java
private Mono<Void> getFromIdAndName(HttpServerRequest request, HttpServerResponse response) {
    String blobId = request.param(BLOB_ID_PATH_PARAM);
    try {
        String name = URLDecoder.decode(request.param(NAME_PATH_PARAM), StandardCharsets.UTF_8.toString());
        DownloadPath downloadPath = DownloadPath.of(blobId, name);
        return get(request, response, downloadPath);
    } catch (UnsupportedEncodingException e) {
        throw new BadRequestException("Wrong url encoding", e);
    }
}
 
源代码28 项目: james-project   文件: DownloadRoutes.java
private Mono<Void> get(HttpServerRequest request, HttpServerResponse response, DownloadPath downloadPath) {
    return authenticator.authenticate(request)
        .flatMap(session -> Mono.from(metricFactory.decoratePublisherWithTimerMetric("JMAP-download-get",
                download(session, downloadPath, response)))
            .subscriberContext(jmapAuthContext(session)))
        .onErrorResume(UnauthorizedException.class, e -> handleAuthenticationFailure(response, LOGGER, e))
        .doOnEach(logOnError(e -> LOGGER.error("Unexpected error", e)))
        .onErrorResume(e -> handleInternalError(response, e))
        .subscriberContext(jmapContext(request))
        .subscriberContext(jmapAction("download-get"))
        .subscribeOn(Schedulers.elastic());
}
 
源代码29 项目: james-project   文件: DownloadRoutes.java
private Mono<Void> respondAttachmentAccessToken(MailboxSession mailboxSession, DownloadPath downloadPath, HttpServerResponse resp) {
    String blobId = downloadPath.getBlobId();
    try {
        if (!attachmentExists(mailboxSession, blobId)) {
            return resp.status(NOT_FOUND).send();
        }
        AttachmentAccessToken attachmentAccessToken = simpleTokenFactory.generateAttachmentAccessToken(mailboxSession.getUser().asString(), blobId);
        return resp.header(CONTENT_TYPE, TEXT_PLAIN_CONTENT_TYPE)
            .status(OK)
            .sendString(Mono.just(attachmentAccessToken.serialize()))
            .then();
    } catch (MailboxException e) {
        throw new InternalErrorException("Error while asking attachment access token", e);
    }
}
 
源代码30 项目: james-project   文件: DownloadRoutes.java
private Mono<Void> downloadBlob(Optional<String> optionalName, HttpServerResponse response, long blobSize, ContentType blobContentType, InputStream stream) {
    return addContentDispositionHeader(optionalName, response)
        .header("Content-Length", String.valueOf(blobSize))
        .header(CONTENT_TYPE, blobContentType.asString())
        .status(OK)
        .send(ReactorUtils.toChunks(stream, BUFFER_SIZE)
            .map(Unpooled::wrappedBuffer)
            .subscribeOn(Schedulers.elastic()))
        .then();
}