下面列出了怎么用io.reactivex.netty.protocol.http.server.HttpServer的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void init() {
server = HttpServer.newServer()
.enableWireLogging(LogLevel.DEBUG)
.start((req, resp) -> {
if (req.getHeader(Constants.HAWKULAR_APM_TRACEID) == null) {
return resp.setStatus(HttpResponseStatus.BAD_REQUEST);
}
if (req.getHttpMethod() == HttpMethod.POST
|| req.getHttpMethod() == HttpMethod.PUT) {
req.getContent().subscribe(bb -> System.out.println("DATA = " + bb.toString()));
}
resp.setStatus(HttpResponseStatus.OK);
return resp;
}
);
super.init();
}
@Override
public void init() {
server = HttpServer.newServer()
.enableWireLogging(LogLevel.DEBUG)
.start((req, resp) -> {
if (req.getHeader(Constants.HAWKULAR_APM_TRACEID) == null) {
return resp.setStatus(HttpResponseStatus.BAD_REQUEST);
}
if (req.getHttpMethod() == HttpMethod.POST
|| req.getHttpMethod() == HttpMethod.PUT) {
req.getContent().subscribe(bb -> System.out.println("DATA = " + bb.toString()));
}
return resp.writeString(Observable.just(HELLO_WORLD));
});
super.init();
}
@Test
public void testStreamDoesNotRunWhenSubscribeFails_mesos5xxResponse() throws Throwable {
final RequestHandler<ByteBuf, ByteBuf> handler = (request, response) -> {
response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
return response.close();
};
final HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(0, handler);
server.start();
final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", server.getServerPort()));
final MesosClient<String, String> client = createClient(uri);
try {
client.openStream().await();
fail("Expect an exception to be propagated up because subscribe will 500");
} catch (Mesos5xxException e) {
// expected
final MesosClientErrorContext ctx = e.getContext();
assertThat(ctx.getStatusCode()).isEqualTo(500);
} finally {
server.shutdown();
}
}
@Test
public void testStreamDoesNotRunWhenSubscribeFails_mismatchContentType() throws Throwable {
final RequestHandler<ByteBuf, ByteBuf> handler = (request, response) -> {
response.setStatus(HttpResponseStatus.OK);
response.getHeaders().setHeader("Content-Type", "application/json");
return response.close();
};
final HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(0, handler);
server.start();
final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", server.getServerPort()));
final MesosClient<String, String> client = createClient(uri);
try {
client.openStream().await();
fail("Expect an exception to be propagated up because of content type mismatch");
} catch (MesosException e) {
// expected
final MesosClientErrorContext ctx = e.getContext();
assertThat(ctx.getStatusCode()).isEqualTo(200);
assertThat(ctx.getMessage()).isEqualTo("Response had Content-Type \"application/json\" expected \"text/plain;charset=utf-8\"");
} finally {
server.shutdown();
}
}
public static void main(String[] args) {
/**
* Start our HTTP server.
*/
HttpServer<ByteBuf, ByteBuf> server = startServer(8088);
/**
* Submit the request.
*/
createRequest("localhost", server.getServerPort())
/* Block till you get the response. In a real world application, one should not be blocked but chained
* into a response to the caller. */
.toBlocking()
/**
* Print each content of the response.
*/
.forEach(System.out::println);
}
public static HttpServer<ByteBuf, ByteBuf> startServer(int port) {
/**
* Creates an HTTP server which returns "Hello World!" responses.
*/
return RxNetty.createHttpServer(port,
/*
* HTTP Request handler for RxNetty where you control what you write as the
* response for each and every request the server receives.
*/
(request, response) -> {
/**
* In a real server, you would be writing different responses based on the
* URI of the request.
* This example just returns a "Hello World!!" string unconditionally.
*/
return response.writeStringAndFlush("Hello World!!");
})
.start();
}
public HttpServer<ByteBuf, ByteBuf> createServer() {
HttpServer<ByteBuf, ByteBuf> server = RxNetty.newHttpServerBuilder(port, new RequestHandler<ByteBuf, ByteBuf>() {
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
if (request.getPath().contains("/users")) {
if (request.getHttpMethod().equals(HttpMethod.GET)) {
return handleRecommendationsByUserId(request, response);
} else {
return handleUpdateRecommendationsForUser(request, response);
}
}
if (request.getPath().contains("/recommendations")) {
return handleRecommendationsBy(request, response);
}
if (request.getPath().contains("/movies")) {
return handleRegisterMovie(request, response);
}
response.setStatus(HttpResponseStatus.NOT_FOUND);
return response.close();
}
}).pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpServerConfigurator()).enableWireLogging(LogLevel.ERROR).build();
System.out.println("RxMovie server started...");
return server;
}
@Setup
public void setup() {
server = HttpServer.newServer(SERVER_PORT)
.start((request, response) -> null);
client = new OkHttpClient();
client.retryOnConnectionFailure();
okFeign = Feign.builder()
.client(new feign.okhttp.OkHttpClient(client))
.logLevel(Level.NONE)
.logger(new Logger.ErrorLogger())
.retryer(new Retryer.Default())
.target(FeignTestInterface.class, "http://localhost:" + SERVER_PORT);
queryRequest = new Request.Builder()
.url("http://localhost:" + SERVER_PORT + "/?Action=GetUser&Version=2010-05-08&limit=1")
.build();
}
public HttpServer<ByteBuf, ServerSentEvent> createServer() {
if (flaky) {
events = SubscriptionLimiter
.limitSubscriptions(1,initializeEventStream());
} else {
events = initializeEventStream();
}
return super.createServer();
}
public HttpServer<ByteBuf, ByteBuf> createServer() {
return RxNetty.createHttpServer(port, (request, response) -> {
HttpRequest httpRequest = new HttpRequest(request.getQueryParameters());
String content = getResponseContent(httpRequest);
return response.writeStringAndFlush(content);
});
}
public HttpServer<ByteBuf, ServerSentEvent> createServer() {
HttpServer<ByteBuf, ServerSentEvent> server = RxNetty.createHttpServer(port,
(request, response) -> {
response.getHeaders().set(ACCESS_CONTROL_ALLOW_ORIGIN, "*");
response.getHeaders().set(CACHE_CONTROL, "no-cache");
response.getHeaders().set(CONNECTION, "keep-alive");
response.getHeaders().set(CONTENT_TYPE, "text/event-stream");
return getIntervalObservable(request, response);
}, PipelineConfigurators.<ByteBuf>serveSseConfigurator());
System.out.println("HTTP Server Sent Events server started...");
return server;
}
@Test
public void testStreamDoesNotRunWhenSubscribeFails_mesos4xxResponse() throws Throwable {
final String errorMessage = "Error message that should come from the server";
final RequestHandler<ByteBuf, ByteBuf> handler = (request, response) -> {
response.setStatus(HttpResponseStatus.BAD_REQUEST);
final byte[] msgBytes = errorMessage.getBytes(StandardCharsets.UTF_8);
response.getHeaders().setHeader("Content-Length", msgBytes.length);
response.getHeaders().setHeader("Content-Type", "text/plain;charset=utf-8");
response.writeBytes(msgBytes);
return response.close();
};
final HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(0, handler);
server.start();
final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", server.getServerPort()));
final MesosClient<String, String> client = createClient(uri);
try {
client.openStream().await();
fail("Expect an exception to be propagated up because subscribe will 400");
} catch (Mesos4xxException e) {
// expected
final MesosClientErrorContext ctx = e.getContext();
assertThat(ctx.getStatusCode()).isEqualTo(400);
assertThat(ctx.getMessage()).isEqualTo(errorMessage);
} finally {
server.shutdown();
}
}
@Test
@Ignore
public void testBurstyObservable_missingBackpressureException() throws Throwable {
final String subscribedMessage = "{\"type\": \"SUBSCRIBED\",\"subscribed\": {\"framework_id\": {\"value\":\"12220-3440-12532-2345\"},\"heartbeat_interval_seconds\":15.0}";
final String heartbeatMessage = "{\"type\":\"HEARTBEAT\"}";
final byte[] hmsg = heartbeatMessage.getBytes(StandardCharsets.UTF_8);
final byte[] hbytes = String.format("%d\n", heartbeatMessage.getBytes().length).getBytes(StandardCharsets.UTF_8);
final RequestHandler<ByteBuf, ByteBuf> handler = (request, response) -> {
response.setStatus(HttpResponseStatus.OK);
response.getHeaders().setHeader("Content-Type", "text/plain;charset=utf-8");
writeRecordIOMessage(response, subscribedMessage);
for (int i = 0; i < 20000; i++) {
response.writeBytes(hbytes);
response.writeBytes(hmsg);
}
return response.flush();
};
final HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(0, handler);
server.start();
final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", server.getServerPort()));
final MesosClient<String, String> client = createClientForStreaming(uri).build();
try {
client.openStream().await();
fail("Expect an exception to be propagated up due to backpressure");
} catch (MissingBackpressureException e) {
// expected
e.printStackTrace();
assertThat(e.getMessage()).isNullOrEmpty();
} finally {
server.shutdown();
}
}
@Test
public void testBurstyObservable_unboundedBufferSucceeds() throws Throwable {
msgNo = 0;
final int numMessages = 20000;
final String subscribedMessage = "{\"type\": \"SUBSCRIBED\",\"subscribed\": {\"framework_id\": {\"value\":\"12220-3440-12532-2345\"},\"heartbeat_interval_seconds\":15.0}";
final String heartbeatMessage = "{\"type\":\"HEARTBEAT\"}";
final RequestHandler<ByteBuf, ByteBuf> handler = (request, response) -> {
response.setStatus(HttpResponseStatus.OK);
response.getHeaders().setHeader("Content-Type", "text/plain;charset=utf-8");
writeRecordIOMessage(response, subscribedMessage);
for (int i = 0; i < numMessages; i++) {
writeRecordIOMessage(response, heartbeatMessage);
}
return response.close();
};
final HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(0, handler);
server.start();
final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", server.getServerPort()));
final MesosClient<String, String> client = createClientForStreaming(uri)
.onBackpressureBuffer()
.build();
try {
client.openStream().await();
} finally {
// 20000 heartbeats PLUS 1 subscribe
assertEquals("All heartbeats received (plus the subscribed)", 1 + numMessages, msgNo);
server.shutdown();
}
}
protected static SocketAddress startServer(long latencyMillis) {
return HttpServer.newServer()
.start((request, response) -> {
return Observable.timer(latencyMillis, TimeUnit.MILLISECONDS)
.flatMap(aTick -> response.addHeader("X-Instance",
response.unsafeNettyChannel()
.localAddress())
.setStatus(HttpResponseStatus.OK));
})
.getServerAddress();
}
protected static SocketAddress startServer(HttpResponseStatus cannedStatus) {
return HttpServer.newServer()
.start((request, response) -> {
return response.addHeader("X-Instance", response.unsafeNettyChannel().localAddress())
.setStatus(cannedStatus);
})
.getServerAddress();
}
public static void main(String... args) throws InterruptedException {
HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(8080,
(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) -> {
System.out.println("Server => Request: " + request.getPath());
try {
if ("/error".equals(request.getPath())) {
throw new RuntimeException("forced error");
}
response.setStatus(HttpResponseStatus.OK);
response.writeString("Path Requested =>: " + request.getPath() + '\n');
return response.close();
} catch (Throwable e) {
System.err.println("Server => Error [" + request.getPath() + "] => " + e);
response.setStatus(HttpResponseStatus.BAD_REQUEST);
response.writeString("Error 500: Bad Request\n");
return response.close();
}
});
server.startAndWait();
RxNetty.createHttpGet("http://localhost:8080/")
.flatMap(response -> response.getContent())
.map(data -> "Client => " + data.toString(Charset.defaultCharset()))
.toBlocking().forEach(System.out::println);
RxNetty.createHttpGet("http://localhost:8080/error")
.flatMap(response -> response.getContent())
.map(data -> "Client => " + data.toString(Charset.defaultCharset()))
.toBlocking().forEach(System.out::println);
RxNetty.createHttpGet("http://localhost:8080/data")
.flatMap(response -> response.getContent())
.map(data -> "Client => " + data.toString(Charset.defaultCharset()))
.toBlocking().forEach(System.out::println);
//server.shutdown();
}
public HttpServer<ByteBuf, ServerSentEvent> createServer(int port) {
System.out.println("Start " + getClass().getSimpleName() + " on port: " + port);
// declare handler chain (wrapped in Hystrix)
// TODO create a better way of chaining these (related https://github.com/ReactiveX/RxNetty/issues/232 and https://github.com/ReactiveX/RxNetty/issues/202)
HystrixMetricsStreamHandler<ByteBuf, ServerSentEvent> handlerChain
= new HystrixMetricsStreamHandler<>(metrics, "/hystrix.stream", 1000, (request, response) -> {
try {
long startTime = System.currentTimeMillis();
return handleRequest(request, response)
.doOnCompleted(() -> System.out.println("Response => " + request.getPath() + " Time => " + (int) (System.currentTimeMillis() - startTime) + "ms"))
.doOnCompleted(() -> metrics.getRollingPercentile().addValue((int) (System.currentTimeMillis() - startTime)))
.doOnCompleted(() -> metrics.getRollingNumber().add(Metrics.EventType.SUCCESS, 1))
.doOnError(t -> metrics.getRollingNumber().add(Metrics.EventType.FAILURE, 1));
} catch (Throwable e) {
e.printStackTrace();
System.err.println("Server => Error [" + request.getPath() + "] => " + e);
response.setStatus(HttpResponseStatus.BAD_REQUEST);
return response.writeStringAndFlush("data: Error 500: Bad Request\n" + e.getMessage() + "\n");
}
});
return RxNetty.createHttpServer(port, (request, response) -> {
// System.out.println("Server => Request: " + request.getPath());
return handlerChain.handle(request, response);
}, PipelineConfigurators.<ByteBuf> serveSseConfigurator());
}
protected KaryonHttpModule(String moduleName, Class<I> iType, Class<O> oType) {
super(moduleName, iType, oType);
routerKey = keyFor(RequestHandler.class, iType, oType, nameAnnotation);
interceptorSupportKey = keyFor(GovernatorHttpInterceptorSupport.class, iType, oType, nameAnnotation);
httpServerKey = keyFor(HttpServer.class, iType, oType, nameAnnotation);
}
@Override
protected void configure() {
configureServer();
bind(serverConfigKey).toInstance(serverConfigBuilder.build());
bind(interceptorSupportKey).toInstance(interceptorSupportInstance);
MapBinder.newMapBinder(binder(), String.class, RxServer.class).addBinding(nameAnnotation.value()).toProvider(
new HttpRxServerProvider<I, O, HttpServer<I, O>>(nameAnnotation.value(), iType, oType)
).asEagerSingleton();
}
/**
* Creates a new {@link KaryonServer} that has a single HTTP server instance which delegates all request
* handling to {@link RequestHandler}.
* The {@link HttpServer} is created using {@link KaryonTransport#newHttpServer(int, HttpRequestHandler)}
*
* @param port Port for the server.
* @param handler Request Handler
* @param bootstrapModules Additional bootstrapModules if any.
*
* @return {@link KaryonServer} which is to be used to start the created server.
*/
public static KaryonServer forRequestHandler(int port, final RequestHandler<ByteBuf, ByteBuf> handler,
BootstrapModule... bootstrapModules) {
HttpServer<ByteBuf, ByteBuf> httpServer =
KaryonTransport.newHttpServer(port, new RequestHandler<ByteBuf, ByteBuf>() {
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request,
HttpServerResponse<ByteBuf> response) {
return handler.handle(request, response);
}
});
return new RxNettyServerBackedServer(httpServer, bootstrapModules);
}
public static void main(String[] args) throws Exception {
final int eurekaReadServerPort = 7005;
final int eurekaWriteServerPort = 7006;
/**
* Starts an embedded eureka server with the defined read and write ports.
*/
startEurekaServer(eurekaReadServerPort, eurekaWriteServerPort);
/**
* Create eureka client with the same read and write ports for the embedded eureka server.
*/
EurekaClient eurekaClient = createEurekaClient(eurekaReadServerPort, eurekaWriteServerPort);
/**
* Reuse {@link ClientServer} example to start an RxNetty server on the passed port.
*/
HttpServer<ByteBuf, ByteBuf> server = ClientServer.startServer(8089);
/**
* Register the server started above with eureka using a unique virtual IP address (VIP).
* Eureka uses VIPs to group homogeneous instances of a service together, so that they can be used by clients,
* interchangeably.
*/
String vipAddress = "mock_server-" + server.getServerPort();
registerWithEureka(server.getServerPort(), eurekaClient, vipAddress);
/**
* Retrieve the instance information of the registered server from eureka.
* This is to demonstrate how to use eureka to fetch information about any server in your deployment.
* In order to fetch information from eureka, one MUST know the VIP address of the server before hand.
*/
InstanceInfo serverInfo = getServerInfo(eurekaClient, vipAddress);
/**
* Retrieve IPAddress and port information from the instance information returned from eureka.
*/
Host host = getServerHostAndPort(serverInfo);
/**
* Reuse {@link ClientServer} example to create an HTTP request to the server retrieved from eureka.
*/
ClientServer.createRequest(host.getIpAddress(), host.getPort())
/* Block till you get the response. In a real world application, one should not be blocked but chained
* into a response to the caller. */
.toBlocking()
/**
* Print each content of the response.
*/
.forEach(System.out::println);
}
public static void main(String[] args) throws Exception {
final int eurekaReadServerPort = 7008;
final int eurekaWriteServerPort = 7010;
/**
* Starts an embedded eureka server with the defined read and write ports.
*/
ClientServerWithDiscovery.startEurekaServer(eurekaReadServerPort, eurekaWriteServerPort);
/**
* Create eureka client with the same read and write ports for the embedded eureka server.
*/
EurekaClient eurekaClient = ClientServerWithDiscovery.createEurekaClient(eurekaReadServerPort,
eurekaWriteServerPort);
/**
* Reuse {@link ClientServer} example to start an RxNetty server on the passed port.
*/
HttpServer<ByteBuf, ByteBuf> server = ClientServer.startServer(8089);
/**
* Register the server started above with eureka using a unique virtual IP address (VIP).
* Eureka uses VIPs to group homogeneous instances of a service together, so that they can be used by clients,
* interchangeably.
*/
String vipAddress = "mock_server-" + server.getServerPort();
ClientServerWithDiscovery.registerWithEureka(server.getServerPort(), eurekaClient, vipAddress);
/**
* Using the eureka client, create an Ocelli Host event stream.
* Ocelli, uses this host stream to know about the available hosts.
*/
Observable<MembershipEvent<Host>> eurekaHostSource = ClientServerWithLoadBalancer.createEurekaHostStream(
eurekaClient, vipAddress);
MyCommand myCommand = new MyCommand(eurekaHostSource);
/**
* This executes the request on the client (just as {@link ClientServerWithLoadBalancer} but using hystrix.
*/
myCommand.toObservable()
/* Block till you get the response. In a real world application, one should not be blocked but chained
* into a response to the caller. */
.toBlocking()
/**
* Print each content of the response.
*/
.forEach(System.out::println);
}
public static void main(String[] args) throws Exception {
final int eurekaReadServerPort = 7007;
final int eurekaWriteServerPort = 7008;
/**
* Starts an embedded eureka server with the defined read and write ports.
*/
ClientServerWithDiscovery.startEurekaServer(eurekaReadServerPort, eurekaWriteServerPort);
/**
* Create eureka client with the same read and write ports for the embedded eureka server.
*/
EurekaClient eurekaClient = ClientServerWithDiscovery.createEurekaClient(eurekaReadServerPort,
eurekaWriteServerPort);
/**
* Reuse {@link ClientServer} example to start an RxNetty server on the passed port.
*/
HttpServer<ByteBuf, ByteBuf> server = ClientServer.startServer(8089);
/**
* Register the server started above with eureka using a unique virtual IP address (VIP).
* Eureka uses VIPs to group homogeneous instances of a service together, so that they can be used by clients,
* interchangeably.
*/
String vipAddress = "mock_server-" + server.getServerPort();
ClientServerWithDiscovery.registerWithEureka(server.getServerPort(), eurekaClient, vipAddress);
/**
* Using the eureka client, create an Ocelli Host event stream.
* Ocelli, uses this host stream to know about the available hosts.
*/
Observable<MembershipEvent<Host>> eurekaHostSource = createEurekaHostStream(eurekaClient, vipAddress);
/**
* Instead of directly using the host and port from eureka as in example {@link ClientServerWithDiscovery},
* choose a host from the load balancer.
*/
createRequestFromLB(eurekaHostSource)
/* Block till you get the response. In a real world application, one should not be blocked but chained
* into a response to the caller. */
.toBlocking()
/**
* Print each content of the response.
*/
.forEach(System.out::println);
}
public static <I, O> HttpServer<I, O> newHttpServer(int port, RequestHandler<I, O> router) {
return newHttpServerBuilder(port, router).build();
}
public static <I, O> HttpServer<I, O> newHttpServer(int port, HttpRequestHandler<I, O> requestHandler) {
return newHttpServerBuilder(port, requestHandler).build();
}
/**
* Creates a new {@link KaryonServer} which combines lifecycle of the passed {@link HttpServer} with
* it's own lifecycle.
*
* @param server HTTP server
* @param modules Additional bootstrapModules if any.
*
* @return {@link KaryonServer} which is to be used to start the created server.
*/
public static KaryonServer forHttpServer(HttpServer<?, ?> server, Module... modules) {
return forHttpServer(server, toBootstrapModule(modules));
}
/**
* Creates a new {@link KaryonServer} which combines lifecycle of the passed {@link HttpServer} with
* it's own lifecycle.
*
* @param server HTTP server
* @param bootstrapModules Additional bootstrapModules if any.
*
* @return {@link KaryonServer} which is to be used to start the created server.
*/
public static KaryonServer forHttpServer(HttpServer<?, ?> server,
BootstrapModule... bootstrapModules) {
return new RxNettyServerBackedServer(server, bootstrapModules);
}