下面列出了 io.netty.handler.codec.http.HttpClientUpgradeHandler #reactor.netty.NettyPipeline 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
static void configureHttp2Pipeline(ChannelPipeline p, HttpResponseDecoderSpec decoder, Http2Settings http2Settings,
ConnectionObserver observer) {
Http2FrameCodecBuilder http2FrameCodecBuilder =
Http2FrameCodecBuilder.forClient()
.validateHeaders(decoder.validateHeaders())
.initialSettings(http2Settings);
if (p.get(NettyPipeline.LoggingHandler) != null) {
http2FrameCodecBuilder.frameLogger(new Http2FrameLogger(LogLevel.DEBUG,
"reactor.netty.http.client.h2"));
}
p.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.HttpCodec, http2FrameCodecBuilder.build())
.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.H2MultiplexHandler, new Http2MultiplexHandler(new H2Codec()))
.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.HttpTrafficHandler, new HttpTrafficHandler(observer));
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
Channel channel = ctx.channel();
if (channel.isActive()) {
if (ctx.pipeline().get(NettyPipeline.H2MultiplexHandler) == null) {
// Proceed with HTTP/1.x as per configuration
ctx.fireChannelActive();
}
else if (ctx.pipeline().get(NettyPipeline.SslHandler) == null) {
// Proceed with H2C as per configuration
sendNewState(Connection.from(channel), ConnectionObserver.State.CONNECTED);
ctx.flush();
ctx.read();
}
else {
// Proceed with H2 as per configuration
sendNewState(Connection.from(channel), ConnectionObserver.State.CONNECTED);
}
}
}
static void addStreamHandlers(Channel ch, ChannelOperations.OnSetup opsFactory,
ConnectionObserver listener, boolean readForwardHeaders,
ServerCookieEncoder encoder, ServerCookieDecoder decoder) {
if (ACCESS_LOG) {
ch.pipeline()
.addLast(NettyPipeline.AccessLogHandler, new AccessLogHandlerH2());
}
ch.pipeline()
.addLast(NettyPipeline.H2ToHttp11Codec, new Http2StreamFrameToHttpObjectCodec(true))
.addLast(NettyPipeline.HttpTrafficHandler,
new Http2StreamBridgeServerHandler(listener, readForwardHeaders, encoder, decoder));
ChannelOperations.addReactiveBridge(ch, opsFactory, listener);
if (log.isDebugEnabled()) {
log.debug(format(ch, "Initialized HTTP/2 stream pipeline {}"), ch.pipeline());
}
}
static void configureH2Pipeline(ChannelPipeline p,
ServerCookieDecoder cookieDecoder,
ServerCookieEncoder cookieEncoder,
boolean forwarded,
Http2Settings http2Settings,
ConnectionObserver listener,
ChannelOperations.OnSetup opsFactory,
boolean validate) {
p.remove(NettyPipeline.ReactiveBridge);
Http2FrameCodecBuilder http2FrameCodecBuilder =
Http2FrameCodecBuilder.forServer()
.validateHeaders(validate)
.initialSettings(http2Settings);
if (p.get(NettyPipeline.LoggingHandler) != null) {
http2FrameCodecBuilder.frameLogger(new Http2FrameLogger(LogLevel.DEBUG,
"reactor.netty.http.server.h2"));
}
p.addLast(NettyPipeline.HttpCodec, http2FrameCodecBuilder.build())
.addLast(NettyPipeline.H2MultiplexHandler,
new Http2MultiplexHandler(new H2Codec(opsFactory, listener, forwarded, cookieEncoder, cookieDecoder)));
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
ProtocolDetectionResult<HAProxyProtocolVersion> detectionResult = HAProxyMessageDecoder.detectProtocol(in);
if (detectionResult.equals(ProtocolDetectionResult.needsMoreData())) {
return;
}
else if(detectionResult.equals(ProtocolDetectionResult.invalid())) {
ctx.pipeline()
.remove(this);
}
else {
ctx.pipeline()
.addAfter(NettyPipeline.ProxyProtocolDecoder,
NettyPipeline.ProxyProtocolReader,
new HAProxyMessageReader());
ctx.pipeline()
.replace(this, NettyPipeline.ProxyProtocolDecoder, new HAProxyMessageDecoder());
}
}
private Mono<Tuple2<String, HttpHeaders>> sendRequest(
Consumer<? super ProxyProvider.TypeSpec> proxyOptions,
Supplier<? extends SocketAddress> connectAddressSupplier,
String uri,
boolean wiretap) {
HttpClient client =
HttpClient.create()
.proxy(proxyOptions)
.doOnResponse((res, conn) -> {
ChannelHandler handler = conn.channel().pipeline().get(NettyPipeline.ProxyLoggingHandler);
res.responseHeaders()
.add("Logging-Handler", handler != null? "FOUND" : "NOT FOUND");
});
if (connectAddressSupplier != null) {
client = client.remoteAddress(server::address);
}
return client.wiretap(wiretap)
.get()
.uri(uri)
.responseSingle((response, body) -> Mono.zip(body.asString(),
Mono.just(response.responseHeaders())));
}
@Test
public void httpsUserAddedSslHandler() throws SSLException {
SslContext clientSslContext = SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE).build();
SslContext serverSslContext = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
testClientRequest(
clientRequestHeaders -> {},
serverRequest -> Assertions.assertThat(serverRequest.scheme()).isEqualTo("https"),
httpClient -> httpClient.secure(ssl -> ssl.sslContext(clientSslContext)),
httpServer -> httpServer.doOnChannelInit((observer, channel, address) -> {
SslHandler sslHandler = serverSslContext.newHandler(channel.alloc());
if (channel.pipeline().get(NettyPipeline.SslHandler) == null) {
channel.pipeline().addFirst(NettyPipeline.SslHandler, sslHandler);
}
}),
true);
}
@Override
public Mono<Void> send(Publisher<WebSocketMessage> messages) {
Flux<WebSocketFrame> frames = Flux.from(messages)
.doOnNext(message -> {
if (logger.isTraceEnabled()) {
logger.trace(getLogPrefix() + "Sending " + message);
}
})
.map(this::toFrame);
return getDelegate().getOutbound()
.options(NettyPipeline.SendOptions::flushOnEach)
.sendObject(frames)
.then();
}
@Override
public Mono<Void> send(Publisher<WebSocketMessage> messages) {
Flux<WebSocketFrame> frames = Flux.from(messages)
.doOnNext(message -> {
if (logger.isTraceEnabled()) {
logger.trace(getLogPrefix() + "Sending " + message);
}
})
.map(this::toFrame);
return getDelegate().getOutbound()
.options(NettyPipeline.SendOptions::flushOnEach)
.sendObject(frames)
.then();
}
/**
* Add {@link NettyPipeline#ChannelMetricsHandler} to the channel pipeline.
*
* @param ch the channel
* @param recorder the configured metrics recorder
* @param remoteAddress the remote address
* @param onServer true if {@link ChannelMetricsRecorder} is for the server, false if it is for the client
*/
public static void addMetricsHandler(Channel ch, ChannelMetricsRecorder recorder,
@Nullable SocketAddress remoteAddress, boolean onServer) {
SocketAddress remote = remoteAddress;
if (remote == null) {
remote = ch.remoteAddress();
}
ch.pipeline()
.addFirst(NettyPipeline.ChannelMetricsHandler, new ChannelMetricsHandler(recorder, remote, onServer));
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
if (!onServer) {
ctx.pipeline()
.addAfter(NettyPipeline.ChannelMetricsHandler,
NettyPipeline.ConnectMetricsHandler,
new ConnectMetricsHandler(recorder));
}
ctx.fireChannelRegistered();
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
ChannelHandler handler = ctx.pipeline().get(NettyPipeline.ChannelMetricsHandler);
if (handler != null) {
recorder = ((ChannelMetricsHandler) handler).recorder();
tlsHandshakeTimeStart = System.nanoTime();
}
ctx.fireChannelRegistered();
}
static void configureHttp11Pipeline(ChannelPipeline p,
boolean acceptGzip,
HttpResponseDecoderSpec decoder,
@Nullable Supplier<? extends ChannelMetricsRecorder> metricsRecorder,
@Nullable Function<String, String> uriTagValue) {
p.addBefore(NettyPipeline.ReactiveBridge,
NettyPipeline.HttpCodec,
new HttpClientCodec(
decoder.maxInitialLineLength(),
decoder.maxHeaderSize(),
decoder.maxChunkSize(),
decoder.failOnMissingResponse,
decoder.validateHeaders(),
decoder.initialBufferSize(),
decoder.parseHttpAfterConnectRequest));
if (acceptGzip) {
p.addAfter(NettyPipeline.HttpCodec, NettyPipeline.HttpDecompressor, new HttpContentDecompressor());
}
if (metricsRecorder != null) {
ChannelMetricsRecorder channelMetricsRecorder = metricsRecorder.get();
if (channelMetricsRecorder instanceof HttpClientMetricsRecorder) {
p.addBefore(NettyPipeline.ReactiveBridge,
NettyPipeline.HttpMetricsHandler,
new HttpClientMetricsHandler((HttpClientMetricsRecorder) channelMetricsRecorder, uriTagValue));
}
}
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
ChannelPipeline pipeline = ctx.pipeline();
pipeline.addAfter(ctx.name(), NettyPipeline.HttpCodec, http2FrameCodec)
.addAfter(NettyPipeline.HttpCodec, NettyPipeline.H2MultiplexHandler,
new Http2MultiplexHandler(new H2Codec(opsFactory), new H2Codec(opsFactory)));
if (pipeline.get(NettyPipeline.HttpDecompressor) != null) {
pipeline.remove(NettyPipeline.HttpDecompressor);
}
pipeline.remove(NettyPipeline.ReactiveBridge);
pipeline.remove(this);
}
static void addStreamHandlers(Channel ch, ConnectionObserver obs, ChannelOperations.OnSetup opsFactory) {
ch.pipeline()
.addLast(NettyPipeline.H2ToHttp11Codec, new Http2StreamFrameToHttpObjectCodec(false))
.addLast(NettyPipeline.HttpTrafficHandler, new Http2StreamBridgeClientHandler(obs, opsFactory));
ChannelOperations.addReactiveBridge(ch, opsFactory, obs);
if (log.isDebugEnabled()) {
log.debug(format(ch, "Initialized HTTP/2 stream pipeline {}"), ch.pipeline());
}
}
@Override
public void onChannelInit(ConnectionObserver observer, Channel channel, @Nullable SocketAddress remoteAddress) {
if (sslProvider != null) {
sslProvider.addSslHandler(channel, remoteAddress, SSL_DEBUG);
if ((protocols & h11orH2) == h11orH2) {
channel.pipeline()
.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.H2OrHttp11Codec,
new H2OrHttp11Codec(acceptGzip, decoder, http2Settings, metricsRecorder, observer, uriTagValue));
}
else if ((protocols & h11) == h11) {
configureHttp11Pipeline(channel.pipeline(), acceptGzip, decoder, metricsRecorder, uriTagValue);
}
else if ((protocols & h2) == h2) {
configureHttp2Pipeline(channel.pipeline(), decoder, http2Settings, observer);
}
}
else {
if ((protocols & h11orH2C) == h11orH2C) {
configureHttp11OrH2CleartextPipeline(channel.pipeline(), acceptGzip, decoder, http2Settings, metricsRecorder, observer, opsFactory, uriTagValue);
}
else if ((protocols & h11) == h11) {
configureHttp11Pipeline(channel.pipeline(), acceptGzip, decoder, metricsRecorder, uriTagValue);
}
else if ((protocols & h2c) == h2c) {
configureHttp2Pipeline(channel.pipeline(), decoder, http2Settings, observer);
}
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof Http2SettingsFrame) {
sendNewState(Connection.from(ctx.channel()), ConnectionObserver.State.CONFIGURED);
ctx.pipeline().remove(NettyPipeline.ReactiveBridge);
ctx.pipeline().remove(this);
return;
}
ctx.fireChannelRead(msg);
}
HttpClientOperations(Connection c, ConnectionObserver listener, ClientCookieEncoder encoder, ClientCookieDecoder decoder) {
super(c, listener);
this.isSecure = c.channel()
.pipeline()
.get(NettyPipeline.SslHandler) != null;
this.nettyRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
this.requestHeaders = nettyRequest.headers();
this.cookieDecoder = decoder;
this.cookieEncoder = encoder;
}
@SuppressWarnings("FutureReturnValueIgnored")
final void withWebsocketSupport(WebsocketClientSpec websocketClientSpec, boolean compress) {
URI url = websocketUri();
//prevent further header to be sent for handshaking
if (markSentHeaders()) {
// Returned value is deliberately ignored
addHandlerFirst(NettyPipeline.HttpAggregator, new HttpObjectAggregator(8192));
removeHandler(NettyPipeline.HttpMetricsHandler);
if (websocketClientSpec.compress()) {
requestHeaders().remove(HttpHeaderNames.ACCEPT_ENCODING);
// Returned value is deliberately ignored
removeHandler(NettyPipeline.HttpDecompressor);
// Returned value is deliberately ignored
addHandlerFirst(NettyPipeline.WsCompressionHandler, WebSocketClientCompressionHandler.INSTANCE);
}
if (log.isDebugEnabled()) {
log.debug(format(channel(), "Attempting to perform websocket handshake with {}"), url);
}
WebsocketClientOperations ops = new WebsocketClientOperations(url, websocketClientSpec, this);
if (!rebind(ops)) {
log.error(format(channel(), "Error while rebinding websocket in channel attribute: " +
get(channel()) + " to " + ops));
}
}
}
boolean isH2cUpgrade() {
Channel channel = pooledRef.poolable().channel();
if (channel.pipeline().get(NettyPipeline.H2CUpgradeHandler) != null &&
channel.pipeline().get(NettyPipeline.H2MultiplexHandler) == null) {
ChannelOperations<?, ?> ops = ChannelOperations.get(channel);
if (ops != null) {
sink.success(ops);
return true;
}
}
return false;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
ChannelPipeline pipeline = ctx.pipeline();
pipeline.addAfter(ctx.name(), NettyPipeline.HttpCodec, upgrader.http2FrameCodec)
.addAfter(NettyPipeline.HttpCodec, NettyPipeline.H2MultiplexHandler, new Http2MultiplexHandler(upgrader))
.remove(this);
if (pipeline.get(NettyPipeline.AccessLogHandler) != null){
pipeline.remove(NettyPipeline.AccessLogHandler);
}
if (pipeline.get(NettyPipeline.CompressionHandler) != null) {
pipeline.remove(NettyPipeline.CompressionHandler);
}
pipeline.remove(NettyPipeline.HttpTrafficHandler);
pipeline.remove(NettyPipeline.ReactiveBridge);
}
public void addProxyHandler(Channel channel) {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addFirst(NettyPipeline.ProxyHandler, newProxyHandler());
if (pipeline.get(NettyPipeline.LoggingHandler) != null) {
pipeline.addBefore(NettyPipeline.ProxyHandler,
NettyPipeline.ProxyLoggingHandler,
new LoggingHandler("reactor.netty.proxy"));
}
}
@Override
protected void initChannel(Channel channel) {
ChannelPipeline pipeline = channel.pipeline();
if (config.metricsRecorder != null) {
ChannelOperations.addMetricsHandler(channel,
Objects.requireNonNull(config.metricsRecorder.get(), "Metrics recorder supplier returned null"),
remoteAddress,
onServer);
ByteBufAllocator alloc = channel.alloc();
if (alloc instanceof PooledByteBufAllocator) {
ByteBufAllocatorMetrics.INSTANCE.registerMetrics("pooled", ((PooledByteBufAllocator) alloc).metric());
}
else if (alloc instanceof UnpooledByteBufAllocator) {
ByteBufAllocatorMetrics.INSTANCE.registerMetrics("unpooled", ((UnpooledByteBufAllocator) alloc).metric());
}
}
if (config.loggingHandler != null) {
pipeline.addFirst(NettyPipeline.LoggingHandler, config.loggingHandler);
}
ChannelOperations.addReactiveBridge(channel, config.channelOperationsProvider(), connectionObserver);
config.defaultOnChannelInit()
.then(config.doOnChannelInit)
.onChannelInit(connectionObserver, channel, remoteAddress);
pipeline.remove(this);
if (log.isDebugEnabled()) {
log.debug(format(channel, "Initialized pipeline {}"), pipeline.toString());
}
}
@Test
public void testConstructorWithProvidedReplacement() {
EmbeddedChannel channel = new EmbeddedChannel();
channel.pipeline().addFirst(NettyPipeline.SslHandler, new ChannelHandlerAdapter() {
});
HttpClientOperations ops1 = new HttpClientOperations(() -> channel,
ConnectionObserver.emptyListener(),
ClientCookieEncoder.STRICT, ClientCookieDecoder.STRICT);
ops1.followRedirectPredicate((req, res) -> true);
ops1.started = true;
ops1.retrying = true;
ops1.redirecting = new RedirectClientException(new DefaultHttpHeaders().add(HttpHeaderNames.LOCATION, "/"));
HttpClientOperations ops2 = new HttpClientOperations(ops1);
assertSame(ops1.channel(), ops2.channel());
assertSame(ops1.started, ops2.started);
assertSame(ops1.retrying, ops2.retrying);
assertSame(ops1.redirecting, ops2.redirecting);
assertSame(ops1.redirectedFrom, ops2.redirectedFrom);
assertSame(ops1.isSecure, ops2.isSecure);
assertSame(ops1.nettyRequest, ops2.nettyRequest);
assertSame(ops1.responseState, ops2.responseState);
assertSame(ops1.followRedirectPredicate, ops2.followRedirectPredicate);
assertSame(ops1.requestHeaders, ops2.requestHeaders);
}
@Override
public void run() {
PooledConnection pooledConnection = pooledRef.poolable();
Channel c = pooledConnection.channel;
// The connection might be closed after checking the eviction predicate
if (!c.isActive()) {
pooledRef.invalidate()
.subscribe(null, null, () -> {
if (log.isDebugEnabled()) {
log.debug(format(c, "Channel closed, now {} active connections and {} inactive connections"),
pool.metrics().acquiredSize(),
pool.metrics().idleSize());
}
});
if (!retried) {
if (log.isDebugEnabled()) {
log.debug(format(c, "Immediately aborted pooled channel, re-acquiring new channel"));
}
pool.acquire(Duration.ofMillis(pendingAcquireTimeout))
.subscribe(new DisposableAcquire(this));
}
else {
sink.error(new IOException("Error while acquiring from " + pool));
}
return;
}
// Set the owner only if the channel is active
ConnectionObserver current = c.attr(OWNER)
.getAndSet(this);
if (current instanceof PendingConnectionObserver) {
PendingConnectionObserver pending = (PendingConnectionObserver) current;
PendingConnectionObserver.Pending p;
current = null;
registerClose(pooledRef, pool);
while ((p = pending.pendingQueue.poll()) != null) {
if (p.error != null) {
onUncaughtException(p.connection, p.error);
}
else if (p.state != null) {
onStateChange(p.connection, p.state);
}
}
}
else if (current == null) {
registerClose(pooledRef, pool);
}
if (current != null) {
if (log.isDebugEnabled()) {
log.debug(format(c, "Channel acquired, now {} active connections and {} inactive connections"),
pool.metrics().acquiredSize(),
pool.metrics().idleSize());
}
obs.onStateChange(pooledConnection, State.ACQUIRED);
ChannelOperations<?, ?> ops = opsFactory.create(pooledConnection, pooledConnection, null);
if (ops != null) {
if (c.pipeline().get(NettyPipeline.H2MultiplexHandler) == null) {
ops.bind();
obs.onStateChange(ops, State.CONFIGURED);
}
else {
obs.onStateChange(pooledConnection, State.CONFIGURED);
}
sink.success(ops);
}
else {
// Already configured, just forward the connection
sink.success(pooledConnection);
}
return;
}
// Connected, leave onStateChange forward the event if factory
if (log.isDebugEnabled()) {
log.debug(format(c, "Channel connected, now {} active connections and {} inactive connections"),
pool.metrics().acquiredSize(),
pool.metrics().idleSize());
}
if (opsFactory == ChannelOperations.OnSetup.empty()) {
sink.success(Connection.from(c));
}
}
public void addSslHandler(Channel channel, @Nullable SocketAddress remoteAddress, boolean sslDebug) {
SslHandler sslHandler;
if (remoteAddress instanceof InetSocketAddress) {
InetSocketAddress sniInfo = (InetSocketAddress) remoteAddress;
sslHandler = getSslContext()
.newHandler(channel.alloc(), sniInfo.getHostString(), sniInfo.getPort());
if (log.isDebugEnabled()) {
log.debug(format(channel, "SSL enabled using engine {} and SNI {}"),
sslHandler.engine().getClass().getSimpleName(), sniInfo);
}
}
else {
sslHandler = getSslContext().newHandler(channel.alloc());
if (log.isDebugEnabled()) {
log.debug(format(channel, "SSL enabled using engine {}"),
sslHandler.engine().getClass().getSimpleName());
}
}
configure(sslHandler);
ChannelPipeline pipeline = channel.pipeline();
if (pipeline.get(NettyPipeline.ProxyHandler) != null) {
pipeline.addAfter(NettyPipeline.ProxyHandler, NettyPipeline.SslHandler, sslHandler);
}
else {
pipeline.addFirst(NettyPipeline.SslHandler, sslHandler);
}
if (pipeline.get(NettyPipeline.LoggingHandler) != null) {
pipeline.addAfter(NettyPipeline.LoggingHandler, NettyPipeline.SslReader, new SslReadHandler());
if (sslDebug) {
pipeline.addBefore(NettyPipeline.SslHandler,
NettyPipeline.SslLoggingHandler,
new LoggingHandler("reactor.netty.tcp.ssl"));
}
}
else {
pipeline.addAfter(NettyPipeline.SslHandler, NettyPipeline.SslReader, new SslReadHandler());
}
}
static void configureHttp11OrH2CleartextPipeline(
ChannelPipeline p,
boolean acceptGzip,
HttpResponseDecoderSpec decoder,
Http2Settings http2Settings,
@Nullable Supplier<? extends ChannelMetricsRecorder> metricsRecorder,
ConnectionObserver observer,
ChannelOperations.OnSetup opsFactory,
@Nullable Function<String, String> uriTagValue) {
HttpClientCodec httpClientCodec =
new HttpClientCodec(
decoder.maxInitialLineLength(),
decoder.maxHeaderSize(),
decoder.maxChunkSize(),
decoder.failOnMissingResponse,
decoder.validateHeaders(),
decoder.initialBufferSize(),
decoder.parseHttpAfterConnectRequest);
Http2FrameCodecBuilder http2FrameCodecBuilder =
Http2FrameCodecBuilder.forClient()
.validateHeaders(decoder.validateHeaders())
.initialSettings(http2Settings);
if (p.get(NettyPipeline.LoggingHandler) != null) {
http2FrameCodecBuilder.frameLogger(new Http2FrameLogger(LogLevel.DEBUG,
"reactor.netty.http.client.h2"));
}
Http2FrameCodec http2FrameCodec = http2FrameCodecBuilder.build();
Http2ClientUpgradeCodec upgradeCodec = new Http2ClientUpgradeCodec(http2FrameCodec, new H2CleartextCodec(http2FrameCodec, opsFactory));
HttpClientUpgradeHandler upgradeHandler = new HttpClientUpgradeHandler(httpClientCodec, upgradeCodec, decoder.h2cMaxContentLength());
p.addBefore(NettyPipeline.ReactiveBridge, null, httpClientCodec)
.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.H2CUpgradeHandler, upgradeHandler)
.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.HttpTrafficHandler, new HttpTrafficHandler(observer));
if (acceptGzip) {
p.addAfter(NettyPipeline.HttpCodec, NettyPipeline.HttpDecompressor, new HttpContentDecompressor());
}
if (metricsRecorder != null) {
ChannelMetricsRecorder channelMetricsRecorder = metricsRecorder.get();
if (channelMetricsRecorder instanceof HttpClientMetricsRecorder) {
p.addBefore(NettyPipeline.ReactiveBridge,
NettyPipeline.HttpMetricsHandler,
new HttpClientMetricsHandler((HttpClientMetricsRecorder) channelMetricsRecorder, uriTagValue));
}
}
}
@SuppressWarnings("FutureReturnValueIgnored")
void _subscribe(CoreSubscriber<? super Void> s) {
HttpDataFactory df = DEFAULT_FACTORY;
try {
HttpClientFormEncoder encoder = new HttpClientFormEncoder(df,
parent.nettyRequest,
false,
HttpConstants.DEFAULT_CHARSET,
HttpPostRequestEncoder.EncoderMode.RFC1738);
formCallback.accept(parent, encoder);
encoder = encoder.applyChanges(parent.nettyRequest);
df = encoder.newFactory;
if (!encoder.isMultipart()) {
parent.requestHeaders.remove(HttpHeaderNames.TRANSFER_ENCODING);
}
// Returned value is deliberately ignored
parent.addHandlerFirst(NettyPipeline.ChunkedWriter, new ChunkedWriteHandler());
boolean chunked = HttpUtil.isTransferEncodingChunked(parent.nettyRequest);
HttpRequest r = encoder.finalizeRequest();
if (!chunked) {
HttpUtil.setTransferEncodingChunked(r, false);
HttpUtil.setContentLength(r, encoder.length());
}
ChannelFuture f = parent.channel()
.writeAndFlush(r);
Flux<Long> tail = encoder.progressFlux.onBackpressureLatest();
if (encoder.cleanOnTerminate) {
tail = tail.doOnCancel(encoder)
.doAfterTerminate(encoder);
}
if (encoder.isChunked()) {
if (progressCallback != null) {
progressCallback.accept(tail);
}
//"FutureReturnValueIgnored" this is deliberate
parent.channel()
.writeAndFlush(encoder);
}
else {
if (progressCallback != null) {
progressCallback.accept(FutureMono.from(f)
.cast(Long.class)
.switchIfEmpty(Mono.just(encoder.length()))
.flux());
}
}
s.onComplete();
}
catch (Throwable e) {
Exceptions.throwIfJvmFatal(e);
df.cleanRequestHttpData(parent.nettyRequest);
s.onError(Exceptions.unwrap(e));
}
}
static void configureHttp11OrH2CleartextPipeline(ChannelPipeline p,
@Nullable BiPredicate<HttpServerRequest, HttpServerResponse> compressPredicate,
ServerCookieDecoder cookieDecoder,
ServerCookieEncoder cookieEncoder,
HttpRequestDecoderSpec decoder,
boolean forwarded,
Http2Settings http2Settings,
ConnectionObserver listener,
@Nullable Supplier<? extends ChannelMetricsRecorder> metricsRecorder,
int minCompressionSize,
ChannelOperations.OnSetup opsFactory,
@Nullable Function<String, String> uriTagValue) {
HttpServerCodec httpServerCodec =
new HttpServerCodec(decoder.maxInitialLineLength(), decoder.maxHeaderSize(),
decoder.maxChunkSize(), decoder.validateHeaders(), decoder.initialBufferSize());
Http11OrH2CleartextCodec
upgrader = new Http11OrH2CleartextCodec(cookieDecoder, cookieEncoder, p.get(NettyPipeline.LoggingHandler) != null,
forwarded, http2Settings, listener, opsFactory, decoder.validateHeaders());
ChannelHandler http2ServerHandler = new H2CleartextCodec(upgrader);
CleartextHttp2ServerUpgradeHandler h2cUpgradeHandler = new CleartextHttp2ServerUpgradeHandler(
httpServerCodec,
new HttpServerUpgradeHandler(httpServerCodec, upgrader, decoder.h2cMaxContentLength()),
http2ServerHandler);
p.addBefore(NettyPipeline.ReactiveBridge,
NettyPipeline.H2CUpgradeHandler, h2cUpgradeHandler)
.addBefore(NettyPipeline.ReactiveBridge,
NettyPipeline.HttpTrafficHandler,
new HttpTrafficHandler(listener, forwarded, compressPredicate, cookieEncoder, cookieDecoder));
if (ACCESS_LOG) {
p.addAfter(NettyPipeline.H2CUpgradeHandler, NettyPipeline.AccessLogHandler, new AccessLogHandler());
}
boolean alwaysCompress = compressPredicate == null && minCompressionSize == 0;
if (alwaysCompress) {
p.addBefore(NettyPipeline.HttpTrafficHandler, NettyPipeline.CompressionHandler, new SimpleCompressionHandler());
}
if (metricsRecorder != null) {
ChannelMetricsRecorder channelMetricsRecorder = metricsRecorder.get();
if (channelMetricsRecorder instanceof HttpServerMetricsRecorder) {
p.addAfter(NettyPipeline.HttpTrafficHandler, NettyPipeline.HttpMetricsHandler,
new HttpServerMetricsHandler((HttpServerMetricsRecorder) channelMetricsRecorder, uriTagValue));
if (channelMetricsRecorder instanceof MicrometerHttpServerMetricsRecorder) {
// MicrometerHttpServerMetricsRecorder does not implement metrics on protocol level
// ChannelMetricsHandler will be removed from the pipeline
p.remove(NettyPipeline.ChannelMetricsHandler);
}
}
}
}
static void configureHttp11Pipeline(ChannelPipeline p,
@Nullable BiPredicate<HttpServerRequest, HttpServerResponse> compressPredicate,
ServerCookieDecoder cookieDecoder,
ServerCookieEncoder cookieEncoder,
HttpRequestDecoderSpec decoder,
boolean forwarded,
ConnectionObserver listener,
@Nullable Supplier<? extends ChannelMetricsRecorder> metricsRecorder,
int minCompressionSize,
@Nullable Function<String, String> uriTagValue) {
p.addBefore(NettyPipeline.ReactiveBridge,
NettyPipeline.HttpCodec,
new HttpServerCodec(decoder.maxInitialLineLength(), decoder.maxHeaderSize(),
decoder.maxChunkSize(), decoder.validateHeaders(), decoder.initialBufferSize()))
.addBefore(NettyPipeline.ReactiveBridge,
NettyPipeline.HttpTrafficHandler,
new HttpTrafficHandler(listener, forwarded, compressPredicate, cookieEncoder, cookieDecoder));
if (ACCESS_LOG) {
p.addAfter(NettyPipeline.HttpCodec, NettyPipeline.AccessLogHandler, new AccessLogHandler());
}
boolean alwaysCompress = compressPredicate == null && minCompressionSize == 0;
if (alwaysCompress) {
p.addBefore(NettyPipeline.HttpTrafficHandler, NettyPipeline.CompressionHandler, new SimpleCompressionHandler());
}
if (metricsRecorder != null) {
ChannelMetricsRecorder channelMetricsRecorder = metricsRecorder.get();
if (channelMetricsRecorder instanceof HttpServerMetricsRecorder) {
p.addAfter(NettyPipeline.HttpTrafficHandler, NettyPipeline.HttpMetricsHandler,
new HttpServerMetricsHandler((HttpServerMetricsRecorder) channelMetricsRecorder, uriTagValue));
if (channelMetricsRecorder instanceof MicrometerHttpServerMetricsRecorder) {
// MicrometerHttpServerMetricsRecorder does not implement metrics on protocol level
// ChannelMetricsHandler will be removed from the pipeline
p.remove(NettyPipeline.ChannelMetricsHandler);
}
}
}
}