类io.reactivex.netty.protocol.http.sse.ServerSentEvent源码实例Demo

下面列出了怎么用io.reactivex.netty.protocol.http.sse.ServerSentEvent的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: MarketData   文件: RxNettyEventServer.java
private Observable<Void> getIntervalObservable(HttpServerRequest<?> request, final HttpServerResponse<ServerSentEvent> response) {
    HttpRequest simpleRequest = new HttpRequest(request.getQueryParameters());
    return getEvents(simpleRequest)
            .flatMap(event -> {
                System.out.println("Writing SSE event: " + event);
                ByteBuf data = response.getAllocator().buffer().writeBytes(( event + "\n").getBytes());
                ServerSentEvent sse = new ServerSentEvent(data);
                return response.writeAndFlush(sse);
            }).materialize()
            .takeWhile(notification -> {
                if (notification.isOnError()) {
                    System.out.println("Write to client failed, stopping response sending.");
                    notification.getThrowable().printStackTrace(System.err);
                }
                return !notification.isOnError();
            })
            .map((Func1<Notification<Void>, Void>) notification -> null);
}
 
源代码2 项目: ReactiveLab   文件: BookmarksService.java
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
    List<String> videoIds = request.getQueryParameters().get("videoId");

    int latency = 1;
    if (Random.randomIntFrom0to100() > 80) {
        latency = 10;
    }

    return Observable.from(videoIds).map(videoId -> {
        Map<String, Object> video = new HashMap<>();
        video.put("videoId", videoId);
        video.put("position", (int) (Math.random() * 5000));
        return video;
    }).flatMap(video -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(video) + "\n"))
            .delay(latency, TimeUnit.MILLISECONDS).doOnCompleted(response::close); // simulate latency
}
 
源代码3 项目: ReactiveLab   文件: GeoService.java
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
    return request.getContent().flatMap(i -> {
        List<String> ips = request.getQueryParameters().get("ip");
        Map<String, Object> data = new HashMap<>();
        for (String ip : ips) {
            Map<String, Object> ip_data = new HashMap<>();
            ip_data.put("country_code", "GB");
            ip_data.put("longitude", "-0.13");
            ip_data.put("latitude", "51.5");
            data.put(ip, ip_data);
        }
        return response.writeStringAndFlush("data: " + SimpleJson.mapToJson(data) + "\n")
                .doOnCompleted(response::close);
    }).delay(10, TimeUnit.MILLISECONDS);
}
 
源代码4 项目: ReactiveLab   文件: UserService.java
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
    List<String> userIds = request.getQueryParameters().get("userId");
    if (userIds == null || userIds.size() == 0) {
        return writeError(request, response, "At least one parameter of 'userId' must be included.");
    }
    return Observable.from(userIds).map(userId -> {
        Map<String, Object> user = new HashMap<>();
        user.put("userId", userId);
        user.put("name", "Name Here");
        user.put("other_data", "goes_here");
        return user;
    }).flatMap(user -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(user) + "\n")
            .doOnCompleted(response::close))
            .delay(((long) (Math.random() * 500) + 500), TimeUnit.MILLISECONDS); // simulate latency
}
 
@Override
public Publisher<String> stream() {
    Observable<String> rxStream = rxClient.createGet("/logs")
                                          .flatMap(HttpClientResponse::getContentAsServerSentEvents)
                                          .map(ServerSentEvent::contentAsString);

    return RxReactiveStreams.toPublisher(rxStream);
}
 
private Observable<String> initializeStream() {
    HttpClient<ByteBuf, ServerSentEvent> client =
            RxNetty.createHttpClient("localhost", port, PipelineConfigurators.<ByteBuf>clientSseConfigurator());

    return client.submit(HttpClientRequest.createGet("/hello")).
            flatMap(response -> {
                printResponseHeader(response);
                return response.getContent();
            }).map(serverSentEvent -> serverSentEvent.contentAsString());
}
 
private static void printResponseHeader(HttpClientResponse<ServerSentEvent> response) {
    System.out.println("New response received.");
    System.out.println("========================");
    System.out.println(response.getHttpVersion().text() + ' ' + response.getStatus().code()
            + ' ' + response.getStatus().reasonPhrase());
    for (Map.Entry<String, String> header : response.getHeaders().entries()) {
        System.out.println(header.getKey() + ": " + header.getValue());
    }
}
 
源代码8 项目: MarketData   文件: RxNettyEventBroadcaster.java
public HttpServer<ByteBuf, ServerSentEvent> createServer() {
    if (flaky) {
        events = SubscriptionLimiter
                    .limitSubscriptions(1,initializeEventStream());
    } else {
        events  = initializeEventStream();
    }
    return super.createServer();
}
 
源代码9 项目: MarketData   文件: RxNettyEventServer.java
public HttpServer<ByteBuf, ServerSentEvent> createServer() {
    HttpServer<ByteBuf, ServerSentEvent> server = RxNetty.createHttpServer(port,
            (request, response) -> {
                response.getHeaders().set(ACCESS_CONTROL_ALLOW_ORIGIN, "*");
                response.getHeaders().set(CACHE_CONTROL, "no-cache");
                response.getHeaders().set(CONNECTION, "keep-alive");
                response.getHeaders().set(CONTENT_TYPE, "text/event-stream");
                return getIntervalObservable(request, response);
            }, PipelineConfigurators.<ByteBuf>serveSseConfigurator());
    System.out.println("HTTP Server Sent Events server started...");
    return server;
}
 
源代码10 项目: ReactiveLab   文件: AbstractMiddleTierService.java
public HttpServer<ByteBuf, ServerSentEvent> createServer(int port) {
    System.out.println("Start " + getClass().getSimpleName() + " on port: " + port);

    // declare handler chain (wrapped in Hystrix)
    // TODO create a better way of chaining these (related https://github.com/ReactiveX/RxNetty/issues/232 and https://github.com/ReactiveX/RxNetty/issues/202)
    HystrixMetricsStreamHandler<ByteBuf, ServerSentEvent> handlerChain 
      = new HystrixMetricsStreamHandler<>(metrics, "/hystrix.stream", 1000, (request, response) -> {
        try {
            long startTime = System.currentTimeMillis();
            return handleRequest(request, response)
                    .doOnCompleted(() -> System.out.println("Response => " + request.getPath() + " Time => " + (int) (System.currentTimeMillis() - startTime) + "ms"))
                    .doOnCompleted(() -> metrics.getRollingPercentile().addValue((int) (System.currentTimeMillis() - startTime)))
                    .doOnCompleted(() -> metrics.getRollingNumber().add(Metrics.EventType.SUCCESS, 1))
                    .doOnError(t -> metrics.getRollingNumber().add(Metrics.EventType.FAILURE, 1));
        } catch (Throwable e) {
            e.printStackTrace();
            System.err.println("Server => Error [" + request.getPath() + "] => " + e);
            response.setStatus(HttpResponseStatus.BAD_REQUEST);
            return response.writeStringAndFlush("data: Error 500: Bad Request\n" + e.getMessage() + "\n");
        }
    });

    return RxNetty.createHttpServer(port, (request, response) -> {
        // System.out.println("Server => Request: " + request.getPath());
            return handlerChain.handle(request, response);
        }, PipelineConfigurators.<ByteBuf> serveSseConfigurator());
}
 
源代码11 项目: ReactiveLab   文件: PersonalizedCatalogService.java
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
    return Observable.from(request.getQueryParameters().get("userId")).map(userId -> {
        Map<String, Object> userData = new HashMap<>();
        userData.put("user_id", userId);

        userData.put("list_title", "Really quirky and over detailed list title!");
        userData.put("other_data", "goes_here");
        userData.put("videos", Arrays.asList(12345, 23456, 34567, 45678, 56789, 67890));
        return userData;
    }).flatMap(list -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(list) + "\n"))
            .delay(((long) (Math.random() * 100) + 20), TimeUnit.MILLISECONDS)
            .doOnCompleted(response::close); // simulate latency
}
 
源代码12 项目: ReactiveLab   文件: SocialService.java
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
    return Observable.from(request.getQueryParameters().get("userId")).map(userId -> {
        Map<String, Object> user = new HashMap<>();
        user.put("userId", userId);
        user.put("friends", Arrays.asList(randomUser(), randomUser(), randomUser(), randomUser()));
        return user;
    }).flatMap(list -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(list) + "\n"))
            .delay(((long) (Math.random() * 100) + 20), TimeUnit.MILLISECONDS).doOnCompleted(response::close); // simulate latency
}
 
源代码13 项目: ReactiveLab   文件: VideoMetadataService.java
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
    List<String> videoIds = request.getQueryParameters().get("videoId");
    return Observable.from(videoIds).map(videoId -> {
        Map<String, Object> video = new HashMap<>();
        video.put("videoId", videoId);
        video.put("title", "Video Title");
        video.put("other_data", "goes_here");
        return video;
    }).flatMap(video -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(video) + "\n")
            .doOnCompleted(response::close))
            .delay(((long) (Math.random() * 20) + 20), TimeUnit.MILLISECONDS); // simulate latency
}
 
源代码14 项目: ReactiveLab   文件: MockService.java
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
    List<String> _id = request.getQueryParameters().get("id");
    if (_id == null || _id.size() != 1) {
        return writeError(request, response, "Please provide a numerical 'id' value. It can be a random number (uuid). Received => " + _id);
    }
    long id = Long.parseLong(String.valueOf(_id.get(0)));

    int delay = getParameter(request, "delay", 50); // default to 50ms server-side delay
    int itemSize = getParameter(request, "itemSize", 128); // default to 128 bytes item size (assuming ascii text)
    int numItems = getParameter(request, "numItems", 10); // default to 10 items in a list

    // no more than 100 items
    if (numItems < 1 || numItems > 100) {
        return writeError(request, response, "Please choose a 'numItems' value from 1 to 100.");
    }

    // no larger than 50KB per item
    if (itemSize < 1 || itemSize > 1024 * 50) {
        return writeError(request, response, "Please choose an 'itemSize' value from 1 to 1024*50 (50KB).");
    }

    // no larger than 60 second delay
    if (delay < 0 || delay > 60000) {
        return writeError(request, response, "Please choose a 'delay' value from 0 to 60000 (60 seconds).");
    }

    response.setStatus(HttpResponseStatus.OK);
    return MockResponse.generateJson(id, delay, itemSize, numItems)
            .flatMap(json -> response.writeStringAndFlush("data:" + json + "\n"))
            .doOnCompleted(response::close);
}
 
源代码15 项目: ReactiveLab   文件: RatingsService.java
@Override
protected Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response) {
    List<String> videoIds = request.getQueryParameters().get("videoId");
    return Observable.from(videoIds).map(videoId -> {
        Map<String, Object> video = new HashMap<>();
        video.put("videoId", videoId);
        video.put("estimated_user_rating", 3.5);
        video.put("actual_user_rating", 4);
        video.put("average_user_rating", 3.1);
        return video;
    }).flatMap(video -> response.writeStringAndFlush("data: " + SimpleJson.mapToJson(video) + "\n"))
            .delay(20, TimeUnit.MILLISECONDS).doOnCompleted(response::close); // simulate latenc
}
 
源代码16 项目: ReactiveLab   文件: LoadBalancerFactory.java
public LoadBalancer<HttpClientHolder<ByteBuf, ServerSentEvent>> forVip(String targetVip) {
    Observable<MembershipEvent<Host>> eurekaHostSource = membershipSource.forInterest(Interests.forVips(targetVip), instanceInfo -> {
        String ipAddress = instanceInfo.getDataCenterInfo()
                .getAddresses().stream()
                .filter(na -> na.getProtocolType() == ProtocolType.IPv4)
                .collect(Collectors.toList()).get(0).getIpAddress();
        HashSet<ServicePort> servicePorts = instanceInfo.getPorts();
        ServicePort portToUse = servicePorts.iterator().next();
        return new Host(ipAddress, portToUse.getPort());
    });

    final Map<Host, HttpClientHolder<ByteBuf, ServerSentEvent>> hostVsHolders = new ConcurrentHashMap<>();

    String lbName = targetVip + "-lb";
    return LoadBalancers.newBuilder(eurekaHostSource.map(
            hostEvent -> {
                HttpClient<ByteBuf, ServerSentEvent> client = clientPool.getClientForHost(hostEvent.getClient());
                HttpClientHolder<ByteBuf, ServerSentEvent> holder;
                if (hostEvent.getType() == MembershipEvent.EventType.REMOVE) {
                    holder = hostVsHolders.remove(hostEvent.getClient());
                } else {
                    holder = new HttpClientHolder<>(client);
                    hostVsHolders.put(hostEvent.getClient(), holder);
                }
                return new MembershipEvent<>(hostEvent.getType(), holder);
            })).withWeightingStrategy(new LinearWeightingStrategy<>(new RxNettyPendingRequests<>()))
               .withName(lbName)
            .withFailureDetector(new RxNettyFailureDetector<>()).build();
}
 
源代码17 项目: ReactiveLab   文件: PersonalizedCatalogCommand.java
@Override
protected Observable<Catalog> construct() {
    HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("/catalog?" + UrlGenerator.generate("userId", users));
    return loadBalancer.choose()
                       .map(holder -> holder.getClient())
                       .<Catalog>flatMap(client -> client.submit(request)
                                                .flatMap(r -> r.getContent()
                                                               .map((ServerSentEvent sse) -> Catalog.fromJson(sse.contentAsString()))))
                       .retry(1);
}
 
源代码18 项目: ReactiveLab   文件: SocialCommand.java
@Override
protected Observable<Social> construct() {
    HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("/social?" + UrlGenerator.generate("userId", users));
    return loadBalancer.choose().map(holder -> holder.getClient())
            .<Social>flatMap(client -> client.submit(request)
                                     .flatMap(r -> r.getContent().map((ServerSentEvent sse) -> {
                                         String social = sse.contentAsString();
                                         return Social.fromJson(social);
                                     })))
            .retry(1);
}
 
源代码19 项目: ReactiveLab   文件: GeoCommand.java
@Override
protected Observable<GeoIP> construct() {
    HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("/geo?" + UrlGenerator.generate("ip", ips));
    return loadBalancer.choose()
                       .map(holder -> holder.getClient())
                       .<GeoIP>flatMap(client -> client.submit(request)
                                                .flatMap(r -> r.getContent()
                                                               .map((ServerSentEvent sse) -> GeoIP.fromJson(sse.contentAsString()))))
                       .retry(1);
}
 
源代码20 项目: ReactiveLab   文件: UserCommand.java
@Override
protected Observable<User> construct() {
    HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("/user?" + UrlGenerator.generate("userId", userIds));
    return loadBalancer.choose().map(holder -> holder.getClient())
            .<User>flatMap(client -> client.submit(request)
                                     .flatMap(r -> r.getContent().map(
                                             (ServerSentEvent sse) -> {
                                                 String user = sse.contentAsString();
                                                 return User.fromJson(user);
                                             })))
            .retry(1);
}
 
源代码21 项目: ReactiveLab   文件: VideoMetadataCommand.java
@Override
protected Observable<VideoMetadata> construct() {
    HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("/metadata?" + UrlGenerator.generate("videoId",
                                                                                                          videos));
    return loadBalancer.choose()
                       .map(holder -> holder.getClient())
                       .<VideoMetadata>flatMap(client -> client.submit(request)
                                                .flatMap(r -> r.getContent()
                                                               .map((ServerSentEvent sse) -> VideoMetadata.fromJson(sse.contentAsString()))))
                       .retry(1);
}
 
源代码22 项目: ReactiveLab   文件: RatingsCommand.java
@Override
protected Observable<Rating> construct() {
    HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("/ratings?" + UrlGenerator.generate("videoId", videos));
    return loadBalancer.choose()
                       .map(holder -> holder.getClient())
                       .<Rating>flatMap(client -> client.submit(request)
                                                .flatMap(r -> r.getContent()
                                                               .map((ServerSentEvent sse) -> Rating.fromJson(sse.contentAsString()))))
                       .retry(1);
}
 
源代码23 项目: ReactiveLab   文件: BookmarksCommand.java
public BookmarksCommand(List<Video> videos, LoadBalancer<HttpClientHolder<ByteBuf, ServerSentEvent>> loadBalancer) {
    super(HystrixCommandGroupKey.Factory.asKey("GetBookmarks"));
    this.videos = videos;
    this.loadBalancer = loadBalancer;
    StringBuilder b = new StringBuilder();
    for (Video v : videos) {
        b.append(v.getId()).append("-");
    }
    this.cacheKey = b.toString();
}
 
源代码24 项目: ReactiveLab   文件: BookmarksCommand.java
@Override
public Observable<Bookmark> construct() {
    HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("/bookmarks?" + UrlGenerator.generate("videoId", videos));
    return loadBalancer.choose()
            .map(holder -> holder.getClient())
            .<Bookmark>flatMap(client -> client.submit(request)
                                     .flatMap(r -> r.getContent().map((ServerSentEvent sse) -> Bookmark.fromJson(sse.contentAsString()))))
            .retry(1);
}
 
源代码25 项目: ReactiveLab   文件: LoadBalancerFactory.java
public LoadBalancerFactory(EurekaMembershipSource membershipSource,
                           HttpClientPool<ByteBuf, ServerSentEvent> clientPool) {
    this.membershipSource = membershipSource;
    this.clientPool = clientPool;
}
 
源代码26 项目: ReactiveLab   文件: AbstractMiddleTierService.java
protected abstract Observable<Void> handleRequest(HttpServerRequest<?> request, HttpServerResponse<ServerSentEvent> response); 
 类所在包
 同包方法