类javax.ws.rs.sse.OutboundSseEvent源码实例Demo

下面列出了怎么用javax.ws.rs.sse.OutboundSseEvent的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: servicetalk   文件: AsynchronousResources.java
@Produces(SERVER_SENT_EVENTS)
@Path("/sse/stream")
@GET
public void getSseStream(@Context final SseEventSink eventSink,
                         @Context final Sse sse) {
    scheduleSseEventSend(new SseEmitter() {
        @Override
        public CompletionStage<?> emit(final OutboundSseEvent event) {
            return eventSink.send(event);
        }

        @Override
        public void close() {
            eventSink.close();
        }
    }, sse, Refs.of(0), ctx.executionContext().executor());
}
 
源代码2 项目: servicetalk   文件: AsynchronousResources.java
@Produces(SERVER_SENT_EVENTS)
@Path("/sse/broadcast")
@GET
public void getSseBroadcast(@Context final SseEventSink eventSink,
                            @Context final Sse sse) {
    eventSink.send(sse.newEvent("bar"));
    final SseBroadcaster sseBroadcaster = sse.newBroadcaster();
    sseBroadcaster.register(eventSink);

    scheduleSseEventSend(new SseEmitter() {
        @Override
        public CompletionStage<?> emit(final OutboundSseEvent event) {
            return sseBroadcaster.broadcast(event);
        }

        @Override
        public void close() {
            sseBroadcaster.close();
        }
    }, sse, Refs.of(0), ctx.executionContext().executor());
}
 
@GET
@Path("/sse")
@SseElementType(MediaType.APPLICATION_JSON)
@Produces(MediaType.SERVER_SENT_EVENTS)
public void serverSentEvents(@Context SseEventSink sink) {
    VanillaJavaImmutableData data = new VanillaJavaImmutableData("sse", "ssevalue");
    try {
        OutboundSseEvent.Builder builder = sse.newEventBuilder();
        builder.id(String.valueOf(1))
                .mediaType(MediaType.APPLICATION_JSON_TYPE)
                .data(data)
                .name("stream of json data");

        sink.send(builder.build());
    } finally {
        sink.close();
    }
}
 
源代码4 项目: quarkus   文件: ServerSentEventResource.java
@GET
@Path("/stream")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void sendData(@Context SseEventSink sink) {
    // send a stream of few events
    try {
        for (int i = 0; i < 3; i++) {
            final OutboundSseEvent.Builder builder = this.sse.newEventBuilder();
            builder.id(String.valueOf(i)).mediaType(MediaType.TEXT_PLAIN_TYPE)
                    .data(Integer.class, i)
                    .name("stream of numbers");

            sink.send(builder.build());
        }
    } finally {
        sink.close();
    }
}
 
源代码5 项目: quarkus   文件: ServerSentEventResource.java
@GET
@Path("/stream-html")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType("text/html")
public void sendHtmlData(@Context SseEventSink sink) {
    // send a stream of few events
    try {
        for (int i = 0; i < 3; i++) {
            final OutboundSseEvent.Builder builder = this.sse.newEventBuilder();
            builder.id(String.valueOf(i)).mediaType(MediaType.TEXT_HTML_TYPE)
                    .data("<html><body>" + i + "</body></html>")
                    .name("stream of pages");

            sink.send(builder.build());
        }
    } finally {
        sink.close();
    }
}
 
源代码6 项目: quarkus   文件: ServerSentEventResource.java
@GET
@Path("/stream-xml")
@SseElementType(MediaType.TEXT_XML)
@Produces(MediaType.SERVER_SENT_EVENTS)
public void sendXmlData(@Context SseEventSink sink) {
    // send a stream of few events
    try {
        for (int i = 0; i < 3; i++) {
            final OutboundSseEvent.Builder builder = this.sse.newEventBuilder();
            builder.id(String.valueOf(i)).mediaType(MediaType.TEXT_XML_TYPE)
                    .data("<settings><foo bar=\"" + i + "\"/></settings>")
                    .name("stream of XML data");

            sink.send(builder.build());
        }
    } finally {
        sink.close();
    }
}
 
源代码7 项目: mycore   文件: MCREventHandler.java
@Override
public void doHandleEvent(MCREvent evt) throws MCRException {
    if (!evt.getObjectType().equals(MCREvent.OBJECT_TYPE)) {
        return;
    }
    MCRObject obj = (MCRObject) evt.get(MCREvent.OBJECT_KEY);
    JsonObject jEvent = new JsonObject();
    JsonObject newData = getData(obj);
    addUserInfo(jEvent);
    jEvent.add("current", newData);
    MCRObject oldObj = (MCRObject) evt.get(MCREvent.OBJECT_OLD_KEY);
    if (oldObj != null) {
        JsonObject oldData = getData(oldObj);
        jEvent.add("old", oldData);
    }
    OutboundSseEvent event = sse.newEventBuilder()
        .mediaType(MediaType.APPLICATION_JSON_TYPE)
        .id(getId(evt))
        .name(getName(evt))
        .data(jEvent.toString())
        .build();
    sseBroadcaster.broadcast(event);
}
 
源代码8 项目: mycore   文件: MCREventHandler.java
@Override
public void doHandleEvent(MCREvent evt) throws MCRException {
    if (!evt.getObjectType().equals(MCREvent.DERIVATE_TYPE)) {
        return;
    }
    MCRDerivate der = (MCRDerivate) evt.get(MCREvent.DERIVATE_KEY);
    JsonObject jEvent = new JsonObject();
    addUserInfo(jEvent);
    JsonObject newData = getData(der);
    jEvent.add("current", newData);
    MCRDerivate oldDer = (MCRDerivate) evt.get(MCREvent.DERIVATE_OLD_KEY);
    if (oldDer != null) {
        JsonObject oldData = getData(oldDer);
        jEvent.add("old", oldData);
    }
    OutboundSseEvent event = sse.newEventBuilder()
        .mediaType(MediaType.APPLICATION_JSON_TYPE)
        .id(getId(evt))
        .name(getName(evt))
        .data(jEvent.toString())
        .build();
    sseBroadcaster.broadcast(event);
}
 
源代码9 项目: openhab-core   文件: SseResource.java
/**
 * Subscribes the connecting client for state updates. It will initially only send a "ready" event with an unique
 * connectionId that the client can use to dynamically alter the list of tracked items.
 *
 * @return {@link EventOutput} object associated with the incoming connection.
 */
@GET
@Path("/states")
@Produces(MediaType.SERVER_SENT_EVENTS)
@ApiOperation(value = "Initiates a new item state tracker connection")
@ApiResponses(value = { @ApiResponse(code = 200, message = "OK") })
public void getStateEvents(@Context final SseEventSink sseEventSink, @Context final HttpServletResponse response) {
    final SseSinkItemInfo sinkItemInfo = new SseSinkItemInfo();
    itemStatesBroadcaster.add(sseEventSink, sinkItemInfo);

    addCommonResponseHeaders(response);

    String connectionId = sinkItemInfo.getConnectionId();
    OutboundSseEvent readyEvent = sse.newEventBuilder().id("0").name("ready").data(connectionId).build();
    itemStatesBroadcaster.sendIf(readyEvent, hasConnectionId(connectionId));
}
 
源代码10 项目: openhab-core   文件: SseResource.java
/**
 * Alters the list of tracked items for a given state update connection
 *
 * @param connectionId the connection Id to change
 * @param itemNames the list of items to track
 */
@POST
@Path("/states/{connectionId}")
@ApiOperation(value = "Changes the list of items a SSE connection will receive state updates to.")
@ApiResponses(value = { @ApiResponse(code = 200, message = "OK"),
        @ApiResponse(code = 404, message = "Unknown connectionId") })
public Object updateTrackedItems(@PathParam("connectionId") String connectionId,
        @ApiParam("items") Set<String> itemNames) {
    Optional<SseSinkItemInfo> itemStateInfo = itemStatesBroadcaster.getInfoIf(hasConnectionId(connectionId))
            .findFirst();
    if (!itemStateInfo.isPresent()) {
        return Response.status(Status.NOT_FOUND).build();
    }

    itemStateInfo.get().updateTrackedItems(itemNames);

    OutboundSseEvent itemStateEvent = itemStatesEventBuilder.buildEvent(sse.newEventBuilder(), itemNames);
    if (itemStateEvent != null) {
        itemStatesBroadcaster.sendIf(itemStateEvent, hasConnectionId(connectionId));
    }

    return Response.ok().build();
}
 
源代码11 项目: openhab-core   文件: SseItemStatesEventBuilder.java
public @Nullable OutboundSseEvent buildEvent(Builder eventBuilder, Set<String> itemNames) {
    Map<String, StateDTO> payload = new HashMap<>(itemNames.size());
    for (String itemName : itemNames) {
        try {
            Item item = itemRegistry.getItem(itemName);
            StateDTO stateDto = new StateDTO();
            stateDto.state = item.getState().toString();
            String displayState = getDisplayState(item, Locale.getDefault());
            // Only include the display state if it's different than the raw state
            if (stateDto.state != null && !stateDto.state.equals(displayState)) {
                stateDto.displayState = displayState;
            }
            payload.put(itemName, stateDto);
        } catch (ItemNotFoundException e) {
            logger.warn("Attempting to send a state update of an item which doesn't exist: {}", itemName);
        }
    }

    if (!payload.isEmpty()) {
        return eventBuilder.mediaType(MediaType.APPLICATION_JSON_TYPE).data(payload).build();
    }

    return null;
}
 
源代码12 项目: cxf   文件: SseEventSinkImpl.java
@Override
public CompletionStage<?> send(OutboundSseEvent event) {
    final CompletableFuture<?> future = new CompletableFuture<>();

    if (!closed.get() && writer != null) {
        final Throwable ex = throwable.get(); 
        if (ex != null) {
            future.completeExceptionally(ex);
        } else if (buffer.offer(new QueuedEvent(event, future))) {
            if (dispatching.compareAndSet(false, true)) {
                ctx.start(this::dequeue);
            }
        } else {
            future.completeExceptionally(new IllegalStateException("The buffer is full (" 
                + bufferSize + "), unable to queue SSE event for send. Please use '" 
                    + BUFFER_SIZE_PROPERTY + "' property to increase the limit."));
        }
    } else {
        future.completeExceptionally(new IllegalStateException(
            "The sink is already closed, unable to queue SSE event for send"));
    }

    return future;
}
 
源代码13 项目: cxf   文件: SseEventSinkContextProvider.java
@Override
public SseEventSink createContext(Message message) {
    final HttpServletRequest request = (HttpServletRequest)message.get(AbstractHTTPDestination.HTTP_REQUEST);
    if (request == null) {
        throw new IllegalStateException("Unable to retrieve HTTP request from the context");
    }

    final MessageBodyWriter<OutboundSseEvent> writer = new OutboundSseEventBodyWriter(
        ServerProviderFactory.getInstance(message), message.getExchange());

    final AsyncResponse async = new AsyncResponseImpl(message);
    final Integer bufferSize = PropertyUtils.getInteger(message, SseEventSinkImpl.BUFFER_SIZE_PROPERTY);
    
    final SseEventSink sink = createSseEventSink(request, writer, async, bufferSize);
    message.put(SseEventSink.class, sink);
    
    return sink;
}
 
源代码14 项目: cxf   文件: OutboundSseEventImplTest.java
/**
 * A user should not need to specify a media type when creating an outbound event. The default
 * should be <code>MediaType.SERVER_SENT_EVENTS_TYPE</code>.
 */
@Test
public void testDefaultMediaType() {
    Sse sse = new SseImpl();

    // test newEvent(data)
    OutboundSseEvent event = sse.newEvent("myData");
    assertNull(event.getName());
    assertEquals("myData", event.getData());
    assertEquals(MediaType.TEXT_PLAIN_TYPE, event.getMediaType());

    // test newEvent(name, data)
    event = sse.newEvent("myName", "myData2");
    assertEquals("myName", event.getName());
    assertEquals("myData2", event.getData());
    assertEquals(MediaType.TEXT_PLAIN_TYPE, event.getMediaType());

    // test newEventBuilder()...build()
    event = sse.newEventBuilder().comment("myComment").data("myData3").build();
    assertEquals("myComment", event.getComment());
    assertEquals("myData3", event.getData());
    assertEquals(MediaType.TEXT_PLAIN_TYPE, event.getMediaType());
}
 
源代码15 项目: onos   文件: RestSBControllerImplTest.java
@GET
@Path("server-sent-events")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void getServerSentEvents(@Context SseEventSink eventSink, @Context Sse sse) throws InterruptedException {
    new Thread(() -> {
        try {
            for (int i = 0; i < 10; i++) {
                // ... code that waits 0.1 second
                    Thread.sleep(100L);
                final OutboundSseEvent event = sse.newEventBuilder()
                        .id(String.valueOf(i))
                        .name("message-to-rest-sb")
                        .data(String.class, "Test message " + i + "!")
                        .build();
                eventSink.send(event);
                System.out.println("Message " + i + " sent");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
}
 
@Lock(WRITE)
public void onEvent(@Observes DomainEvent domainEvent) {
    String message = domainEvent.getContents();
    messages.add(message);

    OutboundSseEvent event = createEvent(message, ++lastEventId);

    sseBroadcaster.broadcast(event);
}
 
源代码17 项目: liberty-bikes   文件: PartyQueue.java
public void notifyPosition() {
    int position = queuePosition();
    OutboundSseEvent event = sse.newEventBuilder()
                    .mediaType(MediaType.APPLICATION_JSON_TYPE)
                    .data(new OutboundMessage.QueuePosition(position))
                    .build();
    sink.send(event);
    party.log("Notified queued client " + playerId + " who is currently at position " + position);
}
 
源代码18 项目: liberty-bikes   文件: PartyQueue.java
public void promoteToGame(String roundId) {
    OutboundSseEvent event = sse.newEventBuilder()
                    .mediaType(MediaType.APPLICATION_JSON_TYPE)
                    .data(new OutboundMessage.RequeueGame(roundId))
                    .build();
    sink.send(event);
    party.log("Promoted queued client " + playerId + " into round " + roundId);
    close();
}
 
源代码19 项目: mycore   文件: MCREventHandler.java
public void sendEvent(MCREvent evt, Path path, BasicFileAttributes attrs) {
    if (!(path instanceof MCRPath) || !(attrs instanceof MCRFileAttributes)) {
        LogManager.getLogger().warn("Cannot handle {} {}", path.getClass(), attrs.getClass());
        return;
    }
    JsonObject file = new JsonObject();
    addUserInfo(file);
    String derId = ((MCRPath) path).getOwner();
    String fPath = ((MCRPath) path).getOwnerRelativePath();
    String objId = MCRMetadataManager.getObjectId(MCRObjectID.getInstance(derId), 1, TimeUnit.MINUTES)
        .toString();
    String relPath = String.format(Locale.ROOT, "objects/%s/derivates/%s/contents/%s", objId, derId, fPath);
    String uri = uriResolver.apply(getPathURI(relPath)).toString();
    file.addProperty("uri", uri);
    file.addProperty("derivate", derId);
    file.addProperty("object", objId);
    file.addProperty("size", attrs.size());
    file.addProperty("modified", attrs.lastModifiedTime().toInstant().toString());
    file.addProperty("md5", ((MCRFileAttributes) attrs).md5sum());
    file.addProperty("mimeType", context.getMimeType(path.getFileName().toString()));
    OutboundSseEvent event = sse.newEventBuilder()
        .mediaType(MediaType.APPLICATION_JSON_TYPE)
        .id(getId(evt))
        .name(getName(evt))
        .data(file.toString())
        .build();
    sseBroadcaster.broadcast(event);
}
 
源代码20 项目: openhab-core   文件: SseBroadcaster.java
public void sendIf(final OutboundSseEvent event, Predicate<I> predicate) {
    logger.trace("broadcast to potential {} sinks", sinks.size());
    sinks.forEach((sink, info) -> {
        if (sink.isClosed()) {
            // We are using a concurrent collection, so we are allowed to modify the collection asynchronous (we
            // don't know if there is currently an iteration in progress or not, but it does not matter).
            handleRemoval(sink);
            return;
        }

        // Check if we should send at all.
        if (!predicate.test(info)) {
            return;
        }

        sink.send(event).exceptionally(th -> {
            close(sink);

            // We are using a concurrent collection, so we are allowed to modify the collection asynchronous (we
            // don't know if there is currently an iteration in progress or not, but it does not matter).
            handleRemoval(sink);

            final String thClass = th.getClass().toString();
            final String message = th.getMessage();

            if (thClass.equals("class org.eclipse.jetty.io.EofException")) {
                // The peer terminates the connection.
            } else if (th instanceof IllegalStateException && message != null
                    && (message.equals("The sink is already closed, unable to queue SSE event for send")
                            || message.equals("AsyncContext completed and/or Request lifecycle recycled"))) {
                // java.lang.IllegalStateException: The sink is already closed, unable to queue SSE event for
                // send
                // java.lang.IllegalStateException: AsyncContext completed and/or Request lifecycle recycled
            } else {
                logger.warn("failure", th);
            }
            return null;
        });
    });
}
 
源代码21 项目: openhab-core   文件: SseResource.java
/**
 * Broadcasts a state event to all currently listening clients, after transforming it to a simple map.
 *
 * @param stateChangeEvent the {@link ItemStateChangedEvent} containing the new state
 */
public void handleEventBroadcastItemState(final ItemStateChangedEvent stateChangeEvent) {
    String itemName = stateChangeEvent.getItemName();
    boolean isTracked = itemStatesBroadcaster.getInfoIf(info -> true).anyMatch(tracksItem(itemName));
    if (isTracked) {
        OutboundSseEvent event = itemStatesEventBuilder.buildEvent(sse.newEventBuilder(), Set.of(itemName));
        if (event != null) {
            itemStatesBroadcaster.sendIf(event, tracksItem(itemName));
        }
    }
}
 
源代码22 项目: openhab-core   文件: SseUtil.java
/**
 * Creates a new {@link OutboundSseEvent} object containing an {@link EventDTO} created for the given Eclipse
 * SmartHome {@link Event}.
 *
 * @param eventBuilder the builder that should be used
 * @param event the event data transfer object
 * @return a new OutboundEvent
 */
public static OutboundSseEvent buildEvent(OutboundSseEvent.Builder eventBuilder, EventDTO event) {
    final OutboundSseEvent sseEvent = eventBuilder.name("message") //
            .mediaType(MediaType.APPLICATION_JSON_TYPE) //
            .data(event) //
            .build();

    return sseEvent;
}
 
源代码23 项目: openhab-core   文件: SitemapResource.java
@Override
public void onEvent(SitemapEvent event) {
    final Sse sse = this.sse;
    if (sse == null) {
        logger.trace("broadcast skipped (no one listened since activation)");
        return;
    }

    final OutboundSseEvent outboundSseEvent = sse.newEventBuilder().name("event")
            .mediaType(MediaType.APPLICATION_JSON_TYPE).data(event).build();
    broadcaster.sendIf(outboundSseEvent, info -> {
        String sitemapName = event.sitemapName;
        String pageId = event.pageId;
        if (sitemapName != null && sitemapName.equals(subscriptions.getSitemapName(info.subscriptionId))
                && pageId != null && pageId.equals(subscriptions.getPageId(info.subscriptionId))) {
            if (logger.isDebugEnabled()) {
                if (event instanceof SitemapWidgetEvent) {
                    logger.debug("Sent sitemap event for widget {} to subscription {}.",
                            ((SitemapWidgetEvent) event).widgetId, info.subscriptionId);
                } else if (event instanceof ServerAliveEvent) {
                    logger.debug("Sent alive event to subscription {}.", info.subscriptionId);
                }
            }
            return true;
        }
        return false;
    });
}
 
源代码24 项目: cxf   文件: StatsRestServiceImpl.java
@Context 
public void setSse(Sse sse) {
    this.broadcaster = sse.newBroadcaster();
    this.builder = sse.newEventBuilder();
    
    Flowable
        .interval(500, TimeUnit.MILLISECONDS)
        .zipWith(
            Flowable.generate((Emitter<OutboundSseEvent.Builder> emitter) -> emitter.onNext(builder.name("stats"))),
            (id, bldr) -> createStatsEvent(bldr, id)
        )
        .subscribeOn(Schedulers.single())
        .subscribe(broadcaster::broadcast);
}
 
源代码25 项目: cxf   文件: StatsRestServiceImpl.java
private static OutboundSseEvent createStatsEvent(final OutboundSseEvent.Builder builder, final long eventId) {
    return builder
        .id("" + eventId)
        .data(Stats.class, new Stats(new Date().getTime(), RANDOM.nextInt(100)))
        .mediaType(MediaType.APPLICATION_JSON_TYPE)
        .build();
}
 
源代码26 项目: cxf   文件: StatsRestServiceImpl.java
private static OutboundSseEvent createStatsEvent(final OutboundSseEvent.Builder builder, final int eventId) {
    return builder
        .id("" + eventId)
        .mediaType(MediaType.APPLICATION_JSON_TYPE)
        .data(Stats.class, new Stats(new Date().getTime(), RANDOM.nextInt(100)))
        .build();
}
 
源代码27 项目: cxf   文件: StatsRestServiceImpl.java
private static OutboundSseEvent createStatsEvent(final OutboundSseEvent.Builder builder, final int eventId) {
    return builder
        .id("" + eventId)
        .data(Stats.class, new Stats(new Date().getTime(), RANDOM.nextInt(100)))
        .mediaType(MediaType.APPLICATION_JSON_TYPE)
        .build();
}
 
源代码28 项目: cxf   文件: StatsRestServiceImpl.java
private static OutboundSseEvent createStatsEvent(final OutboundSseEvent.Builder builder, final int eventId) {
    return builder
        .id("" + eventId)
        .data(Stats.class, new Stats(new Date().getTime(), RANDOM.nextInt(100)))
        .mediaType(MediaType.APPLICATION_JSON_TYPE)
        .build();
}
 
源代码29 项目: cxf   文件: StatsRestServiceImpl.java
private static OutboundSseEvent createStatsEvent(final OutboundSseEvent.Builder builder, final int eventId) {
    return builder
        .id("" + eventId)
        .data(Stats.class, new Stats(new Date().getTime(), RANDOM.nextInt(100)))
        .mediaType(MediaType.APPLICATION_JSON_TYPE)
        .build();
}
 
源代码30 项目: cxf   文件: BookStoreClientCloseable.java
protected static OutboundSseEvent createEvent(final OutboundSseEvent.Builder builder, final int eventId) {
    return builder
        .id(Integer.toString(eventId))
        .data(Book.class, new Book("New Book #" + eventId, eventId))
        .mediaType(MediaType.APPLICATION_JSON_TYPE)
        .build();
}
 
 类所在包
 类方法
 同包方法