下面列出了怎么用io.reactivex.netty.RxNetty的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* The Mesos HTTP Scheduler API will send a redirect to a client if it is not the leader. The client that is
* constructed during {@link #openStream} is bound to a specific host and port, due to this behavior
* we "probe" Mesos to try and find out where it's "master" is before we configure the client.
*
* This method will attempt to send a simple GET to {@code mesosUri}, however instead of going to the path
* specified, it will go to {@code /redirect} and construct a uri relative to mesosUri using the host and port
* returned in the Location header of the response.
*/
@NotNull
private static URI resolveMesosUri(final @NotNull URI mesosUri) {
final String redirectUri = createRedirectUri(mesosUri);
LOGGER.info("Probing Mesos server at {}", redirectUri);
// When sending an individual request (rather than creating a client) RxNetty WILL follow redirects,
// so here we tell it not to do that for this request by creating the config object below.
final HttpClient.HttpClientConfig config =
HttpClient.HttpClientConfig.Builder.fromDefaultConfig()
.setFollowRedirect(false)
.build();
final HttpClientResponse<ByteBuf> redirectResponse =
RxNetty.createHttpRequest(HttpClientRequest.createGet(redirectUri), config)
.toBlocking()
.first();
return getUriFromRedirectResponse(mesosUri, redirectResponse);
}
@Test
public void testStreamDoesNotRunWhenSubscribeFails_mesos5xxResponse() throws Throwable {
final RequestHandler<ByteBuf, ByteBuf> handler = (request, response) -> {
response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
return response.close();
};
final HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(0, handler);
server.start();
final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", server.getServerPort()));
final MesosClient<String, String> client = createClient(uri);
try {
client.openStream().await();
fail("Expect an exception to be propagated up because subscribe will 500");
} catch (Mesos5xxException e) {
// expected
final MesosClientErrorContext ctx = e.getContext();
assertThat(ctx.getStatusCode()).isEqualTo(500);
} finally {
server.shutdown();
}
}
@Test
public void testStreamDoesNotRunWhenSubscribeFails_mismatchContentType() throws Throwable {
final RequestHandler<ByteBuf, ByteBuf> handler = (request, response) -> {
response.setStatus(HttpResponseStatus.OK);
response.getHeaders().setHeader("Content-Type", "application/json");
return response.close();
};
final HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(0, handler);
server.start();
final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", server.getServerPort()));
final MesosClient<String, String> client = createClient(uri);
try {
client.openStream().await();
fail("Expect an exception to be propagated up because of content type mismatch");
} catch (MesosException e) {
// expected
final MesosClientErrorContext ctx = e.getContext();
assertThat(ctx.getStatusCode()).isEqualTo(200);
assertThat(ctx.getMessage()).isEqualTo("Response had Content-Type \"application/json\" expected \"text/plain;charset=utf-8\"");
} finally {
server.shutdown();
}
}
@Test
public void server400_nonPost() throws Exception {
final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", serverPort));
final HttpClient<ByteBuf, ByteBuf> httpClient = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder(uri.getHost(), uri.getPort())
.pipelineConfigurator(new HttpClientPipelineConfigurator<>())
.build();
final HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet(uri.getPath())
.withHeader("Accept", StringMessageCodec.UTF8_STRING.mediaType());
final HttpClientResponse<ByteBuf> response = httpClient.submit(request)
.toBlocking()
.last();
assertThat(response.getStatus()).isEqualTo(HttpResponseStatus.BAD_REQUEST);
final HttpResponseHeaders headers = response.getHeaders();
assertThat(headers.getHeader("Accept")).isEqualTo(StringMessageCodec.UTF8_STRING.mediaType());
assertThat(mesosServerSimulation.getCallsReceived()).hasSize(0);
}
@Test
public void server400_invalidContentType() throws Exception {
final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", serverPort));
final HttpClient<ByteBuf, ByteBuf> httpClient = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder(uri.getHost(), uri.getPort())
.pipelineConfigurator(new HttpClientPipelineConfigurator<>())
.build();
final byte[] data = StringMessageCodec.UTF8_STRING.encode("decline");
final HttpClientRequest<ByteBuf> request = HttpClientRequest.createPost(uri.getPath())
.withHeader("Content-Type", "application/octet-stream")
.withHeader("Accept", "application/octet-stream")
.withContent(data);
final HttpClientResponse<ByteBuf> response = httpClient.submit(request)
.toBlocking()
.last();
assertThat(response.getStatus()).isEqualTo(HttpResponseStatus.BAD_REQUEST);
final HttpResponseHeaders headers = response.getHeaders();
assertThat(headers.getHeader("Accept")).isEqualTo(StringMessageCodec.UTF8_STRING.mediaType());
assertThat(mesosServerSimulation.getCallsReceived()).hasSize(0);
}
@Test
public void server400_emptyBody() throws Exception {
final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", serverPort));
final HttpClient<ByteBuf, ByteBuf> httpClient = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder(uri.getHost(), uri.getPort())
.pipelineConfigurator(new HttpClientPipelineConfigurator<>())
.build();
final HttpClientRequest<ByteBuf> request = HttpClientRequest.createPost(uri.getPath())
.withHeader("Content-Type", StringMessageCodec.UTF8_STRING.mediaType())
.withHeader("Accept", StringMessageCodec.UTF8_STRING.mediaType())
.withContent(new byte[]{});
final HttpClientResponse<ByteBuf> response = httpClient.submit(request)
.toBlocking()
.last();
assertThat(response.getStatus()).isEqualTo(HttpResponseStatus.BAD_REQUEST);
final HttpResponseHeaders headers = response.getHeaders();
assertThat(headers.getHeader("Accept")).isEqualTo(StringMessageCodec.UTF8_STRING.mediaType());
assertThat(mesosServerSimulation.getCallsReceived()).hasSize(0);
}
@NotNull
private static HttpClientResponse<ByteBuf> sendCall(final URI uri, final String call) {
final HttpClient<ByteBuf, ByteBuf> httpClient = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder(uri.getHost(), uri.getPort())
.pipelineConfigurator(new HttpClientPipelineConfigurator<>())
.build();
final byte[] data = call.getBytes(StandardCharsets.UTF_8);
final HttpClientRequest<ByteBuf> request = HttpClientRequest.createPost(uri.getPath())
.withHeader("Content-Type", StringMessageCodec.UTF8_STRING.mediaType())
.withHeader("Accept", StringMessageCodec.UTF8_STRING.mediaType())
.withContent(data);
return httpClient.submit(request)
.toBlocking()
.last();
}
public static HttpServer<ByteBuf, ByteBuf> startServer(int port) {
/**
* Creates an HTTP server which returns "Hello World!" responses.
*/
return RxNetty.createHttpServer(port,
/*
* HTTP Request handler for RxNetty where you control what you write as the
* response for each and every request the server receives.
*/
(request, response) -> {
/**
* In a real server, you would be writing different responses based on the
* URI of the request.
* This example just returns a "Hello World!!" string unconditionally.
*/
return response.writeStringAndFlush("Hello World!!");
})
.start();
}
private Observable<HttpClientResponse<ByteBuf>> getResponse(String externalHealthCheckURL) {
String host = "localhost";
int port = DEFAULT_APPLICATION_PORT;
String path = "/healthcheck";
try {
URL url = new URL(externalHealthCheckURL);
host = url.getHost();
port = url.getPort();
path = url.getPath();
} catch (MalformedURLException e) {
//continue
}
Integer timeout = DynamicProperty.getInstance("prana.host.healthcheck.timeout").getInteger(DEFAULT_CONNECTION_TIMEOUT);
HttpClient<ByteBuf, ByteBuf> httpClient = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder(host, port)
.pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpClientConfigurator())
.channelOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout)
.build();
return httpClient.submit(HttpClientRequest.createGet(path));
}
@Override
protected RxClient<I, O> createRxClient(Server server) {
ClientBuilder<I, O> builder = RxNetty.newTcpClientBuilder(server.getHost(), server.getPort());
if (pipelineConfigurator != null) {
builder.pipelineConfigurator(pipelineConfigurator);
}
Integer connectTimeout = getProperty(IClientConfigKey.Keys.ConnectTimeout, null, DefaultClientConfigImpl.DEFAULT_CONNECT_TIMEOUT);
builder.channelOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);
if (isPoolEnabled()) {
builder.withConnectionPoolLimitStrategy(poolStrategy)
.withIdleConnectionsTimeoutMillis(idleConnectionEvictionMills)
.withPoolIdleCleanupScheduler(poolCleanerScheduler);
} else {
builder.withNoConnectionPooling();
}
RxClient<I, O> client = builder.build();
return client;
}
public UdpServer<DatagramPacket, DatagramPacket> createServer() {
UdpServer<DatagramPacket, DatagramPacket> server = RxNetty.createUdpServer(port, new ConnectionHandler<DatagramPacket, DatagramPacket>() {
@Override
public Observable<Void> handle(final ObservableConnection<DatagramPacket, DatagramPacket> newConnection) {
return newConnection.getInput().flatMap(new Func1<DatagramPacket, Observable<Void>>() {
@Override
public Observable<Void> call(final DatagramPacket received) {
return Observable.interval(delay, TimeUnit.MILLISECONDS).take(1).flatMap(new Func1<Long, Observable<Void>>() {
@Override
public Observable<Void> call(Long aLong) {
InetSocketAddress sender = received.sender();
System.out.println("Received datagram. Sender: " + sender);
ByteBuf data = newConnection.getChannel().alloc().buffer(WELCOME_MSG_BYTES.length);
data.writeBytes(WELCOME_MSG_BYTES);
return newConnection.writeAndFlush(new DatagramPacket(data, sender));
}
});
}
});
}
});
System.out.println("UDP hello server started at port: " + port);
return server;
}
public HttpServer<ByteBuf, ByteBuf> createServer() {
HttpServer<ByteBuf, ByteBuf> server = RxNetty.newHttpServerBuilder(port, new RequestHandler<ByteBuf, ByteBuf>() {
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
if (request.getPath().contains("/users")) {
if (request.getHttpMethod().equals(HttpMethod.GET)) {
return handleRecommendationsByUserId(request, response);
} else {
return handleUpdateRecommendationsForUser(request, response);
}
}
if (request.getPath().contains("/recommendations")) {
return handleRecommendationsBy(request, response);
}
if (request.getPath().contains("/movies")) {
return handleRegisterMovie(request, response);
}
response.setStatus(HttpResponseStatus.NOT_FOUND);
return response.close();
}
}).pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpServerConfigurator()).enableWireLogging(LogLevel.ERROR).build();
System.out.println("RxMovie server started...");
return server;
}
@Inject
public void setInjector(Injector injector) {
ServerConfig config = injector.getInstance(serverConfigKey);
ConnectionHandler<I, O> connectionHandler = injector.getInstance(connectionHandlerKey);
ServerBuilder<I, O> builder = RxNetty.newTcpServerBuilder(config.getPort(), connectionHandler);
if (injector.getExistingBinding(pipelineConfiguratorKey) != null) {
builder.appendPipelineConfigurator(injector.getInstance(pipelineConfiguratorKey));
}
if (injector.getExistingBinding(metricEventsListenerFactoryKey) != null) {
builder.withMetricEventsListenerFactory(injector.getInstance(metricEventsListenerFactoryKey));
}
server = builder.build().start();
logger.info("Starting server {} on port {}...", nameAnnotation.value(), server.getServerPort());
}
@Inject
@SuppressWarnings("unchecked")
public void setInjector(Injector injector) {
KaryonWebSocketsModule.WebSocketsServerConfig config = (KaryonWebSocketsModule.WebSocketsServerConfig) injector.getInstance(serverConfigKey);
ConnectionHandler<I, O> connectionHandler = injector.getInstance(connectionHandlerKey);
WebSocketServerBuilder<I, O> builder = RxNetty.newWebSocketServerBuilder(config.getPort(), connectionHandler)
.withMessageAggregator(config.isMessageAggregator());
if (injector.getExistingBinding(pipelineConfiguratorKey) != null) {
builder.appendPipelineConfigurator(injector.getInstance(pipelineConfiguratorKey));
}
if (injector.getExistingBinding(metricEventsListenerFactoryKey) != null) {
builder.withMetricEventsListenerFactory(injector.getInstance(metricEventsListenerFactoryKey));
}
server = builder.build().start();
logger.info("Starting WebSockets server {} on port {}...", nameAnnotation.value(), server.getServerPort());
}
@Test
public void testGovernatedTcpServer() throws Exception {
String message = RxNetty.createTcpClient("localhost", server.getServerPort()).connect()
.flatMap(new Func1<ObservableConnection<ByteBuf, ByteBuf>, Observable<String>>() {
@Override
public Observable<String> call(ObservableConnection<ByteBuf, ByteBuf> connection) {
return connection.getInput().map(new Func1<ByteBuf, String>() {
@Override
public String call(ByteBuf byteBuf) {
return byteBuf.toString(Charset.defaultCharset());
}
});
}
}).single().toBlocking().toFuture().get(60, TimeUnit.SECONDS);
assertEquals("Invalid message received from server", SERVER_MESSAGE, message);
}
@Test
public void testGovernatedTcpServer() throws Exception {
String message = RxNetty.<TextWebSocketFrame, TextWebSocketFrame>newWebSocketClientBuilder("localhost", server.getServerPort())
.build()
.connect()
.flatMap(new Func1<ObservableConnection<TextWebSocketFrame, TextWebSocketFrame>, Observable<String>>() {
@Override
public Observable<String> call(ObservableConnection<TextWebSocketFrame, TextWebSocketFrame> connection) {
return connection.getInput().map(new Func1<TextWebSocketFrame, String>() {
@Override
public String call(TextWebSocketFrame frame) {
return frame.text();
}
});
}
}).single().toBlocking().toFuture().get(60, TimeUnit.SECONDS);
assertEquals("Invalid message received from server", SERVER_MESSAGE, message);
}
public static void main(String[] args) {
int port = 8989;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
}
System.out.println("Starting mock service on port " + port + "...");
startMonitoring();
RxNetty.<ByteBuf, ByteBuf>newHttpServerBuilder(port, (request, response) -> {
try {
long startTime = System.currentTimeMillis();
counter.increment(CounterEvent.REQUESTS);
return handleRequest(request, response)
.doOnCompleted(() -> {
counter.increment(CounterEvent.SUCCESS);
latency.addValue((int)(System.currentTimeMillis() - startTime));
})
.doOnError(t -> counter.increment(CounterEvent.NETTY_ERROR));
} catch (Throwable e) {
System.err.println("Server => Error [" + request.getPath() + "] => " + e);
response.setStatus(HttpResponseStatus.BAD_REQUEST);
counter.increment(CounterEvent.HTTP_ERROR);
return response.writeStringAndFlush("Error 500: Bad Request\n" + e.getMessage() + '\n');
}
}).build().startAndWait();
}
private Observable<String> initializeStream() {
HttpClient<ByteBuf, ServerSentEvent> client =
RxNetty.createHttpClient("localhost", port, PipelineConfigurators.<ByteBuf>clientSseConfigurator());
return client.submit(HttpClientRequest.createGet("/hello")).
flatMap(response -> {
printResponseHeader(response);
return response.getContent();
}).map(serverSentEvent -> serverSentEvent.contentAsString());
}
public HttpServer<ByteBuf, ByteBuf> createServer() {
return RxNetty.createHttpServer(port, (request, response) -> {
HttpRequest httpRequest = new HttpRequest(request.getQueryParameters());
String content = getResponseContent(httpRequest);
return response.writeStringAndFlush(content);
});
}
@Override
public Observable<String> request(String parameter) {
return RxNetty.createHttpGet("http://localhost:" + port + "?" + paramName + "=" + parameter)
.flatMap(response
-> response.getContent()
.<String> map(content -> content.toString(Charset.defaultCharset()))
);
}
public HttpServer<ByteBuf, ServerSentEvent> createServer() {
HttpServer<ByteBuf, ServerSentEvent> server = RxNetty.createHttpServer(port,
(request, response) -> {
response.getHeaders().set(ACCESS_CONTROL_ALLOW_ORIGIN, "*");
response.getHeaders().set(CACHE_CONTROL, "no-cache");
response.getHeaders().set(CONNECTION, "keep-alive");
response.getHeaders().set(CONTENT_TYPE, "text/event-stream");
return getIntervalObservable(request, response);
}, PipelineConfigurators.<ByteBuf>serveSseConfigurator());
System.out.println("HTTP Server Sent Events server started...");
return server;
}
/**
* Sends the subscribe call to Mesos and starts processing the stream of {@code Receive} events.
* The {@code streamProcessor} function provided to the constructor will be applied to the stream of events
* received from Mesos.
* <p>
* The stream processing will then process any {@link SinkOperation} that should be sent to Mesos.
* @return The subscription representing the processing of the event stream. This subscription can then be used
* to block the invoking thread using {@link AwaitableSubscription#await()} (For example to block a main thread
* from exiting while events are being processed.)
*/
@NotNull
public AwaitableSubscription openStream() {
final URI uri = resolveMesosUri(mesosUri);
final HttpClient<ByteBuf, ByteBuf> httpClient = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder(uri.getHost(), getPort(uri))
.withName(userAgent.getEntries().get(0).getName())
.pipelineConfigurator(new HttpClientPipelineConfigurator<>())
.build();
final Observable<Receive> receives = createPost.call(subscribe)
.flatMap(httpClient::submit)
.subscribeOn(Rx.io())
.flatMap(verifyResponseOk(subscribe, mesosStreamId, receiveCodec.mediaType()))
.lift(new RecordIOOperator())
.compose(backpressureTransformer)
.observeOn(Rx.compute())
/* Begin temporary back-pressure */
.buffer(250, TimeUnit.MILLISECONDS)
.flatMap(Observable::from)
/* end temporary back-pressure */
.map(receiveCodec::decode)
;
final Observable<SinkOperation<Send>> sends = streamProcessor.apply(receives)
.filter(Optional::isPresent)
.map(Optional::get);
final Subscriber<SinkOperation<Send>> subscriber = new SinkSubscriber<>(httpClient, createPost);
final SubscriberDecorator<SinkOperation<Send>> decorator = new SubscriberDecorator<>(subscriber);
final Subscription subscription = sends
.subscribeOn(Rx.compute())
.observeOn(Rx.compute())
.subscribe(decorator);
return new ObservableAwaitableSubscription(Observable.from(exec.submit(decorator)), subscription);
}
@Test
public void testStreamDoesNotRunWhenSubscribeFails_mesos4xxResponse() throws Throwable {
final String errorMessage = "Error message that should come from the server";
final RequestHandler<ByteBuf, ByteBuf> handler = (request, response) -> {
response.setStatus(HttpResponseStatus.BAD_REQUEST);
final byte[] msgBytes = errorMessage.getBytes(StandardCharsets.UTF_8);
response.getHeaders().setHeader("Content-Length", msgBytes.length);
response.getHeaders().setHeader("Content-Type", "text/plain;charset=utf-8");
response.writeBytes(msgBytes);
return response.close();
};
final HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(0, handler);
server.start();
final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", server.getServerPort()));
final MesosClient<String, String> client = createClient(uri);
try {
client.openStream().await();
fail("Expect an exception to be propagated up because subscribe will 400");
} catch (Mesos4xxException e) {
// expected
final MesosClientErrorContext ctx = e.getContext();
assertThat(ctx.getStatusCode()).isEqualTo(400);
assertThat(ctx.getMessage()).isEqualTo(errorMessage);
} finally {
server.shutdown();
}
}
@Test
@Ignore
public void testBurstyObservable_missingBackpressureException() throws Throwable {
final String subscribedMessage = "{\"type\": \"SUBSCRIBED\",\"subscribed\": {\"framework_id\": {\"value\":\"12220-3440-12532-2345\"},\"heartbeat_interval_seconds\":15.0}";
final String heartbeatMessage = "{\"type\":\"HEARTBEAT\"}";
final byte[] hmsg = heartbeatMessage.getBytes(StandardCharsets.UTF_8);
final byte[] hbytes = String.format("%d\n", heartbeatMessage.getBytes().length).getBytes(StandardCharsets.UTF_8);
final RequestHandler<ByteBuf, ByteBuf> handler = (request, response) -> {
response.setStatus(HttpResponseStatus.OK);
response.getHeaders().setHeader("Content-Type", "text/plain;charset=utf-8");
writeRecordIOMessage(response, subscribedMessage);
for (int i = 0; i < 20000; i++) {
response.writeBytes(hbytes);
response.writeBytes(hmsg);
}
return response.flush();
};
final HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(0, handler);
server.start();
final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", server.getServerPort()));
final MesosClient<String, String> client = createClientForStreaming(uri).build();
try {
client.openStream().await();
fail("Expect an exception to be propagated up due to backpressure");
} catch (MissingBackpressureException e) {
// expected
e.printStackTrace();
assertThat(e.getMessage()).isNullOrEmpty();
} finally {
server.shutdown();
}
}
@Test
public void testBurstyObservable_unboundedBufferSucceeds() throws Throwable {
msgNo = 0;
final int numMessages = 20000;
final String subscribedMessage = "{\"type\": \"SUBSCRIBED\",\"subscribed\": {\"framework_id\": {\"value\":\"12220-3440-12532-2345\"},\"heartbeat_interval_seconds\":15.0}";
final String heartbeatMessage = "{\"type\":\"HEARTBEAT\"}";
final RequestHandler<ByteBuf, ByteBuf> handler = (request, response) -> {
response.setStatus(HttpResponseStatus.OK);
response.getHeaders().setHeader("Content-Type", "text/plain;charset=utf-8");
writeRecordIOMessage(response, subscribedMessage);
for (int i = 0; i < numMessages; i++) {
writeRecordIOMessage(response, heartbeatMessage);
}
return response.close();
};
final HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(0, handler);
server.start();
final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", server.getServerPort()));
final MesosClient<String, String> client = createClientForStreaming(uri)
.onBackpressureBuffer()
.build();
try {
client.openStream().await();
} finally {
// 20000 heartbeats PLUS 1 subscribe
assertEquals("All heartbeats received (plus the subscribed)", 1 + numMessages, msgNo);
server.shutdown();
}
}
@Before
public void setUp() throws Exception {
server = RxNetty.createHttpServer(0, (request, response) -> {
response.writeString("Hello World");
return response.close();
});
server.start();
}
@Test
public void testConnectionTerminatedOnClose() throws Exception {
final TcpSocketProxy proxy = new TcpSocketProxy(
new InetSocketAddress("localhost", 0),
new InetSocketAddress("localhost", server.getServerPort())
);
proxy.start();
final int listenPort = proxy.getListenPort();
final HttpClient<ByteBuf, ByteBuf> client = RxNetty.createHttpClient("localhost", listenPort);
final String first = client.submit(HttpClientRequest.createGet("/"))
.flatMap(AbstractHttpContentHolder::getContent)
.map(bb -> bb.toString(StandardCharsets.UTF_8))
.toBlocking()
.first();
assertThat(first).isEqualTo("Hello World");
LOGGER.info("first request done");
proxy.shutdown();
if (proxy.isShutdown()) {
proxy.close();
} else {
fail("proxy should have been shutdown");
}
try {
final URI uri = URI.create(String.format("http://localhost:%d/", listenPort));
uri.toURL().getContent();
fail("Shouldn't have been able to get content");
} catch (IOException e) {
// expected
}
}
public static void main(String... args) throws InterruptedException {
HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(8080,
(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) -> {
System.out.println("Server => Request: " + request.getPath());
try {
if ("/error".equals(request.getPath())) {
throw new RuntimeException("forced error");
}
response.setStatus(HttpResponseStatus.OK);
response.writeString("Path Requested =>: " + request.getPath() + '\n');
return response.close();
} catch (Throwable e) {
System.err.println("Server => Error [" + request.getPath() + "] => " + e);
response.setStatus(HttpResponseStatus.BAD_REQUEST);
response.writeString("Error 500: Bad Request\n");
return response.close();
}
});
server.startAndWait();
RxNetty.createHttpGet("http://localhost:8080/")
.flatMap(response -> response.getContent())
.map(data -> "Client => " + data.toString(Charset.defaultCharset()))
.toBlocking().forEach(System.out::println);
RxNetty.createHttpGet("http://localhost:8080/error")
.flatMap(response -> response.getContent())
.map(data -> "Client => " + data.toString(Charset.defaultCharset()))
.toBlocking().forEach(System.out::println);
RxNetty.createHttpGet("http://localhost:8080/data")
.flatMap(response -> response.getContent())
.map(data -> "Client => " + data.toString(Charset.defaultCharset()))
.toBlocking().forEach(System.out::println);
//server.shutdown();
}
private static void startEurekaDashboard(final int port, EurekaClient client) {
final StaticFileHandler staticFileHandler = new StaticFileHandler();
RxNetty.createHttpServer(port, (request, response) -> {
if (request.getUri().startsWith("/dashboard")) {
return staticFileHandler.handle(request, response);
} else if (request.getUri().startsWith("/data")) {
response.getHeaders().set(HttpHeaders.Names.CONTENT_TYPE, "text/event-stream");
response.getHeaders().set(HttpHeaders.Names.CACHE_CONTROL, "no-cache");
return client.forInterest(Interests.forFullRegistry())
.flatMap(notification -> {
ByteBuf data = response.getAllocator().buffer();
data.writeBytes("data: ".getBytes());
Map<String, String> dataAttributes = new HashMap<>();
dataAttributes.put("type", notification.getKind().toString());
dataAttributes.put("instance-id", notification.getData().getId());
dataAttributes.put("vip", notification.getData().getVipAddress());
if (notification.getData().getStatus() != null) {
dataAttributes.put("status", notification.getData().getStatus().name());
}
HashSet<ServicePort> servicePorts = notification.getData().getPorts();
int port1 = servicePorts.iterator().next().getPort();
dataAttributes.put("port", String.valueOf(port1));
String jsonData = SimpleJson.mapToJson(dataAttributes);
data.writeBytes(jsonData.getBytes());
data.writeBytes("\n\n".getBytes());
return response.writeBytesAndFlush(data);
});
} else {
response.setStatus(HttpResponseStatus.NOT_FOUND);
return Observable.empty();
}
}).start();
}
public HttpServer<ByteBuf, ServerSentEvent> createServer(int port) {
System.out.println("Start " + getClass().getSimpleName() + " on port: " + port);
// declare handler chain (wrapped in Hystrix)
// TODO create a better way of chaining these (related https://github.com/ReactiveX/RxNetty/issues/232 and https://github.com/ReactiveX/RxNetty/issues/202)
HystrixMetricsStreamHandler<ByteBuf, ServerSentEvent> handlerChain
= new HystrixMetricsStreamHandler<>(metrics, "/hystrix.stream", 1000, (request, response) -> {
try {
long startTime = System.currentTimeMillis();
return handleRequest(request, response)
.doOnCompleted(() -> System.out.println("Response => " + request.getPath() + " Time => " + (int) (System.currentTimeMillis() - startTime) + "ms"))
.doOnCompleted(() -> metrics.getRollingPercentile().addValue((int) (System.currentTimeMillis() - startTime)))
.doOnCompleted(() -> metrics.getRollingNumber().add(Metrics.EventType.SUCCESS, 1))
.doOnError(t -> metrics.getRollingNumber().add(Metrics.EventType.FAILURE, 1));
} catch (Throwable e) {
e.printStackTrace();
System.err.println("Server => Error [" + request.getPath() + "] => " + e);
response.setStatus(HttpResponseStatus.BAD_REQUEST);
return response.writeStringAndFlush("data: Error 500: Bad Request\n" + e.getMessage() + "\n");
}
});
return RxNetty.createHttpServer(port, (request, response) -> {
// System.out.println("Server => Request: " + request.getPath());
return handlerChain.handle(request, response);
}, PipelineConfigurators.<ByteBuf> serveSseConfigurator());
}