下面列出了怎么用javax.ws.rs.sse.InboundSseEvent的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testSse() throws Exception {
Client client = ClientBuilder.newBuilder().build();
WebTarget target = client.target(uri.toString() + "sse");
try (SseEventSource eventSource = SseEventSource.target(target).build()) {
CompletableFuture<String> res = new CompletableFuture<>();
eventSource.register(new Consumer<InboundSseEvent>() {
@Override
public void accept(InboundSseEvent inboundSseEvent) {
res.complete(inboundSseEvent.readData());
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
res.completeExceptionally(throwable);
}
});
eventSource.open();
Assertions.assertEquals("hello", res.get(5, TimeUnit.SECONDS));
}
}
private <T> Multi<T> eventSourceToMulti(
final SseEventSourceImpl sseEventSource,
final Function<InboundSseEventImpl, T> tSupplier,
final String verb,
final Entity<?> entity,
final MediaType[] mediaTypes) {
final Multi<T> multi = Multi.createFrom().emitter(emitter -> {
sseEventSource.register(
(InboundSseEvent e) -> emitter.emit(tSupplier.apply((InboundSseEventImpl) e)),
(Throwable t) -> emitter.fail(t),
() -> emitter.complete());
synchronized (monitor) {
if (!sseEventSource.isOpen()) {
sseEventSource.open(null, verb, entity, mediaTypes);
}
}
},
backpressureStrategy);
return multi;
}
@Test
public void testDataOnlySse_InboundSseEvent() throws Exception {
CountDownLatch resultsLatch = new CountDownLatch(3);
AtomicReference<Throwable> subscriptionException = new AtomicReference<>();
AtomicReference<Throwable> serverException = launchServer(resultsLatch, es -> {
es.emitData("foo");
es.emitData("bar");
es.emitData("baz");
});
RsSseClient client = RestClientBuilder.newBuilder()
.baseUri(URI.create("http://localhost:" + PORT + "/string/sse"))
.build(RsSseClient.class);
Publisher<InboundSseEvent> publisher = client.getEvents();
InboundSseEventSubscriber subscriber = new InboundSseEventSubscriber(3, resultsLatch);
publisher.subscribe(subscriber);
assertTrue(resultsLatch.await(30, TimeUnit.SECONDS));
assertEquals(subscriber.data, new HashSet<>(Arrays.asList("foo", "bar", "baz")));
assertNull(serverException.get());
assertNull(subscriptionException.get());
}
@Test
public void testCommentOnlySse() throws Exception {
CountDownLatch resultsLatch = new CountDownLatch(3);
AtomicReference<Throwable> subscriptionException = new AtomicReference<>();
AtomicReference<Throwable> serverException = launchServer(resultsLatch, es -> {
es.emitComment("huey");
es.emitComment("dewey");
es.emitComment("louie");
});
RsSseClient client = RestClientBuilder.newBuilder()
.baseUri(URI.create("http://localhost:" + PORT+ "/string/sse"))
.build(RsSseClient.class);
Publisher<InboundSseEvent> publisher = client.getEvents();
InboundSseEventSubscriber subscriber = new InboundSseEventSubscriber(3, resultsLatch);
publisher.subscribe(subscriber);
assertTrue(resultsLatch.await(30, TimeUnit.SECONDS));
assertEquals(subscriber.comments, new HashSet<>(Arrays.asList("huey", "dewey", "louie")));
assertNull(serverException.get());
assertNull(subscriptionException.get());
}
@Override
public void request(long n) {
if (canceled.get()) {
return;
}
if (n < 1) {
fireError(new IllegalArgumentException("Only positive values may be requested - passed-in " + n));
return;
}
requested.addAndGet(n);
synchronized (buffer) {
InboundSseEvent bufferedEvent = null;
synchronized (delivered) {
while (delivered.get() < requested.get()
&& (bufferedEvent = buffer.pollFirst()) != null) {
subscriber.onNext(bufferedEvent);
delivered.incrementAndGet();
}
}
}
}
void fireEvent(InboundSseEvent event) {
if (completed.get() || canceled.get()) {
return;
}
delivered.updateAndGet(l -> {
if (l < requested.get()) {
subscriber.onNext(event);
return l + 1;
} else {
buffer(event);
}
return l;
});
fireCompleteIfReady();
}
@Override
public Publisher<?> readFrom(Class<Publisher<?>> type, Type genericType,
Annotation[] annotations, MediaType mediaType, MultivaluedMap<String, String> httpHeaders,
InputStream entityStream) throws IOException, WebApplicationException {
ProvidersImpl providersImpl = (ProvidersImpl) (providers instanceof ThreadLocalProviders
? ((ThreadLocalProviders)providers).get() : providers);
ExecutorService executor = Utils.getExecutorService(mc);
SsePublisher publisher = new SsePublisher(entityStream, executor, providersImpl);
if (genericType instanceof ParameterizedType) {
Type typeArgument = ((ParameterizedType)genericType).getActualTypeArguments()[0];
if (typeArgument.equals(InboundSseEvent.class)) {
return publisher;
}
return new SseTypeSafeProcessor<Object>(new GenericType<Object>(typeArgument), publisher);
}
return null;
}
/**
* Tests the low level getServerSentEvents function of the REST SB Controller.
*
* Note: If the consumer throws an error it will not be propagated back up
* to here - instead the source will go in to error and no more callbacks
* will be executed
*/
@Test
public void testGetServerSentEvents() {
Consumer<InboundSseEvent> sseEventConsumer = (event) -> {
System.out.println("ServerSentEvent received: " + event);
assertEquals("message-to-rest-sb", event.getName());
// Just to show it works we stop before the last message is sent
if (Integer.parseInt(event.getId()) == 8) {
controller.cancelServerSentEvents(device1.deviceId());
}
};
Consumer<Throwable> sseError = (error) -> {
System.err.println(error);
controller.cancelServerSentEvents(device1.deviceId());
//fail(error.toString()); //Does nothing as it's in lambda scope
};
int response = controller.getServerSentEvents(device1.deviceId(),
"/testme/server-sent-events",
sseEventConsumer,
sseError
);
assertEquals(204, response);
}
@Test
public void testNamedEventSse() throws Exception {
CountDownLatch resultsLatch = new CountDownLatch(3);
AtomicReference<Throwable> subscriptionException = new AtomicReference<>();
AtomicReference<Throwable> serverException = launchServer(resultsLatch, es -> {
es.emitNamedEvent("1", "{\"date\":\"2020-01-21\", \"description\":\"Significant snowfall\"}");
sleep(500);
es.emitNamedEvent("2", "{\"date\":\"2020-02-16\", \"description\":\"Hail storm\"}");
sleep(500);
es.emitNamedEvent("3", "{\"date\":\"2020-04-12\", \"description\":\"Blizzard\"}");
});
RsSseClient client = RestClientBuilder.newBuilder()
.baseUri(URI.create("http://localhost:" + PORT+ "/string/sse"))
.build(RsSseClient.class);
Publisher<InboundSseEvent> publisher = client.getEvents();
InboundSseEventSubscriber subscriber = new InboundSseEventSubscriber(3, resultsLatch);
publisher.subscribe(subscriber);
assertTrue(resultsLatch.await(40, TimeUnit.SECONDS));
assertEquals(subscriber.names, new HashSet<>(Arrays.asList("1", "2", "3")));
assertEquals(subscriber.data, new HashSet<>(Arrays.asList(
"{\"date\":\"2020-01-21\", \"description\":\"Significant snowfall\"}",
"{\"date\":\"2020-02-16\", \"description\":\"Hail storm\"}",
"{\"date\":\"2020-04-12\", \"description\":\"Blizzard\"}")));
assertNull(serverException.get());
assertNull(subscriptionException.get());
}
@Test
public void testServerClosesConnection() throws Exception {
CountDownLatch resultsLatch = new CountDownLatch(6);
AtomicReference<Throwable> subscriptionException = new AtomicReference<>();
AtomicReference<Throwable> serverException = launchServer(resultsLatch, es -> {
es.emitData("one");
es.emitData("two");
sleep(500);
es.emitData("three");
sleep(500);
es.emitData("four");
es.emitData("five");
sleep(500);
es.close();
});
RsSseClient client = RestClientBuilder.newBuilder()
.baseUri(URI.create("http://localhost:" + PORT+ "/string/sse"))
.build(RsSseClient.class);
Publisher<InboundSseEvent> publisher = client.getEvents();
InboundSseEventSubscriber subscriber = new InboundSseEventSubscriber(20, resultsLatch){
@Override
public void onComplete() {
super.onComplete();
eventLatch.countDown();
}
};
publisher.subscribe(subscriber);
assertTrue(resultsLatch.await(45, TimeUnit.SECONDS));
assertEquals(subscriber.data, new HashSet<>(Arrays.asList("one", "two", "three", "four", "five")));
assertTrue(subscriber.completed);
assertNull(serverException.get());
assertNull(subscriptionException.get());
}
@Override
public void onNext(InboundSseEvent event) {
LOG.debug("InboundSseEventSubscriber onNext " + event);
data.add(event.readData());
comments.add(event.getComment());
names.add(event.getName());
ids.add(event.getId());
eventLatch.countDown();
}
@Override
public Publisher<InboundSseEvent> createPublisher(long elements) {
LOG.debug("createPublisher (" + elements + ")");
CountDownLatch latch = new CountDownLatch(1);
try {
AtomicReference<Throwable> serverException = launchServer(latch, es -> {
for (long i = 0; i < elements; i++) {
if (inMethod.get()) {
es.emitData(Long.toString(i));
}
}
latch.countDown();
}, cleanupLatch);
if (serverException.get() != null) {
throw serverException.get();
}
RsSseClient client = RestClientBuilder.newBuilder()
.baseUri(URI.create("http://localhost:" + PORT + "/string/sse")).build(RsSseClient.class);
Publisher<InboundSseEvent> publisher = client.getEvents();
LOG.debug("createPublisher --> " + publisher);
return publisher;
}
catch (Throwable t) {
LOG.error("Failed to create publisher", t);
t.printStackTrace();
return null;
}
}
@Override
public void subscribe(Subscriber<? super InboundSseEvent> subscriber) {
SseSubscription subscription = new SseSubscription(this, subscriber);
subscriptions.add(subscription);
subscription.fireSubscribe();
start();
}
private void buffer(InboundSseEvent event) {
synchronized (buffer) {
buffer.addLast(event);
if (buffer.size() > bufferSize.get()) {
buffer.removeFirst();
}
}
}
@Override
public void onNext(InboundSseEvent t) {
LOG.entering(SseTypeSafeProcessor.class.getName(), "onNext", t);
if (incomingSubscription == null) {
throw new IllegalStateException("not subscribed");
}
if (!isClosed.get()) {
@SuppressWarnings("unchecked")
T data = (T) t.readData(type);
for (Subscriber<? super T> subscriber : subscribers) {
subscriber.onNext(data);
}
}
LOG.exiting(SseTypeSafeProcessor.class.getName(), "onNext");
}
@Override
public void onNext(InboundSseEvent event) {
lastEventId = event.getId();
listeners.forEach(listener -> listener.onNext(event));
// Reconnect delay is set in milliseconds
if (event.isReconnectDelaySet()) {
unit = TimeUnit.MILLISECONDS;
delay = event.getReconnectDelay();
}
}
@Override
public int getServerSentEvents(DeviceId deviceId, String request,
Consumer<InboundSseEvent> onEvent,
Consumer<Throwable> onError) {
if (deviceId == null) {
log.warn("Device ID is null", request);
return Status.PRECONDITION_FAILED.getStatusCode();
}
if (request == null || request.isEmpty()) {
log.warn("Request cannot be empty", request);
return Status.PRECONDITION_FAILED.getStatusCode();
}
if (sseEventSourceMap.containsKey(deviceId)) {
log.warn("Device", deviceId, "is already listening to an SSE stream");
return Status.CONFLICT.getStatusCode();
}
WebTarget wt = getWebTarget(deviceId, request);
SseEventSource sseEventSource = SseEventSource.target(wt).build();
sseEventSource.register(onEvent, onError);
sseEventSource.open();
if (sseEventSource.isOpen()) {
sseEventSourceMap.put(deviceId, sseEventSource);
log.info("Opened Server Sent Events request to ", request, "on", deviceId);
while (sseEventSource.isOpen()) {
try {
Thread.sleep(1010);
System.out.println("Listening for SSEs");
} catch (InterruptedException e) {
log.error("Error", e);
}
}
return Status.NO_CONTENT.getStatusCode();
} else {
log.error("Unable to open Server Sent Events request to ", request, "to", deviceId);
return Status.INTERNAL_SERVER_ERROR.getStatusCode();
}
}
public RestSBServerSentEvent(Type type, DeviceId deviceId, InboundSseEvent sseEvent) {
super(type, deviceId);
checkNotNull(sseEvent);
data = sseEvent.readData();
id = sseEvent.getId();
name = sseEvent.getName();
comment = sseEvent.getComment();
}
private void onMessage(InboundSseEvent event) {
if("ping".equals(event.getName()) || "ready".equals(event.getName())) {
return;
}
ObjectMapper mapper = new ObjectMapper();
try {
ObjectNode data = (ObjectNode) mapper.readTree(event.readData());
var urib = UriBuilder.fromUri(this.target);
if(data.has("query")) {
urib.replaceQuery(URLEncoder.encode(data.get("query").asText(), "UTF-8"));
data.remove("query");
}
try(CloseableHttpClient client = HttpClientBuilder.create().disableCookieManagement().build()) {
var request = new HttpPost(urib.build());
if(data.has("body")) {
request.setEntity(new StringEntity(data.get("body").asText()));
data.remove("body");
}
data.fieldNames().forEachRemaining(s ->
{
if(!s.equalsIgnoreCase("content-length")) {
request.setHeader(s, data.get(s).asText());
}});
CloseableHttpResponse response = client.execute(request);
if(response.getStatusLine().getStatusCode()!=200) {
log.severe(response.getStatusLine().toString());
} else {
log.info(request.getMethod() + " " + request.getURI() + " - " + response.getStatusLine().getStatusCode());
}
}
} catch (IOException e) {
log.warning("Could not parse event data: " + e.getMessage());
e.printStackTrace();
} catch (RuntimeException re) {
log.warning("Could not parse event data: " + re.getMessage());
re.printStackTrace();
}
}
@Override
public Publisher<InboundSseEvent> createFailedPublisher() {
cleanupLatch.countDown();
return null; // TODO: implement for failed publisher test support (optional tests)
}
@GET
@Path("ssePath2")
@Produces(MediaType.SERVER_SENT_EVENTS)
Publisher<InboundSseEvent> getEvents();
private static void print(InboundSseEvent event) {
final Stats stats = event.readData(Stats.class, MediaType.APPLICATION_JSON_TYPE);
System.out.println(stats.getTimestamp() + ": " + stats.getLoad() + "%");
}
private static Consumer<InboundSseEvent> collect(final Collection< Book > books) {
return event -> books.add(event.readData(Book.class, javax.ws.rs.core.MediaType.APPLICATION_JSON_TYPE));
}
private static Consumer<InboundSseEvent> collect(final Collection<Book> books) {
return event -> books.add(event.readData(Book.class, MediaType.APPLICATION_JSON_TYPE));
}
private static Consumer<InboundSseEvent> collectRaw(final Collection<String> titles) {
return event -> titles.add(event.readData(String.class, MediaType.TEXT_PLAIN_TYPE));
}
InboundSseEvent build() {
return new InboundSseEventImpl(providers, theName, theId, theComment, theData);
}
SseSubscription(SsePublisher publisher, Subscriber<? super InboundSseEvent> subscriber) {
this.publisher = publisher;
this.subscriber = subscriber;
}
@Override
public void onNext(InboundSseEvent t) {
receivedEvents.add(t);
}
InboundSseEventListenerImpl(Consumer<InboundSseEvent> e) {
this(e, ex -> { }, () -> { });
}
InboundSseEventListenerImpl(Consumer<InboundSseEvent> e, Consumer<Throwable> t) {
this(e, t, () -> { });
}