类io.reactivex.netty.RxNetty源码实例Demo

下面列出了怎么用io.reactivex.netty.RxNetty的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: mesos-rxjava   文件: MesosClient.java
/**
 * The Mesos HTTP Scheduler API will send a redirect to a client if it is not the leader. The client that is
 * constructed during {@link #openStream} is bound to a specific host and port, due to this behavior
 * we "probe" Mesos to try and find out where it's "master" is before we configure the client.
 *
 * This method will attempt to send a simple GET to {@code mesosUri}, however instead of going to the path
 * specified, it will go to {@code /redirect} and construct a uri relative to mesosUri using the host and port
 * returned in the Location header of the response.
 */
@NotNull
private static URI resolveMesosUri(final @NotNull URI mesosUri) {
    final String redirectUri = createRedirectUri(mesosUri);
    LOGGER.info("Probing Mesos server at {}", redirectUri);

    // When sending an individual request (rather than creating a client) RxNetty WILL follow redirects,
    // so here we tell it not to do that for this request by creating the config object below.
    final HttpClient.HttpClientConfig config =
        HttpClient.HttpClientConfig.Builder.fromDefaultConfig()
            .setFollowRedirect(false)
            .build();
    final HttpClientResponse<ByteBuf> redirectResponse =
        RxNetty.createHttpRequest(HttpClientRequest.createGet(redirectUri), config)
            .toBlocking()
            .first();

    return getUriFromRedirectResponse(mesosUri, redirectResponse);
}
 
源代码2 项目: mesos-rxjava   文件: MesosClientIntegrationTest.java
@Test
public void testStreamDoesNotRunWhenSubscribeFails_mesos5xxResponse() throws Throwable {
    final RequestHandler<ByteBuf, ByteBuf> handler = (request, response) -> {
        response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
        return response.close();
    };
    final HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(0, handler);
    server.start();
    final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", server.getServerPort()));
    final MesosClient<String, String> client = createClient(uri);

    try {
        client.openStream().await();
        fail("Expect an exception to be propagated up because subscribe will 500");
    } catch (Mesos5xxException e) {
        // expected
        final MesosClientErrorContext ctx = e.getContext();
        assertThat(ctx.getStatusCode()).isEqualTo(500);
    } finally {
        server.shutdown();
    }
}
 
源代码3 项目: mesos-rxjava   文件: MesosClientIntegrationTest.java
@Test
public void testStreamDoesNotRunWhenSubscribeFails_mismatchContentType() throws Throwable {
    final RequestHandler<ByteBuf, ByteBuf> handler = (request, response) -> {
        response.setStatus(HttpResponseStatus.OK);
        response.getHeaders().setHeader("Content-Type", "application/json");
        return response.close();
    };
    final HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(0, handler);
    server.start();
    final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", server.getServerPort()));
    final MesosClient<String, String> client = createClient(uri);

    try {
        client.openStream().await();
        fail("Expect an exception to be propagated up because of content type mismatch");
    } catch (MesosException e) {
        // expected
        final MesosClientErrorContext ctx = e.getContext();
        assertThat(ctx.getStatusCode()).isEqualTo(200);
        assertThat(ctx.getMessage()).isEqualTo("Response had Content-Type \"application/json\" expected \"text/plain;charset=utf-8\"");
    } finally {
        server.shutdown();
    }
}
 
源代码4 项目: mesos-rxjava   文件: MesosServerSimulationTest.java
@Test
public void server400_nonPost() throws Exception {
    final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", serverPort));
    final HttpClient<ByteBuf, ByteBuf> httpClient = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder(uri.getHost(), uri.getPort())
        .pipelineConfigurator(new HttpClientPipelineConfigurator<>())
        .build();

    final HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet(uri.getPath())
        .withHeader("Accept", StringMessageCodec.UTF8_STRING.mediaType());

    final HttpClientResponse<ByteBuf> response = httpClient.submit(request)
        .toBlocking()
        .last();

    assertThat(response.getStatus()).isEqualTo(HttpResponseStatus.BAD_REQUEST);
    final HttpResponseHeaders headers = response.getHeaders();
    assertThat(headers.getHeader("Accept")).isEqualTo(StringMessageCodec.UTF8_STRING.mediaType());

    assertThat(mesosServerSimulation.getCallsReceived()).hasSize(0);
}
 
源代码5 项目: mesos-rxjava   文件: MesosServerSimulationTest.java
@Test
public void server400_invalidContentType() throws Exception {
    final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", serverPort));
    final HttpClient<ByteBuf, ByteBuf> httpClient = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder(uri.getHost(), uri.getPort())
        .pipelineConfigurator(new HttpClientPipelineConfigurator<>())
        .build();

    final byte[] data = StringMessageCodec.UTF8_STRING.encode("decline");
    final HttpClientRequest<ByteBuf> request = HttpClientRequest.createPost(uri.getPath())
        .withHeader("Content-Type", "application/octet-stream")
        .withHeader("Accept", "application/octet-stream")
        .withContent(data);

    final HttpClientResponse<ByteBuf> response = httpClient.submit(request)
        .toBlocking()
        .last();

    assertThat(response.getStatus()).isEqualTo(HttpResponseStatus.BAD_REQUEST);
    final HttpResponseHeaders headers = response.getHeaders();
    assertThat(headers.getHeader("Accept")).isEqualTo(StringMessageCodec.UTF8_STRING.mediaType());

    assertThat(mesosServerSimulation.getCallsReceived()).hasSize(0);
}
 
源代码6 项目: mesos-rxjava   文件: MesosServerSimulationTest.java
@Test
public void server400_emptyBody() throws Exception {
    final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", serverPort));
    final HttpClient<ByteBuf, ByteBuf> httpClient = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder(uri.getHost(), uri.getPort())
        .pipelineConfigurator(new HttpClientPipelineConfigurator<>())
        .build();

    final HttpClientRequest<ByteBuf> request = HttpClientRequest.createPost(uri.getPath())
        .withHeader("Content-Type", StringMessageCodec.UTF8_STRING.mediaType())
        .withHeader("Accept", StringMessageCodec.UTF8_STRING.mediaType())
        .withContent(new byte[]{});

    final HttpClientResponse<ByteBuf> response = httpClient.submit(request)
        .toBlocking()
        .last();

    assertThat(response.getStatus()).isEqualTo(HttpResponseStatus.BAD_REQUEST);
    final HttpResponseHeaders headers = response.getHeaders();
    assertThat(headers.getHeader("Accept")).isEqualTo(StringMessageCodec.UTF8_STRING.mediaType());

    assertThat(mesosServerSimulation.getCallsReceived()).hasSize(0);
}
 
源代码7 项目: mesos-rxjava   文件: MesosServerSimulationTest.java
@NotNull
private static HttpClientResponse<ByteBuf> sendCall(final URI uri, final String call) {
    final HttpClient<ByteBuf, ByteBuf> httpClient = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder(uri.getHost(), uri.getPort())
        .pipelineConfigurator(new HttpClientPipelineConfigurator<>())
        .build();

    final byte[] data = call.getBytes(StandardCharsets.UTF_8);
    final HttpClientRequest<ByteBuf> request = HttpClientRequest.createPost(uri.getPath())
        .withHeader("Content-Type", StringMessageCodec.UTF8_STRING.mediaType())
        .withHeader("Accept", StringMessageCodec.UTF8_STRING.mediaType())
        .withContent(data);

    return httpClient.submit(request)
        .toBlocking()
        .last();
}
 
源代码8 项目: ReactiveLab   文件: ClientServer.java
public static HttpServer<ByteBuf, ByteBuf> startServer(int port) {

        /**
         * Creates an HTTP server which returns "Hello World!" responses.
         */
        return RxNetty.createHttpServer(port,
                                        /*
                                         * HTTP Request handler for RxNetty where you control what you write as the
                                         * response for each and every request the server receives.
                                         */
                                        (request, response) -> {
                                            /**
                                             * In a real server, you would be writing different responses based on the
                                             * URI of the request.
                                             * This example just returns a "Hello World!!" string unconditionally.
                                             */
                                            return response.writeStringAndFlush("Hello World!!");
                                        })
                      .start();
    }
 
源代码9 项目: Prana   文件: HealthCheckHandler.java
private Observable<HttpClientResponse<ByteBuf>> getResponse(String externalHealthCheckURL) {
    String host = "localhost";
    int port = DEFAULT_APPLICATION_PORT;
    String path = "/healthcheck";
    try {
        URL url = new URL(externalHealthCheckURL);
        host = url.getHost();
        port = url.getPort();
        path = url.getPath();
    } catch (MalformedURLException e) {
        //continue
    }
    Integer timeout = DynamicProperty.getInstance("prana.host.healthcheck.timeout").getInteger(DEFAULT_CONNECTION_TIMEOUT);
    HttpClient<ByteBuf, ByteBuf> httpClient = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder(host, port)
            .pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpClientConfigurator())
            .channelOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout)
            .build();
    return httpClient.submit(HttpClientRequest.createGet(path));

}
 
源代码10 项目: ribbon   文件: LoadBalancingTcpClient.java
@Override
protected RxClient<I, O> createRxClient(Server server) {
    ClientBuilder<I, O> builder = RxNetty.newTcpClientBuilder(server.getHost(), server.getPort());
    if (pipelineConfigurator != null) {
        builder.pipelineConfigurator(pipelineConfigurator);
    }
    Integer connectTimeout = getProperty(IClientConfigKey.Keys.ConnectTimeout, null, DefaultClientConfigImpl.DEFAULT_CONNECT_TIMEOUT);
    builder.channelOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);
    if (isPoolEnabled()) {
        builder.withConnectionPoolLimitStrategy(poolStrategy)
        .withIdleConnectionsTimeoutMillis(idleConnectionEvictionMills)
        .withPoolIdleCleanupScheduler(poolCleanerScheduler);
    } else {
        builder.withNoConnectionPooling();
    }
    RxClient<I, O> client = builder.build();
    return client;
}
 
源代码11 项目: ribbon   文件: HelloUdpServer.java
public UdpServer<DatagramPacket, DatagramPacket> createServer() {
    UdpServer<DatagramPacket, DatagramPacket> server = RxNetty.createUdpServer(port, new ConnectionHandler<DatagramPacket, DatagramPacket>() {
        @Override
        public Observable<Void> handle(final ObservableConnection<DatagramPacket, DatagramPacket> newConnection) {
            return newConnection.getInput().flatMap(new Func1<DatagramPacket, Observable<Void>>() {
                @Override
                public Observable<Void> call(final DatagramPacket received) {
                    return Observable.interval(delay, TimeUnit.MILLISECONDS).take(1).flatMap(new Func1<Long, Observable<Void>>() {
                        @Override
                        public Observable<Void> call(Long aLong) {
                            InetSocketAddress sender = received.sender();
                            System.out.println("Received datagram. Sender: " + sender);
                            ByteBuf data = newConnection.getChannel().alloc().buffer(WELCOME_MSG_BYTES.length);
                            data.writeBytes(WELCOME_MSG_BYTES);
                            return newConnection.writeAndFlush(new DatagramPacket(data, sender));
                        }
                    });
                }
            });
        }
    });
    System.out.println("UDP hello server started at port: " + port);
    return server;
}
 
源代码12 项目: ribbon   文件: RxMovieServer.java
public HttpServer<ByteBuf, ByteBuf> createServer() {
    HttpServer<ByteBuf, ByteBuf> server = RxNetty.newHttpServerBuilder(port, new RequestHandler<ByteBuf, ByteBuf>() {
        @Override
        public Observable<Void> handle(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
            if (request.getPath().contains("/users")) {
                if (request.getHttpMethod().equals(HttpMethod.GET)) {
                    return handleRecommendationsByUserId(request, response);
                } else {
                    return handleUpdateRecommendationsForUser(request, response);
                }
            }
            if (request.getPath().contains("/recommendations")) {
                return handleRecommendationsBy(request, response);
            }
            if (request.getPath().contains("/movies")) {
                return handleRegisterMovie(request, response);
            }
            response.setStatus(HttpResponseStatus.NOT_FOUND);
            return response.close();
        }
    }).pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpServerConfigurator()).enableWireLogging(LogLevel.ERROR).build();

    System.out.println("RxMovie server started...");
    return server;
}
 
源代码13 项目: karyon   文件: TcpRxServerProvider.java
@Inject
public void setInjector(Injector injector) {
    ServerConfig config = injector.getInstance(serverConfigKey);

    ConnectionHandler<I, O> connectionHandler = injector.getInstance(connectionHandlerKey);

    ServerBuilder<I, O> builder = RxNetty.newTcpServerBuilder(config.getPort(), connectionHandler);

    if (injector.getExistingBinding(pipelineConfiguratorKey) != null) {
        builder.appendPipelineConfigurator(injector.getInstance(pipelineConfiguratorKey));
    }

    if (injector.getExistingBinding(metricEventsListenerFactoryKey) != null) {
        builder.withMetricEventsListenerFactory(injector.getInstance(metricEventsListenerFactoryKey));
    }

    server = builder.build().start();
    logger.info("Starting server {} on port {}...", nameAnnotation.value(), server.getServerPort());
}
 
源代码14 项目: karyon   文件: WebSocketsRxServerProvider.java
@Inject
@SuppressWarnings("unchecked")
public void setInjector(Injector injector) {
    KaryonWebSocketsModule.WebSocketsServerConfig config = (KaryonWebSocketsModule.WebSocketsServerConfig) injector.getInstance(serverConfigKey);

    ConnectionHandler<I, O> connectionHandler = injector.getInstance(connectionHandlerKey);

    WebSocketServerBuilder<I, O> builder = RxNetty.newWebSocketServerBuilder(config.getPort(), connectionHandler)
            .withMessageAggregator(config.isMessageAggregator());

    if (injector.getExistingBinding(pipelineConfiguratorKey) != null) {
        builder.appendPipelineConfigurator(injector.getInstance(pipelineConfiguratorKey));
    }

    if (injector.getExistingBinding(metricEventsListenerFactoryKey) != null) {
        builder.withMetricEventsListenerFactory(injector.getInstance(metricEventsListenerFactoryKey));
    }

    server = builder.build().start();
    logger.info("Starting WebSockets server {} on port {}...", nameAnnotation.value(), server.getServerPort());
}
 
源代码15 项目: karyon   文件: KaryonTcpModuleTest.java
@Test
public void testGovernatedTcpServer() throws Exception {
    String message = RxNetty.createTcpClient("localhost", server.getServerPort()).connect()
            .flatMap(new Func1<ObservableConnection<ByteBuf, ByteBuf>, Observable<String>>() {
                @Override
                public Observable<String> call(ObservableConnection<ByteBuf, ByteBuf> connection) {
                    return connection.getInput().map(new Func1<ByteBuf, String>() {
                        @Override
                        public String call(ByteBuf byteBuf) {
                            return byteBuf.toString(Charset.defaultCharset());
                        }
                    });
                }
            }).single().toBlocking().toFuture().get(60, TimeUnit.SECONDS);

    assertEquals("Invalid message received from server", SERVER_MESSAGE, message);
}
 
源代码16 项目: karyon   文件: KaryonWebSocketsModuleTest.java
@Test
public void testGovernatedTcpServer() throws Exception {
    String message = RxNetty.<TextWebSocketFrame, TextWebSocketFrame>newWebSocketClientBuilder("localhost", server.getServerPort())
            .build()
            .connect()
            .flatMap(new Func1<ObservableConnection<TextWebSocketFrame, TextWebSocketFrame>, Observable<String>>() {
                @Override
                public Observable<String> call(ObservableConnection<TextWebSocketFrame, TextWebSocketFrame> connection) {
                    return connection.getInput().map(new Func1<TextWebSocketFrame, String>() {
                        @Override
                        public String call(TextWebSocketFrame frame) {
                            return frame.text();
                        }
                    });
                }
            }).single().toBlocking().toFuture().get(60, TimeUnit.SECONDS);

    assertEquals("Invalid message received from server", SERVER_MESSAGE, message);
}
 
源代码17 项目: WSPerfLab   文件: StartMockService.java
public static void main(String[] args) {
    int port = 8989;
    if (args.length > 0) {
        port = Integer.parseInt(args[0]);
    }
    System.out.println("Starting mock service on port " + port + "...");
    startMonitoring();
    RxNetty.<ByteBuf, ByteBuf>newHttpServerBuilder(port, (request, response) -> {
        try {
            long startTime = System.currentTimeMillis();
            counter.increment(CounterEvent.REQUESTS);
            return handleRequest(request, response)
                    .doOnCompleted(() ->  {
                        counter.increment(CounterEvent.SUCCESS);
                        latency.addValue((int)(System.currentTimeMillis() - startTime));
                    })
                    .doOnError(t -> counter.increment(CounterEvent.NETTY_ERROR));
        } catch (Throwable e) {
            System.err.println("Server => Error [" + request.getPath() + "] => " + e);
            response.setStatus(HttpResponseStatus.BAD_REQUEST);
            counter.increment(CounterEvent.HTTP_ERROR);
            return response.writeStringAndFlush("Error 500: Bad Request\n" + e.getMessage() + '\n');
        }
    }).build().startAndWait();
}
 
源代码18 项目: MarketData   文件: RxNettyEventEventStreamClient.java
private Observable<String> initializeStream() {
    HttpClient<ByteBuf, ServerSentEvent> client =
            RxNetty.createHttpClient("localhost", port, PipelineConfigurators.<ByteBuf>clientSseConfigurator());

    return client.submit(HttpClientRequest.createGet("/hello")).
            flatMap(response -> {
                printResponseHeader(response);
                return response.getContent();
            }).map(serverSentEvent -> serverSentEvent.contentAsString());
}
 
源代码19 项目: MarketData   文件: RxNettyRequestReplyServer.java
public HttpServer<ByteBuf, ByteBuf> createServer() {
    return RxNetty.createHttpServer(port, (request, response) -> {

        HttpRequest httpRequest = new HttpRequest(request.getQueryParameters());
        String content = getResponseContent(httpRequest);
        return response.writeStringAndFlush(content);
    });
}
 
源代码20 项目: MarketData   文件: RxNettyRequestReplyClient.java
@Override
public Observable<String> request(String parameter) {
    return RxNetty.createHttpGet("http://localhost:" + port + "?" + paramName + "=" + parameter)
            .flatMap(response
                    -> response.getContent()
                        .<String> map(content -> content.toString(Charset.defaultCharset()))
            );
}
 
源代码21 项目: MarketData   文件: RxNettyEventServer.java
public HttpServer<ByteBuf, ServerSentEvent> createServer() {
    HttpServer<ByteBuf, ServerSentEvent> server = RxNetty.createHttpServer(port,
            (request, response) -> {
                response.getHeaders().set(ACCESS_CONTROL_ALLOW_ORIGIN, "*");
                response.getHeaders().set(CACHE_CONTROL, "no-cache");
                response.getHeaders().set(CONNECTION, "keep-alive");
                response.getHeaders().set(CONTENT_TYPE, "text/event-stream");
                return getIntervalObservable(request, response);
            }, PipelineConfigurators.<ByteBuf>serveSseConfigurator());
    System.out.println("HTTP Server Sent Events server started...");
    return server;
}
 
源代码22 项目: mesos-rxjava   文件: MesosClient.java
/**
 * Sends the subscribe call to Mesos and starts processing the stream of {@code Receive} events.
 * The {@code streamProcessor} function provided to the constructor will be applied to the stream of events
 * received from Mesos.
 * <p>
 * The stream processing will then process any {@link SinkOperation} that should be sent to Mesos.
 * @return The subscription representing the processing of the event stream. This subscription can then be used
 * to block the invoking thread using {@link AwaitableSubscription#await()} (For example to block a main thread
 * from exiting while events are being processed.)
 */
@NotNull
public AwaitableSubscription openStream() {

    final URI uri = resolveMesosUri(mesosUri);

    final HttpClient<ByteBuf, ByteBuf> httpClient = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder(uri.getHost(), getPort(uri))
        .withName(userAgent.getEntries().get(0).getName())
        .pipelineConfigurator(new HttpClientPipelineConfigurator<>())
        .build();

    final Observable<Receive> receives = createPost.call(subscribe)
        .flatMap(httpClient::submit)
        .subscribeOn(Rx.io())
        .flatMap(verifyResponseOk(subscribe, mesosStreamId, receiveCodec.mediaType()))
        .lift(new RecordIOOperator())
        .compose(backpressureTransformer)
        .observeOn(Rx.compute())
        /* Begin temporary back-pressure */
        .buffer(250, TimeUnit.MILLISECONDS)
        .flatMap(Observable::from)
        /* end temporary back-pressure */
        .map(receiveCodec::decode)
        ;

    final Observable<SinkOperation<Send>> sends = streamProcessor.apply(receives)
        .filter(Optional::isPresent)
        .map(Optional::get);

    final Subscriber<SinkOperation<Send>> subscriber = new SinkSubscriber<>(httpClient, createPost);
    final SubscriberDecorator<SinkOperation<Send>> decorator = new SubscriberDecorator<>(subscriber);
    final Subscription subscription = sends
        .subscribeOn(Rx.compute())
        .observeOn(Rx.compute())
        .subscribe(decorator);

    return new ObservableAwaitableSubscription(Observable.from(exec.submit(decorator)), subscription);
}
 
源代码23 项目: mesos-rxjava   文件: MesosClientIntegrationTest.java
@Test
public void testStreamDoesNotRunWhenSubscribeFails_mesos4xxResponse() throws Throwable {
    final String errorMessage = "Error message that should come from the server";
    final RequestHandler<ByteBuf, ByteBuf> handler = (request, response) -> {
        response.setStatus(HttpResponseStatus.BAD_REQUEST);
        final byte[] msgBytes = errorMessage.getBytes(StandardCharsets.UTF_8);
        response.getHeaders().setHeader("Content-Length", msgBytes.length);
        response.getHeaders().setHeader("Content-Type", "text/plain;charset=utf-8");
        response.writeBytes(msgBytes);
        return response.close();
    };
    final HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(0, handler);
    server.start();
    final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", server.getServerPort()));
    final MesosClient<String, String> client = createClient(uri);

    try {
        client.openStream().await();
        fail("Expect an exception to be propagated up because subscribe will 400");
    } catch (Mesos4xxException e) {
        // expected
        final MesosClientErrorContext ctx = e.getContext();
        assertThat(ctx.getStatusCode()).isEqualTo(400);
        assertThat(ctx.getMessage()).isEqualTo(errorMessage);
    } finally {
        server.shutdown();
    }
}
 
@Test
@Ignore
public void testBurstyObservable_missingBackpressureException() throws Throwable {
    final String subscribedMessage = "{\"type\": \"SUBSCRIBED\",\"subscribed\": {\"framework_id\": {\"value\":\"12220-3440-12532-2345\"},\"heartbeat_interval_seconds\":15.0}";
    final String heartbeatMessage = "{\"type\":\"HEARTBEAT\"}";
    final byte[] hmsg = heartbeatMessage.getBytes(StandardCharsets.UTF_8);
    final byte[] hbytes = String.format("%d\n", heartbeatMessage.getBytes().length).getBytes(StandardCharsets.UTF_8);

    final RequestHandler<ByteBuf, ByteBuf> handler = (request, response) -> {
        response.setStatus(HttpResponseStatus.OK);
        response.getHeaders().setHeader("Content-Type", "text/plain;charset=utf-8");
        writeRecordIOMessage(response, subscribedMessage);
        for (int i = 0; i < 20000; i++) {
            response.writeBytes(hbytes);
            response.writeBytes(hmsg);
        }
        return response.flush();
    };
    final HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(0, handler);
    server.start();
    final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", server.getServerPort()));
    final MesosClient<String, String> client = createClientForStreaming(uri).build();

    try {
        client.openStream().await();
        fail("Expect an exception to be propagated up due to backpressure");
    } catch (MissingBackpressureException e) {
        // expected
        e.printStackTrace();
        assertThat(e.getMessage()).isNullOrEmpty();
    } finally {
        server.shutdown();
    }
}
 
@Test
public void testBurstyObservable_unboundedBufferSucceeds() throws Throwable {
    msgNo = 0;
    final int numMessages = 20000;
    final String subscribedMessage = "{\"type\": \"SUBSCRIBED\",\"subscribed\": {\"framework_id\": {\"value\":\"12220-3440-12532-2345\"},\"heartbeat_interval_seconds\":15.0}";
    final String heartbeatMessage = "{\"type\":\"HEARTBEAT\"}";
    final RequestHandler<ByteBuf, ByteBuf> handler = (request, response) -> {
        response.setStatus(HttpResponseStatus.OK);
        response.getHeaders().setHeader("Content-Type", "text/plain;charset=utf-8");
        writeRecordIOMessage(response, subscribedMessage);
        for (int i = 0; i < numMessages; i++) {
            writeRecordIOMessage(response, heartbeatMessage);
        }
        return response.close();
    };
    final HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(0, handler);
    server.start();
    final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", server.getServerPort()));
    final MesosClient<String, String> client = createClientForStreaming(uri)
            .onBackpressureBuffer()
            .build();

    try {
        client.openStream().await();
    } finally {
        // 20000 heartbeats PLUS 1 subscribe
        assertEquals("All heartbeats received (plus the subscribed)", 1 + numMessages, msgNo);
        server.shutdown();
    }
}
 
源代码26 项目: mesos-rxjava   文件: TcpSocketProxyTest.java
@Before
public void setUp() throws Exception {
    server = RxNetty.createHttpServer(0, (request, response) -> {
        response.writeString("Hello World");
        return response.close();
    });
    server.start();
}
 
源代码27 项目: mesos-rxjava   文件: TcpSocketProxyTest.java
@Test
public void testConnectionTerminatedOnClose() throws Exception {
    final TcpSocketProxy proxy = new TcpSocketProxy(
        new InetSocketAddress("localhost", 0),
        new InetSocketAddress("localhost", server.getServerPort())
    );
    proxy.start();

    final int listenPort = proxy.getListenPort();
    final HttpClient<ByteBuf, ByteBuf> client = RxNetty.createHttpClient("localhost", listenPort);

    final String first = client.submit(HttpClientRequest.createGet("/"))
        .flatMap(AbstractHttpContentHolder::getContent)
        .map(bb -> bb.toString(StandardCharsets.UTF_8))
        .toBlocking()
        .first();

    assertThat(first).isEqualTo("Hello World");
    LOGGER.info("first request done");
    proxy.shutdown();
    if (proxy.isShutdown()) {
        proxy.close();
    } else {
        fail("proxy should have been shutdown");
    }

    try {
        final URI uri = URI.create(String.format("http://localhost:%d/", listenPort));
        uri.toURL().getContent();
        fail("Shouldn't have been able to get content");
    } catch (IOException e) {
        // expected
    }
}
 
源代码28 项目: netty-cookbook   文件: NettyRxJavaServer.java
public static void main(String... args) throws InterruptedException {
    HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(8080, 
    		(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) -> {
        System.out.println("Server => Request: " + request.getPath());            
        try {
            if ("/error".equals(request.getPath())) {
                throw new RuntimeException("forced error");
            }
            response.setStatus(HttpResponseStatus.OK);
            response.writeString("Path Requested =>: " + request.getPath() + '\n');
            return response.close();
        } catch (Throwable e) {
            System.err.println("Server => Error [" + request.getPath() + "] => " + e);
            response.setStatus(HttpResponseStatus.BAD_REQUEST);
            response.writeString("Error 500: Bad Request\n");
            return response.close();
        }
    });

    server.startAndWait();

    RxNetty.createHttpGet("http://localhost:8080/")
           .flatMap(response -> response.getContent())
           .map(data -> "Client => " + data.toString(Charset.defaultCharset()))
           .toBlocking().forEach(System.out::println);

    RxNetty.createHttpGet("http://localhost:8080/error")
           .flatMap(response -> response.getContent())
           .map(data -> "Client => " + data.toString(Charset.defaultCharset()))
           .toBlocking().forEach(System.out::println);

    RxNetty.createHttpGet("http://localhost:8080/data")
           .flatMap(response -> response.getContent())
           .map(data -> "Client => " + data.toString(Charset.defaultCharset()))
           .toBlocking().forEach(System.out::println);

    //server.shutdown();
}
 
源代码29 项目: ReactiveLab   文件: StartEurekaServer.java
private static void startEurekaDashboard(final int port, EurekaClient client) {
    final StaticFileHandler staticFileHandler = new StaticFileHandler();

    RxNetty.createHttpServer(port, (request, response) -> {
        if (request.getUri().startsWith("/dashboard")) {
            return staticFileHandler.handle(request, response);
        } else if (request.getUri().startsWith("/data")) {
            response.getHeaders().set(HttpHeaders.Names.CONTENT_TYPE, "text/event-stream");
            response.getHeaders().set(HttpHeaders.Names.CACHE_CONTROL, "no-cache");
            return client.forInterest(Interests.forFullRegistry())
                    .flatMap(notification -> {
                        ByteBuf data = response.getAllocator().buffer();
                        data.writeBytes("data: ".getBytes());
                        Map<String, String> dataAttributes = new HashMap<>();
                        dataAttributes.put("type", notification.getKind().toString());
                        dataAttributes.put("instance-id", notification.getData().getId());
                        dataAttributes.put("vip", notification.getData().getVipAddress());
                        if (notification.getData().getStatus() != null) {
                            dataAttributes.put("status", notification.getData().getStatus().name());
                        }
                        HashSet<ServicePort> servicePorts = notification.getData().getPorts();
                        int port1 = servicePorts.iterator().next().getPort();
                        dataAttributes.put("port", String.valueOf(port1));
                        String jsonData = SimpleJson.mapToJson(dataAttributes);
                        data.writeBytes(jsonData.getBytes());
                        data.writeBytes("\n\n".getBytes());
                        return response.writeBytesAndFlush(data);
                    });
        } else {
            response.setStatus(HttpResponseStatus.NOT_FOUND);
            return Observable.empty();
        }
    }).start();
}
 
源代码30 项目: ReactiveLab   文件: AbstractMiddleTierService.java
public HttpServer<ByteBuf, ServerSentEvent> createServer(int port) {
    System.out.println("Start " + getClass().getSimpleName() + " on port: " + port);

    // declare handler chain (wrapped in Hystrix)
    // TODO create a better way of chaining these (related https://github.com/ReactiveX/RxNetty/issues/232 and https://github.com/ReactiveX/RxNetty/issues/202)
    HystrixMetricsStreamHandler<ByteBuf, ServerSentEvent> handlerChain 
      = new HystrixMetricsStreamHandler<>(metrics, "/hystrix.stream", 1000, (request, response) -> {
        try {
            long startTime = System.currentTimeMillis();
            return handleRequest(request, response)
                    .doOnCompleted(() -> System.out.println("Response => " + request.getPath() + " Time => " + (int) (System.currentTimeMillis() - startTime) + "ms"))
                    .doOnCompleted(() -> metrics.getRollingPercentile().addValue((int) (System.currentTimeMillis() - startTime)))
                    .doOnCompleted(() -> metrics.getRollingNumber().add(Metrics.EventType.SUCCESS, 1))
                    .doOnError(t -> metrics.getRollingNumber().add(Metrics.EventType.FAILURE, 1));
        } catch (Throwable e) {
            e.printStackTrace();
            System.err.println("Server => Error [" + request.getPath() + "] => " + e);
            response.setStatus(HttpResponseStatus.BAD_REQUEST);
            return response.writeStringAndFlush("data: Error 500: Bad Request\n" + e.getMessage() + "\n");
        }
    });

    return RxNetty.createHttpServer(port, (request, response) -> {
        // System.out.println("Server => Request: " + request.getPath());
            return handlerChain.handle(request, response);
        }, PipelineConfigurators.<ByteBuf> serveSseConfigurator());
}
 
 类所在包
 同包方法