下面列出了怎么用org.springframework.http.client.reactive.ReactorClientHttpConnector的API类实例代码及写法,或者点击链接到github查看源代码。
public static void main(String[] args) throws JsonProcessingException {
// ObjectMapper objectMapper = new ObjectMapper();
// objectMapper.setSerializationInclusion(NON_NULL);
// InfluxDB influxdb = InfluxDBFactory.connect("http://172.29.64.250:18086");
// QueryResult result = influxdb.query(new Query("select * from health limit 1", "micrometerDb"));
// System.out.println(objectMapper.writeValueAsString(result));
// influxdb.close();
TcpClient tcpClient = TcpClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
WebClient webClient = WebClient.builder()
.baseUrl("http://localhost:8086")
.clientConnector(new ReactorClientHttpConnector(HttpClient.from(tcpClient)))
.filter(logRequest())
.build();
System.out.println(webClient.get().uri(uriBuilder -> uriBuilder
.path("query")
.queryParam("db", "micrometerDb")
.queryParam("q", "select * from cpu")
.build()
)
.exchange()
.flatMap(clientResponse -> clientResponse.toEntity(String.class))
.block().getBody());
}
private static WebClient.Builder createDefaultWebClient(Duration connectTimeout, Duration readTimeout) {
HttpClient httpClient = HttpClient.create()
.compress(true)
.tcpConfiguration(tcp -> tcp.bootstrap(bootstrap -> bootstrap.option(
ChannelOption.CONNECT_TIMEOUT_MILLIS,
(int) connectTimeout.toMillis()
)).observe((connection, newState) -> {
if (ConnectionObserver.State.CONNECTED.equals(newState)) {
connection.addHandlerLast(new ReadTimeoutHandler(readTimeout.toMillis(),
TimeUnit.MILLISECONDS
));
}
}));
ReactorClientHttpConnector connector = new ReactorClientHttpConnector(httpClient);
return WebClient.builder().clientConnector(connector);
}
private WebClient getSslIgnoringWebClient() {
return WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(HttpClient
.create()
.secure(t -> {
try {
t.sslContext(SslContextBuilder
.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build());
}
catch (SSLException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("problem ignoring SSL in WebClient", e);
}
}
})))
.build();
}
static ClientHttpConnector usingReactorNetty(ClientOptions options, SslConfiguration sslConfiguration) {
HttpClient client = HttpClient.create();
if (hasSslConfiguration(sslConfiguration)) {
SslContextBuilder sslContextBuilder = SslContextBuilder.forClient();
configureSsl(sslConfiguration, sslContextBuilder);
client = client.secure(builder -> {
builder.sslContext(sslContextBuilder);
});
}
client = client.tcpConfiguration(it -> it.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
Math.toIntExact(options.getConnectionTimeout().toMillis())));
return new ReactorClientHttpConnector(client);
}
@Test
public void givenNettyHttpClientWithWiretap_whenEndpointIsConsumed_thenRequestAndResponseBodyLogged() {
reactor.netty.http.client.HttpClient httpClient = HttpClient
.create()
.wiretap(true);
WebClient
.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.build()
.post()
.uri(sampleUrl)
.body(BodyInserters.fromObject(post))
.exchange()
.block();
verify(nettyAppender).doAppend(argThat(argument -> (((LoggingEvent) argument).getFormattedMessage()).contains("00000300")));
}
@Test
public void givenNettyHttpClientWithCustomLogger_whenEndpointIsConsumed_thenRequestAndResponseBodyLogged() {
reactor.netty.http.client.HttpClient httpClient = HttpClient
.create()
.tcpConfiguration(
tc -> tc.bootstrap(
b -> BootstrapHandlers.updateLogSupport(b, new CustomLogger(HttpClient.class))));
WebClient
.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.build()
.post()
.uri(sampleUrl)
.body(BodyInserters.fromObject(post))
.exchange()
.block();
verify(nettyAppender).doAppend(argThat(argument -> (((LoggingEvent) argument).getFormattedMessage()).contains(sampleResponseBody)));
}
private ReactorClientHttpConnector getConnector() throws SSLException {
SslContext sslContext = SslContextBuilder
.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build();
TcpClient tcpClient = TcpClient
.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, TIMEOUT_MILLIS)
.doOnConnected(connection -> {
connection.addHandlerLast(new ReadTimeoutHandler(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
connection.addHandlerLast(new WriteTimeoutHandler(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
});
HttpClient httpClient = HttpClient.from(tcpClient).secure(t -> t.sslContext(sslContext));
return new ReactorClientHttpConnector(httpClient);
}
public WebClient getWebClientFromCacheOrCreate(BackendNode node) {
WebClient client = webClientCache.get(node.getUrl());
if (client != null) {
return client;
}
synchronized (webClientCache) {
client = webClientCache.get(node.getUrl());
if (client != null) {
return client;
}
int queryTimeout=Optional.ofNullable(node.getQueryTimeout()).orElse(DEFAULT_QUERY_TIMEOUT);
int writeTimeout=Optional.ofNullable(node.getWriteTimeout()).orElse(DEFAULT_WRITE_TIMEOUT);
int timeout=Math.max(queryTimeout,writeTimeout);
TcpClient tcpClient = TcpClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout)
.doOnConnected(conn -> conn
.addHandlerLast(new ReadTimeoutHandler(timeout))
.addHandlerLast(new WriteTimeoutHandler(timeout)));
WebClient webClient = WebClient.builder()
.baseUrl(node.getUrl())
.clientConnector(new ReactorClientHttpConnector(HttpClient.from(tcpClient).keepAlive(false)))
.filter(logRequest())
.build();
webClientCache.put(node.getUrl(), webClient);
return webClient;
}
}
private ExchangeFunction initExchangeFunction() {
if (this.exchangeFunction != null) {
return this.exchangeFunction;
}
else if (this.connector != null) {
return ExchangeFunctions.create(this.connector, this.exchangeStrategies);
}
else {
return ExchangeFunctions.create(new ReactorClientHttpConnector(), this.exchangeStrategies);
}
}
@Parameterized.Parameters(name = "webClient [{0}]")
public static Object[][] arguments() {
return new Object[][] {
{new JettyClientHttpConnector()},
{new ReactorClientHttpConnector()}
};
}
private ReactorClientHttpConnector initConnector() {
if (bufferFactory instanceof NettyDataBufferFactory) {
ByteBufAllocator allocator = ((NettyDataBufferFactory) bufferFactory).getByteBufAllocator();
return new ReactorClientHttpConnector(this.factory, httpClient ->
httpClient.tcpConfiguration(tcpClient -> tcpClient.option(ChannelOption.ALLOCATOR, allocator)));
}
else {
return new ReactorClientHttpConnector();
}
}
@Parameterized.Parameters(name = "server [{0}] webClient [{1}]")
public static Object[][] arguments() {
File base = new File(System.getProperty("java.io.tmpdir"));
return new Object[][] {
{new JettyHttpServer(), new ReactorClientHttpConnector()},
{new JettyHttpServer(), new JettyClientHttpConnector()},
{new ReactorHttpServer(), new ReactorClientHttpConnector()},
{new ReactorHttpServer(), new JettyClientHttpConnector()},
{new TomcatHttpServer(base.getAbsolutePath()), new ReactorClientHttpConnector()},
{new TomcatHttpServer(base.getAbsolutePath()), new JettyClientHttpConnector()},
{new UndertowHttpServer(), new ReactorClientHttpConnector()},
{new UndertowHttpServer(), new JettyClientHttpConnector()}
};
}
@Bean
@ConditionalOnProperty(name = "cf.sslValidationSkipped", havingValue="true")
public WebClient insecureWebClient(WebClient.Builder builder) throws SSLException {
SslContext sslContext =
SslContextBuilder
.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build();
TcpClient tcpClient = TcpClient.create().secure(sslProviderBuilder -> sslProviderBuilder.sslContext(sslContext));
HttpClient httpClient = HttpClient.from(tcpClient);
return builder
.clientConnector(new ReactorClientHttpConnector(httpClient))
.build();
}
private ExchangeFunction initExchangeFunction() {
if (this.exchangeFunction != null) {
return this.exchangeFunction;
}
else if (this.connector != null) {
return ExchangeFunctions.create(this.connector, this.exchangeStrategies);
}
else {
return ExchangeFunctions.create(new ReactorClientHttpConnector(), this.exchangeStrategies);
}
}
@Parameterized.Parameters(name = "webClient [{0}]")
public static Object[][] arguments() {
return new Object[][] {
{new JettyClientHttpConnector()},
{new ReactorClientHttpConnector()}
};
}
private ReactorClientHttpConnector initConnector() {
if (bufferFactory instanceof NettyDataBufferFactory) {
ByteBufAllocator allocator = ((NettyDataBufferFactory) bufferFactory).getByteBufAllocator();
return new ReactorClientHttpConnector(this.factory, httpClient ->
httpClient.tcpConfiguration(tcpClient -> tcpClient.option(ChannelOption.ALLOCATOR, allocator)));
}
else {
return new ReactorClientHttpConnector();
}
}
@Parameterized.Parameters(name = "server [{0}] webClient [{1}]")
public static Object[][] arguments() {
File base = new File(System.getProperty("java.io.tmpdir"));
return new Object[][] {
{new JettyHttpServer(), new ReactorClientHttpConnector()},
{new JettyHttpServer(), new JettyClientHttpConnector()},
{new ReactorHttpServer(), new ReactorClientHttpConnector()},
{new ReactorHttpServer(), new JettyClientHttpConnector()},
{new TomcatHttpServer(base.getAbsolutePath()), new ReactorClientHttpConnector()},
{new TomcatHttpServer(base.getAbsolutePath()), new JettyClientHttpConnector()},
{new UndertowHttpServer(), new ReactorClientHttpConnector()},
{new UndertowHttpServer(), new JettyClientHttpConnector()}
};
}
/**
* Web client plugin soul plugin.
*
* @param httpClient the http client
* @return the soul plugin
*/
@Bean
public SoulPlugin webClientPlugin(final ObjectProvider<HttpClient> httpClient) {
WebClient webClient = WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(Objects.requireNonNull(httpClient.getIfAvailable())))
.build();
return new WebClientPlugin(webClient);
}
public static final void main(String[] args) throws IOException {
WebClient client = WebClient.builder()
//see: https://github.com/jetty-project/jetty-reactive-httpclient
//.clientConnector(new JettyClientHttpConnector())
.clientConnector(new ReactorClientHttpConnector())
.codecs(
clientCodecConfigurer ->{
// use defaultCodecs() to apply DefaultCodecs
// clientCodecConfigurer.defaultCodecs();
// alter a registered encoder/decoder based on the default config.
// clientCodecConfigurer.defaultCodecs().jackson2Encoder(...)
// Or
// use customCodecs to register Codecs from scratch.
clientCodecConfigurer.customCodecs().register(new Jackson2JsonDecoder());
clientCodecConfigurer.customCodecs().register(new Jackson2JsonEncoder());
}
)
.exchangeStrategies(ExchangeStrategies.withDefaults())
// .exchangeFunction(ExchangeFunctions.create(new ReactorClientHttpConnector())
// .filter(ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {})))
// .filter(ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {clientRequest.}))
//.defaultHeaders(httpHeaders -> httpHeaders.addAll())
.baseUrl("http://localhost:8080")
.build();
client
.get()
.uri("/posts")
.exchange()
.flatMapMany(res -> res.bodyToFlux(Post.class))
.log()
.subscribe(post -> System.out.println("post: " + post));
System.out.println("Client is started!");
System.in.read();
}
/**
* Create a {@link ClientHttpConnector} for the given {@link ClientOptions}.
* @param options must not be {@literal null}
* @return a new {@link ClientHttpConnector}.
*/
public static ClientHttpConnector create(ClientOptions options) {
HttpClient httpClient = HttpClient.create();
if (usingCustomCerts(options)) {
TrustManagerFactory trustManagerFactory = sslCertificateUtils
.createTrustManagerFactory(options.getCaCertFiles());
httpClient = httpClient.secure((sslContextSpec) -> sslContextSpec.sslContext(
SslContextBuilder.forClient().sslProvider(SslProvider.JDK).trustManager(trustManagerFactory)));
}
else {
httpClient = httpClient.secure((sslContextSpec) -> {
try {
sslContextSpec.sslContext(new JdkSslContext(SSLContext.getDefault(), true, null,
IdentityCipherSuiteFilter.INSTANCE, null, ClientAuth.REQUIRE, null, false));
}
catch (NoSuchAlgorithmException ex) {
logger.error("Error configuring HTTP connections", ex);
throw new RuntimeException("Error configuring HTTP connections", ex);
}
});
}
if (options.getConnectionTimeout() != null) {
httpClient = httpClient
.tcpConfiguration((tcpClient) -> tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
Math.toIntExact(options.getConnectionTimeout().toMillis())));
}
return new ReactorClientHttpConnector(httpClient);
}
public static WebClient.Builder addTitusDefaults(WebClient.Builder clientBuilder,
HttpClient httpClient,
WebClientMetric webClientMetric) {
HttpClient updatedHttpClient = addMetricCallbacks(
addLoggingCallbacks(httpClient),
webClientMetric
);
return clientBuilder.clientConnector(new ReactorClientHttpConnector(updatedHttpClient));
}
@Override
public ClientHttpConnector createConnector(TimeoutConfiguration configuration) {
return new ReactorClientHttpConnector(httpClient.tcpConfiguration(client ->
client.option(CONNECT_TIMEOUT_MILLIS, toMillis(configuration.getConnection()))
.doOnConnected(connection -> connection
.addHandlerLast(new ReadTimeoutHandler(configuration.getRead().toMillis(), MILLISECONDS))
.addHandlerLast(new WriteTimeoutHandler(configuration.getWrite().toMillis(), MILLISECONDS)))));
}
@Before
public void setup() throws Exception {
try {
SslContext sslContext = SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE).build();
HttpClient httpClient = HttpClient.create()
.secure(ssl -> ssl.sslContext(sslContext));
setup(new ReactorClientHttpConnector(httpClient),
"https://localhost:" + port);
}
catch (SSLException e) {
throw new RuntimeException(e);
}
}
@Before
public void setup() {
try {
SslContext sslContext = SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE).build();
HttpClient httpClient = HttpClient.create()
.secure(ssl -> ssl.sslContext(sslContext));
setup(new ReactorClientHttpConnector(httpClient),
"https://localhost:" + port);
}
catch (SSLException e) {
throw new RuntimeException(e);
}
}
/**
* Normally, the HTTP connector would be statically initialized. This ensures the
* {@link HttpClient} is configured for the mock endpoint.
*/
@Bean
@Order(0)
public WebClientCustomizer clientConnectorCustomizer(HttpClient httpClient,
URI baseUrl) {
return (builder) -> builder.baseUrl(baseUrl.toString())
.clientConnector(new ReactorClientHttpConnector(httpClient));
}
private ReactorClientHttpConnector getConnector() throws SSLException {
SslContext sslContext = SslContextBuilder
.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build();
HttpClient httpClient = HttpClient.create().secure(t -> t.sslContext(sslContext));
return new ReactorClientHttpConnector(httpClient);
}
/** Connect to server via Reactor Netty. */
DefaultWebTestClientBuilder() {
this(new ReactorClientHttpConnector());
}
/** Connect to server via Reactor Netty. */
DefaultWebTestClientBuilder() {
this(new ReactorClientHttpConnector());
}
@Bean
public WebClient webClientWithNetty(reactor.netty.http.client.HttpClient httpClient) {
return WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.build();
}
@Override
public WebClient buildWebClient() {
return WebClient.builder().clientConnector(new ReactorClientHttpConnector(buildHttpClient()))
.baseUrl(buildEsUrl()).build();
}