类org.springframework.http.ReactiveHttpInputMessage源码实例Demo

下面列出了怎么用org.springframework.http.ReactiveHttpInputMessage的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: spring-analysis-note   文件: BodyExtractors.java
private static <T, S extends Publisher<T>> S readWithMessageReaders(
		ReactiveHttpInputMessage message, BodyExtractor.Context context, ResolvableType elementType,
		Function<HttpMessageReader<T>, S> readerFunction,
		Function<UnsupportedMediaTypeException, S> errorFunction,
		Supplier<S> emptySupplier) {

	if (VOID_TYPE.equals(elementType)) {
		return emptySupplier.get();
	}
	MediaType contentType = Optional.ofNullable(message.getHeaders().getContentType())
			.orElse(MediaType.APPLICATION_OCTET_STREAM);

	return context.messageReaders().stream()
			.filter(reader -> reader.canRead(elementType, contentType))
			.findFirst()
			.map(BodyExtractors::<T>cast)
			.map(readerFunction)
			.orElseGet(() -> {
				List<MediaType> mediaTypes = context.messageReaders().stream()
						.flatMap(reader -> reader.getReadableMediaTypes().stream())
						.collect(Collectors.toList());
				return errorFunction.apply(
						new UnsupportedMediaTypeException(contentType, mediaTypes, elementType));
			});
}
 
源代码2 项目: spring-analysis-note   文件: BodyExtractors.java
private static <T> Flux<T> unsupportedErrorHandler(
		ReactiveHttpInputMessage message, UnsupportedMediaTypeException ex) {

	Flux<T> result;
	if (message.getHeaders().getContentType() == null) {
		// Maybe it's okay there is no content type, if there is no content..
		result = message.getBody().map(buffer -> {
			DataBufferUtils.release(buffer);
			throw ex;
		});
	}
	else {
		result = message instanceof ClientHttpResponse ?
				consumeAndCancel(message).thenMany(Flux.error(ex)) : Flux.error(ex);
	}
	return result;
}
 
@Test
public void toMono() {
	BodyExtractor<Mono<String>, ReactiveHttpInputMessage> extractor = BodyExtractors.toMono(String.class);

	DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
	DefaultDataBuffer dataBuffer =
			factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8)));
	Flux<DataBuffer> body = Flux.just(dataBuffer);

	MockServerHttpRequest request = MockServerHttpRequest.post("/").body(body);
	Mono<String> result = extractor.extract(request, this.context);

	StepVerifier.create(result)
			.expectNext("foo")
			.expectComplete()
			.verify();
}
 
@Test
public void toMonoParameterizedTypeReference() {
	BodyExtractor<Mono<Map<String, String>>, ReactiveHttpInputMessage> extractor =
			BodyExtractors.toMono(new ParameterizedTypeReference<Map<String, String>>() {});

	DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
	DefaultDataBuffer dataBuffer =
			factory.wrap(ByteBuffer.wrap("{\"username\":\"foo\",\"password\":\"bar\"}".getBytes(StandardCharsets.UTF_8)));
	Flux<DataBuffer> body = Flux.just(dataBuffer);

	MockServerHttpRequest request = MockServerHttpRequest.post("/").contentType(MediaType.APPLICATION_JSON).body(body);
	Mono<Map<String, String>> result = extractor.extract(request, this.context);

	Map<String, String > expected = new LinkedHashMap<>();
	expected.put("username", "foo");
	expected.put("password", "bar");
	StepVerifier.create(result)
			.expectNext(expected)
			.expectComplete()
			.verify();
}
 
@Test
public void toMonoWithHints() {
	BodyExtractor<Mono<User>, ReactiveHttpInputMessage> extractor = BodyExtractors.toMono(User.class);
	this.hints.put(JSON_VIEW_HINT, SafeToDeserialize.class);

	DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
	DefaultDataBuffer dataBuffer =
			factory.wrap(ByteBuffer.wrap("{\"username\":\"foo\",\"password\":\"bar\"}".getBytes(StandardCharsets.UTF_8)));
	Flux<DataBuffer> body = Flux.just(dataBuffer);

	MockServerHttpRequest request = MockServerHttpRequest.post("/")
			.contentType(MediaType.APPLICATION_JSON)
			.body(body);

	Mono<User> result = extractor.extract(request, this.context);

	StepVerifier.create(result)
			.consumeNextWith(user -> {
				assertEquals("foo", user.getUsername());
				assertNull(user.getPassword());
			})
			.expectComplete()
			.verify();
}
 
@Test
public void toMonoVoidAsClientShouldConsumeAndCancel() {
	DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
	DefaultDataBuffer dataBuffer =
			factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8)));
	TestPublisher<DataBuffer> body = TestPublisher.create();

	BodyExtractor<Mono<Void>, ReactiveHttpInputMessage> extractor = BodyExtractors.toMono(Void.class);
	MockClientHttpResponse response = new MockClientHttpResponse(HttpStatus.OK);
	response.setBody(body.flux());

	StepVerifier.create(extractor.extract(response, this.context))
			.then(() -> {
				body.assertWasSubscribed();
				body.emit(dataBuffer);
			})
			.verifyComplete();

	body.assertCancelled();
}
 
@Test
public void toFlux() {
	BodyExtractor<Flux<String>, ReactiveHttpInputMessage> extractor = BodyExtractors.toFlux(String.class);

	DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
	DefaultDataBuffer dataBuffer =
			factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8)));
	Flux<DataBuffer> body = Flux.just(dataBuffer);

	MockServerHttpRequest request = MockServerHttpRequest.post("/").body(body);
	Flux<String> result = extractor.extract(request, this.context);

	StepVerifier.create(result)
			.expectNext("foo")
			.expectComplete()
			.verify();
}
 
@Test
public void toDataBuffers() {
	BodyExtractor<Flux<DataBuffer>, ReactiveHttpInputMessage> extractor = BodyExtractors.toDataBuffers();

	DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
	DefaultDataBuffer dataBuffer =
			factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8)));
	Flux<DataBuffer> body = Flux.just(dataBuffer);

	MockServerHttpRequest request = MockServerHttpRequest.post("/").body(body);
	Flux<DataBuffer> result = extractor.extract(request, this.context);

	StepVerifier.create(result)
			.expectNext(dataBuffer)
			.expectComplete()
			.verify();
}
 
@Override
public Mono<MultiValueMap<String, String>> readMono(ResolvableType elementType,
		ReactiveHttpInputMessage message, Map<String, Object> hints) {

	MediaType contentType = message.getHeaders().getContentType();
	Charset charset = getMediaTypeCharset(contentType);

	return DataBufferUtils.join(message.getBody())
			.map(buffer -> {
				CharBuffer charBuffer = charset.decode(buffer.asByteBuffer());
				String body = charBuffer.toString();
				DataBufferUtils.release(buffer);
				MultiValueMap<String, String> formData = parseFormData(charset, body);
				logFormData(formData, hints);
				return formData;
			});
}
 
@Override
public Mono<MultiValueMap<String, Part>> readMono(ResolvableType elementType,
		ReactiveHttpInputMessage inputMessage, Map<String, Object> hints) {


	Map<String, Object> allHints = Hints.merge(hints, Hints.SUPPRESS_LOGGING_HINT, true);

	return this.partReader.read(elementType, inputMessage, allHints)
			.collectMultimap(Part::name)
			.doOnNext(map ->
				LogFormatUtils.traceDebug(logger, traceOn -> Hints.getLogPrefix(hints) + "Parsed " +
						(isEnableLoggingRequestDetails() ?
								LogFormatUtils.formatValue(map, !traceOn) :
								"parts " + map.keySet() + " (content masked)"))
			)
			.map(this::toMultiValueMap);
}
 
@Override
public Flux<Part> read(ResolvableType elementType, ReactiveHttpInputMessage message, Map<String, Object> hints) {
	byte[] boundary = boundary(message);
	if (boundary == null) {
		return Flux.error(new CodecException("No multipart boundary found in Content-Type: \"" +
				message.getHeaders().getContentType() + "\""));
	}
	if (logger.isTraceEnabled()) {
		logger.trace("Boundary: " + toString(boundary));
	}

	byte[] boundaryNeedle = concat(BOUNDARY_PREFIX, boundary);
	Flux<DataBuffer> body = skipUntilFirstBoundary(message.getBody(), boundary);

	return DataBufferUtils.split(body, boundaryNeedle)
			.takeWhile(DefaultMultipartMessageReader::notLastBoundary)
			.map(DefaultMultipartMessageReader::toPart)
			.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)
			.doOnDiscard(DefaultPart.class, part -> DataBufferUtils.release(part.body));
}
 
源代码12 项目: java-technology-stack   文件: BodyExtractors.java
private static <T, S extends Publisher<T>> S readWithMessageReaders(
		ReactiveHttpInputMessage message, BodyExtractor.Context context, ResolvableType elementType,
		Function<HttpMessageReader<T>, S> readerFunction,
		Function<UnsupportedMediaTypeException, S> errorFunction,
		Supplier<S> emptySupplier) {

	if (VOID_TYPE.equals(elementType)) {
		return emptySupplier.get();
	}
	MediaType contentType = Optional.ofNullable(message.getHeaders().getContentType())
			.orElse(MediaType.APPLICATION_OCTET_STREAM);

	return context.messageReaders().stream()
			.filter(reader -> reader.canRead(elementType, contentType))
			.findFirst()
			.map(BodyExtractors::<T>cast)
			.map(readerFunction)
			.orElseGet(() -> {
				List<MediaType> mediaTypes = context.messageReaders().stream()
						.flatMap(reader -> reader.getReadableMediaTypes().stream())
						.collect(Collectors.toList());
				return errorFunction.apply(
						new UnsupportedMediaTypeException(contentType, mediaTypes, elementType));
			});
}
 
源代码13 项目: java-technology-stack   文件: BodyExtractors.java
private static <T> Flux<T> unsupportedErrorHandler(
		ReactiveHttpInputMessage message, UnsupportedMediaTypeException ex) {

	Flux<T> result;
	if (message.getHeaders().getContentType() == null) {
		// Maybe it's okay there is no content type, if there is no content..
		result = message.getBody().map(buffer -> {
			DataBufferUtils.release(buffer);
			throw ex;
		});
	}
	else {
		result = message instanceof ClientHttpResponse ?
				consumeAndCancel(message).thenMany(Flux.error(ex)) : Flux.error(ex);
	}
	return result;
}
 
@Test
public void toMono() {
	BodyExtractor<Mono<String>, ReactiveHttpInputMessage> extractor = BodyExtractors.toMono(String.class);

	DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
	DefaultDataBuffer dataBuffer =
			factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8)));
	Flux<DataBuffer> body = Flux.just(dataBuffer);

	MockServerHttpRequest request = MockServerHttpRequest.post("/").body(body);
	Mono<String> result = extractor.extract(request, this.context);

	StepVerifier.create(result)
			.expectNext("foo")
			.expectComplete()
			.verify();
}
 
@Test
public void toMonoParameterizedTypeReference() {
	BodyExtractor<Mono<Map<String, String>>, ReactiveHttpInputMessage> extractor =
			BodyExtractors.toMono(new ParameterizedTypeReference<Map<String, String>>() {});

	DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
	DefaultDataBuffer dataBuffer =
			factory.wrap(ByteBuffer.wrap("{\"username\":\"foo\",\"password\":\"bar\"}".getBytes(StandardCharsets.UTF_8)));
	Flux<DataBuffer> body = Flux.just(dataBuffer);

	MockServerHttpRequest request = MockServerHttpRequest.post("/").contentType(MediaType.APPLICATION_JSON).body(body);
	Mono<Map<String, String>> result = extractor.extract(request, this.context);

	Map<String, String > expected = new LinkedHashMap<>();
	expected.put("username", "foo");
	expected.put("password", "bar");
	StepVerifier.create(result)
			.expectNext(expected)
			.expectComplete()
			.verify();
}
 
@Test
public void toMonoWithHints() {
	BodyExtractor<Mono<User>, ReactiveHttpInputMessage> extractor = BodyExtractors.toMono(User.class);
	this.hints.put(JSON_VIEW_HINT, SafeToDeserialize.class);

	DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
	DefaultDataBuffer dataBuffer =
			factory.wrap(ByteBuffer.wrap("{\"username\":\"foo\",\"password\":\"bar\"}".getBytes(StandardCharsets.UTF_8)));
	Flux<DataBuffer> body = Flux.just(dataBuffer);

	MockServerHttpRequest request = MockServerHttpRequest.post("/")
			.contentType(MediaType.APPLICATION_JSON)
			.body(body);

	Mono<User> result = extractor.extract(request, this.context);

	StepVerifier.create(result)
			.consumeNextWith(user -> {
				assertEquals("foo", user.getUsername());
				assertNull(user.getPassword());
			})
			.expectComplete()
			.verify();
}
 
@Test
public void toMonoVoidAsClientShouldConsumeAndCancel() {
	DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
	DefaultDataBuffer dataBuffer =
			factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8)));
	TestPublisher<DataBuffer> body = TestPublisher.create();

	BodyExtractor<Mono<Void>, ReactiveHttpInputMessage> extractor = BodyExtractors.toMono(Void.class);
	MockClientHttpResponse response = new MockClientHttpResponse(HttpStatus.OK);
	response.setBody(body.flux());

	StepVerifier.create(extractor.extract(response, this.context))
			.then(() -> {
				body.assertWasSubscribed();
				body.emit(dataBuffer);
			})
			.verifyComplete();

	body.assertCancelled();
}
 
@Test
public void toFlux() {
	BodyExtractor<Flux<String>, ReactiveHttpInputMessage> extractor = BodyExtractors.toFlux(String.class);

	DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
	DefaultDataBuffer dataBuffer =
			factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8)));
	Flux<DataBuffer> body = Flux.just(dataBuffer);

	MockServerHttpRequest request = MockServerHttpRequest.post("/").body(body);
	Flux<String> result = extractor.extract(request, this.context);

	StepVerifier.create(result)
			.expectNext("foo")
			.expectComplete()
			.verify();
}
 
@Test
public void toDataBuffers() {
	BodyExtractor<Flux<DataBuffer>, ReactiveHttpInputMessage> extractor = BodyExtractors.toDataBuffers();

	DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
	DefaultDataBuffer dataBuffer =
			factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8)));
	Flux<DataBuffer> body = Flux.just(dataBuffer);

	MockServerHttpRequest request = MockServerHttpRequest.post("/").body(body);
	Flux<DataBuffer> result = extractor.extract(request, this.context);

	StepVerifier.create(result)
			.expectNext(dataBuffer)
			.expectComplete()
			.verify();
}
 
@Override
public Mono<MultiValueMap<String, String>> readMono(ResolvableType elementType,
		ReactiveHttpInputMessage message, Map<String, Object> hints) {

	MediaType contentType = message.getHeaders().getContentType();
	Charset charset = getMediaTypeCharset(contentType);

	return DataBufferUtils.join(message.getBody())
			.map(buffer -> {
				CharBuffer charBuffer = charset.decode(buffer.asByteBuffer());
				String body = charBuffer.toString();
				DataBufferUtils.release(buffer);
				MultiValueMap<String, String> formData = parseFormData(charset, body);
				logFormData(formData, hints);
				return formData;
			});
}
 
@Override
public Mono<MultiValueMap<String, Part>> readMono(ResolvableType elementType,
		ReactiveHttpInputMessage inputMessage, Map<String, Object> hints) {


	Map<String, Object> allHints = Hints.merge(hints, Hints.SUPPRESS_LOGGING_HINT, true);

	return this.partReader.read(elementType, inputMessage, allHints)
			.collectMultimap(Part::name)
			.doOnNext(map -> {
				LogFormatUtils.traceDebug(logger, traceOn -> Hints.getLogPrefix(hints) + "Parsed " +
						(isEnableLoggingRequestDetails() ?
								LogFormatUtils.formatValue(map, !traceOn) :
								"parts " + map.keySet() + " (content masked)"));
			})
			.map(this::toMultiValueMap);
}
 
源代码22 项目: spring-analysis-note   文件: BodyExtractors.java
private static <T> BodyExtractor<Mono<T>, ReactiveHttpInputMessage> toMono(ResolvableType elementType) {
	return (inputMessage, context) ->
			readWithMessageReaders(inputMessage, context, elementType,
					(HttpMessageReader<T> reader) -> readToMono(inputMessage, context, elementType, reader),
					ex -> Mono.from(unsupportedErrorHandler(inputMessage, ex)),
					skipBodyAsMono(inputMessage));
}
 
源代码23 项目: spring-analysis-note   文件: BodyExtractors.java
@SuppressWarnings("unchecked")
private static <T> BodyExtractor<Flux<T>, ReactiveHttpInputMessage> toFlux(ResolvableType elementType) {
	return (inputMessage, context) ->
			readWithMessageReaders(inputMessage, context, elementType,
					(HttpMessageReader<T> reader) -> readToFlux(inputMessage, context, elementType, reader),
					ex -> unsupportedErrorHandler(inputMessage, ex),
					skipBodyAsFlux(inputMessage));
}
 
源代码24 项目: spring-analysis-note   文件: BodyExtractors.java
/**
 * Extractor to read form data into {@code MultiValueMap<String, String>}.
 * <p>As of 5.1 this method can also be used on the client side to read form
 * data from a server response (e.g. OAuth).
 * @return {@code BodyExtractor} for form data
 */
public static BodyExtractor<Mono<MultiValueMap<String, String>>, ReactiveHttpInputMessage> toFormData() {
	return (message, context) -> {
		ResolvableType elementType = FORM_DATA_TYPE;
		MediaType mediaType = MediaType.APPLICATION_FORM_URLENCODED;
		HttpMessageReader<MultiValueMap<String, String>> reader = findReader(elementType, mediaType, context);
		return readToMono(message, context, elementType, reader);
	};
}
 
源代码25 项目: spring-analysis-note   文件: BodyExtractors.java
private static <T> Mono<T> readToMono(ReactiveHttpInputMessage message, BodyExtractor.Context context,
		ResolvableType type, HttpMessageReader<T> reader) {

	return context.serverResponse()
			.map(response -> reader.readMono(type, type, (ServerHttpRequest) message, response, context.hints()))
			.orElseGet(() -> reader.readMono(type, message, context.hints()));
}
 
源代码26 项目: spring-analysis-note   文件: BodyExtractors.java
private static <T> Flux<T> readToFlux(ReactiveHttpInputMessage message, BodyExtractor.Context context,
		ResolvableType type, HttpMessageReader<T> reader) {

	return context.serverResponse()
			.map(response -> reader.read(type, type, (ServerHttpRequest) message, response, context.hints()))
			.orElseGet(() -> reader.read(type, message, context.hints()));
}
 
源代码27 项目: spring-analysis-note   文件: BodyExtractors.java
private static Mono<Void> consumeAndCancel(ReactiveHttpInputMessage message) {
	return message.getBody()
			.map(buffer -> {
				DataBufferUtils.release(buffer);
				throw new ReadCancellationException();
			})
			.onErrorResume(ReadCancellationException.class, ex -> Mono.empty())
			.then();
}
 
@Override
public Mono<Object> readMono(
		ResolvableType elementType, ReactiveHttpInputMessage message, Map<String, Object> hints) {

	// We're ahead of String + "*/*"
	// Let's see if we can aggregate the output (lest we time out)...

	if (elementType.resolve() == String.class) {
		Flux<DataBuffer> body = message.getBody();
		return stringDecoder.decodeToMono(body, elementType, null, null).cast(Object.class);
	}

	return Mono.error(new UnsupportedOperationException(
			"ServerSentEventHttpMessageReader only supports reading stream of events as a Flux"));
}
 
源代码29 项目: spring-analysis-note   文件: BodyExtractorsTests.java
@Test
public void toMonoVoidAsClientWithEmptyBody() {
	TestPublisher<DataBuffer> body = TestPublisher.create();

	BodyExtractor<Mono<Void>, ReactiveHttpInputMessage> extractor = BodyExtractors.toMono(Void.class);
	MockClientHttpResponse response = new MockClientHttpResponse(HttpStatus.OK);
	response.setBody(body.flux());

	StepVerifier.create(extractor.extract(response, this.context))
			.then(() -> {
				body.assertWasSubscribed();
				body.complete();
			})
			.verifyComplete();
}
 
源代码30 项目: spring-analysis-note   文件: BodyExtractorsTests.java
@Test
public void toFluxWithHints() {
	BodyExtractor<Flux<User>, ReactiveHttpInputMessage> extractor = BodyExtractors.toFlux(User.class);
	this.hints.put(JSON_VIEW_HINT, SafeToDeserialize.class);

	DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
	String text = "[{\"username\":\"foo\",\"password\":\"bar\"},{\"username\":\"bar\",\"password\":\"baz\"}]";
	DefaultDataBuffer dataBuffer = factory.wrap(ByteBuffer.wrap(text.getBytes(StandardCharsets.UTF_8)));
	Flux<DataBuffer> body = Flux.just(dataBuffer);

	MockServerHttpRequest request = MockServerHttpRequest.post("/")
			.contentType(MediaType.APPLICATION_JSON)
			.body(body);

	Flux<User> result = extractor.extract(request, this.context);

	StepVerifier.create(result)
			.consumeNextWith(user -> {
				assertEquals("foo", user.getUsername());
				assertNull(user.getPassword());
			})
			.consumeNextWith(user -> {
				assertEquals("bar", user.getUsername());
				assertNull(user.getPassword());
			})
			.expectComplete()
			.verify();
}
 
 类所在包
 同包方法