类io.reactivex.netty.protocol.http.server.HttpServerRequest源码实例Demo

下面列出了怎么用io.reactivex.netty.protocol.http.server.HttpServerRequest的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: ReactiveLab   文件: BookmarksService.java
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
    List<String> videoIds = request.getQueryParameters().get("videoId");

    int latency = 1;
    if (Random.randomIntFrom0to100() > 80) {
        latency = 10;
    }

    return Observable.from(videoIds).map(videoId -> {
        Map<String, Object> video = new HashMap<>();
        video.put("videoId", videoId);
        video.put("position", (int) (Math.random() * 5000));
        return video;
    }).flatMap(video -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(video) + "\n"))
            .delay(latency, TimeUnit.MILLISECONDS).doOnCompleted(response::close); // simulate latency
}
 
源代码2 项目: ReactiveLab   文件: GeoService.java
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
    return request.getContent().flatMap(i -> {
        List<String> ips = request.getQueryParameters().get("ip");
        Map<String, Object> data = new HashMap<>();
        for (String ip : ips) {
            Map<String, Object> ip_data = new HashMap<>();
            ip_data.put("country_code", "GB");
            ip_data.put("longitude", "-0.13");
            ip_data.put("latitude", "51.5");
            data.put(ip, ip_data);
        }
        return response.writeStringAndFlush("data: " + SimpleJson.mapToJson(data) + "\n")
                .doOnCompleted(response::close);
    }).delay(10, TimeUnit.MILLISECONDS);
}
 
源代码3 项目: ReactiveLab   文件: UserService.java
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
    List<String> userIds = request.getQueryParameters().get("userId");
    if (userIds == null || userIds.size() == 0) {
        return writeError(request, response, "At least one parameter of 'userId' must be included.");
    }
    return Observable.from(userIds).map(userId -> {
        Map<String, Object> user = new HashMap<>();
        user.put("userId", userId);
        user.put("name", "Name Here");
        user.put("other_data", "goes_here");
        return user;
    }).flatMap(user -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(user) + "\n")
            .doOnCompleted(response::close))
            .delay(((long) (Math.random() * 500) + 500), TimeUnit.MILLISECONDS); // simulate latency
}
 
源代码4 项目: ReactiveLab   文件: RouteForDeviceHome.java
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
    List<String> userId = request.getQueryParameters().get("userId");
    if (userId == null || userId.size() != 1) {
        return StartGatewayServer.writeError(request, response, "A single 'userId' is required.");
    }

    return new UserCommand(userId).observe().flatMap(user -> {
        Observable<Map<String, Object>> catalog = new PersonalizedCatalogCommand(user).observe()
                .flatMap(catalogList -> catalogList.videos().<Map<String, Object>> flatMap(
                        video -> {
                            Observable<Bookmark> bookmark = new BookmarkCommand(video).observe();
                            Observable<Rating> rating = new RatingsCommand(video).observe();
                            Observable<VideoMetadata> metadata = new VideoMetadataCommand(video).observe();
                            return Observable.zip(bookmark, rating, metadata, (b, r, m) -> combineVideoData(video, b, r, m));
                        }));

        Observable<Map<String, Object>> social = new SocialCommand(user).observe().map(s -> {
            return s.getDataAsMap();
        });

        return Observable.merge(catalog, social);
    }).flatMap(data -> {
        String json = SimpleJson.mapToJson(data);
        return response.writeStringAndFlush("data: " + json + "\n");
    });
}
 
源代码5 项目: ribbon   文件: RxMovieServer.java
public HttpServer<ByteBuf, ByteBuf> createServer() {
    HttpServer<ByteBuf, ByteBuf> server = RxNetty.newHttpServerBuilder(port, new RequestHandler<ByteBuf, ByteBuf>() {
        @Override
        public Observable<Void> handle(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
            if (request.getPath().contains("/users")) {
                if (request.getHttpMethod().equals(HttpMethod.GET)) {
                    return handleRecommendationsByUserId(request, response);
                } else {
                    return handleUpdateRecommendationsForUser(request, response);
                }
            }
            if (request.getPath().contains("/recommendations")) {
                return handleRecommendationsBy(request, response);
            }
            if (request.getPath().contains("/movies")) {
                return handleRegisterMovie(request, response);
            }
            response.setStatus(HttpResponseStatus.NOT_FOUND);
            return response.close();
        }
    }).pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpServerConfigurator()).enableWireLogging(LogLevel.ERROR).build();

    System.out.println("RxMovie server started...");
    return server;
}
 
源代码6 项目: ribbon   文件: RxMovieServer.java
private Observable<Void> handleRecommendationsByUserId(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
    System.out.println("HTTP request -> recommendations by user id request: " + request.getPath());
    final String userId = userIdFromPath(request.getPath());
    if (userId == null) {
        response.setStatus(HttpResponseStatus.BAD_REQUEST);
        return response.close();
    }
    if (!userRecommendations.containsKey(userId)) {
        response.setStatus(HttpResponseStatus.NOT_FOUND);
        return response.close();
    }

    StringBuilder builder = new StringBuilder();
    for (String movieId : userRecommendations.get(userId)) {
        System.out.println("    returning: " + movies.get(movieId));
        builder.append(movies.get(movieId)).append('\n');
    }

    ByteBuf byteBuf = UnpooledByteBufAllocator.DEFAULT.buffer();
    byteBuf.writeBytes(builder.toString().getBytes(Charset.defaultCharset()));

    response.write(byteBuf);
    return response.close();
}
 
源代码7 项目: ribbon   文件: RxMovieServer.java
private Observable<Void> handleRegisterMovie(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
    System.out.println("Http request -> register movie: " + request.getPath());
    return request.getContent().flatMap(new Func1<ByteBuf, Observable<Void>>() {
        @Override
        public Observable<Void> call(ByteBuf byteBuf) {
            String formatted = byteBuf.toString(Charset.defaultCharset());
            System.out.println("    movie: " + formatted);
            try {
                Movie movie = Movie.from(formatted);
                movies.put(movie.getId(), movie);
                response.setStatus(HttpResponseStatus.CREATED);
            } catch (Exception e) {
                System.err.println("Invalid movie content");
                e.printStackTrace();
                response.setStatus(HttpResponseStatus.BAD_REQUEST);
            }
            return response.close();
        }
    });
}
 
源代码8 项目: karyon   文件: GovernatorHttpInterceptorSupport.java
public GovernatorHttpInterceptorSupport<I, O> intercept(List<Class<? extends DuplexInterceptor<HttpServerRequest<I>, HttpServerResponse<O>>>> interceptors) {
    ArrayList<Class<? extends InboundInterceptor<HttpServerRequest<I>, HttpServerResponse<O>>>> ins =
            new ArrayList<Class<? extends InboundInterceptor<HttpServerRequest<I>, HttpServerResponse<O>>>>();
    ArrayList<Class<? extends OutboundInterceptor<HttpServerResponse<O>>>> outs =
            new ArrayList<Class<? extends OutboundInterceptor<HttpServerResponse<O>>>>();

    for (Class<? extends DuplexInterceptor<HttpServerRequest<I>, HttpServerResponse<O>>> interceptor : interceptors) {
        ins.add(interceptor);
        outs.add(interceptor);
    }

    HttpInClassHolder<I, O> inHolder = new HttpInClassHolder<I, O>(key, ins);
    interceptorSupport.inboundInterceptorClasses.add(inHolder);

    HttpOutClassHolder<I, O> outHolder = new HttpOutClassHolder<I, O>(key, outs);
    interceptorSupport.outboundInterceptorClasses.add(outHolder);
    return interceptorSupport;
}
 
源代码9 项目: karyon   文件: HttpKeyEvaluationContext.java
/**
 * Parses (if not done previously) and returns the path component in the URI.
 *
 * @param httpRequest HTTP request for which the URI path is to be returned.
 *
 * @return The path component of the URI (as returned by {@link HttpRequest#getUri()} or {@code null} if the
 * URI is null.
 */
String getRequestUriPath(HttpServerRequest<?> httpRequest) {
    String uri = httpRequest.getUri();
    if (null == uri) {
        return null;
    }

    if (null == queryStringDecoder) {
        if (null == channel) {
            queryStringDecoder = new QueryStringDecoder(uri);
        } else {
            queryStringDecoder = getOrCreateQueryStringDecoder(httpRequest);
        }
    }

    return queryStringDecoder.nettyDecoder().path();
}
 
源代码10 项目: karyon   文件: HttpKeyEvaluationContext.java
private QueryStringDecoder getOrCreateQueryStringDecoder(HttpServerRequest<?> request) {
    if (null == request) {
        throw new NullPointerException("Request can not be null.");
    }
    String uri = request.getUri();
    if (null == uri) {
        return null;
    }

    Attribute<QueryStringDecoder> queryDecoderAttr = channel.attr(queryDecoderKey);

    QueryStringDecoder _queryStringDecoder = queryDecoderAttr.get();

    if (null == _queryStringDecoder) {
        _queryStringDecoder = new QueryStringDecoder(uri);
        queryDecoderAttr.setIfAbsent(_queryStringDecoder);
    }
    return _queryStringDecoder;
}
 
源代码11 项目: karyon   文件: HttpKeyEvaluationContext.java
public static QueryStringDecoder getOrCreateQueryStringDecoder(HttpServerRequest<?> request,
                                                               ChannelHandlerContext channelHandlerContext) {
    if (null == request) {
        throw new NullPointerException("Request can not be null.");
    }

    String uri = request.getUri();
    if (null == uri) {
        return null;
    }

    Attribute<QueryStringDecoder> queryDecoderAttr = channelHandlerContext.attr(queryDecoderKey);

    QueryStringDecoder _queryStringDecoder = queryDecoderAttr.get();

    if (null == _queryStringDecoder) {
        _queryStringDecoder = new QueryStringDecoder(uri);
        queryDecoderAttr.setIfAbsent(_queryStringDecoder);
    }
    return _queryStringDecoder;
}
 
源代码12 项目: karyon   文件: HelloWorldEndpoint.java
public Observable<Void> sayHelloToUser(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
    JSONObject content = new JSONObject();

    int prefixLength = "/hello/to".length();
    String userName = request.getPath().substring(prefixLength);

    try {
        if (userName.isEmpty() || userName.length() == 1 /*The uri is /hello/to/ but no name */) {
            response.setStatus(HttpResponseStatus.BAD_REQUEST);
            content.put("Error", "Please provide a username to say hello. The URI should be /hello/to/{username}");
        } else {
            content.put("Message", "Hello " + userName.substring(1) /*Remove the / prefix*/ + " from Netflix OSS");
        }
    } catch (JSONException e) {
        logger.error("Error creating json response.", e);
        return Observable.error(e);
    }

    response.write(content.toString(), StringTransformer.DEFAULT_INSTANCE);
    return response.close();

}
 
源代码13 项目: WSPerfLab   文件: StartMockService.java
protected static int getParameter(HttpServerRequest<?> request, String key, int defaultValue) {
    List<String> v = request.getQueryParameters().get(key);
    if (v == null || v.size() != 1) {
        return defaultValue;
    } else {
        return Integer.parseInt(String.valueOf(v.get(0)));
    }
}
 
源代码14 项目: netty-cookbook   文件: NettyRxJavaServer.java
public static void main(String... args) throws InterruptedException {
    HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(8080, 
    		(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) -> {
        System.out.println("Server => Request: " + request.getPath());            
        try {
            if ("/error".equals(request.getPath())) {
                throw new RuntimeException("forced error");
            }
            response.setStatus(HttpResponseStatus.OK);
            response.writeString("Path Requested =>: " + request.getPath() + '\n');
            return response.close();
        } catch (Throwable e) {
            System.err.println("Server => Error [" + request.getPath() + "] => " + e);
            response.setStatus(HttpResponseStatus.BAD_REQUEST);
            response.writeString("Error 500: Bad Request\n");
            return response.close();
        }
    });

    server.startAndWait();

    RxNetty.createHttpGet("http://localhost:8080/")
           .flatMap(response -> response.getContent())
           .map(data -> "Client => " + data.toString(Charset.defaultCharset()))
           .toBlocking().forEach(System.out::println);

    RxNetty.createHttpGet("http://localhost:8080/error")
           .flatMap(response -> response.getContent())
           .map(data -> "Client => " + data.toString(Charset.defaultCharset()))
           .toBlocking().forEach(System.out::println);

    RxNetty.createHttpGet("http://localhost:8080/data")
           .flatMap(response -> response.getContent())
           .map(data -> "Client => " + data.toString(Charset.defaultCharset()))
           .toBlocking().forEach(System.out::println);

    //server.shutdown();
}
 
源代码15 项目: ReactiveLab   文件: HystrixMetricsStreamHandler.java
@Override
public Observable<Void> handle(HttpServerRequest<I> request, HttpServerResponse<O> response) {
    if (request.getPath().endsWith(hystrixPrefix)) {
        return handleHystrixRequest(response);
    }
    return appHandler.handle(request, response);
}
 
源代码16 项目: ReactiveLab   文件: AbstractMiddleTierService.java
protected static int getParameter(HttpServerRequest<?> request, String key, int defaultValue) {
    List<String> v = request.getQueryParameters().get(key);
    if (v == null || v.size() != 1) {
        return defaultValue;
    } else {
        return Integer.parseInt(String.valueOf(v.get(0)));
    }
}
 
源代码17 项目: ReactiveLab   文件: PersonalizedCatalogService.java
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
    return Observable.from(request.getQueryParameters().get("userId")).map(userId -> {
        Map<String, Object> userData = new HashMap<>();
        userData.put("user_id", userId);

        userData.put("list_title", "Really quirky and over detailed list title!");
        userData.put("other_data", "goes_here");
        userData.put("videos", Arrays.asList(12345, 23456, 34567, 45678, 56789, 67890));
        return userData;
    }).flatMap(list -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(list) + "\n"))
            .delay(((long) (Math.random() * 100) + 20), TimeUnit.MILLISECONDS)
            .doOnCompleted(response::close); // simulate latency
}
 
源代码18 项目: ReactiveLab   文件: SocialService.java
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
    return Observable.from(request.getQueryParameters().get("userId")).map(userId -> {
        Map<String, Object> user = new HashMap<>();
        user.put("userId", userId);
        user.put("friends", Arrays.asList(randomUser(), randomUser(), randomUser(), randomUser()));
        return user;
    }).flatMap(list -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(list) + "\n"))
            .delay(((long) (Math.random() * 100) + 20), TimeUnit.MILLISECONDS).doOnCompleted(response::close); // simulate latency
}
 
源代码19 项目: ReactiveLab   文件: VideoMetadataService.java
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
    List<String> videoIds = request.getQueryParameters().get("videoId");
    return Observable.from(videoIds).map(videoId -> {
        Map<String, Object> video = new HashMap<>();
        video.put("videoId", videoId);
        video.put("title", "Video Title");
        video.put("other_data", "goes_here");
        return video;
    }).flatMap(video -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(video) + "\n")
            .doOnCompleted(response::close))
            .delay(((long) (Math.random() * 20) + 20), TimeUnit.MILLISECONDS); // simulate latency
}
 
源代码20 项目: ReactiveLab   文件: MockService.java
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
    List<String> _id = request.getQueryParameters().get("id");
    if (_id == null || _id.size() != 1) {
        return writeError(request, response, "Please provide a numerical 'id' value. It can be a random number (uuid). Received => " + _id);
    }
    long id = Long.parseLong(String.valueOf(_id.get(0)));

    int delay = getParameter(request, "delay", 50); // default to 50ms server-side delay
    int itemSize = getParameter(request, "itemSize", 128); // default to 128 bytes item size (assuming ascii text)
    int numItems = getParameter(request, "numItems", 10); // default to 10 items in a list

    // no more than 100 items
    if (numItems < 1 || numItems > 100) {
        return writeError(request, response, "Please choose a 'numItems' value from 1 to 100.");
    }

    // no larger than 50KB per item
    if (itemSize < 1 || itemSize > 1024 * 50) {
        return writeError(request, response, "Please choose an 'itemSize' value from 1 to 1024*50 (50KB).");
    }

    // no larger than 60 second delay
    if (delay < 0 || delay > 60000) {
        return writeError(request, response, "Please choose a 'delay' value from 0 to 60000 (60 seconds).");
    }

    response.setStatus(HttpResponseStatus.OK);
    return MockResponse.generateJson(id, delay, itemSize, numItems)
            .flatMap(json -> response.writeStringAndFlush("data:" + json + "\n"))
            .doOnCompleted(response::close);
}
 
源代码21 项目: ReactiveLab   文件: RatingsService.java
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
    List<String> videoIds = request.getQueryParameters().get("videoId");
    return Observable.from(videoIds).map(videoId -> {
        Map<String, Object> video = new HashMap<>();
        video.put("videoId", videoId);
        video.put("estimated_user_rating", 3.5);
        video.put("actual_user_rating", 4);
        video.put("average_user_rating", 3.1);
        return video;
    }).flatMap(video -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(video) + "\n"))
            .delay(20, TimeUnit.MILLISECONDS).doOnCompleted(response::close); // simulate latenc
}
 
源代码22 项目: ReactiveLab   文件: HystrixMetricsStreamHandler.java
public HystrixMetricsStreamHandler(String hystrixPrefix, long intervalInMillis) {
    this.hystrixPrefix = hystrixPrefix;
    this.interval = intervalInMillis;
    this.appHandler = (HttpServerRequest<I> request, HttpServerResponse<O> response) -> {
        return response.writeStringAndFlush("Only supported path is /" + hystrixPrefix);
    };
}
 
源代码23 项目: ReactiveLab   文件: HystrixMetricsStreamHandler.java
@Override
public Observable<Void> handle(HttpServerRequest<I> request, HttpServerResponse<O> response) {
    if (request.getPath().startsWith(hystrixPrefix)) {
        return handleHystrixRequest(response);
    }
    return appHandler.handle(request, response);
}
 
源代码24 项目: ReactiveLab   文件: StartGatewayServer.java
/**
 * Hard-coded route handling.
 */
private static Observable<Void> handleRoutes(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
    if (request.getPath().equals("/device/home")) {
        return new RouteForDeviceHome().handle(request, response);
    } else if (request.getPath().equals("/testBasic")) {
        return TestRouteBasic.handle(request, response);
    } else if (request.getPath().equals("/testWithSimpleFaultTolerance")) {
        return TestRouteWithSimpleFaultTolerance.handle(request, response);
    } else if (request.getPath().equals("/testWithHystrix")) {
        return TestRouteWithHystrix.handle(request, response);
    } else {
        return writeError(request, response, "Unknown path: " + request.getPath());
    }
}
 
源代码25 项目: Prana   文件: ProxyHandler.java
private void populateRequestHeaders(HttpServerRequest<ByteBuf> serverRequest, HttpClientRequest<ByteBuf> request) {
    Set<String> headerNames = serverRequest.getHeaders().names();
    for (String name : headerNames) {
        if (name.contains("content-length")) {
            continue;
        }
        request.getHeaders().add(name, serverRequest.getHeaders().getHeader(name));
    }
    // Normally always request gzipped from the server. But can be overridden with a Dynamic Property.
    if (PROXY_REQ_ACCEPT_ENCODING != null && PROXY_REQ_ACCEPT_ENCODING.length() > 0) {
        request.getHeaders().addHeader("accept-encoding", PROXY_REQ_ACCEPT_ENCODING);
    }
    //TODO Write X-Forwarded-Host, X-Forwarded-Port, X-Forwarded-Proto, X-Forwarded-For in the headers
}
 
源代码26 项目: ribbon   文件: RxMovieServer.java
private Observable<Void> handleUpdateRecommendationsForUser(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
    System.out.println("HTTP request -> update recommendations for user: " + request.getPath());
    final String userId = userIdFromPath(request.getPath());
    if (userId == null) {
        response.setStatus(HttpResponseStatus.BAD_REQUEST);
        return response.close();
    }
    return request.getContent().flatMap(new Func1<ByteBuf, Observable<Void>>() {
        @Override
        public Observable<Void> call(ByteBuf byteBuf) {
            String movieId = byteBuf.toString(Charset.defaultCharset());
            System.out.println(format("    updating: {user=%s, movie=%s}", userId, movieId));
            synchronized (this) {
                Set<String> recommendations;
                if (userRecommendations.containsKey(userId)) {
                    recommendations = userRecommendations.get(userId);
                } else {
                    recommendations = new ConcurrentSet<String>();
                    userRecommendations.put(userId, recommendations);
                }
                recommendations.add(movieId);
            }
            response.setStatus(HttpResponseStatus.OK);
            return response.close();
        }
    });
}
 
源代码27 项目: WSPerfLab   文件: StartMockService.java
private static Observable<Void> handleRequest(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {

        if (request.getUri().startsWith("/hello")) {
            return response.writeStringAndFlush("Hello world!");
        }

        List<String> _id = request.getQueryParameters().get("id");
        if (_id == null || _id.size() != 1) {
            return writeError(request, response,
                              "Please provide a numerical 'id' value. It can be a random number (uuid). Received => "
                              + _id);
        }
        long id = Long.parseLong(String.valueOf(_id.get(0)));

        int delay = getParameter(request, "delay", 50); // default to 50ms server-side delay
        int itemSize = getParameter(request, "itemSize", 128); // default to 128 bytes item size (assuming ascii text)
        int numItems = getParameter(request, "numItems", 10); // default to 10 items in a list

        // no more than 100 items
        if (numItems < 1 || numItems > 100) {
            return writeError(request, response, "Please choose a 'numItems' value from 1 to 100.");
        }

        // no larger than 50KB per item
        if (itemSize < 1 || itemSize > 1024 * 50) {
            return writeError(request, response, "Please choose an 'itemSize' value from 1 to 1024*50 (50KB).");
        }

        // no larger than 60 second delay
        if (delay < 0 || delay > 60000) {
            return writeError(request, response, "Please choose a 'delay' value from 0 to 60000 (60 seconds).");
        }

        response.setStatus(HttpResponseStatus.OK);
        return MockResponse.generateJson(id, delay, itemSize, numItems)
                           .doOnNext(json -> counter.add(CounterEvent.BYTES, json.readableBytes()))
                           .flatMap(response::writeAndFlush)
                           .doOnTerminate(response::close);
    }
 
源代码28 项目: karyon   文件: GovernatorHttpInterceptorSupport.java
@SuppressWarnings("unchecked")
public GovernatorHttpInterceptorSupport<I, O> intercept(Class<? extends DuplexInterceptor<HttpServerRequest<I>, HttpServerResponse<O>>> interceptor) {
    ArrayList<Class<? extends DuplexInterceptor<HttpServerRequest<I>, HttpServerResponse<O>>>> interceptors =
            new ArrayList<Class<? extends DuplexInterceptor<HttpServerRequest<I>, HttpServerResponse<O>>>>();
    interceptors.add(interceptor);
    return intercept(interceptors);
}
 
源代码29 项目: karyon   文件: Karyon.java
/**
 * Creates a new {@link KaryonServer} that has a single HTTP server instance which delegates all request
 * handling to {@link RequestHandler}.
 * The {@link HttpServer} is created using {@link KaryonTransport#newHttpServer(int, HttpRequestHandler)}
 *
 * @param port Port for the server.
 * @param handler Request Handler
 * @param bootstrapModules Additional bootstrapModules if any.
 *
 * @return {@link KaryonServer} which is to be used to start the created server.
 */
public static KaryonServer forRequestHandler(int port, final RequestHandler<ByteBuf, ByteBuf> handler,
                                             BootstrapModule... bootstrapModules) {
    HttpServer<ByteBuf, ByteBuf> httpServer =
            KaryonTransport.newHttpServer(port, new RequestHandler<ByteBuf, ByteBuf>() {
                @Override
                public Observable<Void> handle(HttpServerRequest<ByteBuf> request,
                                               HttpServerResponse<ByteBuf> response) {
                    return handler.handle(request, response);
                }
            });
    return new RxNettyServerBackedServer(httpServer, bootstrapModules);
}
 
源代码30 项目: karyon   文件: KaryonHttpModuleTest.java
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
    if (request.getPath().contains("/sendOK")) {
        response.setStatus(HttpResponseStatus.OK);
    } else if (request.getPath().contains("/sendNotFound")) {
        response.setStatus(HttpResponseStatus.NOT_FOUND);
    } else {
        response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
    }
    return Observable.empty();
}
 
 类所在包
 类方法
 同包方法