类io.reactivex.netty.protocol.http.server.HttpServer源码实例Demo

下面列出了怎么用io.reactivex.netty.protocol.http.server.HttpServer的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: hawkular-apm   文件: NettyNoResponseHttpITest.java
@Override
public void init() {
    server = HttpServer.newServer()
            .enableWireLogging(LogLevel.DEBUG)
            .start((req, resp) -> {
                if (req.getHeader(Constants.HAWKULAR_APM_TRACEID) == null) {
                    return resp.setStatus(HttpResponseStatus.BAD_REQUEST);
                }
                if (req.getHttpMethod() == HttpMethod.POST
                        || req.getHttpMethod() == HttpMethod.PUT) {
                    req.getContent().subscribe(bb -> System.out.println("DATA = " + bb.toString()));
                }
                resp.setStatus(HttpResponseStatus.OK);
                return resp;
            }
            );

    super.init();
}
 
源代码2 项目: hawkular-apm   文件: NettyHttpITest.java
@Override
public void init() {
    server = HttpServer.newServer()
            .enableWireLogging(LogLevel.DEBUG)
            .start((req, resp) -> {
                if (req.getHeader(Constants.HAWKULAR_APM_TRACEID) == null) {
                    return resp.setStatus(HttpResponseStatus.BAD_REQUEST);
                }
                if (req.getHttpMethod() == HttpMethod.POST
                        || req.getHttpMethod() == HttpMethod.PUT) {
                    req.getContent().subscribe(bb -> System.out.println("DATA = " + bb.toString()));
                }
                return resp.writeString(Observable.just(HELLO_WORLD));
            });
    super.init();
}
 
源代码3 项目: mesos-rxjava   文件: MesosClientIntegrationTest.java
@Test
public void testStreamDoesNotRunWhenSubscribeFails_mesos5xxResponse() throws Throwable {
    final RequestHandler<ByteBuf, ByteBuf> handler = (request, response) -> {
        response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
        return response.close();
    };
    final HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(0, handler);
    server.start();
    final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", server.getServerPort()));
    final MesosClient<String, String> client = createClient(uri);

    try {
        client.openStream().await();
        fail("Expect an exception to be propagated up because subscribe will 500");
    } catch (Mesos5xxException e) {
        // expected
        final MesosClientErrorContext ctx = e.getContext();
        assertThat(ctx.getStatusCode()).isEqualTo(500);
    } finally {
        server.shutdown();
    }
}
 
源代码4 项目: mesos-rxjava   文件: MesosClientIntegrationTest.java
@Test
public void testStreamDoesNotRunWhenSubscribeFails_mismatchContentType() throws Throwable {
    final RequestHandler<ByteBuf, ByteBuf> handler = (request, response) -> {
        response.setStatus(HttpResponseStatus.OK);
        response.getHeaders().setHeader("Content-Type", "application/json");
        return response.close();
    };
    final HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(0, handler);
    server.start();
    final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", server.getServerPort()));
    final MesosClient<String, String> client = createClient(uri);

    try {
        client.openStream().await();
        fail("Expect an exception to be propagated up because of content type mismatch");
    } catch (MesosException e) {
        // expected
        final MesosClientErrorContext ctx = e.getContext();
        assertThat(ctx.getStatusCode()).isEqualTo(200);
        assertThat(ctx.getMessage()).isEqualTo("Response had Content-Type \"application/json\" expected \"text/plain;charset=utf-8\"");
    } finally {
        server.shutdown();
    }
}
 
源代码5 项目: ReactiveLab   文件: ClientServer.java
public static void main(String[] args) {

        /**
         * Start our HTTP server.
         */
        HttpServer<ByteBuf, ByteBuf> server = startServer(8088);

        /**
         * Submit the request.
         */
        createRequest("localhost", server.getServerPort())
                /* Block till you get the response. In a real world application, one should not be blocked but chained
                 * into a response to the caller. */
                .toBlocking()
                /**
                 * Print each content of the response.
                 */
                .forEach(System.out::println);
    }
 
源代码6 项目: ReactiveLab   文件: ClientServer.java
public static HttpServer<ByteBuf, ByteBuf> startServer(int port) {

        /**
         * Creates an HTTP server which returns "Hello World!" responses.
         */
        return RxNetty.createHttpServer(port,
                                        /*
                                         * HTTP Request handler for RxNetty where you control what you write as the
                                         * response for each and every request the server receives.
                                         */
                                        (request, response) -> {
                                            /**
                                             * In a real server, you would be writing different responses based on the
                                             * URI of the request.
                                             * This example just returns a "Hello World!!" string unconditionally.
                                             */
                                            return response.writeStringAndFlush("Hello World!!");
                                        })
                      .start();
    }
 
源代码7 项目: ribbon   文件: RxMovieServer.java
public HttpServer<ByteBuf, ByteBuf> createServer() {
    HttpServer<ByteBuf, ByteBuf> server = RxNetty.newHttpServerBuilder(port, new RequestHandler<ByteBuf, ByteBuf>() {
        @Override
        public Observable<Void> handle(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
            if (request.getPath().contains("/users")) {
                if (request.getHttpMethod().equals(HttpMethod.GET)) {
                    return handleRecommendationsByUserId(request, response);
                } else {
                    return handleUpdateRecommendationsForUser(request, response);
                }
            }
            if (request.getPath().contains("/recommendations")) {
                return handleRecommendationsBy(request, response);
            }
            if (request.getPath().contains("/movies")) {
                return handleRegisterMovie(request, response);
            }
            response.setStatus(HttpResponseStatus.NOT_FOUND);
            return response.close();
        }
    }).pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpServerConfigurator()).enableWireLogging(LogLevel.ERROR).build();

    System.out.println("RxMovie server started...");
    return server;
}
 
源代码8 项目: feign   文件: RealRequestBenchmarks.java
@Setup
public void setup() {

  server = HttpServer.newServer(SERVER_PORT)
      .start((request, response) -> null);
  client = new OkHttpClient();
  client.retryOnConnectionFailure();
  okFeign = Feign.builder()
      .client(new feign.okhttp.OkHttpClient(client))
      .logLevel(Level.NONE)
      .logger(new Logger.ErrorLogger())
      .retryer(new Retryer.Default())
      .target(FeignTestInterface.class, "http://localhost:" + SERVER_PORT);
  queryRequest = new Request.Builder()
      .url("http://localhost:" + SERVER_PORT + "/?Action=GetUser&Version=2010-05-08&limit=1")
      .build();
}
 
源代码9 项目: MarketData   文件: RxNettyEventBroadcaster.java
public HttpServer<ByteBuf, ServerSentEvent> createServer() {
    if (flaky) {
        events = SubscriptionLimiter
                    .limitSubscriptions(1,initializeEventStream());
    } else {
        events  = initializeEventStream();
    }
    return super.createServer();
}
 
源代码10 项目: MarketData   文件: RxNettyRequestReplyServer.java
public HttpServer<ByteBuf, ByteBuf> createServer() {
    return RxNetty.createHttpServer(port, (request, response) -> {

        HttpRequest httpRequest = new HttpRequest(request.getQueryParameters());
        String content = getResponseContent(httpRequest);
        return response.writeStringAndFlush(content);
    });
}
 
源代码11 项目: MarketData   文件: RxNettyEventServer.java
public HttpServer<ByteBuf, ServerSentEvent> createServer() {
    HttpServer<ByteBuf, ServerSentEvent> server = RxNetty.createHttpServer(port,
            (request, response) -> {
                response.getHeaders().set(ACCESS_CONTROL_ALLOW_ORIGIN, "*");
                response.getHeaders().set(CACHE_CONTROL, "no-cache");
                response.getHeaders().set(CONNECTION, "keep-alive");
                response.getHeaders().set(CONTENT_TYPE, "text/event-stream");
                return getIntervalObservable(request, response);
            }, PipelineConfigurators.<ByteBuf>serveSseConfigurator());
    System.out.println("HTTP Server Sent Events server started...");
    return server;
}
 
源代码12 项目: mesos-rxjava   文件: MesosClientIntegrationTest.java
@Test
public void testStreamDoesNotRunWhenSubscribeFails_mesos4xxResponse() throws Throwable {
    final String errorMessage = "Error message that should come from the server";
    final RequestHandler<ByteBuf, ByteBuf> handler = (request, response) -> {
        response.setStatus(HttpResponseStatus.BAD_REQUEST);
        final byte[] msgBytes = errorMessage.getBytes(StandardCharsets.UTF_8);
        response.getHeaders().setHeader("Content-Length", msgBytes.length);
        response.getHeaders().setHeader("Content-Type", "text/plain;charset=utf-8");
        response.writeBytes(msgBytes);
        return response.close();
    };
    final HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(0, handler);
    server.start();
    final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", server.getServerPort()));
    final MesosClient<String, String> client = createClient(uri);

    try {
        client.openStream().await();
        fail("Expect an exception to be propagated up because subscribe will 400");
    } catch (Mesos4xxException e) {
        // expected
        final MesosClientErrorContext ctx = e.getContext();
        assertThat(ctx.getStatusCode()).isEqualTo(400);
        assertThat(ctx.getMessage()).isEqualTo(errorMessage);
    } finally {
        server.shutdown();
    }
}
 
@Test
@Ignore
public void testBurstyObservable_missingBackpressureException() throws Throwable {
    final String subscribedMessage = "{\"type\": \"SUBSCRIBED\",\"subscribed\": {\"framework_id\": {\"value\":\"12220-3440-12532-2345\"},\"heartbeat_interval_seconds\":15.0}";
    final String heartbeatMessage = "{\"type\":\"HEARTBEAT\"}";
    final byte[] hmsg = heartbeatMessage.getBytes(StandardCharsets.UTF_8);
    final byte[] hbytes = String.format("%d\n", heartbeatMessage.getBytes().length).getBytes(StandardCharsets.UTF_8);

    final RequestHandler<ByteBuf, ByteBuf> handler = (request, response) -> {
        response.setStatus(HttpResponseStatus.OK);
        response.getHeaders().setHeader("Content-Type", "text/plain;charset=utf-8");
        writeRecordIOMessage(response, subscribedMessage);
        for (int i = 0; i < 20000; i++) {
            response.writeBytes(hbytes);
            response.writeBytes(hmsg);
        }
        return response.flush();
    };
    final HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(0, handler);
    server.start();
    final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", server.getServerPort()));
    final MesosClient<String, String> client = createClientForStreaming(uri).build();

    try {
        client.openStream().await();
        fail("Expect an exception to be propagated up due to backpressure");
    } catch (MissingBackpressureException e) {
        // expected
        e.printStackTrace();
        assertThat(e.getMessage()).isNullOrEmpty();
    } finally {
        server.shutdown();
    }
}
 
@Test
public void testBurstyObservable_unboundedBufferSucceeds() throws Throwable {
    msgNo = 0;
    final int numMessages = 20000;
    final String subscribedMessage = "{\"type\": \"SUBSCRIBED\",\"subscribed\": {\"framework_id\": {\"value\":\"12220-3440-12532-2345\"},\"heartbeat_interval_seconds\":15.0}";
    final String heartbeatMessage = "{\"type\":\"HEARTBEAT\"}";
    final RequestHandler<ByteBuf, ByteBuf> handler = (request, response) -> {
        response.setStatus(HttpResponseStatus.OK);
        response.getHeaders().setHeader("Content-Type", "text/plain;charset=utf-8");
        writeRecordIOMessage(response, subscribedMessage);
        for (int i = 0; i < numMessages; i++) {
            writeRecordIOMessage(response, heartbeatMessage);
        }
        return response.close();
    };
    final HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(0, handler);
    server.start();
    final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", server.getServerPort()));
    final MesosClient<String, String> client = createClientForStreaming(uri)
            .onBackpressureBuffer()
            .build();

    try {
        client.openStream().await();
    } finally {
        // 20000 heartbeats PLUS 1 subscribe
        assertEquals("All heartbeats received (plus the subscribed)", 1 + numMessages, msgNo);
        server.shutdown();
    }
}
 
源代码15 项目: ocelli   文件: HttpExampleUtils.java
protected static SocketAddress startServer(long latencyMillis) {
    return HttpServer.newServer()
                     .start((request, response) -> {
                         return Observable.timer(latencyMillis, TimeUnit.MILLISECONDS)
                                          .flatMap(aTick -> response.addHeader("X-Instance",
                                                                               response.unsafeNettyChannel()
                                                                                       .localAddress())
                                                                    .setStatus(HttpResponseStatus.OK));
                     })
                     .getServerAddress();
}
 
源代码16 项目: ocelli   文件: HttpExampleUtils.java
protected static SocketAddress startServer(HttpResponseStatus cannedStatus) {
    return HttpServer.newServer()
                     .start((request, response) -> {
                         return response.addHeader("X-Instance", response.unsafeNettyChannel().localAddress())
                                        .setStatus(cannedStatus);
                     })
                     .getServerAddress();
}
 
源代码17 项目: netty-cookbook   文件: NettyRxJavaServer.java
public static void main(String... args) throws InterruptedException {
    HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(8080, 
    		(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) -> {
        System.out.println("Server => Request: " + request.getPath());            
        try {
            if ("/error".equals(request.getPath())) {
                throw new RuntimeException("forced error");
            }
            response.setStatus(HttpResponseStatus.OK);
            response.writeString("Path Requested =>: " + request.getPath() + '\n');
            return response.close();
        } catch (Throwable e) {
            System.err.println("Server => Error [" + request.getPath() + "] => " + e);
            response.setStatus(HttpResponseStatus.BAD_REQUEST);
            response.writeString("Error 500: Bad Request\n");
            return response.close();
        }
    });

    server.startAndWait();

    RxNetty.createHttpGet("http://localhost:8080/")
           .flatMap(response -> response.getContent())
           .map(data -> "Client => " + data.toString(Charset.defaultCharset()))
           .toBlocking().forEach(System.out::println);

    RxNetty.createHttpGet("http://localhost:8080/error")
           .flatMap(response -> response.getContent())
           .map(data -> "Client => " + data.toString(Charset.defaultCharset()))
           .toBlocking().forEach(System.out::println);

    RxNetty.createHttpGet("http://localhost:8080/data")
           .flatMap(response -> response.getContent())
           .map(data -> "Client => " + data.toString(Charset.defaultCharset()))
           .toBlocking().forEach(System.out::println);

    //server.shutdown();
}
 
源代码18 项目: ReactiveLab   文件: AbstractMiddleTierService.java
public HttpServer<ByteBuf, ServerSentEvent> createServer(int port) {
    System.out.println("Start " + getClass().getSimpleName() + " on port: " + port);

    // declare handler chain (wrapped in Hystrix)
    // TODO create a better way of chaining these (related https://github.com/ReactiveX/RxNetty/issues/232 and https://github.com/ReactiveX/RxNetty/issues/202)
    HystrixMetricsStreamHandler<ByteBuf, ServerSentEvent> handlerChain 
      = new HystrixMetricsStreamHandler<>(metrics, "/hystrix.stream", 1000, (request, response) -> {
        try {
            long startTime = System.currentTimeMillis();
            return handleRequest(request, response)
                    .doOnCompleted(() -> System.out.println("Response => " + request.getPath() + " Time => " + (int) (System.currentTimeMillis() - startTime) + "ms"))
                    .doOnCompleted(() -> metrics.getRollingPercentile().addValue((int) (System.currentTimeMillis() - startTime)))
                    .doOnCompleted(() -> metrics.getRollingNumber().add(Metrics.EventType.SUCCESS, 1))
                    .doOnError(t -> metrics.getRollingNumber().add(Metrics.EventType.FAILURE, 1));
        } catch (Throwable e) {
            e.printStackTrace();
            System.err.println("Server => Error [" + request.getPath() + "] => " + e);
            response.setStatus(HttpResponseStatus.BAD_REQUEST);
            return response.writeStringAndFlush("data: Error 500: Bad Request\n" + e.getMessage() + "\n");
        }
    });

    return RxNetty.createHttpServer(port, (request, response) -> {
        // System.out.println("Server => Request: " + request.getPath());
            return handlerChain.handle(request, response);
        }, PipelineConfigurators.<ByteBuf> serveSseConfigurator());
}
 
源代码19 项目: karyon   文件: KaryonHttpModule.java
protected KaryonHttpModule(String moduleName, Class<I> iType, Class<O> oType) {
    super(moduleName, iType, oType);

    routerKey = keyFor(RequestHandler.class, iType, oType, nameAnnotation);
    interceptorSupportKey = keyFor(GovernatorHttpInterceptorSupport.class, iType, oType, nameAnnotation);
    httpServerKey = keyFor(HttpServer.class, iType, oType, nameAnnotation);
}
 
源代码20 项目: karyon   文件: KaryonHttpModule.java
@Override
protected void configure() {
    configureServer();

    bind(serverConfigKey).toInstance(serverConfigBuilder.build());
    bind(interceptorSupportKey).toInstance(interceptorSupportInstance);

    MapBinder.newMapBinder(binder(), String.class, RxServer.class).addBinding(nameAnnotation.value()).toProvider(
            new HttpRxServerProvider<I, O, HttpServer<I, O>>(nameAnnotation.value(), iType, oType)
    ).asEagerSingleton();
}
 
源代码21 项目: karyon   文件: Karyon.java
/**
 * Creates a new {@link KaryonServer} that has a single HTTP server instance which delegates all request
 * handling to {@link RequestHandler}.
 * The {@link HttpServer} is created using {@link KaryonTransport#newHttpServer(int, HttpRequestHandler)}
 *
 * @param port Port for the server.
 * @param handler Request Handler
 * @param bootstrapModules Additional bootstrapModules if any.
 *
 * @return {@link KaryonServer} which is to be used to start the created server.
 */
public static KaryonServer forRequestHandler(int port, final RequestHandler<ByteBuf, ByteBuf> handler,
                                             BootstrapModule... bootstrapModules) {
    HttpServer<ByteBuf, ByteBuf> httpServer =
            KaryonTransport.newHttpServer(port, new RequestHandler<ByteBuf, ByteBuf>() {
                @Override
                public Observable<Void> handle(HttpServerRequest<ByteBuf> request,
                                               HttpServerResponse<ByteBuf> response) {
                    return handler.handle(request, response);
                }
            });
    return new RxNettyServerBackedServer(httpServer, bootstrapModules);
}
 
源代码22 项目: ReactiveLab   文件: ClientServerWithDiscovery.java
public static void main(String[] args) throws Exception {

        final int eurekaReadServerPort = 7005;
        final int eurekaWriteServerPort = 7006;

        /**
         * Starts an embedded eureka server with the defined read and write ports.
         */
        startEurekaServer(eurekaReadServerPort, eurekaWriteServerPort);

        /**
         * Create eureka client with the same read and write ports for the embedded eureka server.
         */
        EurekaClient eurekaClient = createEurekaClient(eurekaReadServerPort, eurekaWriteServerPort);

        /**
         * Reuse {@link ClientServer} example to start an RxNetty server on the passed port.
         */
        HttpServer<ByteBuf, ByteBuf> server = ClientServer.startServer(8089);

        /**
         * Register the server started above with eureka using a unique virtual IP address (VIP).
         * Eureka uses VIPs to group homogeneous instances of a service together, so that they can be used by clients,
         * interchangeably.
         */
        String vipAddress = "mock_server-" + server.getServerPort();
        registerWithEureka(server.getServerPort(), eurekaClient, vipAddress);

        /**
         * Retrieve the instance information of the registered server from eureka.
         * This is to demonstrate how to use eureka to fetch information about any server in your deployment.
         * In order to fetch information from eureka, one MUST know the VIP address of the server before hand.
         */
        InstanceInfo serverInfo = getServerInfo(eurekaClient, vipAddress);

        /**
         * Retrieve IPAddress and port information from the instance information returned from eureka.
         */
        Host host = getServerHostAndPort(serverInfo);

        /**
         * Reuse {@link ClientServer} example to create an HTTP request to the server retrieved from eureka.
         */
        ClientServer.createRequest(host.getIpAddress(), host.getPort())
                    /* Block till you get the response. In a real world application, one should not be blocked but chained
                     * into a response to the caller. */
                    .toBlocking()
                    /**
                     * Print each content of the response.
                     */
                    .forEach(System.out::println);
    }
 
public static void main(String[] args) throws Exception {

        final int eurekaReadServerPort = 7008;
        final int eurekaWriteServerPort = 7010;

        /**
         * Starts an embedded eureka server with the defined read and write ports.
         */
        ClientServerWithDiscovery.startEurekaServer(eurekaReadServerPort, eurekaWriteServerPort);

        /**
         * Create eureka client with the same read and write ports for the embedded eureka server.
         */
        EurekaClient eurekaClient = ClientServerWithDiscovery.createEurekaClient(eurekaReadServerPort,
                                                                                 eurekaWriteServerPort);

        /**
         * Reuse {@link ClientServer} example to start an RxNetty server on the passed port.
         */
        HttpServer<ByteBuf, ByteBuf> server = ClientServer.startServer(8089);

        /**
         * Register the server started above with eureka using a unique virtual IP address (VIP).
         * Eureka uses VIPs to group homogeneous instances of a service together, so that they can be used by clients,
         * interchangeably.
         */
        String vipAddress = "mock_server-" + server.getServerPort();
        ClientServerWithDiscovery.registerWithEureka(server.getServerPort(), eurekaClient, vipAddress);

        /**
         * Using the eureka client, create an Ocelli Host event stream.
         * Ocelli, uses this host stream to know about the available hosts.
         */
        Observable<MembershipEvent<Host>> eurekaHostSource = ClientServerWithLoadBalancer.createEurekaHostStream(
                eurekaClient, vipAddress);

        MyCommand myCommand = new MyCommand(eurekaHostSource);

        /**
         * This executes the request on the client (just as {@link ClientServerWithLoadBalancer} but using hystrix.
         */
        myCommand.toObservable()
                /* Block till you get the response. In a real world application, one should not be blocked but chained
                 * into a response to the caller. */
                .toBlocking()
                /**
                 * Print each content of the response.
                 */
                .forEach(System.out::println);

    }
 
源代码24 项目: ReactiveLab   文件: ClientServerWithLoadBalancer.java
public static void main(String[] args) throws Exception {

        final int eurekaReadServerPort = 7007;
        final int eurekaWriteServerPort = 7008;

        /**
         * Starts an embedded eureka server with the defined read and write ports.
         */
        ClientServerWithDiscovery.startEurekaServer(eurekaReadServerPort, eurekaWriteServerPort);

        /**
         * Create eureka client with the same read and write ports for the embedded eureka server.
         */
        EurekaClient eurekaClient = ClientServerWithDiscovery.createEurekaClient(eurekaReadServerPort,
                                                                                 eurekaWriteServerPort);

        /**
         * Reuse {@link ClientServer} example to start an RxNetty server on the passed port.
         */
        HttpServer<ByteBuf, ByteBuf> server = ClientServer.startServer(8089);

        /**
         * Register the server started above with eureka using a unique virtual IP address (VIP).
         * Eureka uses VIPs to group homogeneous instances of a service together, so that they can be used by clients,
         * interchangeably.
         */
        String vipAddress = "mock_server-" + server.getServerPort();
        ClientServerWithDiscovery.registerWithEureka(server.getServerPort(), eurekaClient, vipAddress);

        /**
         * Using the eureka client, create an Ocelli Host event stream.
         * Ocelli, uses this host stream to know about the available hosts.
         */
        Observable<MembershipEvent<Host>> eurekaHostSource = createEurekaHostStream(eurekaClient, vipAddress);

        /**
         * Instead of directly using the host and port from eureka as in example {@link ClientServerWithDiscovery},
         * choose a host from the load balancer.
         */
        createRequestFromLB(eurekaHostSource)
                /* Block till you get the response. In a real world application, one should not be blocked but chained
                 * into a response to the caller. */
                .toBlocking()
                /**
                 * Print each content of the response.
                 */
                .forEach(System.out::println);
    }
 
源代码25 项目: karyon   文件: KaryonTransport.java
public static <I, O> HttpServer<I, O> newHttpServer(int port, RequestHandler<I, O> router) {
    return newHttpServerBuilder(port, router).build();
}
 
源代码26 项目: karyon   文件: KaryonTransport.java
public static <I, O> HttpServer<I, O> newHttpServer(int port, HttpRequestHandler<I, O> requestHandler) {
    return newHttpServerBuilder(port, requestHandler).build();
}
 
源代码27 项目: karyon   文件: Karyon.java
/**
 * Creates a new {@link KaryonServer} which combines lifecycle of the passed {@link HttpServer} with
 * it's own lifecycle.
 *
 * @param server HTTP server
 * @param modules Additional bootstrapModules if any.
 *
 * @return {@link KaryonServer} which is to be used to start the created server.
 */
public static KaryonServer forHttpServer(HttpServer<?, ?> server, Module... modules) {
    return forHttpServer(server, toBootstrapModule(modules));
}
 
源代码28 项目: karyon   文件: Karyon.java
/**
 * Creates a new {@link KaryonServer} which combines lifecycle of the passed {@link HttpServer} with
 * it's own lifecycle.
 *
 * @param server HTTP server
 * @param bootstrapModules Additional bootstrapModules if any.
 *
 * @return {@link KaryonServer} which is to be used to start the created server.
 */
public static KaryonServer forHttpServer(HttpServer<?, ?> server,
                                         BootstrapModule... bootstrapModules) {
    return new RxNettyServerBackedServer(server, bootstrapModules);
}
 
 类所在包
 同包方法