类io.reactivex.netty.pipeline.PipelineConfigurators源码实例Demo

下面列出了怎么用io.reactivex.netty.pipeline.PipelineConfigurators的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: Prana   文件: HealthCheckHandler.java
private Observable<HttpClientResponse<ByteBuf>> getResponse(String externalHealthCheckURL) {
    String host = "localhost";
    int port = DEFAULT_APPLICATION_PORT;
    String path = "/healthcheck";
    try {
        URL url = new URL(externalHealthCheckURL);
        host = url.getHost();
        port = url.getPort();
        path = url.getPath();
    } catch (MalformedURLException e) {
        //continue
    }
    Integer timeout = DynamicProperty.getInstance("prana.host.healthcheck.timeout").getInteger(DEFAULT_CONNECTION_TIMEOUT);
    HttpClient<ByteBuf, ByteBuf> httpClient = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder(host, port)
            .pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpClientConfigurator())
            .channelOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout)
            .build();
    return httpClient.submit(HttpClientRequest.createGet(path));

}
 
源代码2 项目: ribbon   文件: RxMovieServer.java
public HttpServer<ByteBuf, ByteBuf> createServer() {
    HttpServer<ByteBuf, ByteBuf> server = RxNetty.newHttpServerBuilder(port, new RequestHandler<ByteBuf, ByteBuf>() {
        @Override
        public Observable<Void> handle(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
            if (request.getPath().contains("/users")) {
                if (request.getHttpMethod().equals(HttpMethod.GET)) {
                    return handleRecommendationsByUserId(request, response);
                } else {
                    return handleUpdateRecommendationsForUser(request, response);
                }
            }
            if (request.getPath().contains("/recommendations")) {
                return handleRecommendationsBy(request, response);
            }
            if (request.getPath().contains("/movies")) {
                return handleRegisterMovie(request, response);
            }
            response.setStatus(HttpResponseStatus.NOT_FOUND);
            return response.close();
        }
    }).pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpServerConfigurator()).enableWireLogging(LogLevel.ERROR).build();

    System.out.println("RxMovie server started...");
    return server;
}
 
private Observable<String> initializeStream() {
    HttpClient<ByteBuf, ServerSentEvent> client =
            RxNetty.createHttpClient("localhost", port, PipelineConfigurators.<ByteBuf>clientSseConfigurator());

    return client.submit(HttpClientRequest.createGet("/hello")).
            flatMap(response -> {
                printResponseHeader(response);
                return response.getContent();
            }).map(serverSentEvent -> serverSentEvent.contentAsString());
}
 
源代码4 项目: MarketData   文件: RxNettyEventServer.java
public HttpServer<ByteBuf, ServerSentEvent> createServer() {
    HttpServer<ByteBuf, ServerSentEvent> server = RxNetty.createHttpServer(port,
            (request, response) -> {
                response.getHeaders().set(ACCESS_CONTROL_ALLOW_ORIGIN, "*");
                response.getHeaders().set(CACHE_CONTROL, "no-cache");
                response.getHeaders().set(CONNECTION, "keep-alive");
                response.getHeaders().set(CONTENT_TYPE, "text/event-stream");
                return getIntervalObservable(request, response);
            }, PipelineConfigurators.<ByteBuf>serveSseConfigurator());
    System.out.println("HTTP Server Sent Events server started...");
    return server;
}
 
源代码5 项目: ReactiveLab   文件: AbstractMiddleTierService.java
public HttpServer<ByteBuf, ServerSentEvent> createServer(int port) {
    System.out.println("Start " + getClass().getSimpleName() + " on port: " + port);

    // declare handler chain (wrapped in Hystrix)
    // TODO create a better way of chaining these (related https://github.com/ReactiveX/RxNetty/issues/232 and https://github.com/ReactiveX/RxNetty/issues/202)
    HystrixMetricsStreamHandler<ByteBuf, ServerSentEvent> handlerChain 
      = new HystrixMetricsStreamHandler<>(metrics, "/hystrix.stream", 1000, (request, response) -> {
        try {
            long startTime = System.currentTimeMillis();
            return handleRequest(request, response)
                    .doOnCompleted(() -> System.out.println("Response => " + request.getPath() + " Time => " + (int) (System.currentTimeMillis() - startTime) + "ms"))
                    .doOnCompleted(() -> metrics.getRollingPercentile().addValue((int) (System.currentTimeMillis() - startTime)))
                    .doOnCompleted(() -> metrics.getRollingNumber().add(Metrics.EventType.SUCCESS, 1))
                    .doOnError(t -> metrics.getRollingNumber().add(Metrics.EventType.FAILURE, 1));
        } catch (Throwable e) {
            e.printStackTrace();
            System.err.println("Server => Error [" + request.getPath() + "] => " + e);
            response.setStatus(HttpResponseStatus.BAD_REQUEST);
            return response.writeStringAndFlush("data: Error 500: Bad Request\n" + e.getMessage() + "\n");
        }
    });

    return RxNetty.createHttpServer(port, (request, response) -> {
        // System.out.println("Server => Request: " + request.getPath());
            return handlerChain.handle(request, response);
        }, PipelineConfigurators.<ByteBuf> serveSseConfigurator());
}
 
源代码6 项目: Prana   文件: HealthCheckHandlerTest.java
@Before
public void setUp() {
    super.setUp();
    externalServer = RxNetty.newHttpServerBuilder(0, new ExternalServerHandler())
            .pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpServerConfigurator()).build();
    externalServer.start();
    this.externalServerPort = externalServer.getServerPort();
}
 
源代码7 项目: Prana   文件: AbstractIntegrationTest.java
@Before
public void setUp() {
    server = RxNetty.newHttpServerBuilder(0, getHandler())
            .pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpServerConfigurator()).build();
    server.start();
    client = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder("localhost", server.getServerPort())
            .pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpClientConfigurator())
            .channelOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 2000)
            .build();

}
 
源代码8 项目: karyon   文件: ShutdownListener.java
public ShutdownListener(int shutdownPort, final Func1<String, Observable<Void>> commandHandler) {
    shutdownCmdServer = RxNetty.createTcpServer(shutdownPort,
                                     PipelineConfigurators.stringMessageConfigurator(),
                                     new ShutdownConnectionHandler(commandHandler));
}
 
 类所在包
 同包方法