类io.grpc.stub.CallStreamObserver源码实例Demo

下面列出了怎么用io.grpc.stub.CallStreamObserver的API类实例代码及写法,或者点击链接到github查看源代码。

@Override
public void onNext(T t) {
    if (sourceMode == ASYNC || sourceMode == NOT_FUSED) {
        drain();
        return;
    }

    if (!isCanceled()) {
        checkNotNull(t);

        final CallStreamObserver<T> subscriber = downstream;

        try {
            subscriber.onNext(t);
            isRequested = false;
            drain();
        } catch (Throwable throwable) {
            cancel();
            try {
                subscriber.onError(prepareError(throwable));
            } catch (Throwable ignore) { }
        }
    }
}
 
@Test
public void statusExceptionTriggersHandler() {
    CallStreamObserver delegate = mock(CallStreamObserver.class);
    final AtomicBoolean called = new AtomicBoolean(false);

    AbstractStreamObserverAndPublisher observer = new AbstractStreamObserverAndPublisher(new ArrayBlockingQueue(1), null, new Runnable() {
        @Override
        public void run() {
            called.set(true);
        }
    }) { };

    observer.onSubscribe(delegate);

    TestSubscriber test = Flowable.fromPublisher(observer)
                                  .test();

    StatusException exception = Status.CANCELLED.asException();
    observer.onError(exception);

    test.awaitTerminalEvent();
    test.assertError(exception);

    assertThat(called.get()).isTrue();
    assertThat(observer.outputFused).isFalse();
}
 
@Test
public void statusRuntimeExceptionTriggersHandler() {
    CallStreamObserver delegate = mock(CallStreamObserver.class);
    final AtomicBoolean called = new AtomicBoolean(false);

    AbstractStreamObserverAndPublisher observer = new AbstractStreamObserverAndPublisher(new ArrayBlockingQueue(1), null, new Runnable() {
        @Override
        public void run() {
            called.set(true);
        }
    }) { };

    observer.onSubscribe(delegate);

    TestSubscriber test = Flowable.fromPublisher(observer)
                                  .test();

    StatusRuntimeException exception = Status.CANCELLED.asRuntimeException();
    observer.onError(exception);

    test.awaitTerminalEvent();
    test.assertError(exception);

    assertThat(called.get()).isTrue();
    assertThat(observer.outputFused).isFalse();
}
 
源代码4 项目: reactive-grpc   文件: ClientCalls.java
/**
 * Implements a stream → unary call as {@link Flux} → {@link Mono}, where the client transits a stream of
 * messages.
 */
@SuppressWarnings("unchecked")
public static <TRequest, TResponse> Mono<TResponse> manyToOne(
        Flux<TRequest> fluxSource,
        Function<StreamObserver<TResponse>, StreamObserver<TRequest>> delegate,
        CallOptions options) {
    try {
        ReactorSubscriberAndClientProducer<TRequest> subscriberAndGRPCProducer =
                fluxSource.subscribeWith(new ReactorSubscriberAndClientProducer<>());
        ReactorClientStreamObserverAndPublisher<TResponse> observerAndPublisher =
                new ReactorClientStreamObserverAndPublisher<>(
                    s -> subscriberAndGRPCProducer.subscribe((CallStreamObserver<TRequest>) s),
                    subscriberAndGRPCProducer::cancel
                );
        delegate.apply(observerAndPublisher);

        return Flux.from(observerAndPublisher)
                   .singleOrEmpty();
    } catch (Throwable throwable) {
        return Mono.error(throwable);
    }
}
 
源代码5 项目: reactive-grpc   文件: ClientCalls.java
/**
 * Implements a bidirectional stream → stream call as {@link Flux} → {@link Flux}, where both the client
 * and the server independently stream to each other.
 */
@SuppressWarnings("unchecked")
public static <TRequest, TResponse> Flux<TResponse> manyToMany(
        Flux<TRequest> fluxSource,
        Function<StreamObserver<TResponse>, StreamObserver<TRequest>> delegate,
        CallOptions options) {
    try {

        final int prefetch = ReactorCallOptions.getPrefetch(options);
        final int lowTide = ReactorCallOptions.getLowTide(options);

        ReactorSubscriberAndClientProducer<TRequest> subscriberAndGRPCProducer =
            fluxSource.subscribeWith(new ReactorSubscriberAndClientProducer<>());
        ReactorClientStreamObserverAndPublisher<TResponse> observerAndPublisher =
            new ReactorClientStreamObserverAndPublisher<>(
                s -> subscriberAndGRPCProducer.subscribe((CallStreamObserver<TRequest>) s),
                subscriberAndGRPCProducer::cancel, prefetch, lowTide
            );
        delegate.apply(observerAndPublisher);

        return Flux.from(observerAndPublisher);
    } catch (Throwable throwable) {
        return Flux.error(throwable);
    }
}
 
源代码6 项目: reactive-grpc   文件: ClientCalls.java
/**
 * Implements a stream → unary call as {@link Flowable} → {@link Single}, where the client transits a stream of
 * messages.
 */
@SuppressWarnings("unchecked")
public static <TRequest, TResponse> Single<TResponse> manyToOne(
        final Flowable<TRequest> flowableSource,
        final Function<StreamObserver<TResponse>, StreamObserver<TRequest>> delegate,
        final CallOptions options) {
    try {
        final RxSubscriberAndClientProducer<TRequest> subscriberAndGRPCProducer =
                flowableSource.subscribeWith(new RxSubscriberAndClientProducer<TRequest>());
        final RxClientStreamObserverAndPublisher<TResponse> observerAndPublisher =
            new RxClientStreamObserverAndPublisher<TResponse>(
                new com.salesforce.reactivegrpc.common.Consumer<CallStreamObserver<?>>() {
                    @Override
                    public void accept(CallStreamObserver<?> observer) {
                        subscriberAndGRPCProducer.subscribe((CallStreamObserver<TRequest>) observer);
                    }
                },
                new Runnable() {
                    @Override
                    public void run() {
                        subscriberAndGRPCProducer.cancel();
                    }
                }
            );
        delegate.apply(observerAndPublisher);

        return Flowable.fromPublisher(observerAndPublisher)
                       .singleOrError();
    } catch (Throwable throwable) {
        return Single.error(throwable);
    }
}
 
源代码7 项目: reactive-grpc   文件: ClientCalls.java
/**
 * Implements a bidirectional stream → stream call as {@link Flowable} → {@link Flowable}, where both the client
 * and the server independently stream to each other.
 */
@SuppressWarnings("unchecked")
public static <TRequest, TResponse> Flowable<TResponse> manyToMany(
        final Flowable<TRequest> flowableSource,
        final Function<StreamObserver<TResponse>, StreamObserver<TRequest>> delegate,
        final CallOptions options) {

    final int prefetch = RxCallOptions.getPrefetch(options);
    final int lowTide = RxCallOptions.getLowTide(options);

    try {
        final RxSubscriberAndClientProducer<TRequest> subscriberAndGRPCProducer =
                flowableSource.subscribeWith(new RxSubscriberAndClientProducer<TRequest>());
        final RxClientStreamObserverAndPublisher<TResponse> observerAndPublisher =
            new RxClientStreamObserverAndPublisher<TResponse>(
                new com.salesforce.reactivegrpc.common.Consumer<CallStreamObserver<?>>() {
                    @Override
                    public void accept(CallStreamObserver<?> observer) {
                        subscriberAndGRPCProducer.subscribe((CallStreamObserver<TRequest>) observer);
                    }
                },
                new Runnable() {
                    @Override
                    public void run() {
                        subscriberAndGRPCProducer.cancel();
                    }
                },
                prefetch, lowTide);
        delegate.apply(observerAndPublisher);

        return Flowable.fromPublisher(observerAndPublisher);
    } catch (Throwable throwable) {
        return Flowable.error(throwable);
    }
}
 
RxClientStreamObserverAndPublisher(
        Consumer<CallStreamObserver<?>> onSubscribe,
        Runnable onTerminate,
        int prefetch,
        int lowTide) {
    super(new SimpleQueueAdapter<T>(new SpscArrayQueue<T>(prefetch)), onSubscribe, onTerminate, prefetch, lowTide);
}
 
RxServerStreamObserverAndPublisher(
        ServerCallStreamObserver<?> serverCallStreamObserver,
        Consumer<CallStreamObserver<?>> onSubscribe,
        int prefetch,
        int lowTide) {
    super(serverCallStreamObserver, new SimpleQueueAdapter<T>(new SpscArrayQueue<T>(prefetch)), onSubscribe, prefetch, lowTide);
}
 
public AbstractClientStreamObserverAndPublisher(
        Queue<T> queue,
        Consumer<CallStreamObserver<?>> onSubscribe,
        Runnable onTerminate,
        int prefetch,
        int lowTide) {
    super(queue, prefetch, lowTide, onSubscribe, onTerminate);
}
 
AbstractStreamObserverAndPublisher(
        Queue<T> queue,
        int prefetch,
        int lowTide,
        Consumer<CallStreamObserver<?>> onSubscribe) {
    this(queue, prefetch, lowTide, onSubscribe, null);
}
 
AbstractStreamObserverAndPublisher(
        Queue<T> queue,
        int prefetch,
        int lowTide,
        Consumer<CallStreamObserver<?>> onSubscribe,
        Runnable onTerminate) {
    this.prefetch = prefetch;
    this.limit = lowTide;
    this.queue = queue;
    this.onSubscribe = onSubscribe;
    this.onTerminate = onTerminate;
}
 
protected void onSubscribe(final CallStreamObserver<?> upstream) {
    if (subscription == null && SUBSCRIPTION.compareAndSet(this, null, upstream)) {
        upstream.disableAutoInboundFlowControl();
        if (onSubscribe != null) {
            onSubscribe.accept(upstream);
        }
        return;
    }

    throw new IllegalStateException(getClass().getSimpleName() + " supports only a single subscription");
}
 
public void subscribe(final CallStreamObserver<T> downstream) {
    checkNotNull(downstream);

    if (this.downstream == null && DOWNSTREAM.compareAndSet(this, null, downstream)) {
        downstream.setOnReadyHandler(this);
        return;
    }

    throw new IllegalStateException(getClass().getSimpleName() + " does not support multiple subscribers");
}
 
@Override
public void subscribe(CallStreamObserver<T> downstream) {
    super.subscribe(downstream);
    ((ServerCallStreamObserver<?>) downstream).setOnCancelHandler(new Runnable() {
        @Override
        public void run() {
            AbstractSubscriberAndServerProducer.super.cancel();
        }
    });
}
 
public AbstractServerStreamObserverAndPublisher(
    ServerCallStreamObserver<?> serverCallStreamObserver,
    Queue<T> queue,
    Consumer<CallStreamObserver<?>> onSubscribe) {
    super(queue, onSubscribe);
    super.onSubscribe(serverCallStreamObserver);
}
 
public AbstractServerStreamObserverAndPublisher(
        ServerCallStreamObserver<?> serverCallStreamObserver,
        Queue<T> queue,
        Consumer<CallStreamObserver<?>> onSubscribe,
        int prefetch,
        int lowTide) {
    super(queue, prefetch, lowTide, onSubscribe);
    super.onSubscribe(serverCallStreamObserver);
}
 
@Test
public void statusExceptionTriggersHandlerFuseable() {
    CallStreamObserver delegate = mock(CallStreamObserver.class);
    final AtomicBoolean called = new AtomicBoolean(false);

    AbstractStreamObserverAndPublisher observer = new TestStreamObserverAndPublisherWithFusion(new ArrayBlockingQueue(1), null, new Runnable() {
        @Override
        public void run() {
            called.set(true);
        }
    });

    observer.onSubscribe(delegate);

    TestSubscriber test = Flowable.fromPublisher(observer)
                                  .observeOn(Schedulers.trampoline())
                                  .test();

    StatusException exception = Status.CANCELLED.asException();
    observer.onError(exception);

    test.awaitTerminalEvent();
    test.assertError(exception);

    assertThat(called.get()).isTrue();
    assertThat(observer.outputFused).isTrue();
}
 
@Test
public void statusRuntimeExceptionTriggersHandlerFuseable() {
    CallStreamObserver delegate = mock(CallStreamObserver.class);
    final AtomicBoolean called = new AtomicBoolean(false);

    AbstractStreamObserverAndPublisher observer = new TestStreamObserverAndPublisherWithFusion(new ArrayBlockingQueue(1), null, new Runnable() {
        @Override
        public void run() {
            called.set(true);
        }
    });

    observer.onSubscribe(delegate);

    TestSubscriber test = Flowable.fromPublisher(observer)
                                  .observeOn(Schedulers.trampoline())
                                  .test();

    StatusRuntimeException exception = Status.CANCELLED.asRuntimeException();
    observer.onError(exception);

    test.awaitTerminalEvent();
    test.assertError(exception);

    assertThat(called.get()).isTrue();
    assertThat(observer.outputFused).isTrue();
}
 
ReactorServerStreamObserverAndPublisher(
        ServerCallStreamObserver<?> serverCallStreamObserver,
        Consumer<CallStreamObserver<?>> onSubscribe,
        int prefetch,
        int lowTide) {
    super(serverCallStreamObserver, Queues.<T>get(DEFAULT_CHUNK_SIZE).get(), onSubscribe, prefetch, lowTide);
}
 
ReactorClientStreamObserverAndPublisher(
        Consumer<CallStreamObserver<?>> onSubscribe,
        Runnable onTerminate,
        int prefetch,
        int lowTide) {
    super(Queues.<T>get(DEFAULT_CHUNK_SIZE).get(), onSubscribe, onTerminate, prefetch, lowTide);
}
 
源代码22 项目: mirror   文件: BlockingStreamObserver.java
BlockingStreamObserver(CallStreamObserver<T> delegate) {
  this.delegate = delegate;
  final Runnable notifyAll = () -> {
    synchronized (lock) {
      lock.notifyAll(); // wake up our thread
    }
  };
  this.delegate.setOnReadyHandler(notifyAll);
  if (delegate instanceof ServerCallStreamObserver) {
    ((ServerCallStreamObserver<T>) delegate).setOnCancelHandler(notifyAll);
  }
}
 
RxClientStreamObserverAndPublisher(
        Consumer<CallStreamObserver<?>> onSubscribe,
        Runnable onTerminate) {
    super(new SimpleQueueAdapter<T>(new SpscArrayQueue<T>(DEFAULT_CHUNK_SIZE)), onSubscribe, onTerminate);
}
 
public AbstractClientStreamObserverAndPublisher(
        Queue<T> queue,
        Consumer<CallStreamObserver<?>> onSubscribe) {
    super(queue, onSubscribe);
}
 
public AbstractClientStreamObserverAndPublisher(
        Queue<T> queue,
        Consumer<CallStreamObserver<?>> onSubscribe,
        Runnable onTerminate) {
    super(queue, onSubscribe, onTerminate);
}
 
AbstractStreamObserverAndPublisher(
        Queue<T> queue,
        Consumer<CallStreamObserver<?>> onSubscribe) {
    this(queue, DEFAULT_CHUNK_SIZE, TWO_THIRDS_OF_DEFAULT_CHUNK_SIZE, onSubscribe);
}
 
AbstractStreamObserverAndPublisher(
        Queue<T> queue,
        Consumer<CallStreamObserver<?>> onSubscribe,
        Runnable onTerminate) {
    this(queue, DEFAULT_CHUNK_SIZE, TWO_THIRDS_OF_DEFAULT_CHUNK_SIZE, onSubscribe, onTerminate);
}
 
private void drainRegular(final Subscriber<? super T> subscriber) {
    int missed = 1;

    final CallStreamObserver<?> s = subscription;
    final Queue<T> q = queue;
    int sent = produced;
    long r;

    for (;;) {
        r = requested;
        while (r != sent) {
            boolean d = done;

            T t = q.poll();
            boolean empty = t == null;

            if (checkTerminated(d, empty, subscriber, q)) {
                return;
            }

            if (empty) {
                break;
            }

            subscriber.onNext(t);

            sent++;

            if (sent == limit) {
                if (r != Long.MAX_VALUE) {
                    r = REQUESTED.addAndGet(this, -sent);
                }

                s.request(sent);
                sent = 0;
            }
        }

        if (r == sent) {
            if (checkTerminated(done, q.isEmpty(), subscriber, q)) {
                return;
            }
        }

        int w = wip;
        if (missed == w) {
            produced = sent;
            missed = WIP.addAndGet(this, -missed);
            if (missed == 0) {
                break;
            }
        } else {
            missed = w;
        }
    }
}
 
private void drain() {
    if (WIP.getAndIncrement(this) != 0) {
        return;
    }

    int mode = sourceMode;

    int missed = 1;
    final CallStreamObserver<? super T> subscriber = downstream;


    if (mode == NOT_FUSED) {
        final Subscription s = subscription;

        if (s instanceof FusionModeAwareSubscription) {
            mode = ((FusionModeAwareSubscription) s).mode();
            sourceMode = mode;

            if (mode == SYNC) {
                done = true;
            } else {
                s.request(1);
            }
        } else {
            mode = NONE;
            sourceMode = mode;
        }
    }


    for (;;) {
        if (subscriber != null) {
            if (mode == SYNC) {
                drainSync();
            } else if (mode == ASYNC) {
                drainAsync();
            } else {
                drainRegular();
            }

            return;
        }

        missed = WIP.addAndGet(this, -missed);

        if (missed == 0) {
            break;
        }
    }
}
 
public TestStreamObserverAndPublisher(
    com.salesforce.reactivegrpc.common.Consumer<CallStreamObserver<?>> onSubscribe) {
    super(new ConcurrentLinkedQueue<T>(), onSubscribe);
}
 
 类所在包
 类方法
 同包方法