类javax.ws.rs.sse.SseEventSink源码实例Demo

下面列出了怎么用javax.ws.rs.sse.SseEventSink的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: servicetalk   文件: AsynchronousResources.java
@Produces(SERVER_SENT_EVENTS)
@Path("/sse/stream")
@GET
public void getSseStream(@Context final SseEventSink eventSink,
                         @Context final Sse sse) {
    scheduleSseEventSend(new SseEmitter() {
        @Override
        public CompletionStage<?> emit(final OutboundSseEvent event) {
            return eventSink.send(event);
        }

        @Override
        public void close() {
            eventSink.close();
        }
    }, sse, Refs.of(0), ctx.executionContext().executor());
}
 
源代码2 项目: servicetalk   文件: AsynchronousResources.java
@Produces(SERVER_SENT_EVENTS)
@Path("/sse/broadcast")
@GET
public void getSseBroadcast(@Context final SseEventSink eventSink,
                            @Context final Sse sse) {
    eventSink.send(sse.newEvent("bar"));
    final SseBroadcaster sseBroadcaster = sse.newBroadcaster();
    sseBroadcaster.register(eventSink);

    scheduleSseEventSend(new SseEmitter() {
        @Override
        public CompletionStage<?> emit(final OutboundSseEvent event) {
            return sseBroadcaster.broadcast(event);
        }

        @Override
        public void close() {
            sseBroadcaster.close();
        }
    }, sse, Refs.of(0), ctx.executionContext().executor());
}
 
@GET
@Path("/sse")
@SseElementType(MediaType.APPLICATION_JSON)
@Produces(MediaType.SERVER_SENT_EVENTS)
public void serverSentEvents(@Context SseEventSink sink) {
    VanillaJavaImmutableData data = new VanillaJavaImmutableData("sse", "ssevalue");
    try {
        OutboundSseEvent.Builder builder = sse.newEventBuilder();
        builder.id(String.valueOf(1))
                .mediaType(MediaType.APPLICATION_JSON_TYPE)
                .data(data)
                .name("stream of json data");

        sink.send(builder.build());
    } finally {
        sink.close();
    }
}
 
源代码4 项目: quarkus   文件: ServerSentEventResource.java
@GET
@Path("/stream")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void sendData(@Context SseEventSink sink) {
    // send a stream of few events
    try {
        for (int i = 0; i < 3; i++) {
            final OutboundSseEvent.Builder builder = this.sse.newEventBuilder();
            builder.id(String.valueOf(i)).mediaType(MediaType.TEXT_PLAIN_TYPE)
                    .data(Integer.class, i)
                    .name("stream of numbers");

            sink.send(builder.build());
        }
    } finally {
        sink.close();
    }
}
 
源代码5 项目: quarkus   文件: ServerSentEventResource.java
@GET
@Path("/stream-html")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType("text/html")
public void sendHtmlData(@Context SseEventSink sink) {
    // send a stream of few events
    try {
        for (int i = 0; i < 3; i++) {
            final OutboundSseEvent.Builder builder = this.sse.newEventBuilder();
            builder.id(String.valueOf(i)).mediaType(MediaType.TEXT_HTML_TYPE)
                    .data("<html><body>" + i + "</body></html>")
                    .name("stream of pages");

            sink.send(builder.build());
        }
    } finally {
        sink.close();
    }
}
 
源代码6 项目: quarkus   文件: ServerSentEventResource.java
@GET
@Path("/stream-xml")
@SseElementType(MediaType.TEXT_XML)
@Produces(MediaType.SERVER_SENT_EVENTS)
public void sendXmlData(@Context SseEventSink sink) {
    // send a stream of few events
    try {
        for (int i = 0; i < 3; i++) {
            final OutboundSseEvent.Builder builder = this.sse.newEventBuilder();
            builder.id(String.valueOf(i)).mediaType(MediaType.TEXT_XML_TYPE)
                    .data("<settings><foo bar=\"" + i + "\"/></settings>")
                    .name("stream of XML data");

            sink.send(builder.build());
        }
    } finally {
        sink.close();
    }
}
 
@POST
@Path("domains/{id}")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void startDomain(@PathParam("id") final String id, @Context SseEventSink eventSink) {

    executorService.submit(() -> {
        try {
            eventSink.send(sse.newEventBuilder().name("domain-progress").data(String.class, "starting domain " + id + " ...").build());
            Thread.sleep(200);
            eventSink.send(sse.newEvent("domain-progress", "50%"));
            Thread.sleep(200);
            eventSink.send(sse.newEvent("domain-progress", "60%"));
            Thread.sleep(200);
            eventSink.send(sse.newEvent("domain-progress", "70%"));
            Thread.sleep(200);
            eventSink.send(sse.newEvent("domain-progress", "99%"));
            Thread.sleep(200);
            eventSink.send(sse.newEvent("domain-progress", "Done."));
            eventSink.close();
        } catch (final InterruptedException e) {
            e.printStackTrace();
        }
    });
}
 
@POST
@Path("progress/{report_id}")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void eventStream(@PathParam("report_id") final String id,
                        @Context SseEventSink es,
                        @Context Sse sse) {
    executorService.execute(() -> {
        try {
            eventSink.send(sse.newEventBuilder().name("report-progress")
                    .data(String.class, "Commencing process for report " + id + " ...").build());
            es.send(sse.newEvent("Progress", "25%"));
            Thread.sleep(500);
            es.send(sse.newEvent("Progress", "50%"));
            Thread.sleep(500);
            es.send(sse.newEvent("Progress", "75%"));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
}
 
源代码9 项目: liberty-bikes   文件: PartyQueue.java
public void add(String playerId, SseEventSink sink, Sse sse) {
    QueuedClient client = new QueuedClient(playerId, sink, sse);
    // If this client was already in the queue, remove them and add them at the end
    if (waitingPlayers.removeFirstOccurrence(client)) {
        party.log("Removed client " + playerId + " from queue before adding at end");
        GameMetrics.counterDec(GameMetrics.currentQueuedPlayersCounter);
    }
    party.log("Adding client " + playerId + " into the queue in position " + client.queuePosition());
    waitingPlayers.add(client);

    GameMetrics.counterInc(GameMetrics.currentQueuedPlayersCounter);

    if (party.getCurrentRound().isOpen())
        promoteClients();
    else
        client.notifyPosition();
}
 
源代码10 项目: openhab-core   文件: SseResource.java
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@ApiOperation(value = "Get all events.")
@ApiResponses(value = { @ApiResponse(code = 200, message = "OK"),
        @ApiResponse(code = 400, message = "Topic is empty or contains invalid characters") })
public void listen(@Context final SseEventSink sseEventSink, @Context final HttpServletResponse response,
        @QueryParam("topics") @ApiParam(value = "topics") String eventFilter) {
    if (!SseUtil.isValidTopicFilter(eventFilter)) {
        response.setStatus(Status.BAD_REQUEST.getStatusCode());
        return;
    }

    topicBroadcaster.add(sseEventSink, new SseSinkTopicInfo(eventFilter));

    addCommonResponseHeaders(response);
}
 
源代码11 项目: openhab-core   文件: SseResource.java
/**
 * Subscribes the connecting client for state updates. It will initially only send a "ready" event with an unique
 * connectionId that the client can use to dynamically alter the list of tracked items.
 *
 * @return {@link EventOutput} object associated with the incoming connection.
 */
@GET
@Path("/states")
@Produces(MediaType.SERVER_SENT_EVENTS)
@ApiOperation(value = "Initiates a new item state tracker connection")
@ApiResponses(value = { @ApiResponse(code = 200, message = "OK") })
public void getStateEvents(@Context final SseEventSink sseEventSink, @Context final HttpServletResponse response) {
    final SseSinkItemInfo sinkItemInfo = new SseSinkItemInfo();
    itemStatesBroadcaster.add(sseEventSink, sinkItemInfo);

    addCommonResponseHeaders(response);

    String connectionId = sinkItemInfo.getConnectionId();
    OutboundSseEvent readyEvent = sse.newEventBuilder().id("0").name("ready").data(connectionId).build();
    itemStatesBroadcaster.sendIf(readyEvent, hasConnectionId(connectionId));
}
 
源代码12 项目: cxf   文件: BookStore.java
@GET
@Path("nodelay/sse/{id}")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void forBookNoDelay(@Context SseEventSink sink, @PathParam("id") final String id) {
    final Builder builder = sse.newEventBuilder();
    
    CompletableFuture
        .runAsync(() -> {
            sink.send(createEvent(builder.name("book"), 1));
            sink.send(createEvent(builder.name("book"), 2));
            sink.send(createEvent(builder.name("book"), 3));
            sink.send(createEvent(builder.name("book"), 4));
            sink.send(createEvent(builder.name("book"), 5));
        })
        .whenComplete((r, ex) -> sink.close());
}
 
源代码13 项目: cxf   文件: BookStore.java
@GET
@Path("/titles/sse")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void forBookTitlesOnly(@Context SseEventSink sink) {
    final Builder builder = sse.newEventBuilder();
    
    CompletableFuture
        .runAsync(() -> {
            sink.send(createRawEvent(builder.name("book"), 1));
            sink.send(createRawEvent(builder.name("book"), 2));
            sink.send(createRawEvent(builder.name("book"), 3));
            sink.send(createRawEvent(builder.name("book"), 4));
            sink.send(createRawEvent(builder.name("book"), 5));
        })
        .whenComplete((r, ex) -> sink.close());
}
 
源代码14 项目: cxf   文件: BookStore2.java
@GET
@Path("nodelay/sse/{id}")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void forBookNoDelay(@Context SseEventSink sink, @PathParam("id") final String id) {
    final Builder builder = sse.newEventBuilder();
    
    CompletableFuture
        .runAsync(() -> {
            sink.send(createEvent(builder.name("book"), 1));
            sink.send(createEvent(builder.name("book"), 2));
            sink.send(createEvent(builder.name("book"), 3));
            sink.send(createEvent(builder.name("book"), 4));
            sink.send(createEvent(builder.name("book"), 5));
        })
        .whenComplete((r, ex) -> sink.close());
}
 
源代码15 项目: cxf   文件: BookStore2.java
@GET
@Path("/titles/sse")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void forBookTitlesOnly(@Context SseEventSink sink) {
    final Builder builder = sse.newEventBuilder();
    
    CompletableFuture
        .runAsync(() -> {
            sink.send(createRawEvent(builder.name("book"), 1));
            sink.send(createRawEvent(builder.name("book"), 2));
            sink.send(createRawEvent(builder.name("book"), 3));
            sink.send(createRawEvent(builder.name("book"), 4));
            sink.send(createRawEvent(builder.name("book"), 5));
        })
        .whenComplete((r, ex) -> sink.close());
}
 
源代码16 项目: cxf   文件: SseEventSinkContextProvider.java
@Override
public SseEventSink createContext(Message message) {
    final HttpServletRequest request = (HttpServletRequest)message.get(AbstractHTTPDestination.HTTP_REQUEST);
    if (request == null) {
        throw new IllegalStateException("Unable to retrieve HTTP request from the context");
    }

    final MessageBodyWriter<OutboundSseEvent> writer = new OutboundSseEventBodyWriter(
        ServerProviderFactory.getInstance(message), message.getExchange());

    final AsyncResponse async = new AsyncResponseImpl(message);
    final Integer bufferSize = PropertyUtils.getInteger(message, SseEventSinkImpl.BUFFER_SIZE_PROPERTY);
    
    final SseEventSink sink = createSseEventSink(request, writer, async, bufferSize);
    message.put(SseEventSink.class, sink);
    
    return sink;
}
 
源代码17 项目: cxf   文件: SseEventSinkContextProviderTest.java
@Test
public void testCreateSseEventSinkWithDefaultBufferSize() {
    final SseEventSink sink = provider.createContext(message);
    
    IntStream
        .range(0, 10000)
        .mapToObj(i -> sink.send(EVENT))
        .map(CompletionStage::toCompletableFuture)
        // At this point, buffer is full, but nothing has been delivered so far
        .forEach(f -> assertThat(f.isDone(), equalTo(false)));
   
    // The buffer overflow should trigger message rejection and exceptional completion
    final CompletableFuture<?> overflow = sink.send(EVENT).toCompletableFuture();
    assertThat(overflow.isCompletedExceptionally(), equalTo(true));
    
    exception.expect(CompletionException.class);
    exception.expectMessage("The buffer is full (10000), unable to queue SSE event for send.");

    overflow.join();
}
 
源代码18 项目: cxf   文件: SseEventSinkContextProviderTest.java
@Test
public void testCreateSseEventSinkWithCustomBufferSize() {
    message.put(SseEventSinkImpl.BUFFER_SIZE_PROPERTY, 20000);
    final SseEventSink sink = provider.createContext(message);
    
    IntStream
        .range(0, 20000)
        .mapToObj(i -> sink.send(EVENT))
        .map(CompletionStage::toCompletableFuture)
        // At this point, buffer is full, but nothing has been delivered so far
        .forEach(f -> assertThat(f.isDone(), equalTo(false)));
   
    // The buffer overflow should trigger message rejection and exceptional completion
    final CompletableFuture<?> overflow = sink.send(EVENT).toCompletableFuture();
    assertThat(overflow.isCompletedExceptionally(), equalTo(true));
    
    exception.expect(CompletionException.class);
    exception.expectMessage("The buffer is full (20000), unable to queue SSE event for send."); 

    overflow.join();
}
 
源代码19 项目: onos   文件: RestSBControllerImplTest.java
@GET
@Path("server-sent-events")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void getServerSentEvents(@Context SseEventSink eventSink, @Context Sse sse) throws InterruptedException {
    new Thread(() -> {
        try {
            for (int i = 0; i < 10; i++) {
                // ... code that waits 0.1 second
                    Thread.sleep(100L);
                final OutboundSseEvent event = sse.newEventBuilder()
                        .id(String.valueOf(i))
                        .name("message-to-rest-sb")
                        .data(String.class, "Test message " + i + "!")
                        .build();
                eventSink.send(event);
                System.out.println("Message " + i + " sent");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
}
 
源代码20 项目: servicetalk   文件: CancellableResources.java
@Produces(SERVER_SENT_EVENTS)
@Path("/sse")
@GET
public void getSseStream(@Context final SseEventSink eventSink,
                         @Context final Sse sse,
                         @Context final ConnectionContext ctx) {
    sendSseUntilFailure(eventSink, sse, ctx.executionContext().executor());
}
 
源代码21 项目: servicetalk   文件: CancellableResources.java
private void sendSseUntilFailure(final SseEventSink eventSink, final Sse sse, final Executor executor) {
    try {
        eventSink.send(sse.newEvent("foo"));
        executor.schedule(() -> sendSseUntilFailure(eventSink, sse, executor), 10, MILLISECONDS);
    } catch (final Throwable t) {
        if (eventSink.isClosed()) {
            sseSinkClosedLatch.countDown();
        } else {
            throw new IllegalStateException("SseEventSink should be closed", t);
        }
    }
}
 
源代码22 项目: servicetalk   文件: AsynchronousResources.java
@Produces(SERVER_SENT_EVENTS)
@Path("/sse/unsupported")
@GET
public void getSseUnsupportedType(@Context final SseEventSink eventSink,
                                  @Context final Sse sse) {
    eventSink.send(sse.newEventBuilder()
            .data(Buffer.class, ctx.executionContext().bufferAllocator().fromAscii("foo"))
            .mediaType(MediaType.TEXT_PLAIN_TYPE)
            .build());
}
 
源代码23 项目: Java-EE-8-and-Angular   文件: TaskUpdatesSSE.java
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public void subscribe(@Context Sse sse, @Context SseEventSink eventSink) {
    this.sse = sse;
    if(this.broadcaster == null) {
        this.broadcaster = sse.newBroadcaster();
    }
    this.broadcaster.register(eventSink);
}
 
源代码24 项目: Java-EE-8-and-Angular   文件: IssueUpdatesSSE.java
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public void subscribe(@Context Sse sse, @Context SseEventSink eventSink) {
    this.sse = sse;
    if(this.broadcaster == null) {
        this.broadcaster = sse.newBroadcaster();
    }
    this.broadcaster.register(eventSink);
}
 
源代码25 项目: quarkus   文件: SseResource.java
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public void subscribe(@Context SseEventSink sink) throws IOException {
    if (sink == null) {
        throw new IllegalStateException("No client connected.");
    }
    SseBroadcaster sseBroadcaster = sse.newBroadcaster();

    sseBroadcaster.register(sink);
    sseBroadcaster.broadcast(sse.newEventBuilder().data("hello").build());
}
 
源代码26 项目: javaee8-cookbook   文件: ServerSentService.java
@Path("register/{id}")
@Produces(MediaType.SERVER_SENT_EVENTS)
@GET
public void register(@PathParam("id") Long id,
        @Context SseEventSink sseEventSink) {
    final UserEvent event = POOL.get(id);

    if (event != null) {
        event.getSseBroadcaster().register(sseEventSink);
    } else {
        throw new NotFoundException();
    }
}
 
源代码27 项目: Java-EE-8-Sampler   文件: SseResource.java
@GET
@Path("subscribe")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void subscribe(@Context SseEventSink eventSink) {
    eventSink.send(sse.newEvent("You are subscribed"));
    broadcaster.register(eventSink);
}
 
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public void getMessageQueue(@Context SseEventSink eventSink) {
    synchronized (outputLock) {
        if (this.eventSink != null) {
            throw new IllegalStateException("Server sink already served.");
        }
        this.eventSink = eventSink;
    }
}
 
@GET
@Lock(READ)
@Produces(MediaType.SERVER_SENT_EVENTS)
public void itemEvents(@HeaderParam(HttpHeaders.LAST_EVENT_ID_HEADER)
                       @DefaultValue("-1") int lastEventId,
                       @Context SseEventSink eventSink) {

    if (lastEventId >= 0)
        replayLastMessages(lastEventId, eventSink);

    sseBroadcaster.register(eventSink);
}
 
private void replayLastMessages(int lastEventId, SseEventSink eventSink) {
    try {
        for (int i = lastEventId; i < messages.size(); i++) {
            eventSink.send(createEvent(messages.get(i), i + 1));
        }
    } catch (Exception e) {
        throw new InternalServerErrorException("Could not replay messages ", e);
    }
}
 
 类所在包
 类方法
 同包方法