下面列出了 io.netty.handler.codec.http.FullHttpRequest # setUri ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//首次连接是FullHttpRequest,处理参数
if (msg instanceof FullHttpRequest) {
FullHttpRequest request = (FullHttpRequest) msg;
String uri = request.uri();
Map paramMap = getUrlParams(uri);
//如果url包含参数,需要处理
if (uri.contains("?")) {
String newUri = uri.substring(0, uri.indexOf("?"));
request.setUri(newUri);
}
Object obj = paramMap.get("token");
if (null == obj || "undefined".equals(obj)) {
ctx.channel().close();
return;
}
ChannelManager.putChannel(ChannelManager.channelLongText(ctx), ctx.channel());
} else if (msg instanceof TextWebSocketFrame) {
//正常的TEXT消息类型
// TextWebSocketFrame frame = (TextWebSocketFrame) msg;
}
super.channelRead(ctx, msg);
}
@Test
public void testDecodeHttpRequest() throws Exception {
ServiceManager serviceManager = ServiceManager.getInstance();
serviceManager.registerService(new EchoServiceImpl(), null);
ByteBuf content = Unpooled.wrappedBuffer(encodeBody(
Echo.EchoRequest.newBuilder().setMessage("hello").build()));
FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_0, HttpMethod.GET,
"/example.EchoService/Echo", content);
httpRequest.headers().set("log-id", 1);
httpRequest.setUri("/example.EchoService/Echo");
httpRequest.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/proto; charset=utf-8");
httpRequest.headers().set("key", "value");
Request request = protocol.decodeRequest(httpRequest);
assertEquals("example.EchoService", request.getRpcMethodInfo().getServiceName());
assertEquals("Echo", request.getRpcMethodInfo().getMethodName());
assertEquals(EchoService.class.getMethods()[0], request.getTargetMethod());
assertEquals(EchoServiceImpl.class, request.getTarget().getClass());
assertEquals(request.getKvAttachment().get("key"), "value");
}
@Override
protected void incoming(FullHttpRequest request) {
// TODO Auto-generated method stub
request.setUri("/test/fromtrans");
request.headers().add("x-incoming", "1");
transit("127.0.0.1", 9080);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest) {
FullHttpRequest request = (FullHttpRequest) msg;
String requestUri = request.uri();
String mapToURI = mapTo(requestUri);
log.trace("Mapping from {} to {}", requestUri, mapToURI);
request.setUri(mapToURI);
}
super.channelRead(ctx, msg);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest) {
FullHttpRequest request = (FullHttpRequest) msg;
if (request.uri().equals(initUrl)) {
if (ctx.channel().attr(SessionDao.userAttributeKey).get() == null) {
request.setUri(mapToUrlWithoutCookie);
} else {
request.setUri(mapToUrlWithCookie);
}
}
}
super.channelRead(ctx, msg);
}
@Override
protected boolean handleProxyProtocol(ChannelHandlerContext ctx, Object msg) throws Exception {
FullHttpRequest req = (FullHttpRequest) msg;
ctx.pipeline().remove(HttpObjectAggregator.class);
ctx.pipeline().get(HttpServerCodec.class).removeInboundHandler();
String host = destination.getHostString() + ":" + destination.getPort();
req.headers().set("Host", host);
req.setUri(uri.getRawPath());
addReceived(req);
ctx.pipeline().get(HttpServerCodec.class).removeOutboundHandler();
return true;
}
@SuppressWarnings("unchecked")
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//check if the request is a handshake
if (msg instanceof FullHttpRequest) {
FullHttpRequest req = (FullHttpRequest) msg;
uri = req.getUri();
URI uriTemp = new URI(uri);
apiContextUri = new URI(uriTemp.getScheme(), uriTemp.getAuthority(), uriTemp.getPath(),
null, uriTemp.getFragment()).toString();
if (req.getUri().contains("/t/")) {
tenantDomain = MultitenantUtils.getTenantDomainFromUrl(req.getUri());
} else {
tenantDomain = MultitenantConstants.SUPER_TENANT_DOMAIN_NAME;
}
String useragent = req.headers().get(HttpHeaders.USER_AGENT);
// '-' is used for empty values to avoid possible errors in DAS side.
// Required headers are stored one by one as validateOAuthHeader()
// removes some of the headers from the request
useragent = useragent != null ? useragent : "-";
headers.add(HttpHeaders.USER_AGENT, useragent);
if (validateOAuthHeader(req)) {
if (!MultitenantConstants.SUPER_TENANT_DOMAIN_NAME.equals(tenantDomain)) {
// carbon-mediation only support websocket invocation from super tenant APIs.
// This is a workaround to mimic the the invocation came from super tenant.
req.setUri(req.getUri().replaceFirst("/", "-"));
String modifiedUri = uri.replaceFirst("/t/", "-t/");
req.setUri(modifiedUri);
msg = req;
} else {
req.setUri(uri); // Setting endpoint appended uri
}
if (StringUtils.isNotEmpty(token)) {
((FullHttpRequest) msg).headers().set(APIMgtGatewayConstants.WS_JWT_TOKEN_HEADER, token);
}
ctx.fireChannelRead(msg);
// publish google analytics data
GoogleAnalyticsData.DataBuilder gaData = new GoogleAnalyticsData.DataBuilder(null, null, null, null)
.setDocumentPath(uri)
.setDocumentHostName(DataPublisherUtil.getHostAddress())
.setSessionControl("end")
.setCacheBuster(APIMgtGoogleAnalyticsUtils.getCacheBusterId())
.setIPOverride(ctx.channel().remoteAddress().toString());
APIMgtGoogleAnalyticsUtils gaUtils = new APIMgtGoogleAnalyticsUtils();
gaUtils.init(tenantDomain);
gaUtils.publishGATrackingData(gaData, req.headers().get(HttpHeaders.USER_AGENT),
headers.get(HttpHeaders.AUTHORIZATION));
} else {
ctx.writeAndFlush(new TextWebSocketFrame(APISecurityConstants.API_AUTH_INVALID_CREDENTIALS_MESSAGE));
if (log.isDebugEnabled()) {
log.debug("Authentication Failure for the websocket context: " + apiContextUri);
}
throw new APISecurityException(APISecurityConstants.API_AUTH_INVALID_CREDENTIALS,
APISecurityConstants.API_AUTH_INVALID_CREDENTIALS_MESSAGE);
}
} else if ((msg instanceof CloseWebSocketFrame) || (msg instanceof PingWebSocketFrame)) {
//if the inbound frame is a closed frame, throttling, analytics will not be published.
ctx.fireChannelRead(msg);
} else if (msg instanceof WebSocketFrame) {
boolean isAllowed = doThrottle(ctx, (WebSocketFrame) msg);
if (isAllowed) {
ctx.fireChannelRead(msg);
String clientIp = getRemoteIP(ctx);
// publish analytics events if analytics is enabled
if (APIUtil.isAnalyticsEnabled()) {
publishRequestEvent(clientIp, true);
}
} else {
ctx.writeAndFlush(new TextWebSocketFrame("Websocket frame throttled out"));
if (log.isDebugEnabled()) {
log.debug("Inbound Websocket frame is throttled. " + ctx.channel().toString());
}
}
}
}
@Override
protected FullHttpRequest newHandshakeRequest() {
FullHttpRequest request = super.newHandshakeRequest();
request.setUri(super.uri().getRawPath());
return request;
}