下面列出了怎么用io.grpc.stub.CallStreamObserver的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void onNext(T t) {
if (sourceMode == ASYNC || sourceMode == NOT_FUSED) {
drain();
return;
}
if (!isCanceled()) {
checkNotNull(t);
final CallStreamObserver<T> subscriber = downstream;
try {
subscriber.onNext(t);
isRequested = false;
drain();
} catch (Throwable throwable) {
cancel();
try {
subscriber.onError(prepareError(throwable));
} catch (Throwable ignore) { }
}
}
}
@Test
public void statusExceptionTriggersHandler() {
CallStreamObserver delegate = mock(CallStreamObserver.class);
final AtomicBoolean called = new AtomicBoolean(false);
AbstractStreamObserverAndPublisher observer = new AbstractStreamObserverAndPublisher(new ArrayBlockingQueue(1), null, new Runnable() {
@Override
public void run() {
called.set(true);
}
}) { };
observer.onSubscribe(delegate);
TestSubscriber test = Flowable.fromPublisher(observer)
.test();
StatusException exception = Status.CANCELLED.asException();
observer.onError(exception);
test.awaitTerminalEvent();
test.assertError(exception);
assertThat(called.get()).isTrue();
assertThat(observer.outputFused).isFalse();
}
@Test
public void statusRuntimeExceptionTriggersHandler() {
CallStreamObserver delegate = mock(CallStreamObserver.class);
final AtomicBoolean called = new AtomicBoolean(false);
AbstractStreamObserverAndPublisher observer = new AbstractStreamObserverAndPublisher(new ArrayBlockingQueue(1), null, new Runnable() {
@Override
public void run() {
called.set(true);
}
}) { };
observer.onSubscribe(delegate);
TestSubscriber test = Flowable.fromPublisher(observer)
.test();
StatusRuntimeException exception = Status.CANCELLED.asRuntimeException();
observer.onError(exception);
test.awaitTerminalEvent();
test.assertError(exception);
assertThat(called.get()).isTrue();
assertThat(observer.outputFused).isFalse();
}
/**
* Implements a stream → unary call as {@link Flux} → {@link Mono}, where the client transits a stream of
* messages.
*/
@SuppressWarnings("unchecked")
public static <TRequest, TResponse> Mono<TResponse> manyToOne(
Flux<TRequest> fluxSource,
Function<StreamObserver<TResponse>, StreamObserver<TRequest>> delegate,
CallOptions options) {
try {
ReactorSubscriberAndClientProducer<TRequest> subscriberAndGRPCProducer =
fluxSource.subscribeWith(new ReactorSubscriberAndClientProducer<>());
ReactorClientStreamObserverAndPublisher<TResponse> observerAndPublisher =
new ReactorClientStreamObserverAndPublisher<>(
s -> subscriberAndGRPCProducer.subscribe((CallStreamObserver<TRequest>) s),
subscriberAndGRPCProducer::cancel
);
delegate.apply(observerAndPublisher);
return Flux.from(observerAndPublisher)
.singleOrEmpty();
} catch (Throwable throwable) {
return Mono.error(throwable);
}
}
/**
* Implements a bidirectional stream → stream call as {@link Flux} → {@link Flux}, where both the client
* and the server independently stream to each other.
*/
@SuppressWarnings("unchecked")
public static <TRequest, TResponse> Flux<TResponse> manyToMany(
Flux<TRequest> fluxSource,
Function<StreamObserver<TResponse>, StreamObserver<TRequest>> delegate,
CallOptions options) {
try {
final int prefetch = ReactorCallOptions.getPrefetch(options);
final int lowTide = ReactorCallOptions.getLowTide(options);
ReactorSubscriberAndClientProducer<TRequest> subscriberAndGRPCProducer =
fluxSource.subscribeWith(new ReactorSubscriberAndClientProducer<>());
ReactorClientStreamObserverAndPublisher<TResponse> observerAndPublisher =
new ReactorClientStreamObserverAndPublisher<>(
s -> subscriberAndGRPCProducer.subscribe((CallStreamObserver<TRequest>) s),
subscriberAndGRPCProducer::cancel, prefetch, lowTide
);
delegate.apply(observerAndPublisher);
return Flux.from(observerAndPublisher);
} catch (Throwable throwable) {
return Flux.error(throwable);
}
}
/**
* Implements a stream → unary call as {@link Flowable} → {@link Single}, where the client transits a stream of
* messages.
*/
@SuppressWarnings("unchecked")
public static <TRequest, TResponse> Single<TResponse> manyToOne(
final Flowable<TRequest> flowableSource,
final Function<StreamObserver<TResponse>, StreamObserver<TRequest>> delegate,
final CallOptions options) {
try {
final RxSubscriberAndClientProducer<TRequest> subscriberAndGRPCProducer =
flowableSource.subscribeWith(new RxSubscriberAndClientProducer<TRequest>());
final RxClientStreamObserverAndPublisher<TResponse> observerAndPublisher =
new RxClientStreamObserverAndPublisher<TResponse>(
new com.salesforce.reactivegrpc.common.Consumer<CallStreamObserver<?>>() {
@Override
public void accept(CallStreamObserver<?> observer) {
subscriberAndGRPCProducer.subscribe((CallStreamObserver<TRequest>) observer);
}
},
new Runnable() {
@Override
public void run() {
subscriberAndGRPCProducer.cancel();
}
}
);
delegate.apply(observerAndPublisher);
return Flowable.fromPublisher(observerAndPublisher)
.singleOrError();
} catch (Throwable throwable) {
return Single.error(throwable);
}
}
/**
* Implements a bidirectional stream → stream call as {@link Flowable} → {@link Flowable}, where both the client
* and the server independently stream to each other.
*/
@SuppressWarnings("unchecked")
public static <TRequest, TResponse> Flowable<TResponse> manyToMany(
final Flowable<TRequest> flowableSource,
final Function<StreamObserver<TResponse>, StreamObserver<TRequest>> delegate,
final CallOptions options) {
final int prefetch = RxCallOptions.getPrefetch(options);
final int lowTide = RxCallOptions.getLowTide(options);
try {
final RxSubscriberAndClientProducer<TRequest> subscriberAndGRPCProducer =
flowableSource.subscribeWith(new RxSubscriberAndClientProducer<TRequest>());
final RxClientStreamObserverAndPublisher<TResponse> observerAndPublisher =
new RxClientStreamObserverAndPublisher<TResponse>(
new com.salesforce.reactivegrpc.common.Consumer<CallStreamObserver<?>>() {
@Override
public void accept(CallStreamObserver<?> observer) {
subscriberAndGRPCProducer.subscribe((CallStreamObserver<TRequest>) observer);
}
},
new Runnable() {
@Override
public void run() {
subscriberAndGRPCProducer.cancel();
}
},
prefetch, lowTide);
delegate.apply(observerAndPublisher);
return Flowable.fromPublisher(observerAndPublisher);
} catch (Throwable throwable) {
return Flowable.error(throwable);
}
}
RxClientStreamObserverAndPublisher(
Consumer<CallStreamObserver<?>> onSubscribe,
Runnable onTerminate,
int prefetch,
int lowTide) {
super(new SimpleQueueAdapter<T>(new SpscArrayQueue<T>(prefetch)), onSubscribe, onTerminate, prefetch, lowTide);
}
RxServerStreamObserverAndPublisher(
ServerCallStreamObserver<?> serverCallStreamObserver,
Consumer<CallStreamObserver<?>> onSubscribe,
int prefetch,
int lowTide) {
super(serverCallStreamObserver, new SimpleQueueAdapter<T>(new SpscArrayQueue<T>(prefetch)), onSubscribe, prefetch, lowTide);
}
public AbstractClientStreamObserverAndPublisher(
Queue<T> queue,
Consumer<CallStreamObserver<?>> onSubscribe,
Runnable onTerminate,
int prefetch,
int lowTide) {
super(queue, prefetch, lowTide, onSubscribe, onTerminate);
}
AbstractStreamObserverAndPublisher(
Queue<T> queue,
int prefetch,
int lowTide,
Consumer<CallStreamObserver<?>> onSubscribe) {
this(queue, prefetch, lowTide, onSubscribe, null);
}
AbstractStreamObserverAndPublisher(
Queue<T> queue,
int prefetch,
int lowTide,
Consumer<CallStreamObserver<?>> onSubscribe,
Runnable onTerminate) {
this.prefetch = prefetch;
this.limit = lowTide;
this.queue = queue;
this.onSubscribe = onSubscribe;
this.onTerminate = onTerminate;
}
protected void onSubscribe(final CallStreamObserver<?> upstream) {
if (subscription == null && SUBSCRIPTION.compareAndSet(this, null, upstream)) {
upstream.disableAutoInboundFlowControl();
if (onSubscribe != null) {
onSubscribe.accept(upstream);
}
return;
}
throw new IllegalStateException(getClass().getSimpleName() + " supports only a single subscription");
}
public void subscribe(final CallStreamObserver<T> downstream) {
checkNotNull(downstream);
if (this.downstream == null && DOWNSTREAM.compareAndSet(this, null, downstream)) {
downstream.setOnReadyHandler(this);
return;
}
throw new IllegalStateException(getClass().getSimpleName() + " does not support multiple subscribers");
}
@Override
public void subscribe(CallStreamObserver<T> downstream) {
super.subscribe(downstream);
((ServerCallStreamObserver<?>) downstream).setOnCancelHandler(new Runnable() {
@Override
public void run() {
AbstractSubscriberAndServerProducer.super.cancel();
}
});
}
public AbstractServerStreamObserverAndPublisher(
ServerCallStreamObserver<?> serverCallStreamObserver,
Queue<T> queue,
Consumer<CallStreamObserver<?>> onSubscribe) {
super(queue, onSubscribe);
super.onSubscribe(serverCallStreamObserver);
}
public AbstractServerStreamObserverAndPublisher(
ServerCallStreamObserver<?> serverCallStreamObserver,
Queue<T> queue,
Consumer<CallStreamObserver<?>> onSubscribe,
int prefetch,
int lowTide) {
super(queue, prefetch, lowTide, onSubscribe);
super.onSubscribe(serverCallStreamObserver);
}
@Test
public void statusExceptionTriggersHandlerFuseable() {
CallStreamObserver delegate = mock(CallStreamObserver.class);
final AtomicBoolean called = new AtomicBoolean(false);
AbstractStreamObserverAndPublisher observer = new TestStreamObserverAndPublisherWithFusion(new ArrayBlockingQueue(1), null, new Runnable() {
@Override
public void run() {
called.set(true);
}
});
observer.onSubscribe(delegate);
TestSubscriber test = Flowable.fromPublisher(observer)
.observeOn(Schedulers.trampoline())
.test();
StatusException exception = Status.CANCELLED.asException();
observer.onError(exception);
test.awaitTerminalEvent();
test.assertError(exception);
assertThat(called.get()).isTrue();
assertThat(observer.outputFused).isTrue();
}
@Test
public void statusRuntimeExceptionTriggersHandlerFuseable() {
CallStreamObserver delegate = mock(CallStreamObserver.class);
final AtomicBoolean called = new AtomicBoolean(false);
AbstractStreamObserverAndPublisher observer = new TestStreamObserverAndPublisherWithFusion(new ArrayBlockingQueue(1), null, new Runnable() {
@Override
public void run() {
called.set(true);
}
});
observer.onSubscribe(delegate);
TestSubscriber test = Flowable.fromPublisher(observer)
.observeOn(Schedulers.trampoline())
.test();
StatusRuntimeException exception = Status.CANCELLED.asRuntimeException();
observer.onError(exception);
test.awaitTerminalEvent();
test.assertError(exception);
assertThat(called.get()).isTrue();
assertThat(observer.outputFused).isTrue();
}
ReactorServerStreamObserverAndPublisher(
ServerCallStreamObserver<?> serverCallStreamObserver,
Consumer<CallStreamObserver<?>> onSubscribe,
int prefetch,
int lowTide) {
super(serverCallStreamObserver, Queues.<T>get(DEFAULT_CHUNK_SIZE).get(), onSubscribe, prefetch, lowTide);
}
ReactorClientStreamObserverAndPublisher(
Consumer<CallStreamObserver<?>> onSubscribe,
Runnable onTerminate,
int prefetch,
int lowTide) {
super(Queues.<T>get(DEFAULT_CHUNK_SIZE).get(), onSubscribe, onTerminate, prefetch, lowTide);
}
BlockingStreamObserver(CallStreamObserver<T> delegate) {
this.delegate = delegate;
final Runnable notifyAll = () -> {
synchronized (lock) {
lock.notifyAll(); // wake up our thread
}
};
this.delegate.setOnReadyHandler(notifyAll);
if (delegate instanceof ServerCallStreamObserver) {
((ServerCallStreamObserver<T>) delegate).setOnCancelHandler(notifyAll);
}
}
RxClientStreamObserverAndPublisher(
Consumer<CallStreamObserver<?>> onSubscribe,
Runnable onTerminate) {
super(new SimpleQueueAdapter<T>(new SpscArrayQueue<T>(DEFAULT_CHUNK_SIZE)), onSubscribe, onTerminate);
}
public AbstractClientStreamObserverAndPublisher(
Queue<T> queue,
Consumer<CallStreamObserver<?>> onSubscribe) {
super(queue, onSubscribe);
}
public AbstractClientStreamObserverAndPublisher(
Queue<T> queue,
Consumer<CallStreamObserver<?>> onSubscribe,
Runnable onTerminate) {
super(queue, onSubscribe, onTerminate);
}
AbstractStreamObserverAndPublisher(
Queue<T> queue,
Consumer<CallStreamObserver<?>> onSubscribe) {
this(queue, DEFAULT_CHUNK_SIZE, TWO_THIRDS_OF_DEFAULT_CHUNK_SIZE, onSubscribe);
}
AbstractStreamObserverAndPublisher(
Queue<T> queue,
Consumer<CallStreamObserver<?>> onSubscribe,
Runnable onTerminate) {
this(queue, DEFAULT_CHUNK_SIZE, TWO_THIRDS_OF_DEFAULT_CHUNK_SIZE, onSubscribe, onTerminate);
}
private void drainRegular(final Subscriber<? super T> subscriber) {
int missed = 1;
final CallStreamObserver<?> s = subscription;
final Queue<T> q = queue;
int sent = produced;
long r;
for (;;) {
r = requested;
while (r != sent) {
boolean d = done;
T t = q.poll();
boolean empty = t == null;
if (checkTerminated(d, empty, subscriber, q)) {
return;
}
if (empty) {
break;
}
subscriber.onNext(t);
sent++;
if (sent == limit) {
if (r != Long.MAX_VALUE) {
r = REQUESTED.addAndGet(this, -sent);
}
s.request(sent);
sent = 0;
}
}
if (r == sent) {
if (checkTerminated(done, q.isEmpty(), subscriber, q)) {
return;
}
}
int w = wip;
if (missed == w) {
produced = sent;
missed = WIP.addAndGet(this, -missed);
if (missed == 0) {
break;
}
} else {
missed = w;
}
}
}
private void drain() {
if (WIP.getAndIncrement(this) != 0) {
return;
}
int mode = sourceMode;
int missed = 1;
final CallStreamObserver<? super T> subscriber = downstream;
if (mode == NOT_FUSED) {
final Subscription s = subscription;
if (s instanceof FusionModeAwareSubscription) {
mode = ((FusionModeAwareSubscription) s).mode();
sourceMode = mode;
if (mode == SYNC) {
done = true;
} else {
s.request(1);
}
} else {
mode = NONE;
sourceMode = mode;
}
}
for (;;) {
if (subscriber != null) {
if (mode == SYNC) {
drainSync();
} else if (mode == ASYNC) {
drainAsync();
} else {
drainRegular();
}
return;
}
missed = WIP.addAndGet(this, -missed);
if (missed == 0) {
break;
}
}
}
public TestStreamObserverAndPublisher(
com.salesforce.reactivegrpc.common.Consumer<CallStreamObserver<?>> onSubscribe) {
super(new ConcurrentLinkedQueue<T>(), onSubscribe);
}