类org.springframework.core.io.buffer.DataBuffer源码实例Demo

下面列出了怎么用org.springframework.core.io.buffer.DataBuffer的API类实例代码及写法,或者点击链接到github查看源代码。

@Override
public Mono<Void> write(Publisher<? extends MultiValueMap<String, String>> inputStream,
		ResolvableType elementType, @Nullable MediaType mediaType, ReactiveHttpOutputMessage message,
		Map<String, Object> hints) {

	mediaType = getMediaType(mediaType);
	message.getHeaders().setContentType(mediaType);

	Charset charset = mediaType.getCharset() != null ? mediaType.getCharset() : getDefaultCharset();

	return Mono.from(inputStream).flatMap(form -> {
		logFormData(form, hints);
		String value = serializeForm(form, charset);
		ByteBuffer byteBuffer = charset.encode(value);
		DataBuffer buffer = message.bufferFactory().wrap(byteBuffer); // wrapping only, no allocation
		message.getHeaders().setContentLength(byteBuffer.remaining());
		return message.writeWith(Mono.just(buffer));
	});
}
 
@Override
public Flux<Object> decode(Publisher<DataBuffer> input, ResolvableType elementType,
		@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {

	Flux<TokenBuffer> tokens = Jackson2Tokenizer.tokenize(
			Flux.from(input), this.jsonFactory, getObjectMapper(), true);

	ObjectReader reader = getObjectReader(elementType, hints);

	return tokens.handle((tokenBuffer, sink) -> {
		try {
			Object value = reader.readValue(tokenBuffer.asParser(getObjectMapper()));
			logValue(value, hints);
			if (value != null) {
				sink.next(value);
			}
		}
		catch (IOException ex) {
			sink.error(processException(ex));
		}
	});
}
 
@Test
public void toEntityList() {
	DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
	DefaultDataBuffer dataBuffer =
			factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8)));
	Flux<DataBuffer> body = Flux.just(dataBuffer);
	mockTextPlainResponse(body);

	List<HttpMessageReader<?>> messageReaders = Collections
			.singletonList(new DecoderHttpMessageReader<>(StringDecoder.allMimeTypes()));
	given(mockExchangeStrategies.messageReaders()).willReturn(messageReaders);

	ResponseEntity<List<String>> result = defaultClientResponse.toEntityList(String.class).block();
	assertEquals(Collections.singletonList("foo"), result.getBody());
	assertEquals(HttpStatus.OK, result.getStatusCode());
	assertEquals(HttpStatus.OK.value(), result.getStatusCodeValue());
	assertEquals(MediaType.TEXT_PLAIN, result.getHeaders().getContentType());
}
 
/**
 * Read from the request body InputStream and return a DataBuffer.
 * Invoked only when {@link ServletInputStream#isReady()} returns "true".
 * @return a DataBuffer with data read, or {@link #EOF_BUFFER} if the input
 * stream returned -1, or null if 0 bytes were read.
 */
@Nullable
DataBuffer readFromInputStream() throws IOException {
	int read = this.request.getInputStream().read(this.buffer);
	logBytesRead(read);

	if (read > 0) {
		DataBuffer dataBuffer = this.bufferFactory.allocateBuffer(read);
		dataBuffer.write(this.buffer, 0, read);
		return dataBuffer;
	}

	if (read == -1) {
		return EOF_BUFFER;
	}

	return null;
}
 
private Mono<Void> writeMultipart(
		MultiValueMap<String, ?> map, ReactiveHttpOutputMessage outputMessage, Map<String, Object> hints) {

	byte[] boundary = generateMultipartBoundary();

	Map<String, String> params = new HashMap<>(2);
	params.put("boundary", new String(boundary, StandardCharsets.US_ASCII));
	params.put("charset", getCharset().name());

	outputMessage.getHeaders().setContentType(new MediaType(MediaType.MULTIPART_FORM_DATA, params));

	LogFormatUtils.traceDebug(logger, traceOn -> Hints.getLogPrefix(hints) + "Encoding " +
			(isEnableLoggingRequestDetails() ?
					LogFormatUtils.formatValue(map, !traceOn) :
					"parts " + map.keySet() + " (content masked)"));

	Flux<DataBuffer> body = Flux.fromIterable(map.entrySet())
			.concatMap(entry -> encodePartValues(boundary, entry.getKey(), entry.getValue()))
			.concatWith(Mono.just(generateLastLine(boundary)));

	return outputMessage.writeWith(body);
}
 
protected Mono<Void> handleAuthenticationFailure(ServerWebExchange exchange, String errorMsg) {
    CommonResponseDto responseDto = CommonResponseDto.error(errorMsg);
    ServerHttpResponse response = exchange.getResponse();
    try {
        byte[] bits = objectMapper.writeValueAsBytes(responseDto);
        DataBuffer buffer = response.bufferFactory().wrap(bits);
        response.setStatusCode(HttpStatus.UNAUTHORIZED);
        response.getHeaders().add("Content-Type", "application/json;charset=UTF-8");
        response.getHeaders().add(HttpHeaders.WWW_AUTHENTICATE, headerValue);

        return response.writeWith(Mono.just(buffer));
    } catch (JsonProcessingException e) {
        log.debug("failed to process json", e);
        response.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
        return response.setComplete();
    }
}
 
源代码7 项目: graphql-java-examples   文件: GraphQLController.java
private Mono<Void> sendDeferResponse(ServerHttpResponse serverHttpResponse, ExecutionResult executionResult, Publisher<DeferredExecutionResult> deferredResults) {
    // this implements this apollo defer spec: https://github.com/apollographql/apollo-server/blob/defer-support/docs/source/defer-support.md
    // the spec says CRLF + "-----" + CRLF is needed at the end, but it works without it and with it we get client
    // side errors with it, so we skp it
    serverHttpResponse.setStatusCode(HttpStatus.OK);
    HttpHeaders headers = serverHttpResponse.getHeaders();
    headers.set("Content-Type", "multipart/mixed; boundary=\"-\"");
    headers.set("Connection", "keep-alive");

    Flux<Mono<DataBuffer>> deferredDataBuffers = Flux.from(deferredResults).map(deferredExecutionResult -> {
        DeferPart deferPart = new DeferPart(deferredExecutionResult.toSpecification());
        StringBuilder builder = new StringBuilder();
        String body = deferPart.write();
        builder.append(CRLF).append("---").append(CRLF);
        builder.append(body);
        return strToDataBuffer(builder.toString());
    });
    Flux<Mono<DataBuffer>> firstResult = Flux.just(firstResult(executionResult));


    return serverHttpResponse.writeAndFlushWith(Flux.mergeSequential(firstResult, deferredDataBuffers));
}
 
@Test
public void toFormData() {
	DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
	String text = "name+1=value+1&name+2=value+2%2B1&name+2=value+2%2B2&name+3";
	DefaultDataBuffer dataBuffer = factory.wrap(ByteBuffer.wrap(text.getBytes(StandardCharsets.UTF_8)));
	Flux<DataBuffer> body = Flux.just(dataBuffer);

	MockServerHttpRequest request = MockServerHttpRequest.post("/")
			.contentType(MediaType.APPLICATION_FORM_URLENCODED)
			.body(body);

	Mono<MultiValueMap<String, String>> result = BodyExtractors.toFormData().extract(request, this.context);

	StepVerifier.create(result)
			.consumeNextWith(form -> {
				assertEquals("Invalid result", 3, form.size());
				assertEquals("Invalid result", "value 1", form.getFirst("name 1"));
				List<String> values = form.get("name 2");
				assertEquals("Invalid result", 2, values.size());
				assertEquals("Invalid result", "value 2+1", values.get(0));
				assertEquals("Invalid result", "value 2+2", values.get(1));
				assertNull("Invalid result", form.getFirst("name 3"));
			})
			.expectComplete()
			.verify();
}
 
@Test
public void toFlux() {
	BodyExtractor<Flux<String>, ReactiveHttpInputMessage> extractor = BodyExtractors.toFlux(String.class);

	DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
	DefaultDataBuffer dataBuffer =
			factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8)));
	Flux<DataBuffer> body = Flux.just(dataBuffer);

	MockServerHttpRequest request = MockServerHttpRequest.post("/").body(body);
	Flux<String> result = extractor.extract(request, this.context);

	StepVerifier.create(result)
			.expectNext("foo")
			.expectComplete()
			.verify();
}
 
@Override
public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri,
		Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {

	if (!uri.isAbsolute()) {
		return Mono.error(new IllegalArgumentException("URI is not absolute: " + uri));
	}

	if (!this.httpClient.isStarted()) {
		try {
			this.httpClient.start();
		}
		catch (Exception ex) {
			return Mono.error(ex);
		}
	}

	JettyClientHttpRequest clientHttpRequest = new JettyClientHttpRequest(
			this.httpClient.newRequest(uri).method(method.toString()), this.bufferFactory);

	return requestCallback.apply(clientHttpRequest).then(Mono.from(
			clientHttpRequest.getReactiveRequest().response((response, chunks) -> {
				Flux<DataBuffer> content = Flux.from(chunks).map(this::toDataBuffer);
				return Mono.just(new JettyClientHttpResponse(response, content));
			})));
}
 
@Test
public void limitResponseSize() {
	DefaultDataBufferFactory bufferFactory = new DefaultDataBufferFactory();
	DataBuffer b1 = dataBuffer("foo", bufferFactory);
	DataBuffer b2 = dataBuffer("bar", bufferFactory);
	DataBuffer b3 = dataBuffer("baz", bufferFactory);

	ClientRequest request = ClientRequest.create(HttpMethod.GET, DEFAULT_URL).build();
	ClientResponse response = ClientResponse.create(HttpStatus.OK).body(Flux.just(b1, b2, b3)).build();

	Mono<ClientResponse> result = ExchangeFilterFunctions.limitResponseSize(5)
			.filter(request, req -> Mono.just(response));

	StepVerifier.create(result.flatMapMany(res -> res.body(BodyExtractors.toDataBuffers())))
			.consumeNextWith(buffer -> assertEquals("foo", string(buffer)))
			.consumeNextWith(buffer -> assertEquals("ba", string(buffer)))
			.expectComplete()
			.verify();

}
 
源代码12 项目: spring-analysis-note   文件: ByteBufferEncoder.java
@Override
public DataBuffer encodeValue(ByteBuffer byteBuffer, DataBufferFactory bufferFactory,
		ResolvableType valueType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {

	DataBuffer dataBuffer = bufferFactory.wrap(byteBuffer);
	if (logger.isDebugEnabled() && !Hints.isLoggingSuppressed(hints)) {
		String logPrefix = Hints.getLogPrefix(hints);
		logger.debug(logPrefix + "Writing " + dataBuffer.readableByteCount() + " bytes");
	}
	return dataBuffer;
}
 
源代码13 项目: spring-analysis-note   文件: DataBufferEncoder.java
@Override
public Flux<DataBuffer> encode(Publisher<? extends DataBuffer> inputStream,
		DataBufferFactory bufferFactory, ResolvableType elementType, @Nullable MimeType mimeType,
		@Nullable Map<String, Object> hints) {

	Flux<DataBuffer> flux = Flux.from(inputStream);
	if (logger.isDebugEnabled() && !Hints.isLoggingSuppressed(hints)) {
		flux = flux.doOnNext(buffer -> logValue(buffer, hints));
	}
	return flux;
}
 
源代码14 项目: spring-analysis-note   文件: StringDecoderTests.java
@Override
@Test
public void decode() {
	String u = "ü";
	String e = "é";
	String o = "ø";
	String s = String.format("%s\n%s\n%s", u, e, o);
	Flux<DataBuffer> input = toDataBuffers(s, 1, UTF_8);

	testDecodeAll(input, TYPE, step -> step.expectNext(u, e, o).verifyComplete(), null, null);
}
 
@Test
public void decodeErrorNonAalto() {
	decoder.useAalto = false;

	Flux<DataBuffer> source = Flux.concat(
			stringBuffer("<pojo>"),
			Flux.error(new RuntimeException()));

	Flux<XMLEvent> events =
			this.decoder.decode(source, null, null, Collections.emptyMap());

	StepVerifier.create(events)
			.expectError(RuntimeException.class)
			.verify();
}
 
@Override
@Test
public void canEncode() {
	assertTrue(this.encoder.canEncode(ResolvableType.forClass(DataBuffer.class),
			MimeTypeUtils.TEXT_PLAIN));
	assertFalse(this.encoder.canEncode(ResolvableType.forClass(Integer.class),
			MimeTypeUtils.TEXT_PLAIN));
	assertTrue(this.encoder.canEncode(ResolvableType.forClass(DataBuffer.class),
			MimeTypeUtils.APPLICATION_JSON));

	// SPR-15464
	assertFalse(this.encoder.canEncode(ResolvableType.NONE, null));
}
 
@SuppressWarnings("unchecked")
private Flux<DataBuffer> encodeContent(
		@Nullable Object content, MethodParameter returnType, DataBufferFactory bufferFactory,
		@Nullable MimeType mimeType, Map<String, Object> hints) {

	ResolvableType returnValueType = ResolvableType.forMethodParameter(returnType);
	ReactiveAdapter adapter = getAdapterRegistry().getAdapter(returnValueType.resolve(), content);

	Publisher<?> publisher;
	ResolvableType elementType;
	if (adapter != null) {
		publisher = adapter.toPublisher(content);
		boolean isUnwrapped = KotlinDetector.isKotlinReflectPresent() &&
				KotlinDetector.isKotlinType(returnType.getContainingClass()) &&
				KotlinDelegate.isSuspend(returnType.getMethod()) &&
				!COROUTINES_FLOW_CLASS_NAME.equals(returnValueType.toClass().getName());
		ResolvableType genericType = isUnwrapped ? returnValueType : returnValueType.getGeneric();
		elementType = getElementType(adapter, genericType);
	}
	else {
		publisher = Mono.justOrEmpty(content);
		elementType = (returnValueType.toClass() == Object.class && content != null ?
				ResolvableType.forInstance(content) : returnValueType);
	}

	if (elementType.resolve() == void.class || elementType.resolve() == Void.class) {
		return Flux.from(publisher).cast(DataBuffer.class);
	}

	Encoder<?> encoder = getEncoder(elementType, mimeType);
	return Flux.from((Publisher) publisher).map(value ->
			encodeValue(value, elementType, encoder, bufferFactory, mimeType, hints));
}
 
@Override
protected final Mono<Void> writeAndFlushWithInternal(
		Publisher<? extends Publisher<? extends DataBuffer>> body) {

	if (this.writeCalled.compareAndSet(false, true)) {
		Processor<? super Publisher<? extends DataBuffer>, Void> processor = createBodyFlushProcessor();
		return Mono.from(subscriber -> {
			body.subscribe(processor);
			processor.subscribe(subscriber);
		});
	}
	return Mono.error(new IllegalStateException(
			"writeWith() or writeAndFlushWith() has already been called"));
}
 
private Mono<DataBuffer> stringBuffer(String value) {
	return Mono.defer(() -> {
		byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
		DataBuffer buffer = this.bufferFactory.allocateBuffer(bytes.length);
		buffer.write(bytes);
		return Mono.just(buffer);
	});
}
 
源代码20 项目: spring-analysis-note   文件: MessagingRSocket.java
private Mono<Void> handle(Payload payload) {
	String destination = getDestination(payload);
	MessageHeaders headers = createHeaders(destination, null);
	DataBuffer dataBuffer = retainDataAndReleasePayload(payload);
	int refCount = refCount(dataBuffer);
	Message<?> message = MessageBuilder.createMessage(dataBuffer, headers);
	return Mono.defer(() -> this.handler.apply(message))
			.doFinally(s -> {
				if (refCount(dataBuffer) == refCount) {
					DataBufferUtils.release(dataBuffer);
				}
			});
}
 
@Override
public Flux<Object> decode(Publisher<DataBuffer> input, ResolvableType elementType,
		@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {

	Flux<TokenBuffer> tokens = Jackson2Tokenizer.tokenize(Flux.from(input), this.jsonFactory, true);
	return decodeInternal(tokens, elementType, mimeType, hints);
}
 
源代码22 项目: java-technology-stack   文件: WiretapConnector.java
public WiretapRecorder(@Nullable Publisher<? extends DataBuffer> publisher,
		@Nullable Publisher<? extends Publisher<? extends DataBuffer>> publisherNested) {

	if (publisher != null && publisherNested != null) {
		throw new IllegalArgumentException("At most one publisher expected");
	}

	this.publisher = publisher != null ?
			Flux.from(publisher)
					.doOnSubscribe(s -> this.hasContentConsumer = true)
					.doOnNext(this.buffer::write)
					.doOnError(this::handleOnError)
					.doOnCancel(this::handleOnComplete)
					.doOnComplete(this::handleOnComplete) : null;

	this.publisherNested = publisherNested != null ?
			Flux.from(publisherNested)
					.doOnSubscribe(s -> this.hasContentConsumer = true)
					.map(p -> Flux.from(p).doOnNext(this.buffer::write).doOnError(this::handleOnError))
					.doOnError(this::handleOnError)
					.doOnCancel(this::handleOnComplete)
					.doOnComplete(this::handleOnComplete) : null;

	if (publisher == null && publisherNested == null) {
		this.content.onComplete();
	}
}
 
/**
 * Test a {@link Encoder#encode encode} scenario where the input stream is canceled.
 * This test method will feed the first element of the {@code input} stream to the decoder,
 * followed by a cancel signal.
 * The result is expected to contain one "normal" element.
 *
 * @param input the input to be provided to the encoder
 * @param inputType the input type
 * @param mimeType the mime type to use for decoding. May be {@code null}.
 * @param hints the hints used for decoding. May be {@code null}.
 */
protected void testEncodeCancel(Publisher<?> input, ResolvableType inputType,
		@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {

	Flux<DataBuffer> result = encoder().encode(input, this.bufferFactory, inputType, mimeType,
			hints);

	StepVerifier.create(result)
			.consumeNextWith(DataBufferUtils::release)
			.thenCancel()
			.verify();
}
 
@SuppressWarnings("SubscriberImplementation")
@Override
public reactor.core.publisher.Flux<DataBuffer> getBody() {
    final Optional<Channel> opt = channelResolver.resolveChannel(request);
    if (opt.isPresent()) {
        final Channel channel = opt.get();
        final NettyDataBufferFactory nettyDataBufferFactory = new NettyDataBufferFactory(channel.alloc());

        final Optional<HttpContentProcessor<ByteBufHolder>> httpContentProcessor = channelResolver.resolveContentProcessor(request);

        if (httpContentProcessor.isPresent()) {

            final HttpContentProcessor<ByteBufHolder> processor = httpContentProcessor.get();
            return Flux.from(subscriber -> processor.subscribe(new Subscriber<ByteBufHolder>() {
                @Override
                public void onSubscribe(Subscription s) {
                    subscriber.onSubscribe(s);
                }

                @Override
                public void onNext(ByteBufHolder byteBufHolder) {
                    subscriber.onNext(nettyDataBufferFactory.wrap(byteBufHolder.content()));
                }

                @Override
                public void onError(Throwable t) {
                    subscriber.onError(t);
                }

                @Override
                public void onComplete() {
                    subscriber.onComplete();
                }
            }));
        }
    }

    return Flux.empty();
}
 
@Override
@Nullable
protected DataBuffer read() throws IOException {
	if (this.inputStream.isReady()) {
		DataBuffer dataBuffer = readFromInputStream();
		if (dataBuffer == EOF_BUFFER) {
			// No need to wait for container callback...
			onAllDataRead();
			dataBuffer = null;
		}
		return dataBuffer;
	}
	return null;
}
 
@Override
public byte[] encode(DataBuffer original) {
	try {
		ByteArrayOutputStream bis = new ByteArrayOutputStream();
		GZIPOutputStream gos = new GZIPOutputStream(bis);
		FileCopyUtils.copy(original.asInputStream(), gos);
		return bis.toByteArray();
	}
	catch (IOException e) {
		throw new IllegalStateException("couldn't encode body to gzip", e);
	}
}
 
源代码27 项目: spring-analysis-note   文件: WebSocketMessage.java
/**
 * Constructor for a WebSocketMessage.
 * <p>See static factory methods in {@link WebSocketSession} or alternatively
 * use {@link WebSocketSession#bufferFactory()} to create the payload and
 * then invoke this constructor.
 */
public WebSocketMessage(Type type, DataBuffer payload) {
	Assert.notNull(type, "'type' must not be null");
	Assert.notNull(payload, "'payload' must not be null");
	this.type = type;
	this.payload = payload;
}
 
public Consumer<DataBuffer> pojoConsumer(Pojo expected) {
	return dataBuffer -> {
		try {
			Pojo actual = this.mapper.reader().forType(Pojo.class)
					.readValue(DataBufferTestUtils.dumpBytes(dataBuffer));
			assertEquals(expected, actual);
			release(dataBuffer);
		}
		catch (IOException ex) {
			throw new UncheckedIOException(ex);
		}
	};
}
 
protected final Consumer<DataBuffer> expect(Msg msg) {
	return dataBuffer -> {
		try {
			assertEquals(msg, Msg.parseDelimitedFrom(dataBuffer.asInputStream()));

		}
		catch (IOException ex) {
			throw new UncheckedIOException(ex);
		}
		finally {
			DataBufferUtils.release(dataBuffer);
		}
	};
}
 
@Test
public void toMonoVoidAsClientWithEmptyBody() {
	TestPublisher<DataBuffer> body = TestPublisher.create();

	BodyExtractor<Mono<Void>, ReactiveHttpInputMessage> extractor = BodyExtractors.toMono(Void.class);
	MockClientHttpResponse response = new MockClientHttpResponse(HttpStatus.OK);
	response.setBody(body.flux());

	StepVerifier.create(extractor.extract(response, this.context))
			.then(() -> {
				body.assertWasSubscribed();
				body.complete();
			})
			.verifyComplete();
}
 
 同包方法