类javax.ws.rs.sse.InboundSseEvent源码实例Demo

下面列出了怎么用javax.ws.rs.sse.InboundSseEvent的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: quarkus   文件: SseTestCase.java
@Test
public void testSse() throws Exception {

    Client client = ClientBuilder.newBuilder().build();
    WebTarget target = client.target(uri.toString() + "sse");
    try (SseEventSource eventSource = SseEventSource.target(target).build()) {
        CompletableFuture<String> res = new CompletableFuture<>();
        eventSource.register(new Consumer<InboundSseEvent>() {
            @Override
            public void accept(InboundSseEvent inboundSseEvent) {
                res.complete(inboundSseEvent.readData());
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) {
                res.completeExceptionally(throwable);
            }
        });
        eventSource.open();
        Assertions.assertEquals("hello", res.get(5, TimeUnit.SECONDS));
    }
}
 
源代码2 项目: quarkus   文件: MultiRxInvokerImpl.java
private <T> Multi<T> eventSourceToMulti(
        final SseEventSourceImpl sseEventSource,
        final Function<InboundSseEventImpl, T> tSupplier,
        final String verb,
        final Entity<?> entity,
        final MediaType[] mediaTypes) {
    final Multi<T> multi = Multi.createFrom().emitter(emitter -> {
        sseEventSource.register(
                (InboundSseEvent e) -> emitter.emit(tSupplier.apply((InboundSseEventImpl) e)),
                (Throwable t) -> emitter.fail(t),
                () -> emitter.complete());
        synchronized (monitor) {
            if (!sseEventSource.isOpen()) {
                sseEventSource.open(null, verb, entity, mediaTypes);
            }
        }
    },
            backpressureStrategy);
    return multi;
}
 
@Test
public void testDataOnlySse_InboundSseEvent() throws Exception {
    CountDownLatch resultsLatch = new CountDownLatch(3);
    AtomicReference<Throwable> subscriptionException = new AtomicReference<>();
    AtomicReference<Throwable> serverException = launchServer(resultsLatch, es -> {
        es.emitData("foo");
        es.emitData("bar");
        es.emitData("baz");
    });

    RsSseClient client = RestClientBuilder.newBuilder()
                                        .baseUri(URI.create("http://localhost:" + PORT + "/string/sse"))
                                        .build(RsSseClient.class);
    Publisher<InboundSseEvent> publisher = client.getEvents();
    InboundSseEventSubscriber subscriber = new InboundSseEventSubscriber(3, resultsLatch);
    publisher.subscribe(subscriber);

    assertTrue(resultsLatch.await(30, TimeUnit.SECONDS));
    assertEquals(subscriber.data, new HashSet<>(Arrays.asList("foo", "bar", "baz")));
    assertNull(serverException.get());
    assertNull(subscriptionException.get());
}
 
@Test
public void testCommentOnlySse() throws Exception {
    CountDownLatch resultsLatch = new CountDownLatch(3);
    AtomicReference<Throwable> subscriptionException = new AtomicReference<>();
    AtomicReference<Throwable> serverException = launchServer(resultsLatch, es -> {
        es.emitComment("huey");
        es.emitComment("dewey");
        es.emitComment("louie");
    });

    RsSseClient client = RestClientBuilder.newBuilder()
                                        .baseUri(URI.create("http://localhost:" + PORT+ "/string/sse"))
                                        .build(RsSseClient.class);
    Publisher<InboundSseEvent> publisher = client.getEvents();
    InboundSseEventSubscriber subscriber = new InboundSseEventSubscriber(3, resultsLatch);
    publisher.subscribe(subscriber);

    assertTrue(resultsLatch.await(30, TimeUnit.SECONDS));
    assertEquals(subscriber.comments, new HashSet<>(Arrays.asList("huey", "dewey", "louie")));
    assertNull(serverException.get());
    assertNull(subscriptionException.get());
}
 
源代码5 项目: cxf   文件: SseSubscription.java
@Override
public void request(long n) {
    if (canceled.get()) {
        return;
    }
    if (n < 1) {
        fireError(new IllegalArgumentException("Only positive values may be requested - passed-in " + n));
        return;
    }
    requested.addAndGet(n);
    synchronized (buffer) {
        InboundSseEvent bufferedEvent = null;
        synchronized (delivered) {
            while (delivered.get() < requested.get()
                   && (bufferedEvent = buffer.pollFirst()) != null) {

                subscriber.onNext(bufferedEvent);
                delivered.incrementAndGet();
            }
        }
    }
}
 
源代码6 项目: cxf   文件: SseSubscription.java
void fireEvent(InboundSseEvent event) {
    if (completed.get() || canceled.get()) {
        return;
    }
    delivered.updateAndGet(l -> {
        if (l < requested.get()) {
            subscriber.onNext(event);
            return l + 1;
        } else {
            buffer(event);
        }
        return l;
    });

    fireCompleteIfReady();
}
 
源代码7 项目: cxf   文件: SseMessageBodyReader.java
@Override
public Publisher<?> readFrom(Class<Publisher<?>> type, Type genericType,
        Annotation[] annotations, MediaType mediaType, MultivaluedMap<String, String> httpHeaders,
        InputStream entityStream) throws IOException, WebApplicationException {
    ProvidersImpl providersImpl = (ProvidersImpl) (providers instanceof ThreadLocalProviders
        ? ((ThreadLocalProviders)providers).get() : providers);
    ExecutorService executor = Utils.getExecutorService(mc);
    SsePublisher publisher = new SsePublisher(entityStream, executor, providersImpl);
    if (genericType instanceof ParameterizedType) {
        Type typeArgument = ((ParameterizedType)genericType).getActualTypeArguments()[0];
        if (typeArgument.equals(InboundSseEvent.class)) {
            return publisher;
        }

        return new SseTypeSafeProcessor<Object>(new GenericType<Object>(typeArgument), publisher);
    }
    return null;
}
 
源代码8 项目: onos   文件: RestSBControllerImplTest.java
/**
 * Tests the low level getServerSentEvents function of the REST SB Controller.
 *
 * Note: If the consumer throws an error it will not be propagated back up
 * to here - instead the source will go in to error and no more callbacks
 * will be executed
 */
@Test
public void testGetServerSentEvents() {
    Consumer<InboundSseEvent> sseEventConsumer = (event) -> {
        System.out.println("ServerSentEvent received: " + event);
        assertEquals("message-to-rest-sb", event.getName());
        // Just to show it works we stop before the last message is sent
        if (Integer.parseInt(event.getId()) == 8) {
            controller.cancelServerSentEvents(device1.deviceId());
        }
    };

    Consumer<Throwable> sseError = (error) -> {
        System.err.println(error);
        controller.cancelServerSentEvents(device1.deviceId());
        //fail(error.toString()); //Does nothing as it's in lambda scope
    };

    int response = controller.getServerSentEvents(device1.deviceId(),
            "/testme/server-sent-events",
            sseEventConsumer,
            sseError
    );
    assertEquals(204, response);
}
 
@Test
public void testNamedEventSse() throws Exception {
    CountDownLatch resultsLatch = new CountDownLatch(3);
    AtomicReference<Throwable> subscriptionException = new AtomicReference<>();
    AtomicReference<Throwable> serverException = launchServer(resultsLatch, es -> {
        es.emitNamedEvent("1", "{\"date\":\"2020-01-21\", \"description\":\"Significant snowfall\"}");
        sleep(500);
        es.emitNamedEvent("2", "{\"date\":\"2020-02-16\", \"description\":\"Hail storm\"}");
        sleep(500);
        es.emitNamedEvent("3", "{\"date\":\"2020-04-12\", \"description\":\"Blizzard\"}");
    });

    RsSseClient client = RestClientBuilder.newBuilder()
                                        .baseUri(URI.create("http://localhost:" + PORT+ "/string/sse"))
                                        .build(RsSseClient.class);
    Publisher<InboundSseEvent> publisher = client.getEvents();
    InboundSseEventSubscriber subscriber = new InboundSseEventSubscriber(3, resultsLatch);
    publisher.subscribe(subscriber);

    assertTrue(resultsLatch.await(40, TimeUnit.SECONDS));
    assertEquals(subscriber.names, new HashSet<>(Arrays.asList("1", "2", "3")));
    assertEquals(subscriber.data, new HashSet<>(Arrays.asList(
        "{\"date\":\"2020-01-21\", \"description\":\"Significant snowfall\"}",
        "{\"date\":\"2020-02-16\", \"description\":\"Hail storm\"}",
        "{\"date\":\"2020-04-12\", \"description\":\"Blizzard\"}")));
    assertNull(serverException.get());
    assertNull(subscriptionException.get());
}
 
@Test
public void testServerClosesConnection() throws Exception {
    CountDownLatch resultsLatch = new CountDownLatch(6);
    AtomicReference<Throwable> subscriptionException = new AtomicReference<>();
    AtomicReference<Throwable> serverException = launchServer(resultsLatch, es -> {
        es.emitData("one");
        es.emitData("two");
        sleep(500);
        es.emitData("three");
        sleep(500);
        es.emitData("four");
        es.emitData("five");
        sleep(500);
        es.close();
    });

    RsSseClient client = RestClientBuilder.newBuilder()
                                        .baseUri(URI.create("http://localhost:" + PORT+ "/string/sse"))
                                        .build(RsSseClient.class);
    Publisher<InboundSseEvent> publisher = client.getEvents();
    InboundSseEventSubscriber subscriber = new InboundSseEventSubscriber(20, resultsLatch){
        @Override
        public void onComplete() {
            super.onComplete();
            eventLatch.countDown();
        }
    };
    publisher.subscribe(subscriber);

    assertTrue(resultsLatch.await(45, TimeUnit.SECONDS));
    assertEquals(subscriber.data, new HashSet<>(Arrays.asList("one", "two", "three", "four", "five")));
    assertTrue(subscriber.completed);
    assertNull(serverException.get());
    assertNull(subscriptionException.get());
}
 
@Override
public void onNext(InboundSseEvent event) {
    LOG.debug("InboundSseEventSubscriber onNext " + event);
    data.add(event.readData());
    comments.add(event.getComment());
    names.add(event.getName());
    ids.add(event.getId());
    eventLatch.countDown();
}
 
@Override
public Publisher<InboundSseEvent> createPublisher(long elements) {
    LOG.debug("createPublisher (" + elements + ")");
    CountDownLatch latch = new CountDownLatch(1);
    try {
        AtomicReference<Throwable> serverException = launchServer(latch, es -> {
            for (long i = 0; i < elements; i++) {
                if (inMethod.get()) {
                    es.emitData(Long.toString(i));
                }
            }
            latch.countDown();
        }, cleanupLatch);

        if (serverException.get() != null) {
            throw serverException.get();
        }

        RsSseClient client = RestClientBuilder.newBuilder()
                .baseUri(URI.create("http://localhost:" + PORT + "/string/sse")).build(RsSseClient.class);
        Publisher<InboundSseEvent> publisher = client.getEvents();
        LOG.debug("createPublisher --> " + publisher);
        return publisher;
    }
    catch (Throwable t) {
        LOG.error("Failed to create publisher", t);
        t.printStackTrace();
        return null;
    }
}
 
源代码13 项目: cxf   文件: SsePublisher.java
@Override
public void subscribe(Subscriber<? super InboundSseEvent> subscriber) {
    SseSubscription subscription = new SseSubscription(this, subscriber);
    subscriptions.add(subscription);
    subscription.fireSubscribe();
    start();
}
 
源代码14 项目: cxf   文件: SseSubscription.java
private void buffer(InboundSseEvent event) {
    synchronized (buffer) {
        buffer.addLast(event);
        if (buffer.size() > bufferSize.get()) {
            buffer.removeFirst();
        }
    }
}
 
源代码15 项目: cxf   文件: SseTypeSafeProcessor.java
@Override
public void onNext(InboundSseEvent t) {
    LOG.entering(SseTypeSafeProcessor.class.getName(), "onNext", t);
    if (incomingSubscription == null) {
        throw new IllegalStateException("not subscribed");
    }
    if (!isClosed.get()) {
        @SuppressWarnings("unchecked")
        T data = (T) t.readData(type);
        for (Subscriber<? super T> subscriber : subscribers) {
            subscriber.onNext(data);
        }
    }
    LOG.exiting(SseTypeSafeProcessor.class.getName(), "onNext");
}
 
源代码16 项目: cxf   文件: SseEventSourceImpl.java
@Override
public void onNext(InboundSseEvent event) {
    lastEventId = event.getId();
    listeners.forEach(listener -> listener.onNext(event));
    
    // Reconnect delay is set in milliseconds
    if (event.isReconnectDelaySet()) {
        unit = TimeUnit.MILLISECONDS;
        delay = event.getReconnectDelay();
    }
}
 
源代码17 项目: onos   文件: HttpSBControllerImpl.java
@Override
public int getServerSentEvents(DeviceId deviceId, String request,
                               Consumer<InboundSseEvent> onEvent,
                               Consumer<Throwable> onError) {
    if (deviceId == null) {
        log.warn("Device ID is null", request);
        return Status.PRECONDITION_FAILED.getStatusCode();
    }

    if (request == null || request.isEmpty()) {
        log.warn("Request cannot be empty", request);
        return Status.PRECONDITION_FAILED.getStatusCode();
    }

    if (sseEventSourceMap.containsKey(deviceId)) {
        log.warn("Device", deviceId, "is already listening to an SSE stream");
        return Status.CONFLICT.getStatusCode();
    }

    WebTarget wt = getWebTarget(deviceId, request);
    SseEventSource sseEventSource = SseEventSource.target(wt).build();
    sseEventSource.register(onEvent, onError);
    sseEventSource.open();
    if (sseEventSource.isOpen()) {
        sseEventSourceMap.put(deviceId, sseEventSource);
        log.info("Opened Server Sent Events request to ", request, "on", deviceId);
        while (sseEventSource.isOpen()) {
            try {
                Thread.sleep(1010);
                System.out.println("Listening for SSEs");
            } catch (InterruptedException e) {
                log.error("Error", e);
            }
        }
        return Status.NO_CONTENT.getStatusCode();
    } else {
        log.error("Unable to open Server Sent Events request to ", request, "to", deviceId);
        return Status.INTERNAL_SERVER_ERROR.getStatusCode();
    }
}
 
源代码18 项目: onos   文件: RestSBServerSentEvent.java
public RestSBServerSentEvent(Type type, DeviceId deviceId, InboundSseEvent sseEvent) {
    super(type, deviceId);
    checkNotNull(sseEvent);
    data = sseEvent.readData();
    id = sseEvent.getId();
    name = sseEvent.getName();
    comment = sseEvent.getComment();
}
 
源代码19 项目: jbang   文件: smee.java
private void onMessage(InboundSseEvent event) {
    if("ping".equals(event.getName()) || "ready".equals(event.getName())) {
        return;
    }
    ObjectMapper mapper = new ObjectMapper();
    try {
        ObjectNode data = (ObjectNode) mapper.readTree(event.readData());

        var urib = UriBuilder.fromUri(this.target);

        if(data.has("query")) {
            urib.replaceQuery(URLEncoder.encode(data.get("query").asText(), "UTF-8"));
            data.remove("query");
        }

        try(CloseableHttpClient client = HttpClientBuilder.create().disableCookieManagement().build()) {

            var request = new HttpPost(urib.build());

            if(data.has("body")) {
                request.setEntity(new StringEntity(data.get("body").asText()));
                data.remove("body");
            }

            data.fieldNames().forEachRemaining(s ->
            {
                if(!s.equalsIgnoreCase("content-length")) {
                request.setHeader(s, data.get(s).asText());
            }});


            CloseableHttpResponse response = client.execute(request);

            if(response.getStatusLine().getStatusCode()!=200) {
                log.severe(response.getStatusLine().toString());
            } else {
                log.info(request.getMethod() + " " + request.getURI() + " - " + response.getStatusLine().getStatusCode());
            }
        }

    } catch (IOException e) {
        log.warning("Could not parse event data: " + e.getMessage());
        e.printStackTrace();
    } catch (RuntimeException re) {
        log.warning("Could not parse event data: " + re.getMessage());
        re.printStackTrace();
    }

}
 
@Override
public Publisher<InboundSseEvent> createFailedPublisher() {
    cleanupLatch.countDown();
    return null; // TODO: implement for failed publisher test support (optional tests)
}
 
源代码21 项目: microprofile-rest-client   文件: RsSseClient.java
@GET
@Path("ssePath2")
@Produces(MediaType.SERVER_SENT_EVENTS)
Publisher<InboundSseEvent> getEvents();
 
源代码22 项目: cxf   文件: StatsClient.java
private static void print(InboundSseEvent event) {
    final Stats stats = event.readData(Stats.class, MediaType.APPLICATION_JSON_TYPE);
    System.out.println(stats.getTimestamp() + ": " + stats.getLoad() + "%");
}
 
源代码23 项目: cxf   文件: SpringSseEmitterTest.java
private static Consumer<InboundSseEvent> collect(final Collection< Book > books) {
    return event -> books.add(event.readData(Book.class, javax.ws.rs.core.MediaType.APPLICATION_JSON_TYPE));
}
 
源代码24 项目: cxf   文件: AbstractSseTest.java
private static Consumer<InboundSseEvent> collect(final Collection<Book> books) {
    return event -> books.add(event.readData(Book.class, MediaType.APPLICATION_JSON_TYPE));
}
 
源代码25 项目: cxf   文件: AbstractSseTest.java
private static Consumer<InboundSseEvent> collectRaw(final Collection<String> titles) {
    return event -> titles.add(event.readData(String.class, MediaType.TEXT_PLAIN_TYPE));
}
 
源代码26 项目: cxf   文件: SseEventBuilder.java
InboundSseEvent build() {
    return new InboundSseEventImpl(providers, theName, theId, theComment, theData);
}
 
源代码27 项目: cxf   文件: SseSubscription.java
SseSubscription(SsePublisher publisher, Subscriber<? super InboundSseEvent> subscriber) {
    this.publisher = publisher;
    this.subscriber = subscriber;
}
 
源代码28 项目: cxf   文件: SseSubscriptionTest.java
@Override
public void onNext(InboundSseEvent t) {
    receivedEvents.add(t);
}
 
源代码29 项目: cxf   文件: SseEventSourceImpl.java
InboundSseEventListenerImpl(Consumer<InboundSseEvent> e) {
    this(e, ex -> { }, () -> { });
}
 
源代码30 项目: cxf   文件: SseEventSourceImpl.java
InboundSseEventListenerImpl(Consumer<InboundSseEvent> e, Consumer<Throwable> t) {
    this(e, t, () -> { });    
}
 
 类所在包
 类方法
 同包方法