类io.reactivex.netty.protocol.http.server.RequestHandler源码实例Demo

下面列出了怎么用io.reactivex.netty.protocol.http.server.RequestHandler的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: mesos-rxjava   文件: MesosClientIntegrationTest.java
@Test
public void testStreamDoesNotRunWhenSubscribeFails_mesos5xxResponse() throws Throwable {
    final RequestHandler<ByteBuf, ByteBuf> handler = (request, response) -> {
        response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
        return response.close();
    };
    final HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(0, handler);
    server.start();
    final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", server.getServerPort()));
    final MesosClient<String, String> client = createClient(uri);

    try {
        client.openStream().await();
        fail("Expect an exception to be propagated up because subscribe will 500");
    } catch (Mesos5xxException e) {
        // expected
        final MesosClientErrorContext ctx = e.getContext();
        assertThat(ctx.getStatusCode()).isEqualTo(500);
    } finally {
        server.shutdown();
    }
}
 
源代码2 项目: mesos-rxjava   文件: MesosClientIntegrationTest.java
@Test
public void testStreamDoesNotRunWhenSubscribeFails_mismatchContentType() throws Throwable {
    final RequestHandler<ByteBuf, ByteBuf> handler = (request, response) -> {
        response.setStatus(HttpResponseStatus.OK);
        response.getHeaders().setHeader("Content-Type", "application/json");
        return response.close();
    };
    final HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(0, handler);
    server.start();
    final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", server.getServerPort()));
    final MesosClient<String, String> client = createClient(uri);

    try {
        client.openStream().await();
        fail("Expect an exception to be propagated up because of content type mismatch");
    } catch (MesosException e) {
        // expected
        final MesosClientErrorContext ctx = e.getContext();
        assertThat(ctx.getStatusCode()).isEqualTo(200);
        assertThat(ctx.getMessage()).isEqualTo("Response had Content-Type \"application/json\" expected \"text/plain;charset=utf-8\"");
    } finally {
        server.shutdown();
    }
}
 
源代码3 项目: ribbon   文件: RxMovieServer.java
public HttpServer<ByteBuf, ByteBuf> createServer() {
    HttpServer<ByteBuf, ByteBuf> server = RxNetty.newHttpServerBuilder(port, new RequestHandler<ByteBuf, ByteBuf>() {
        @Override
        public Observable<Void> handle(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
            if (request.getPath().contains("/users")) {
                if (request.getHttpMethod().equals(HttpMethod.GET)) {
                    return handleRecommendationsByUserId(request, response);
                } else {
                    return handleUpdateRecommendationsForUser(request, response);
                }
            }
            if (request.getPath().contains("/recommendations")) {
                return handleRecommendationsBy(request, response);
            }
            if (request.getPath().contains("/movies")) {
                return handleRegisterMovie(request, response);
            }
            response.setStatus(HttpResponseStatus.NOT_FOUND);
            return response.close();
        }
    }).pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpServerConfigurator()).enableWireLogging(LogLevel.ERROR).build();

    System.out.println("RxMovie server started...");
    return server;
}
 
源代码4 项目: mesos-rxjava   文件: MesosClientIntegrationTest.java
@Test
public void testStreamDoesNotRunWhenSubscribeFails_mesos4xxResponse() throws Throwable {
    final String errorMessage = "Error message that should come from the server";
    final RequestHandler<ByteBuf, ByteBuf> handler = (request, response) -> {
        response.setStatus(HttpResponseStatus.BAD_REQUEST);
        final byte[] msgBytes = errorMessage.getBytes(StandardCharsets.UTF_8);
        response.getHeaders().setHeader("Content-Length", msgBytes.length);
        response.getHeaders().setHeader("Content-Type", "text/plain;charset=utf-8");
        response.writeBytes(msgBytes);
        return response.close();
    };
    final HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(0, handler);
    server.start();
    final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", server.getServerPort()));
    final MesosClient<String, String> client = createClient(uri);

    try {
        client.openStream().await();
        fail("Expect an exception to be propagated up because subscribe will 400");
    } catch (Mesos4xxException e) {
        // expected
        final MesosClientErrorContext ctx = e.getContext();
        assertThat(ctx.getStatusCode()).isEqualTo(400);
        assertThat(ctx.getMessage()).isEqualTo(errorMessage);
    } finally {
        server.shutdown();
    }
}
 
@Test
@Ignore
public void testBurstyObservable_missingBackpressureException() throws Throwable {
    final String subscribedMessage = "{\"type\": \"SUBSCRIBED\",\"subscribed\": {\"framework_id\": {\"value\":\"12220-3440-12532-2345\"},\"heartbeat_interval_seconds\":15.0}";
    final String heartbeatMessage = "{\"type\":\"HEARTBEAT\"}";
    final byte[] hmsg = heartbeatMessage.getBytes(StandardCharsets.UTF_8);
    final byte[] hbytes = String.format("%d\n", heartbeatMessage.getBytes().length).getBytes(StandardCharsets.UTF_8);

    final RequestHandler<ByteBuf, ByteBuf> handler = (request, response) -> {
        response.setStatus(HttpResponseStatus.OK);
        response.getHeaders().setHeader("Content-Type", "text/plain;charset=utf-8");
        writeRecordIOMessage(response, subscribedMessage);
        for (int i = 0; i < 20000; i++) {
            response.writeBytes(hbytes);
            response.writeBytes(hmsg);
        }
        return response.flush();
    };
    final HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(0, handler);
    server.start();
    final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", server.getServerPort()));
    final MesosClient<String, String> client = createClientForStreaming(uri).build();

    try {
        client.openStream().await();
        fail("Expect an exception to be propagated up due to backpressure");
    } catch (MissingBackpressureException e) {
        // expected
        e.printStackTrace();
        assertThat(e.getMessage()).isNullOrEmpty();
    } finally {
        server.shutdown();
    }
}
 
@Test
public void testBurstyObservable_unboundedBufferSucceeds() throws Throwable {
    msgNo = 0;
    final int numMessages = 20000;
    final String subscribedMessage = "{\"type\": \"SUBSCRIBED\",\"subscribed\": {\"framework_id\": {\"value\":\"12220-3440-12532-2345\"},\"heartbeat_interval_seconds\":15.0}";
    final String heartbeatMessage = "{\"type\":\"HEARTBEAT\"}";
    final RequestHandler<ByteBuf, ByteBuf> handler = (request, response) -> {
        response.setStatus(HttpResponseStatus.OK);
        response.getHeaders().setHeader("Content-Type", "text/plain;charset=utf-8");
        writeRecordIOMessage(response, subscribedMessage);
        for (int i = 0; i < numMessages; i++) {
            writeRecordIOMessage(response, heartbeatMessage);
        }
        return response.close();
    };
    final HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(0, handler);
    server.start();
    final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", server.getServerPort()));
    final MesosClient<String, String> client = createClientForStreaming(uri)
            .onBackpressureBuffer()
            .build();

    try {
        client.openStream().await();
    } finally {
        // 20000 heartbeats PLUS 1 subscribe
        assertEquals("All heartbeats received (plus the subscribed)", 1 + numMessages, msgNo);
        server.shutdown();
    }
}
 
源代码7 项目: Prana   文件: HostsHandlerTest.java
@Override
protected RequestHandler<ByteBuf, ByteBuf> getHandler() {
    ArrayList<InstanceInfo> instanceInfos = new ArrayList<>();
    instanceInfos.add(InstanceInfo.Builder.newBuilder().setAppName("foo").setVIPAddress("bar").setHostName("host1").build());
    instanceInfos.add(InstanceInfo.Builder.newBuilder().setAppName("foo").setVIPAddress("bar").setHostName("host2").build());
    when(hostService.getHosts("foo")).thenReturn(instanceInfos);
    return new HostsHandler(hostService, new ObjectMapper());
}
 
源代码8 项目: karyon   文件: KaryonHttpModule.java
protected KaryonHttpModule(String moduleName, Class<I> iType, Class<O> oType) {
    super(moduleName, iType, oType);

    routerKey = keyFor(RequestHandler.class, iType, oType, nameAnnotation);
    interceptorSupportKey = keyFor(GovernatorHttpInterceptorSupport.class, iType, oType, nameAnnotation);
    httpServerKey = keyFor(HttpServer.class, iType, oType, nameAnnotation);
}
 
源代码9 项目: karyon   文件: HttpRxServerProvider.java
public HttpRxServerProvider(String name, Class<I> iType, Class<O> oType) {
    nameAnnotation = Names.named(name);

    routerKey = keyFor(RequestHandler.class, iType, oType, nameAnnotation);
    interceptorSupportKey = keyFor(GovernatorHttpInterceptorSupport.class, iType, oType, nameAnnotation);
    pipelineConfiguratorKey = Key.get(PipelineConfigurator.class, nameAnnotation);
    metricEventsListenerFactoryKey = Key.get(MetricEventsListenerFactory.class, nameAnnotation);
    serverConfigKey = Key.get(ServerConfig.class, nameAnnotation);
}
 
源代码10 项目: karyon   文件: HttpRxServerProvider.java
@SuppressWarnings("rawtypes")
@Inject
public void setInjector(Injector injector) {
    HttpServerConfig config = (HttpServerConfig) injector.getInstance(serverConfigKey);

    RequestHandler router = injector.getInstance(routerKey);

    GovernatorHttpInterceptorSupport<I, O> interceptorSupport = injector.getInstance(interceptorSupportKey);
    interceptorSupport.finish(injector);
    HttpRequestHandler<I, O> httpRequestHandler = new HttpRequestHandler<I, O>(router, interceptorSupport);

    HttpServerBuilder<I, O> builder = KaryonTransport.newHttpServerBuilder(config.getPort(), httpRequestHandler);

    if (config.requiresThreadPool()) {
        builder.withRequestProcessingThreads(config.getThreadPoolSize());
    }

    if (injector.getExistingBinding(pipelineConfiguratorKey) != null) {
        builder.appendPipelineConfigurator(injector.getInstance(pipelineConfiguratorKey));
    }

    if (injector.getExistingBinding(metricEventsListenerFactoryKey) != null) {
        builder.withMetricEventsListenerFactory(injector.getInstance(metricEventsListenerFactoryKey));
    }

    httpServer = builder.build().start();
    logger.info("Starting server {} on port {}...", nameAnnotation.value(), httpServer.getServerPort());
}
 
源代码11 项目: karyon   文件: Karyon.java
/**
 * Creates a new {@link KaryonServer} that has a single HTTP server instance which delegates all request
 * handling to {@link RequestHandler}.
 * The {@link HttpServer} is created using {@link KaryonTransport#newHttpServer(int, HttpRequestHandler)}
 *
 * @param port Port for the server.
 * @param handler Request Handler
 * @param bootstrapModules Additional bootstrapModules if any.
 *
 * @return {@link KaryonServer} which is to be used to start the created server.
 */
public static KaryonServer forRequestHandler(int port, final RequestHandler<ByteBuf, ByteBuf> handler,
                                             BootstrapModule... bootstrapModules) {
    HttpServer<ByteBuf, ByteBuf> httpServer =
            KaryonTransport.newHttpServer(port, new RequestHandler<ByteBuf, ByteBuf>() {
                @Override
                public Observable<Void> handle(HttpServerRequest<ByteBuf> request,
                                               HttpServerResponse<ByteBuf> response) {
                    return handler.handle(request, response);
                }
            });
    return new RxNettyServerBackedServer(httpServer, bootstrapModules);
}
 
源代码12 项目: ReactiveLab   文件: HystrixMetricsStreamHandler.java
public HystrixMetricsStreamHandler(Metrics metrics, String hystrixPrefix, long interval, RequestHandler<I, O> appHandler) {
    this.metrics = metrics;
    this.hystrixPrefix = hystrixPrefix;
    this.interval = interval;
    this.appHandler = appHandler;
}
 
源代码13 项目: ReactiveLab   文件: HystrixMetricsStreamHandler.java
public HystrixMetricsStreamHandler(RequestHandler<I, O> appHandler) {
    this(DEFAULT_HYSTRIX_PREFIX, DEFAULT_INTERVAL, appHandler);
}
 
源代码14 项目: ReactiveLab   文件: HystrixMetricsStreamHandler.java
public HystrixMetricsStreamHandler(String hystrixPrefix, long interval, RequestHandler<I, O> appHandler) {
    this.hystrixPrefix = hystrixPrefix;
    this.interval = interval;
    this.appHandler = appHandler;
}
 
源代码15 项目: Prana   文件: HealthCheckHandlerTest.java
@Override
protected RequestHandler<ByteBuf, ByteBuf> getHandler() {
    return new HealthCheckHandler(objectMapper);
}
 
源代码16 项目: Prana   文件: StatusHandlerTest.java
@Override
protected RequestHandler<ByteBuf, ByteBuf> getHandler() {
    return new StatusHandler(objectMapper, applicationInfoManager);
}
 
源代码17 项目: Prana   文件: PingHandlerTest.java
@Override
protected RequestHandler<ByteBuf, ByteBuf> getHandler() {
    return new PingHandler(objectMapper);
}
 
源代码18 项目: Prana   文件: DynamicPropertiesHandlerTest.java
@Override
protected RequestHandler<ByteBuf, ByteBuf> getHandler() {
    return new DynamicPropertiesHandler(objectMapper);
}
 
源代码19 项目: karyon   文件: KaryonHttpModule.java
protected LinkedBindingBuilder<RequestHandler<I, O>> bindRouter() {
    return bind(routerKey);
}
 
源代码20 项目: karyon   文件: KaryonTransport.java
public static <I, O> HttpServerBuilder<I, O> newHttpServerBuilder(int port, RequestHandler<I, O> router) {
    return RxContexts.newHttpServerBuilder(port, new HttpRequestHandler<I, O>(router), RxContexts.DEFAULT_CORRELATOR);
}
 
源代码21 项目: karyon   文件: KaryonTransport.java
public static <I, O> HttpServer<I, O> newHttpServer(int port, RequestHandler<I, O> router) {
    return newHttpServerBuilder(port, router).build();
}
 
源代码22 项目: karyon   文件: HttpRequestHandler.java
public HttpRequestHandler(RequestHandler<I, O> router) {
    this(router, new HttpInterceptorSupport<I, O>());
}
 
源代码23 项目: karyon   文件: HttpRequestHandler.java
public HttpRequestHandler(RequestHandler<I, O> router,
                          AbstractInterceptorSupport<HttpServerRequest<I>, HttpServerResponse<O>, HttpKeyEvaluationContext, ?, ?> interceptorSupport) {
    executor = new InterceptorExecutor<HttpServerRequest<I>, HttpServerResponse<O>, HttpKeyEvaluationContext>(interceptorSupport, router);
}
 
源代码24 项目: karyon   文件: HttpRequestHandlerBuilder.java
public HttpRequestHandlerBuilder(RequestHandler<I, O> router) {
    this(new HttpInterceptorSupport<I, O>(), router);
}
 
源代码25 项目: karyon   文件: HttpRequestHandlerBuilder.java
public HttpRequestHandlerBuilder(HttpInterceptorSupport<I, O> interceptorSupport,
                                 RequestHandler<I, O> router) {
    this.interceptorSupport = interceptorSupport;
    this.router = router;
}
 
源代码26 项目: karyon   文件: HttpRequestHandlerBuilder.java
public RequestHandler<I, O> getRouter() {
    return router;
}
 
源代码27 项目: karyon   文件: SimpleUriRouter.java
public Route(InterceptorKey<HttpServerRequest<I>, HttpKeyEvaluationContext> key,
              RequestHandler<I, O> handler) {
    this.key = key;
    this.handler = handler;
}
 
源代码28 项目: karyon   文件: SimpleUriRouter.java
public RequestHandler<I, O> getHandler() {
    return handler;
}
 
源代码29 项目: karyon   文件: Karyon.java
/**
 * Creates a new {@link KaryonServer} that has a single HTTP server instance which delegates all request
 * handling to {@link RequestHandler}.
 * The {@link HttpServer} is created using {@link KaryonTransport#newHttpServer(int, HttpRequestHandler)}
 *
 * @param port Port for the server.
 * @param handler Request Handler
 * @param modules Additional modules if any.
 *
 * @return {@link KaryonServer} which is to be used to start the created server.
 */
public static KaryonServer forRequestHandler(int port, final RequestHandler<ByteBuf, ByteBuf> handler,
                                             Module... modules) {
    return forRequestHandler(port, handler, toBootstrapModule(modules));
}
 
源代码30 项目: karyon   文件: SimpleUriRouter.java
/**
 * Add a new URI -&lt; Handler route to this router.
 * @param uri URI to match.
 * @param handler Request handler.
 * @return The updated router.
 */
public SimpleUriRouter<I, O> addUri(String uri, RequestHandler<I, O> handler) {
    routes.add(new Route(new ServletStyleUriConstraintKey<I>(uri, ""), handler));
    return this;
}
 
 类所在包
 同包方法