下面列出了怎么用io.reactivex.netty.protocol.http.server.HttpServerRequest的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
List<String> videoIds = request.getQueryParameters().get("videoId");
int latency = 1;
if (Random.randomIntFrom0to100() > 80) {
latency = 10;
}
return Observable.from(videoIds).map(videoId -> {
Map<String, Object> video = new HashMap<>();
video.put("videoId", videoId);
video.put("position", (int) (Math.random() * 5000));
return video;
}).flatMap(video -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(video) + "\n"))
.delay(latency, TimeUnit.MILLISECONDS).doOnCompleted(response::close); // simulate latency
}
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
return request.getContent().flatMap(i -> {
List<String> ips = request.getQueryParameters().get("ip");
Map<String, Object> data = new HashMap<>();
for (String ip : ips) {
Map<String, Object> ip_data = new HashMap<>();
ip_data.put("country_code", "GB");
ip_data.put("longitude", "-0.13");
ip_data.put("latitude", "51.5");
data.put(ip, ip_data);
}
return response.writeStringAndFlush("data: " + SimpleJson.mapToJson(data) + "\n")
.doOnCompleted(response::close);
}).delay(10, TimeUnit.MILLISECONDS);
}
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
List<String> userIds = request.getQueryParameters().get("userId");
if (userIds == null || userIds.size() == 0) {
return writeError(request, response, "At least one parameter of 'userId' must be included.");
}
return Observable.from(userIds).map(userId -> {
Map<String, Object> user = new HashMap<>();
user.put("userId", userId);
user.put("name", "Name Here");
user.put("other_data", "goes_here");
return user;
}).flatMap(user -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(user) + "\n")
.doOnCompleted(response::close))
.delay(((long) (Math.random() * 500) + 500), TimeUnit.MILLISECONDS); // simulate latency
}
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
List<String> userId = request.getQueryParameters().get("userId");
if (userId == null || userId.size() != 1) {
return StartGatewayServer.writeError(request, response, "A single 'userId' is required.");
}
return new UserCommand(userId).observe().flatMap(user -> {
Observable<Map<String, Object>> catalog = new PersonalizedCatalogCommand(user).observe()
.flatMap(catalogList -> catalogList.videos().<Map<String, Object>> flatMap(
video -> {
Observable<Bookmark> bookmark = new BookmarkCommand(video).observe();
Observable<Rating> rating = new RatingsCommand(video).observe();
Observable<VideoMetadata> metadata = new VideoMetadataCommand(video).observe();
return Observable.zip(bookmark, rating, metadata, (b, r, m) -> combineVideoData(video, b, r, m));
}));
Observable<Map<String, Object>> social = new SocialCommand(user).observe().map(s -> {
return s.getDataAsMap();
});
return Observable.merge(catalog, social);
}).flatMap(data -> {
String json = SimpleJson.mapToJson(data);
return response.writeStringAndFlush("data: " + json + "\n");
});
}
public HttpServer<ByteBuf, ByteBuf> createServer() {
HttpServer<ByteBuf, ByteBuf> server = RxNetty.newHttpServerBuilder(port, new RequestHandler<ByteBuf, ByteBuf>() {
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
if (request.getPath().contains("/users")) {
if (request.getHttpMethod().equals(HttpMethod.GET)) {
return handleRecommendationsByUserId(request, response);
} else {
return handleUpdateRecommendationsForUser(request, response);
}
}
if (request.getPath().contains("/recommendations")) {
return handleRecommendationsBy(request, response);
}
if (request.getPath().contains("/movies")) {
return handleRegisterMovie(request, response);
}
response.setStatus(HttpResponseStatus.NOT_FOUND);
return response.close();
}
}).pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpServerConfigurator()).enableWireLogging(LogLevel.ERROR).build();
System.out.println("RxMovie server started...");
return server;
}
private Observable<Void> handleRecommendationsByUserId(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
System.out.println("HTTP request -> recommendations by user id request: " + request.getPath());
final String userId = userIdFromPath(request.getPath());
if (userId == null) {
response.setStatus(HttpResponseStatus.BAD_REQUEST);
return response.close();
}
if (!userRecommendations.containsKey(userId)) {
response.setStatus(HttpResponseStatus.NOT_FOUND);
return response.close();
}
StringBuilder builder = new StringBuilder();
for (String movieId : userRecommendations.get(userId)) {
System.out.println(" returning: " + movies.get(movieId));
builder.append(movies.get(movieId)).append('\n');
}
ByteBuf byteBuf = UnpooledByteBufAllocator.DEFAULT.buffer();
byteBuf.writeBytes(builder.toString().getBytes(Charset.defaultCharset()));
response.write(byteBuf);
return response.close();
}
private Observable<Void> handleRegisterMovie(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
System.out.println("Http request -> register movie: " + request.getPath());
return request.getContent().flatMap(new Func1<ByteBuf, Observable<Void>>() {
@Override
public Observable<Void> call(ByteBuf byteBuf) {
String formatted = byteBuf.toString(Charset.defaultCharset());
System.out.println(" movie: " + formatted);
try {
Movie movie = Movie.from(formatted);
movies.put(movie.getId(), movie);
response.setStatus(HttpResponseStatus.CREATED);
} catch (Exception e) {
System.err.println("Invalid movie content");
e.printStackTrace();
response.setStatus(HttpResponseStatus.BAD_REQUEST);
}
return response.close();
}
});
}
public GovernatorHttpInterceptorSupport<I, O> intercept(List<Class<? extends DuplexInterceptor<HttpServerRequest<I>, HttpServerResponse<O>>>> interceptors) {
ArrayList<Class<? extends InboundInterceptor<HttpServerRequest<I>, HttpServerResponse<O>>>> ins =
new ArrayList<Class<? extends InboundInterceptor<HttpServerRequest<I>, HttpServerResponse<O>>>>();
ArrayList<Class<? extends OutboundInterceptor<HttpServerResponse<O>>>> outs =
new ArrayList<Class<? extends OutboundInterceptor<HttpServerResponse<O>>>>();
for (Class<? extends DuplexInterceptor<HttpServerRequest<I>, HttpServerResponse<O>>> interceptor : interceptors) {
ins.add(interceptor);
outs.add(interceptor);
}
HttpInClassHolder<I, O> inHolder = new HttpInClassHolder<I, O>(key, ins);
interceptorSupport.inboundInterceptorClasses.add(inHolder);
HttpOutClassHolder<I, O> outHolder = new HttpOutClassHolder<I, O>(key, outs);
interceptorSupport.outboundInterceptorClasses.add(outHolder);
return interceptorSupport;
}
/**
* Parses (if not done previously) and returns the path component in the URI.
*
* @param httpRequest HTTP request for which the URI path is to be returned.
*
* @return The path component of the URI (as returned by {@link HttpRequest#getUri()} or {@code null} if the
* URI is null.
*/
String getRequestUriPath(HttpServerRequest<?> httpRequest) {
String uri = httpRequest.getUri();
if (null == uri) {
return null;
}
if (null == queryStringDecoder) {
if (null == channel) {
queryStringDecoder = new QueryStringDecoder(uri);
} else {
queryStringDecoder = getOrCreateQueryStringDecoder(httpRequest);
}
}
return queryStringDecoder.nettyDecoder().path();
}
private QueryStringDecoder getOrCreateQueryStringDecoder(HttpServerRequest<?> request) {
if (null == request) {
throw new NullPointerException("Request can not be null.");
}
String uri = request.getUri();
if (null == uri) {
return null;
}
Attribute<QueryStringDecoder> queryDecoderAttr = channel.attr(queryDecoderKey);
QueryStringDecoder _queryStringDecoder = queryDecoderAttr.get();
if (null == _queryStringDecoder) {
_queryStringDecoder = new QueryStringDecoder(uri);
queryDecoderAttr.setIfAbsent(_queryStringDecoder);
}
return _queryStringDecoder;
}
public static QueryStringDecoder getOrCreateQueryStringDecoder(HttpServerRequest<?> request,
ChannelHandlerContext channelHandlerContext) {
if (null == request) {
throw new NullPointerException("Request can not be null.");
}
String uri = request.getUri();
if (null == uri) {
return null;
}
Attribute<QueryStringDecoder> queryDecoderAttr = channelHandlerContext.attr(queryDecoderKey);
QueryStringDecoder _queryStringDecoder = queryDecoderAttr.get();
if (null == _queryStringDecoder) {
_queryStringDecoder = new QueryStringDecoder(uri);
queryDecoderAttr.setIfAbsent(_queryStringDecoder);
}
return _queryStringDecoder;
}
public Observable<Void> sayHelloToUser(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
JSONObject content = new JSONObject();
int prefixLength = "/hello/to".length();
String userName = request.getPath().substring(prefixLength);
try {
if (userName.isEmpty() || userName.length() == 1 /*The uri is /hello/to/ but no name */) {
response.setStatus(HttpResponseStatus.BAD_REQUEST);
content.put("Error", "Please provide a username to say hello. The URI should be /hello/to/{username}");
} else {
content.put("Message", "Hello " + userName.substring(1) /*Remove the / prefix*/ + " from Netflix OSS");
}
} catch (JSONException e) {
logger.error("Error creating json response.", e);
return Observable.error(e);
}
response.write(content.toString(), StringTransformer.DEFAULT_INSTANCE);
return response.close();
}
protected static int getParameter(HttpServerRequest<?> request, String key, int defaultValue) {
List<String> v = request.getQueryParameters().get(key);
if (v == null || v.size() != 1) {
return defaultValue;
} else {
return Integer.parseInt(String.valueOf(v.get(0)));
}
}
public static void main(String... args) throws InterruptedException {
HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(8080,
(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) -> {
System.out.println("Server => Request: " + request.getPath());
try {
if ("/error".equals(request.getPath())) {
throw new RuntimeException("forced error");
}
response.setStatus(HttpResponseStatus.OK);
response.writeString("Path Requested =>: " + request.getPath() + '\n');
return response.close();
} catch (Throwable e) {
System.err.println("Server => Error [" + request.getPath() + "] => " + e);
response.setStatus(HttpResponseStatus.BAD_REQUEST);
response.writeString("Error 500: Bad Request\n");
return response.close();
}
});
server.startAndWait();
RxNetty.createHttpGet("http://localhost:8080/")
.flatMap(response -> response.getContent())
.map(data -> "Client => " + data.toString(Charset.defaultCharset()))
.toBlocking().forEach(System.out::println);
RxNetty.createHttpGet("http://localhost:8080/error")
.flatMap(response -> response.getContent())
.map(data -> "Client => " + data.toString(Charset.defaultCharset()))
.toBlocking().forEach(System.out::println);
RxNetty.createHttpGet("http://localhost:8080/data")
.flatMap(response -> response.getContent())
.map(data -> "Client => " + data.toString(Charset.defaultCharset()))
.toBlocking().forEach(System.out::println);
//server.shutdown();
}
@Override
public Observable<Void> handle(HttpServerRequest<I> request, HttpServerResponse<O> response) {
if (request.getPath().endsWith(hystrixPrefix)) {
return handleHystrixRequest(response);
}
return appHandler.handle(request, response);
}
protected static int getParameter(HttpServerRequest<?> request, String key, int defaultValue) {
List<String> v = request.getQueryParameters().get(key);
if (v == null || v.size() != 1) {
return defaultValue;
} else {
return Integer.parseInt(String.valueOf(v.get(0)));
}
}
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
return Observable.from(request.getQueryParameters().get("userId")).map(userId -> {
Map<String, Object> userData = new HashMap<>();
userData.put("user_id", userId);
userData.put("list_title", "Really quirky and over detailed list title!");
userData.put("other_data", "goes_here");
userData.put("videos", Arrays.asList(12345, 23456, 34567, 45678, 56789, 67890));
return userData;
}).flatMap(list -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(list) + "\n"))
.delay(((long) (Math.random() * 100) + 20), TimeUnit.MILLISECONDS)
.doOnCompleted(response::close); // simulate latency
}
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
return Observable.from(request.getQueryParameters().get("userId")).map(userId -> {
Map<String, Object> user = new HashMap<>();
user.put("userId", userId);
user.put("friends", Arrays.asList(randomUser(), randomUser(), randomUser(), randomUser()));
return user;
}).flatMap(list -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(list) + "\n"))
.delay(((long) (Math.random() * 100) + 20), TimeUnit.MILLISECONDS).doOnCompleted(response::close); // simulate latency
}
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
List<String> videoIds = request.getQueryParameters().get("videoId");
return Observable.from(videoIds).map(videoId -> {
Map<String, Object> video = new HashMap<>();
video.put("videoId", videoId);
video.put("title", "Video Title");
video.put("other_data", "goes_here");
return video;
}).flatMap(video -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(video) + "\n")
.doOnCompleted(response::close))
.delay(((long) (Math.random() * 20) + 20), TimeUnit.MILLISECONDS); // simulate latency
}
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
List<String> _id = request.getQueryParameters().get("id");
if (_id == null || _id.size() != 1) {
return writeError(request, response, "Please provide a numerical 'id' value. It can be a random number (uuid). Received => " + _id);
}
long id = Long.parseLong(String.valueOf(_id.get(0)));
int delay = getParameter(request, "delay", 50); // default to 50ms server-side delay
int itemSize = getParameter(request, "itemSize", 128); // default to 128 bytes item size (assuming ascii text)
int numItems = getParameter(request, "numItems", 10); // default to 10 items in a list
// no more than 100 items
if (numItems < 1 || numItems > 100) {
return writeError(request, response, "Please choose a 'numItems' value from 1 to 100.");
}
// no larger than 50KB per item
if (itemSize < 1 || itemSize > 1024 * 50) {
return writeError(request, response, "Please choose an 'itemSize' value from 1 to 1024*50 (50KB).");
}
// no larger than 60 second delay
if (delay < 0 || delay > 60000) {
return writeError(request, response, "Please choose a 'delay' value from 0 to 60000 (60 seconds).");
}
response.setStatus(HttpResponseStatus.OK);
return MockResponse.generateJson(id, delay, itemSize, numItems)
.flatMap(json -> response.writeStringAndFlush("data:" + json + "\n"))
.doOnCompleted(response::close);
}
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
List<String> videoIds = request.getQueryParameters().get("videoId");
return Observable.from(videoIds).map(videoId -> {
Map<String, Object> video = new HashMap<>();
video.put("videoId", videoId);
video.put("estimated_user_rating", 3.5);
video.put("actual_user_rating", 4);
video.put("average_user_rating", 3.1);
return video;
}).flatMap(video -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(video) + "\n"))
.delay(20, TimeUnit.MILLISECONDS).doOnCompleted(response::close); // simulate latenc
}
public HystrixMetricsStreamHandler(String hystrixPrefix, long intervalInMillis) {
this.hystrixPrefix = hystrixPrefix;
this.interval = intervalInMillis;
this.appHandler = (HttpServerRequest<I> request, HttpServerResponse<O> response) -> {
return response.writeStringAndFlush("Only supported path is /" + hystrixPrefix);
};
}
@Override
public Observable<Void> handle(HttpServerRequest<I> request, HttpServerResponse<O> response) {
if (request.getPath().startsWith(hystrixPrefix)) {
return handleHystrixRequest(response);
}
return appHandler.handle(request, response);
}
/**
* Hard-coded route handling.
*/
private static Observable<Void> handleRoutes(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
if (request.getPath().equals("/device/home")) {
return new RouteForDeviceHome().handle(request, response);
} else if (request.getPath().equals("/testBasic")) {
return TestRouteBasic.handle(request, response);
} else if (request.getPath().equals("/testWithSimpleFaultTolerance")) {
return TestRouteWithSimpleFaultTolerance.handle(request, response);
} else if (request.getPath().equals("/testWithHystrix")) {
return TestRouteWithHystrix.handle(request, response);
} else {
return writeError(request, response, "Unknown path: " + request.getPath());
}
}
private void populateRequestHeaders(HttpServerRequest<ByteBuf> serverRequest, HttpClientRequest<ByteBuf> request) {
Set<String> headerNames = serverRequest.getHeaders().names();
for (String name : headerNames) {
if (name.contains("content-length")) {
continue;
}
request.getHeaders().add(name, serverRequest.getHeaders().getHeader(name));
}
// Normally always request gzipped from the server. But can be overridden with a Dynamic Property.
if (PROXY_REQ_ACCEPT_ENCODING != null && PROXY_REQ_ACCEPT_ENCODING.length() > 0) {
request.getHeaders().addHeader("accept-encoding", PROXY_REQ_ACCEPT_ENCODING);
}
//TODO Write X-Forwarded-Host, X-Forwarded-Port, X-Forwarded-Proto, X-Forwarded-For in the headers
}
private Observable<Void> handleUpdateRecommendationsForUser(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
System.out.println("HTTP request -> update recommendations for user: " + request.getPath());
final String userId = userIdFromPath(request.getPath());
if (userId == null) {
response.setStatus(HttpResponseStatus.BAD_REQUEST);
return response.close();
}
return request.getContent().flatMap(new Func1<ByteBuf, Observable<Void>>() {
@Override
public Observable<Void> call(ByteBuf byteBuf) {
String movieId = byteBuf.toString(Charset.defaultCharset());
System.out.println(format(" updating: {user=%s, movie=%s}", userId, movieId));
synchronized (this) {
Set<String> recommendations;
if (userRecommendations.containsKey(userId)) {
recommendations = userRecommendations.get(userId);
} else {
recommendations = new ConcurrentSet<String>();
userRecommendations.put(userId, recommendations);
}
recommendations.add(movieId);
}
response.setStatus(HttpResponseStatus.OK);
return response.close();
}
});
}
private static Observable<Void> handleRequest(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
if (request.getUri().startsWith("/hello")) {
return response.writeStringAndFlush("Hello world!");
}
List<String> _id = request.getQueryParameters().get("id");
if (_id == null || _id.size() != 1) {
return writeError(request, response,
"Please provide a numerical 'id' value. It can be a random number (uuid). Received => "
+ _id);
}
long id = Long.parseLong(String.valueOf(_id.get(0)));
int delay = getParameter(request, "delay", 50); // default to 50ms server-side delay
int itemSize = getParameter(request, "itemSize", 128); // default to 128 bytes item size (assuming ascii text)
int numItems = getParameter(request, "numItems", 10); // default to 10 items in a list
// no more than 100 items
if (numItems < 1 || numItems > 100) {
return writeError(request, response, "Please choose a 'numItems' value from 1 to 100.");
}
// no larger than 50KB per item
if (itemSize < 1 || itemSize > 1024 * 50) {
return writeError(request, response, "Please choose an 'itemSize' value from 1 to 1024*50 (50KB).");
}
// no larger than 60 second delay
if (delay < 0 || delay > 60000) {
return writeError(request, response, "Please choose a 'delay' value from 0 to 60000 (60 seconds).");
}
response.setStatus(HttpResponseStatus.OK);
return MockResponse.generateJson(id, delay, itemSize, numItems)
.doOnNext(json -> counter.add(CounterEvent.BYTES, json.readableBytes()))
.flatMap(response::writeAndFlush)
.doOnTerminate(response::close);
}
@SuppressWarnings("unchecked")
public GovernatorHttpInterceptorSupport<I, O> intercept(Class<? extends DuplexInterceptor<HttpServerRequest<I>, HttpServerResponse<O>>> interceptor) {
ArrayList<Class<? extends DuplexInterceptor<HttpServerRequest<I>, HttpServerResponse<O>>>> interceptors =
new ArrayList<Class<? extends DuplexInterceptor<HttpServerRequest<I>, HttpServerResponse<O>>>>();
interceptors.add(interceptor);
return intercept(interceptors);
}
/**
* Creates a new {@link KaryonServer} that has a single HTTP server instance which delegates all request
* handling to {@link RequestHandler}.
* The {@link HttpServer} is created using {@link KaryonTransport#newHttpServer(int, HttpRequestHandler)}
*
* @param port Port for the server.
* @param handler Request Handler
* @param bootstrapModules Additional bootstrapModules if any.
*
* @return {@link KaryonServer} which is to be used to start the created server.
*/
public static KaryonServer forRequestHandler(int port, final RequestHandler<ByteBuf, ByteBuf> handler,
BootstrapModule... bootstrapModules) {
HttpServer<ByteBuf, ByteBuf> httpServer =
KaryonTransport.newHttpServer(port, new RequestHandler<ByteBuf, ByteBuf>() {
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request,
HttpServerResponse<ByteBuf> response) {
return handler.handle(request, response);
}
});
return new RxNettyServerBackedServer(httpServer, bootstrapModules);
}
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
if (request.getPath().contains("/sendOK")) {
response.setStatus(HttpResponseStatus.OK);
} else if (request.getPath().contains("/sendNotFound")) {
response.setStatus(HttpResponseStatus.NOT_FOUND);
} else {
response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
}
return Observable.empty();
}