下面列出了怎么用 io.netty.handler.codec.haproxy.HAProxyMessage 的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Read is invoked automatically by Netty as messages arrive on the socket.
*/
protected void read(Object msg) {
LOG.debug("Reading: {}", msg);
lastReadTime = System.currentTimeMillis();
if (tunneling) {
// In tunneling mode, this connection is simply shoveling bytes
readRaw((ByteBuf) msg);
} else if ( msg instanceof HAProxyMessage) {
readHAProxyMessage((HAProxyMessage)msg);
} else {
// If not tunneling, then we are always dealing with HttpObjects.
readHTTP((HttpObject) msg);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HAProxyMessage) {
HAProxyMessage proxyMessage = (HAProxyMessage) msg;
if (proxyMessage.sourceAddress() != null && proxyMessage.sourcePort() != 0) {
InetSocketAddress remoteAddress = AddressUtils
.createUnresolved(proxyMessage.sourceAddress(), proxyMessage.sourcePort());
ctx.channel()
.attr(REMOTE_ADDRESS_FROM_PROXY_PROTOCOL)
.set(remoteAddress);
}
proxyMessage.release();
ctx.channel()
.pipeline()
.remove(this);
ctx.read();
} else {
super.channelRead(ctx, msg);
}
}
@Override
protected void decode(ChannelHandlerContext ctx, HAProxyMessage msg, List<Object> out)
throws Exception {
if (logger.isDebugEnabled()) {
logger.debug("PROXY message {}: {}:{} -> {}:{} (next: {})",
msg.protocolVersion().name(),
msg.sourceAddress(), msg.sourcePort(),
msg.destinationAddress(), msg.destinationPort(),
proxiedCandidates);
}
final ChannelPipeline p = ctx.pipeline();
final InetAddress src = InetAddress.getByAddress(
NetUtil.createByteArrayFromIpAddressString(msg.sourceAddress()));
final InetAddress dst = InetAddress.getByAddress(
NetUtil.createByteArrayFromIpAddressString(msg.destinationAddress()));
final ProxiedAddresses proxiedAddresses =
ProxiedAddresses.of(new InetSocketAddress(src, msg.sourcePort()),
new InetSocketAddress(dst, msg.destinationPort()));
configurePipeline(p, proxiedCandidates, proxiedAddresses);
p.remove(this);
}
@Ignore
@Test
public void detectsSplitPpv1Message() {
EmbeddedChannel channel = new EmbeddedChannel();
channel.pipeline().addLast(ElbProxyProtocolChannelHandler.NAME, new ElbProxyProtocolChannelHandler(registry, true));
ByteBuf buf1 = Unpooled.wrappedBuffer(
"PROXY TCP4".getBytes(StandardCharsets.US_ASCII));
channel.writeInbound(buf1);
ByteBuf buf2 = Unpooled.wrappedBuffer(
"192.168.0.1 124.123.111.111 10008 443\r\n".getBytes(StandardCharsets.US_ASCII));
channel.writeInbound(buf2);
Object msg = channel.readInbound();
assertTrue(msg instanceof HAProxyMessage);
buf1.release();
buf2.release();
((HAProxyMessage) msg).release();
// The handler should remove itself.
assertNull(channel.pipeline().context(ElbProxyProtocolChannelHandler.class));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
if (sessionHandler == null) {
// No session handler available, do nothing
return;
}
if (sessionHandler.beforeHandle()) {
return;
}
if (msg instanceof MinecraftPacket) {
MinecraftPacket pkt = (MinecraftPacket) msg;
if (!pkt.handle(sessionHandler)) {
sessionHandler.handleGeneric((MinecraftPacket) msg);
}
} else if (msg instanceof HAProxyMessage) {
HAProxyMessage proxyMessage = (HAProxyMessage) msg;
this.remoteAddress = new InetSocketAddress(proxyMessage.sourceAddress(),
proxyMessage.sourcePort());
} else if (msg instanceof ByteBuf) {
sessionHandler.handleUnknown((ByteBuf) msg);
}
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if(msg instanceof HAProxyMessage) {
ctx.attr(CLIENT_IP).set(((HAProxyMessage) msg).sourceAddress());
} else {
super.channelRead(ctx, msg);
}
}
@Override
protected void readHAProxyMessage(HAProxyMessage msg) {
// NO-OP,
// We never expect server to send a proxy protocol message.
}
@Override
public void onEnable() {
try {
Field remoteAddressField = AbstractChannel.class.getDeclaredField("remoteAddress");
remoteAddressField.setAccessible(true);
Field serverChild = PipelineUtils.class.getField("SERVER_CHILD");
serverChild.setAccessible(true);
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(serverChild, serverChild.getModifiers() & ~Modifier.FINAL);
ChannelInitializer<Channel> bungeeChannelInitializer = PipelineUtils.SERVER_CHILD;
Method initChannelMethod = ChannelInitializer.class.getDeclaredMethod("initChannel", Channel.class);
initChannelMethod.setAccessible(true);
serverChild.set(null, new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
initChannelMethod.invoke(bungeeChannelInitializer, channel);
channel.pipeline().addAfter(PipelineUtils.TIMEOUT_HANDLER, "haproxy-decoder", new HAProxyMessageDecoder());
channel.pipeline().addAfter("haproxy-decoder", "haproxy-handler", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HAProxyMessage) {
HAProxyMessage message = (HAProxyMessage) msg;
remoteAddressField.set(channel, new InetSocketAddress(message.sourceAddress(), message.sourcePort()));
} else {
super.channelRead(ctx, msg);
}
}
});
}
});
} catch (Exception e) {
getLogger().log(Level.SEVERE, e.getMessage(), e);
getProxy().stop();
}
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
// Flag that we have now received the LastContent for this request from the client.
// This is needed for ClientResponseReceiver to know whether it's yet safe to start writing
// a response to the client channel.
if (msg instanceof LastHttpContent) {
ctx.channel().attr(ATTR_LAST_CONTENT_RECEIVED).set(Boolean.TRUE);
}
if (msg instanceof HttpRequest) {
clientRequest = (HttpRequest) msg;
zuulRequest = buildZuulHttpRequest(clientRequest, ctx);
// Handle invalid HTTP requests.
if (clientRequest.decoderResult().isFailure()) {
LOG.warn(
"Invalid http request. clientRequest = {} , uri = {}, info = {}",
clientRequest.toString(),
clientRequest.uri(),
ChannelUtils.channelInfoForLogging(ctx.channel()),
clientRequest.decoderResult().cause());
RejectionUtils.rejectByClosingConnection(
ctx,
ZuulStatusCategory.FAILURE_CLIENT_BAD_REQUEST,
"decodefailure",
clientRequest,
/* injectedLatencyMillis= */ null);
return;
} else if (zuulRequest.hasBody() && zuulRequest.getBodyLength() > zuulRequest.getMaxBodySize()) {
String errorMsg = "Request too large. "
+ "clientRequest = " + clientRequest.toString()
+ ", uri = " + String.valueOf(clientRequest.uri())
+ ", info = " + ChannelUtils.channelInfoForLogging(ctx.channel());
final ZuulException ze = new ZuulException(errorMsg);
ze.setStatusCode(HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE.code());
StatusCategoryUtils.setStatusCategory(
zuulRequest.getContext(),
ZuulStatusCategory.FAILURE_CLIENT_BAD_REQUEST);
zuulRequest.getContext().setError(ze);
zuulRequest.getContext().setShouldSendErrorResponse(true);
}
handleExpect100Continue(ctx, clientRequest);
//Send the request down the filter pipeline
ctx.fireChannelRead(zuulRequest);
}
else if (msg instanceof HttpContent) {
if ((zuulRequest != null) && (! zuulRequest.getContext().isCancelled())) {
ctx.fireChannelRead(msg);
} else {
//We already sent response for this request, these are laggard request body chunks that are still arriving
ReferenceCountUtil.release(msg);
}
}
else if (msg instanceof HAProxyMessage) {
// do nothing, should already be handled by ElbProxyProtocolHandler
LOG.debug("Received HAProxyMessage for Proxy Protocol IP: {}", ((HAProxyMessage) msg).sourceAddress());
ReferenceCountUtil.release(msg);
}
else {
LOG.debug("Received unrecognized message type. " + msg.getClass().getName());
ReferenceCountUtil.release(msg);
}
}
/**
* Read an {@link HAProxyMessage}
* @param msg {@link HAProxyMessage}
*/
protected abstract void readHAProxyMessage(HAProxyMessage msg);