下面列出了io.reactivex.netty.protocol.http.server.RequestHandler#io.reactivex.netty.protocol.http.server.HttpServerResponse 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private Observable<Void> getIntervalObservable(HttpServerRequest<?> request, final HttpServerResponse<ServerSentEvent> response) {
HttpRequest simpleRequest = new HttpRequest(request.getQueryParameters());
return getEvents(simpleRequest)
.flatMap(event -> {
System.out.println("Writing SSE event: " + event);
ByteBuf data = response.getAllocator().buffer().writeBytes(( event + "\n").getBytes());
ServerSentEvent sse = new ServerSentEvent(data);
return response.writeAndFlush(sse);
}).materialize()
.takeWhile(notification -> {
if (notification.isOnError()) {
System.out.println("Write to client failed, stopping response sending.");
notification.getThrowable().printStackTrace(System.err);
}
return !notification.isOnError();
})
.map((Func1<Notification<Void>, Void>) notification -> null);
}
private Observable<Void> handleHystrixRequest(final HttpServerResponse<O> response) {
writeHeaders(response);
final Subject<Void, Void> subject = PublishSubject.create();
final MultipleAssignmentSubscription subscription = new MultipleAssignmentSubscription();
Subscription actionSubscription = Observable.timer(0, interval, TimeUnit.MILLISECONDS, Schedulers.computation())
.subscribe(new Action1<Long>() {
@Override
public void call(Long tick) {
if (!response.getChannel().isOpen()) {
subscription.unsubscribe();
return;
}
try {
writeMetric(JsonMapper.toJson(metrics), response);
} catch (Exception e) {
subject.onError(e);
}
}
});
subscription.set(actionSubscription);
return subject;
}
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
List<String> videoIds = request.getQueryParameters().get("videoId");
int latency = 1;
if (Random.randomIntFrom0to100() > 80) {
latency = 10;
}
return Observable.from(videoIds).map(videoId -> {
Map<String, Object> video = new HashMap<>();
video.put("videoId", videoId);
video.put("position", (int) (Math.random() * 5000));
return video;
}).flatMap(video -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(video) + "\n"))
.delay(latency, TimeUnit.MILLISECONDS).doOnCompleted(response::close); // simulate latency
}
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
return request.getContent().flatMap(i -> {
List<String> ips = request.getQueryParameters().get("ip");
Map<String, Object> data = new HashMap<>();
for (String ip : ips) {
Map<String, Object> ip_data = new HashMap<>();
ip_data.put("country_code", "GB");
ip_data.put("longitude", "-0.13");
ip_data.put("latitude", "51.5");
data.put(ip, ip_data);
}
return response.writeStringAndFlush("data: " + SimpleJson.mapToJson(data) + "\n")
.doOnCompleted(response::close);
}).delay(10, TimeUnit.MILLISECONDS);
}
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
List<String> userIds = request.getQueryParameters().get("userId");
if (userIds == null || userIds.size() == 0) {
return writeError(request, response, "At least one parameter of 'userId' must be included.");
}
return Observable.from(userIds).map(userId -> {
Map<String, Object> user = new HashMap<>();
user.put("userId", userId);
user.put("name", "Name Here");
user.put("other_data", "goes_here");
return user;
}).flatMap(user -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(user) + "\n")
.doOnCompleted(response::close))
.delay(((long) (Math.random() * 500) + 500), TimeUnit.MILLISECONDS); // simulate latency
}
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
List<String> userId = request.getQueryParameters().get("userId");
if (userId == null || userId.size() != 1) {
return StartGatewayServer.writeError(request, response, "A single 'userId' is required.");
}
return new UserCommand(userId).observe().flatMap(user -> {
Observable<Map<String, Object>> catalog = new PersonalizedCatalogCommand(user).observe()
.flatMap(catalogList -> catalogList.videos().<Map<String, Object>> flatMap(
video -> {
Observable<Bookmark> bookmark = new BookmarkCommand(video).observe();
Observable<Rating> rating = new RatingsCommand(video).observe();
Observable<VideoMetadata> metadata = new VideoMetadataCommand(video).observe();
return Observable.zip(bookmark, rating, metadata, (b, r, m) -> combineVideoData(video, b, r, m));
}));
Observable<Map<String, Object>> social = new SocialCommand(user).observe().map(s -> {
return s.getDataAsMap();
});
return Observable.merge(catalog, social);
}).flatMap(data -> {
String json = SimpleJson.mapToJson(data);
return response.writeStringAndFlush("data: " + json + "\n");
});
}
public HttpServer<ByteBuf, ByteBuf> createServer() {
HttpServer<ByteBuf, ByteBuf> server = RxNetty.newHttpServerBuilder(port, new RequestHandler<ByteBuf, ByteBuf>() {
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
if (request.getPath().contains("/users")) {
if (request.getHttpMethod().equals(HttpMethod.GET)) {
return handleRecommendationsByUserId(request, response);
} else {
return handleUpdateRecommendationsForUser(request, response);
}
}
if (request.getPath().contains("/recommendations")) {
return handleRecommendationsBy(request, response);
}
if (request.getPath().contains("/movies")) {
return handleRegisterMovie(request, response);
}
response.setStatus(HttpResponseStatus.NOT_FOUND);
return response.close();
}
}).pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpServerConfigurator()).enableWireLogging(LogLevel.ERROR).build();
System.out.println("RxMovie server started...");
return server;
}
private Observable<Void> handleRecommendationsByUserId(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
System.out.println("HTTP request -> recommendations by user id request: " + request.getPath());
final String userId = userIdFromPath(request.getPath());
if (userId == null) {
response.setStatus(HttpResponseStatus.BAD_REQUEST);
return response.close();
}
if (!userRecommendations.containsKey(userId)) {
response.setStatus(HttpResponseStatus.NOT_FOUND);
return response.close();
}
StringBuilder builder = new StringBuilder();
for (String movieId : userRecommendations.get(userId)) {
System.out.println(" returning: " + movies.get(movieId));
builder.append(movies.get(movieId)).append('\n');
}
ByteBuf byteBuf = UnpooledByteBufAllocator.DEFAULT.buffer();
byteBuf.writeBytes(builder.toString().getBytes(Charset.defaultCharset()));
response.write(byteBuf);
return response.close();
}
private Observable<Void> handleRegisterMovie(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
System.out.println("Http request -> register movie: " + request.getPath());
return request.getContent().flatMap(new Func1<ByteBuf, Observable<Void>>() {
@Override
public Observable<Void> call(ByteBuf byteBuf) {
String formatted = byteBuf.toString(Charset.defaultCharset());
System.out.println(" movie: " + formatted);
try {
Movie movie = Movie.from(formatted);
movies.put(movie.getId(), movie);
response.setStatus(HttpResponseStatus.CREATED);
} catch (Exception e) {
System.err.println("Invalid movie content");
e.printStackTrace();
response.setStatus(HttpResponseStatus.BAD_REQUEST);
}
return response.close();
}
});
}
public GovernatorHttpInterceptorSupport<I, O> intercept(List<Class<? extends DuplexInterceptor<HttpServerRequest<I>, HttpServerResponse<O>>>> interceptors) {
ArrayList<Class<? extends InboundInterceptor<HttpServerRequest<I>, HttpServerResponse<O>>>> ins =
new ArrayList<Class<? extends InboundInterceptor<HttpServerRequest<I>, HttpServerResponse<O>>>>();
ArrayList<Class<? extends OutboundInterceptor<HttpServerResponse<O>>>> outs =
new ArrayList<Class<? extends OutboundInterceptor<HttpServerResponse<O>>>>();
for (Class<? extends DuplexInterceptor<HttpServerRequest<I>, HttpServerResponse<O>>> interceptor : interceptors) {
ins.add(interceptor);
outs.add(interceptor);
}
HttpInClassHolder<I, O> inHolder = new HttpInClassHolder<I, O>(key, ins);
interceptorSupport.inboundInterceptorClasses.add(inHolder);
HttpOutClassHolder<I, O> outHolder = new HttpOutClassHolder<I, O>(key, outs);
interceptorSupport.outboundInterceptorClasses.add(outHolder);
return interceptorSupport;
}
ContainerResponseWriter bridgeResponse(final HttpServerResponse<ByteBuf> serverResponse) {
return new ContainerResponseWriter() {
private final ByteBuf contentBuffer = serverResponse.getChannel().alloc().buffer();
@Override
public OutputStream writeStatusAndHeaders(long contentLength, ContainerResponse response) {
int responseStatus = response.getStatus();
serverResponse.setStatus(HttpResponseStatus.valueOf(responseStatus));
HttpResponseHeaders responseHeaders = serverResponse.getHeaders();
for(Map.Entry<String, List<Object>> header : response.getHttpHeaders().entrySet()){
responseHeaders.setHeader(header.getKey(), header.getValue());
}
return new ByteBufOutputStream(contentBuffer);
}
@Override
public void finish() {
serverResponse.writeAndFlush(contentBuffer);
}
};
}
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
if (request.getUri().startsWith(healthCheckUri)) {
return healthCheckEndpoint.handle(request, response);
} else if (request.getUri().startsWith("/hello/to/")) {
int prefixLength = "/hello/to".length();
String userName = request.getPath().substring(prefixLength);
if (userName.isEmpty() || userName.length() == 1 /*The uri is /hello/to/ but no name */) {
response.setStatus(HttpResponseStatus.BAD_REQUEST);
return response.writeStringAndFlush(
"{\"Error\":\"Please provide a username to say hello. The URI should be /hello/to/{username}\"}");
} else {
String msg = "Hello " + userName.substring(1) /*Remove the / prefix*/ + " from Netflix OSS";
return response.writeStringAndFlush("{\"Message\":\"" + msg + "\"}");
}
} else if (request.getUri().startsWith("/hello")) {
return response.writeStringAndFlush("{\"Message\":\"Hello newbee from Netflix OSS\"}");
} else {
response.setStatus(HttpResponseStatus.NOT_FOUND);
return response.close();
}
}
public Observable<Void> sayHelloToUser(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
JSONObject content = new JSONObject();
int prefixLength = "/hello/to".length();
String userName = request.getPath().substring(prefixLength);
try {
if (userName.isEmpty() || userName.length() == 1 /*The uri is /hello/to/ but no name */) {
response.setStatus(HttpResponseStatus.BAD_REQUEST);
content.put("Error", "Please provide a username to say hello. The URI should be /hello/to/{username}");
} else {
content.put("Message", "Hello " + userName.substring(1) /*Remove the / prefix*/ + " from Netflix OSS");
}
} catch (JSONException e) {
logger.error("Error creating json response.", e);
return Observable.error(e);
}
response.write(content.toString(), StringTransformer.DEFAULT_INSTANCE);
return response.close();
}
public static void main(String... args) throws InterruptedException {
HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(8080,
(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) -> {
System.out.println("Server => Request: " + request.getPath());
try {
if ("/error".equals(request.getPath())) {
throw new RuntimeException("forced error");
}
response.setStatus(HttpResponseStatus.OK);
response.writeString("Path Requested =>: " + request.getPath() + '\n');
return response.close();
} catch (Throwable e) {
System.err.println("Server => Error [" + request.getPath() + "] => " + e);
response.setStatus(HttpResponseStatus.BAD_REQUEST);
response.writeString("Error 500: Bad Request\n");
return response.close();
}
});
server.startAndWait();
RxNetty.createHttpGet("http://localhost:8080/")
.flatMap(response -> response.getContent())
.map(data -> "Client => " + data.toString(Charset.defaultCharset()))
.toBlocking().forEach(System.out::println);
RxNetty.createHttpGet("http://localhost:8080/error")
.flatMap(response -> response.getContent())
.map(data -> "Client => " + data.toString(Charset.defaultCharset()))
.toBlocking().forEach(System.out::println);
RxNetty.createHttpGet("http://localhost:8080/data")
.flatMap(response -> response.getContent())
.map(data -> "Client => " + data.toString(Charset.defaultCharset()))
.toBlocking().forEach(System.out::println);
//server.shutdown();
}
@Override
public Observable<Void> handle(HttpServerRequest<I> request, HttpServerResponse<O> response) {
if (request.getPath().endsWith(hystrixPrefix)) {
return handleHystrixRequest(response);
}
return appHandler.handle(request, response);
}
@SuppressWarnings("unchecked")
private void writeMetric(String json, HttpServerResponse<O> response) {
byte[] bytes = json.getBytes(Charset.defaultCharset());
ByteBuf byteBuf = UnpooledByteBufAllocator.DEFAULT.buffer(bytes.length + EXTRA_SPACE);
byteBuf.writeBytes(HEADER);
byteBuf.writeBytes(bytes);
byteBuf.writeBytes(FOOTER);
response.writeAndFlush((O) byteBuf);
}
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
return Observable.from(request.getQueryParameters().get("userId")).map(userId -> {
Map<String, Object> userData = new HashMap<>();
userData.put("user_id", userId);
userData.put("list_title", "Really quirky and over detailed list title!");
userData.put("other_data", "goes_here");
userData.put("videos", Arrays.asList(12345, 23456, 34567, 45678, 56789, 67890));
return userData;
}).flatMap(list -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(list) + "\n"))
.delay(((long) (Math.random() * 100) + 20), TimeUnit.MILLISECONDS)
.doOnCompleted(response::close); // simulate latency
}
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
return Observable.from(request.getQueryParameters().get("userId")).map(userId -> {
Map<String, Object> user = new HashMap<>();
user.put("userId", userId);
user.put("friends", Arrays.asList(randomUser(), randomUser(), randomUser(), randomUser()));
return user;
}).flatMap(list -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(list) + "\n"))
.delay(((long) (Math.random() * 100) + 20), TimeUnit.MILLISECONDS).doOnCompleted(response::close); // simulate latency
}
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
List<String> videoIds = request.getQueryParameters().get("videoId");
return Observable.from(videoIds).map(videoId -> {
Map<String, Object> video = new HashMap<>();
video.put("videoId", videoId);
video.put("title", "Video Title");
video.put("other_data", "goes_here");
return video;
}).flatMap(video -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(video) + "\n")
.doOnCompleted(response::close))
.delay(((long) (Math.random() * 20) + 20), TimeUnit.MILLISECONDS); // simulate latency
}
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
List<String> _id = request.getQueryParameters().get("id");
if (_id == null || _id.size() != 1) {
return writeError(request, response, "Please provide a numerical 'id' value. It can be a random number (uuid). Received => " + _id);
}
long id = Long.parseLong(String.valueOf(_id.get(0)));
int delay = getParameter(request, "delay", 50); // default to 50ms server-side delay
int itemSize = getParameter(request, "itemSize", 128); // default to 128 bytes item size (assuming ascii text)
int numItems = getParameter(request, "numItems", 10); // default to 10 items in a list
// no more than 100 items
if (numItems < 1 || numItems > 100) {
return writeError(request, response, "Please choose a 'numItems' value from 1 to 100.");
}
// no larger than 50KB per item
if (itemSize < 1 || itemSize > 1024 * 50) {
return writeError(request, response, "Please choose an 'itemSize' value from 1 to 1024*50 (50KB).");
}
// no larger than 60 second delay
if (delay < 0 || delay > 60000) {
return writeError(request, response, "Please choose a 'delay' value from 0 to 60000 (60 seconds).");
}
response.setStatus(HttpResponseStatus.OK);
return MockResponse.generateJson(id, delay, itemSize, numItems)
.flatMap(json -> response.writeStringAndFlush("data:" + json + "\n"))
.doOnCompleted(response::close);
}
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
List<String> videoIds = request.getQueryParameters().get("videoId");
return Observable.from(videoIds).map(videoId -> {
Map<String, Object> video = new HashMap<>();
video.put("videoId", videoId);
video.put("estimated_user_rating", 3.5);
video.put("actual_user_rating", 4);
video.put("average_user_rating", 3.1);
return video;
}).flatMap(video -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(video) + "\n"))
.delay(20, TimeUnit.MILLISECONDS).doOnCompleted(response::close); // simulate latenc
}
/**
* Add various headers used for logging and statistics.
*/
public static void addResponseHeaders(HttpServerResponse<?> response, long startTime) {
System.out.println("response headers");
Map<String, String> perfResponseHeaders = getPerfResponseHeaders(startTime);
for (Map.Entry<String, String> entry : perfResponseHeaders.entrySet()) {
response.getHeaders().add(entry.getKey(), entry.getValue());
}
}
public HystrixMetricsStreamHandler(String hystrixPrefix, long intervalInMillis) {
this.hystrixPrefix = hystrixPrefix;
this.interval = intervalInMillis;
this.appHandler = (HttpServerRequest<I> request, HttpServerResponse<O> response) -> {
return response.writeStringAndFlush("Only supported path is /" + hystrixPrefix);
};
}
@Override
public Observable<Void> handle(HttpServerRequest<I> request, HttpServerResponse<O> response) {
if (request.getPath().startsWith(hystrixPrefix)) {
return handleHystrixRequest(response);
}
return appHandler.handle(request, response);
}
private Observable<Void> handleHystrixRequest(final HttpServerResponse<O> response) {
writeHeaders(response);
final Subject<Void, Void> subject = PublishSubject.create();
final MultipleAssignmentSubscription subscription = new MultipleAssignmentSubscription();
Subscription actionSubscription = Observable.timer(0, interval, TimeUnit.MILLISECONDS, Schedulers.computation())
.subscribe(new Action1<Long>() {
@Override
public void call(Long tick) {
if (!response.getChannel().isOpen()) {
subscription.unsubscribe();
return;
}
try {
for (HystrixCommandMetrics commandMetrics : HystrixCommandMetrics.getInstances()) {
writeMetric(JsonMapper.toJson(commandMetrics), response);
}
for (HystrixThreadPoolMetrics threadPoolMetrics : HystrixThreadPoolMetrics.getInstances()) {
writeMetric(JsonMapper.toJson(threadPoolMetrics), response);
}
} catch (Exception e) {
subject.onError(e);
}
}
});
subscription.set(actionSubscription);
return subject;
}
@SuppressWarnings("unchecked")
private void writeMetric(String json, HttpServerResponse<O> response) {
byte[] bytes = json.getBytes(Charset.defaultCharset());
ByteBuf byteBuf = UnpooledByteBufAllocator.DEFAULT.buffer(bytes.length + EXTRA_SPACE);
byteBuf.writeBytes(HEADER);
byteBuf.writeBytes(bytes);
byteBuf.writeBytes(FOOTER);
response.writeAndFlush((O) byteBuf);
}
/**
* Hard-coded route handling.
*/
private static Observable<Void> handleRoutes(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
if (request.getPath().equals("/device/home")) {
return new RouteForDeviceHome().handle(request, response);
} else if (request.getPath().equals("/testBasic")) {
return TestRouteBasic.handle(request, response);
} else if (request.getPath().equals("/testWithSimpleFaultTolerance")) {
return TestRouteWithSimpleFaultTolerance.handle(request, response);
} else if (request.getPath().equals("/testWithHystrix")) {
return TestRouteWithHystrix.handle(request, response);
} else {
return writeError(request, response, "Unknown path: " + request.getPath());
}
}
private Observable<Void> handleUpdateRecommendationsForUser(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
System.out.println("HTTP request -> update recommendations for user: " + request.getPath());
final String userId = userIdFromPath(request.getPath());
if (userId == null) {
response.setStatus(HttpResponseStatus.BAD_REQUEST);
return response.close();
}
return request.getContent().flatMap(new Func1<ByteBuf, Observable<Void>>() {
@Override
public Observable<Void> call(ByteBuf byteBuf) {
String movieId = byteBuf.toString(Charset.defaultCharset());
System.out.println(format(" updating: {user=%s, movie=%s}", userId, movieId));
synchronized (this) {
Set<String> recommendations;
if (userRecommendations.containsKey(userId)) {
recommendations = userRecommendations.get(userId);
} else {
recommendations = new ConcurrentSet<String>();
userRecommendations.put(userId, recommendations);
}
recommendations.add(movieId);
}
response.setStatus(HttpResponseStatus.OK);
return response.close();
}
});
}
@SuppressWarnings("unchecked")
public GovernatorHttpInterceptorSupport<I, O> intercept(Class<? extends DuplexInterceptor<HttpServerRequest<I>, HttpServerResponse<O>>> interceptor) {
ArrayList<Class<? extends DuplexInterceptor<HttpServerRequest<I>, HttpServerResponse<O>>>> interceptors =
new ArrayList<Class<? extends DuplexInterceptor<HttpServerRequest<I>, HttpServerResponse<O>>>>();
interceptors.add(interceptor);
return intercept(interceptors);
}
/**
* Creates a new {@link KaryonServer} that has a single HTTP server instance which delegates all request
* handling to {@link RequestHandler}.
* The {@link HttpServer} is created using {@link KaryonTransport#newHttpServer(int, HttpRequestHandler)}
*
* @param port Port for the server.
* @param handler Request Handler
* @param bootstrapModules Additional bootstrapModules if any.
*
* @return {@link KaryonServer} which is to be used to start the created server.
*/
public static KaryonServer forRequestHandler(int port, final RequestHandler<ByteBuf, ByteBuf> handler,
BootstrapModule... bootstrapModules) {
HttpServer<ByteBuf, ByteBuf> httpServer =
KaryonTransport.newHttpServer(port, new RequestHandler<ByteBuf, ByteBuf>() {
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request,
HttpServerResponse<ByteBuf> response) {
return handler.handle(request, response);
}
});
return new RxNettyServerBackedServer(httpServer, bootstrapModules);
}